Bus 简介
Spring Cloud Bus 是 Spring Cloud 体系内的消息总线,支持 RabbitMQ 和 Kafka 两种消息中间件。所谓消息总线,简单理解就是一个消息中心,众多微服务实例都可以连接到总线上,实例可以往消息中心发送或接收信息,例如:实例 A 发送一条消息到总线上,总线上的实例 B 可以接收到信息(实例 B 订阅了实例 A),消息总线充当一个中间者的角色,使得实例 A 和实例 B 解耦
Spring Cloud Bus 实战
Spring Cloud Bus 可以将 Spring 事件机制和 Stream 结合在一起,具体机制如下:
- 在需要发布或者监听事件的应用中增加
@RemoteApplicationEventScan
注解,通过该注解
可以启动 Stream 中消息通道的绑定 - 对于事件发布,需要承
ApplicationEvent
的扩展类RemoteApplicationEvent
,通过ApplicationContext.publishEvent()
发布事件时,Spring Cloud Bus 会对所要发布的事件进行包装,形成消息,通过默认的 Spring Cloud Bus 消息通道发送到消息中间件 - 对于事件监听者,则不需要进行任何变更,仍旧按照 Spring 的方式实现消息的监听i
安装并启动 ZooKeeper 和 Kafka,创建事件发布者项目,引入依赖
org.springframework.cloud spring-cloud-starter-bus-kafka
定义用户事件类 UserEvent,实现 RemoteApplicationEvent
@Data@Slf4j@EqualsAndHashCode(callSuper = true)public class UserEvent extends RemoteApplicationEvent { public UserEvent(Object source, String originService, String destination) { super(source, originService, destination); }}
- originService:对于事件发布者来说 originService 就是自己
- destinationService:将事件发布到哪些微服务实例,配置的格式为
{serviceld):{appContextId)
,在配置时 serviceld 和 appContextld 可以使用通配符,比如userservice:**
会将事件发布给 userservice 微服务
发布消息代码如下
@Slf4j@RestControllerpublic class TestCon { @Autowired private ApplicationContextHolder holder; @GetMapping("/test/userEvent") public void userAdd() { User user = new User(); user.setId("2"); user.setName("tom"); user.setAge(50); ApplicationContext ctx = ApplicationContextHolder.getApplicationContext(); UserEvent event = new UserEvent(user, ctx.getId(), "*:**"); ctx.publishEvent(event); }}
在配置文件中添加如下配置:
spring: cloud: stream: default-binder: kafka kafka: binder: brokers: localhost:9092
在启动类添加 @RemoteApplicationEventScan
注解
@SpringBootApplication@RemoteApplicationEventScanpublic class Server01Application { public static void main(String[] args) { SpringApplication.run(Server01Application.class, args); }}
创建事件接收者项目,引入和事件发布者同样的依赖,将 UserEvent 类复制到该模块下,实现事件监听类UserEventListener
@Slf4j@Componentpublic class UserEventListener implements ApplicationListener { @Override public void onApplicationEvent(UserEvent event) { log.info("收到用户事件: {}", event); }}
加上事件发布者同样的配置和启动类注解
启动两个项目,请求事件发布者的 /test/userEvent
接口,即可发布和接收事件