背景

主要解决不同消息中间件切换问题。实现不同中间件的代码解耦。
链接: 支持的中间件


后文使用kafka测试。

引入依赖

 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.3</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>

编写配置文件

配置文件

参考说明

spring:application:name: streamDemocloud:function:definition: streamDemostream:bindings:streamDemo-out-0:destination: streamDemogroup: ${spring.application.name}streamDemo-in-0:destination: streamDemogroup: ${spring.application.name}kafka:binder:auto-create-topics: truereplication-factor: 1brokers: kafkaServer:9092

生产者代码

@Service@Slf4jpublic class MiddleMessageProducer {private static final String pvRecordChannel="streamDemo-out-0";@Resourceprivate StreamBridge streamBridge;public void saveLogInfo(String msg) {streamBridge.send( pvRecordChannel, MessageBuilder.withPayload(msg).build());}}

消费者代码

@Slf4j@Componentpublic class MessageConsumer{@Beanpublic Consumer<String> streamDemo() {return request -> {log.info("收到消息:{}",request);};}}

验证