背景
主要解决不同消息中间件切换问题。实现不同中间件的代码解耦。
链接: 支持的中间件
后文使用kafka测试。
引入依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.3</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>
编写配置文件
配置文件
参考说明
spring:application:name: streamDemocloud:function:definition: streamDemostream:bindings:streamDemo-out-0:destination: streamDemogroup: ${spring.application.name}streamDemo-in-0:destination: streamDemogroup: ${spring.application.name}kafka:binder:auto-create-topics: truereplication-factor: 1brokers: kafkaServer:9092
生产者代码
@Service@Slf4jpublic class MiddleMessageProducer {private static final String pvRecordChannel="streamDemo-out-0";@Resourceprivate StreamBridge streamBridge;public void saveLogInfo(String msg) {streamBridge.send( pvRecordChannel, MessageBuilder.withPayload(msg).build());}}
消费者代码
@Slf4j@Componentpublic class MessageConsumer{@Beanpublic Consumer<String> streamDemo() {return request -> {log.info("收到消息:{}",request);};}}