一、工程简介
1、生产者(test-11-rabbitmq-producer,spring boot 版本 2.4.1)
1)pom依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
2)application.yml 文件配置
server: port: 8012spring: application: name: test-12-rabbitmq-consumer-01 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: /
2、消费者(test-12-rabbitmq-consumer-01,spring boot 版本 2.4.1)
1)pom依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
2)application.yml 文件配置
server: port: 8012spring: application: name: test-12-rabbitmq-consumer-01 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: /
3、消费者(test-12-rabbitmq-consumer-02,spring boot 版本 2.4.1)
1)pom依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
2)application.yml 文件配置
server: port: 8013spring: application: name: test-13-rabbitmq-consumer-02 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: /
二、RabbitMQ常用工作模式简单实现
1、简单模式(Simple)
Ⅰ、模式特点
① 一个生产者,一个消费者,点对点消费消息
② 默认使用 direct 交换机(RabbitMQ中默认使用 direct 交换机,每个新建的队列都会自动绑定到 direct 交换机上,且绑定的路由键 routingKey 名称和队列名称相同。如果想让队列绑定其他交换机,可以手动进行绑定。)
③ 工作流程:生产者生产消息并将消息发送到交换机 –> 交换机将消息路由到绑定的队列 –> 队列收到消息 –> 消费者监听队列获取消息进行消费
Ⅱ、生产者实现
① 在生产者工程 test-11-rabbitmq-producer 下新增 RabbitMQ 的配置类 RabbitmqConfig,内容如下:
package com.test.rabbitmq_producer.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * RabbitMQ 消息生产者配置类 * 配置类需要配置哪些东西: * 1、定义队列 * 2、定义交换机 * 3、配置交换机类型等相关信息(比如说:是否持久化、是否自动删除...) * 4、声明队列 * 5、队列绑定交换机(RabbitMQ默认使用direct交换机,每个新建的队列都会自动绑定到 direct 交换机上,且绑定的路由键 routingKey 名称和队列名称相同。如果想让队列绑定其他交换机,可以手动进行绑定。) */@Slf4j@Configurationpublic class RabbitmqConfig { //定义队列 public static final String SIMPLE_QUEUE = "simple.queue";//简单模式测试队列 //声明-简单模式测试队列 @Bean(SIMPLE_QUEUE) public Queue SIMPLE_QUEUE() { return new Queue(SIMPLE_QUEUE); }}
② 在生产者工程 test-11-rabbitmq-producer 下新增测试类 ModelProducerTest,内容如下:
package com.test.rabbitmq_consumer02.mq;import com.test.rabbitmq_consumer02.config.RabbitmqConfig;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.context.annotation.Configuration;/** * RabbitMQ 常用模式测试类 -- 生产者 */@Slf4j@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestpublic class ModelProducerTest { /** * 简单模式(Simple) * 一对一消费,一条消息只有一个消费者可以消费 * 备注: * 1、一个队列中的一条消息只会被一个消费者消费一次 * 2、简单模式中,使用默认交换机direct */ @Test public void simpleSendMessageTest() { for (int i = 1; i <= 5; i++) { String currTime = TimeFormatUtil.getTimeByMs(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"); //要发送的消息内容 String message = "hello,简单模式(Simple),发送第 " + i + " 条消息,发送时间:" + currTime; rabbitTemplate.convertAndSend(RabbitmqConfig.SIMPLE_QUEUE,message); System.out.println("简单模式(Simple),生产者,发送第 " + i + " 条消息,发送的消息内容:" + message); } }}
Ⅲ、消费者实现
① 将生产者工程 test-11-rabbitmq-producer 下 RabbitMQ 的配置类 RabbitmqConfig 复制一份到 消费者工程 test-12-rabbitmq-consumer-01 下,无需修改
② 在 消费者工程 test-12-rabbitmq-consumer-01 下新增一个消息监听类 ModelListener,内容如下:
package com.test.rabbitmq_consumer01.mq;import com.test.rabbitmq_consumer01.config.RabbitmqConfig;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.context.annotation.Configuration;/** * RabbitMQ 常用模式测试类 -- 消费者 */@Configurationpublic class ModelListener { /** * 简单模式(Simple) * 一对一消费,一条消息只有一个消费者可以消费 * 备注: * 1、一个队列中的一条消息只会被一个消费者消费一次 * 2、简单模式中,使用默认交换机direct */ @RabbitListener(queues = {RabbitmqConfig.SIMPLE_QUEUE})//监听 RabbitmqConfig.SIMPLE_QUEUE 队列 public void simpleReceive(String message) { System.out.println("consumer-01,简单模式(Simple),接收到的消息:" + message); }}
Ⅳ、测试
① 启动生产者工程 test-11-rabbitmq-producer
② 启动消费者工程 test-12-rabbitmq-consumer-01
③ 执行测试类 ModelProducerTest 中的 simpleSendMessageTest() 方法,发送消息
④ 消息监听结果, ModelListener 的 simpleReceive() 方法中收到消息如下:
2、工作队列模式(Work Queues)
Ⅰ、模式特点
① 一个生产者,多个消费者(默认使用轮询策略,可配置能者多劳模式,谁完成的快,谁多做一点)
② 虽然有多个消费者,但是只有一个队列,一条消息只能被一个消费者获取
② 默认使用 direct 交换机(RabbitMQ中默认使用 direct 交换机,每个新建的队列都会自动绑定到 direct 交换机上,且绑定的路由键 routingKey 名称和队列名称相同。如果想让队列绑定其他交换机,可以手动进行绑定。)
③ 工作流程:生产者生产消息并将消息发送到交换机 –> 交换机将消息路由到绑定的队列 –> 队列收到消息 –> 消费者监听队列获取消息进行消费
Ⅱ、生产者实现
① 在生产者工程 test-11-rabbitmq-producer 下 RabbitMQ 的配置类 RabbitmqConfig 中添加一个队列,添加完成后将 RabbitmqConfig 复制到 rabbitmq-consumer-01 和 rabbitmq-consumer-02 工程中,生产者和所有消费者中的配置类完全一样即可,添加后的内容如下:
package com.test.rabbitmq_producer.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * RabbitMQ 消息生产者配置类 * 配置类需要配置哪些东西: * 1、定义队列 * 2、定义交换机 * 3、配置交换机类型等相关信息(比如说:是否持久化、是否自动删除...) * 4、声明队列 * 5、队列绑定交换机(RabbitMQ默认使用direct交换机,每个新建的队列都会自动绑定到 direct 交换机上,且绑定的路由键 routingKey 名称和队列名称相同。如果想让队列绑定其他交换机,可以手动进行绑定。) */@Slf4j@Configurationpublic class RabbitmqConfig { //定义队列 public static final String SIMPLE_QUEUE = "simple.queue";//简单模式测试队列 public static final String WORK_QUEUES = "work.queues";//工作队列模式测试队列 //声明-简单模式测试队列 @Bean(SIMPLE_QUEUE) public Queue SIMPLE_QUEUE() { return new Queue(SIMPLE_QUEUE); }//声明-工作队列模式测试队列 @Bean(WORK_QUEUES) public Queue WORK_QUEUES() { return new Queue(WORK_QUEUES); }}
② 在 生产者工程 test-11-rabbitmq-producer 下测试类 ModelProducerTest 中新增一个方法 workQueuesSendMessageTest(),内容如下:
/** * 工作队列模式(Work Queues) * 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费(轮询策略),可配置能者多劳模式,谁完成的快,谁多做一点 * 备注:一个队列中的一条消息只会被一个消费者消费一次 */ @Test public void workQueuesSendMessageTest() { for (int i = 1; i <= 10; i++) { String currTime = TimeFormatUtil.getTimeByMs(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"); //要发送的消息内容 String message = "hello,工作队列模式(Work Queues),发送第 " + i + " 条消息,发送时间:" + currTime; rabbitTemplate.convertAndSend(RabbitmqConfig.WORK_QUEUES,message); System.out.println("工作队列模型(Work Queues),生产者,发送第 " + i + " 条消息,发送的消息内容:" + message); } }
Ⅲ、消费者实现
① 在 消费者工程 test-12-rabbitmq-consumer-01 下消息监听类 ModelListener 中新增一个监听方法 workQueuesReceive(),内容如下:
/** * 工作队列模式(Work Queues) * 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费(轮询策略),可配置能者多劳模式,谁完成的快,谁多做一点 * 备注:一个队列中的一条消息只会被一个消费者消费一次 */ @RabbitListener(queues = {RabbitmqConfig.WORK_QUEUES}) public void workQueuesReceive(String message) { System.out.println("consumer-01,工作队列模型(Work Queues),消费者,接收到的消息:" + message); }
② 在 消费者工程 test-12-rabbitmq-consumer-02 下消息监听类 ModelListener 中新增一个监听方法 workQueuesReceive(),内容如下:
/** * 工作队列模式(Work Queues) * 多个消费者,你一个我一个分配消费消息,有预取机制,默认公平消费(轮询策略),可配置能者多劳模式,谁完成的快,谁多做一点 * 备注:一个队列中的一条消息只会被一个消费者消费一次 */ @RabbitListener(queues = {RabbitmqConfig.WORK_QUEUES}) public void workQueuesReceive(String message) { System.out.println("consumer-02,工作队列模型(Work Queues),消费者,接收到的消息:" + message); }
Ⅳ、测试
① 启动生产者工程 test-11-rabbitmq-producer
② 启动消费者工程 test-12-rabbitmq-consumer-01
③ 启动消费者工程 test-12-rabbitmq-consumer-02
④ 执行测试类 ModelProducerTest 中的 workQueuesSendMessageTest() 方法,发送消息
⑤ 消息监听结果, test-12-rabbitmq-consumer-01 的 workQueuesReceive() 方法中收到消息如下:
⑥ 消息监听结果, test-12-rabbitmq-consumer-02 的 workQueuesReceive() 方法中收到消息如下:
Ⅴ、结论:
① 默认情况下,工作队列模式(Work Queues)使用轮询策略,多个消费者你一条我一条的公平消费
② 工作队列模式和简单模式的区别仅在于消费者数量
Ⅵ、问题:怎样取消工作队列模式的预取机制(轮询)?
解决:在生产者和所有消费者工程的 application.yml 文件中添加如下配置
application.yml 文件修改重启服务后,test-12-rabbitmq-consumer-01 的 workQueuesReceive() 方法中收到消息如下:
application.yml 文件修改重启服务后,test-12-rabbitmq-consumer-02 的 workQueuesReceive() 方法中收到消息如下:
3、发布订阅模式(Publish/Subscribe)
Ⅰ、模式特点
① 一个生产者,多个消费者,一条消息可以被多个消费者同时消费
② 使用 fanout 交换机
③ 工作流程:生产者生产消息并将消息发送给交换机 –> 交换机收到消息后将消息转发给绑定该交换机的所有队列 –> 队列收到消息 –> 消费者监听队列获取消息进行消费
Ⅱ、生产者实现
① 在生产者工程 test-11-rabbitmq-producer 下 RabbitMQ 的配置类 RabbitmqConfig 中添加两个队列和一个自定义交换机,添加完成后将 RabbitmqConfig 复制到 rabbitmq-consumer-01 和 rabbitmq-consumer-02 工程中,生产者和所有消费者中的配置类完全一样即可,添加后的内容如下:
package com.test.rabbitmq_producer.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * RabbitMQ 消息生产者配置类 * 配置类需要配置哪些东西: * 1、定义队列 * 2、定义交换机 * 3、配置交换机类型等相关信息(比如说:是否持久化、是否自动删除...) * 4、声明队列 * 5、队列绑定交换机(RabbitMQ默认使用direct交换机,每个新建的队列都会自动绑定到 direct 交换机上,且绑定的路由键 routingKey 名称和队列名称相同。如果想让队列绑定其他交换机,可以手动进行绑定。) */@Slf4j@Configurationpublic class RabbitmqConfig { //定义队列 public static final String SIMPLE_QUEUE = "simple.queue";//简单模式测试队列 public static final String WORK_QUEUES = "work.queues";//工作队列模式测试队列 public static final String PUBLISH_SUBSCRIBE_QUEUES_1 = "publish.subscribe.queues.01";//发布订阅模式测试队列 public static final String PUBLISH_SUBSCRIBE_QUEUES_2 = "publish.subscribe.queues.02";//发布订阅模式测试队列//定义交换机public static final String PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE = "fanout_exchange";//发布订阅模式测试交换机 //声明-简单模式测试队列 @Bean(SIMPLE_QUEUE) public Queue SIMPLE_QUEUE() { return new Queue(SIMPLE_QUEUE); }//声明-工作队列模式测试队列 @Bean(WORK_QUEUES) public Queue WORK_QUEUES() { return new Queue(WORK_QUEUES); } //声明-发布订阅模式测试队列01 @Bean(PUBLISH_SUBSCRIBE_QUEUES_1) public Queue PUBLISH_SUBSCRIBE_QUEUES_1() { return new Queue(PUBLISH_SUBSCRIBE_QUEUES_1); } //声明-发布订阅模式测试队列02 @Bean(PUBLISH_SUBSCRIBE_QUEUES_2) public Queue PUBLISH_SUBSCRIBE_QUEUES_2() { return new Queue(PUBLISH_SUBSCRIBE_QUEUES_2); }/** * 配置交换机--fanout交换机 * @return */@Bean(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) public Exchange PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE() { return ExchangeBuilder.fanoutExchange(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE).durable(true).build(); }/** * 队列绑定到交换机 * @param queue 队列 * @param exchange 交换机 * @return */ @Bean public Binding BINDING_PUBLISH_SUBSCRIBE_QUEUES_1(@Qualifier(PUBLISH_SUBSCRIBE_QUEUES_1) Queue queue, @Qualifier(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) Exchange exchange) { //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符 return BindingBuilder.bind(queue).to(exchange).with("publish.subscribe.queues.01").noargs(); } /** * 队列绑定到交换机 * @param queue 队列 * @param exchange 交换机 * @return */ @Bean public Binding BINDING_PUBLISH_SUBSCRIBE_QUEUES_2(@Qualifier(PUBLISH_SUBSCRIBE_QUEUES_2) Queue queue, @Qualifier(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) Exchange exchange) { //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符 return BindingBuilder.bind(queue).to(exchange).with("publish.subscribe.queues.02").noargs(); }}
② 在 生产者工程 test-11-rabbitmq-producer 下测试类 ModelProducerTest 中新增一个方法 publishSubscribeSendMessageTest(),内容如下:
/** * 发布订阅模式(Publish/Subscribe) * 发布订阅模式使用 fanout 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列) * 备注: * 1、生产者将消息发送给交换机,并指定路由规则 * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费) */ @Test public void publishSubscribeSendMessageTest() throws InterruptedException { for (int i = 1; i <= 10; i++) { String currTime = TimeFormatUtil.getTimeByMs(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"); //要发送的消息内容 String message = "hello,publish.subscribe.queues,发送第 " + i + " 条消息,发送时间:" + currTime; String routingKey = ""; if (i % 2 == 0) { message = message.replace("queues", "queues.01"); routingKey = "publish.subscribe.queues.01"; } else { message = message.replace("queues", "queues.02"); routingKey = "publish.subscribe.queues.02"; } /** * convertAndSend(String exchange, String routingKey, Object object): * 参数一(String exchange):交换机名称 * 参数二(String routingKey):路由规则 * 参数三(Object object):发送的消息内容 */ rabbitTemplate.convertAndSend(RabbitmqConfig.PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE,routingKey,message); System.out.println("发布订阅模型(Publish/Subscribe),生产者,发送第 " + i + " 条消息,发送的消息内容:" + message);// Thread.sleep(2000); } }
Ⅲ、消费者实现
① 在 消费者工程 test-12-rabbitmq-consumer-01 下消息监听类 ModelListener 中新增一个监听方法 publishSubscribeQueuesReceive(),内容如下:
/** * 发布订阅模式(Publish/Subscribe) * 发布订阅模式使用 fanout 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列) * 备注: * 1、生产者将消息发送给交换机,并指定路由规则 * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费) */ @RabbitListener(queues = {RabbitmqConfig.PUBLISH_SUBSCRIBE_QUEUES_1}) public void publishSubscribeQueuesReceive(String message) throws InterruptedException { Thread.sleep(100); System.out.println("consumer-01,发布订阅模型(Publish/Subscribe),消费者一*绑定队列*PUBLISH_SUBSCRIBE_QUEUES_1,接收到的消息:" + message); }
② 在 消费者工程 test-12-rabbitmq-consumer-02 下消息监听类 ModelListener 中新增一个监听方法 publishSubscribeQueuesReceive(),内容如下:
/** * 发布订阅模式(Publish/Subscribe) * 发布订阅模式使用 fanout 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列) * 备注: * 1、生产者将消息发送给交换机,并指定路由规则 * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费) */ @RabbitListener(queues = {RabbitmqConfig.PUBLISH_SUBSCRIBE_QUEUES_2}) public void publishSubscribeQueuesReceive(String message) throws InterruptedException { Thread.sleep(100); System.out.println("consumer-02,发布订阅模型(Publish/Subscribe),消费者二*绑定队列*PUBLISH_SUBSCRIBE_QUEUES_2,接收到的消息:" + message); }
Ⅳ、测试
① 启动生产者工程 test-11-rabbitmq-producer
② 启动消费者工程 test-12-rabbitmq-consumer-01
③ 启动消费者工程 test-12-rabbitmq-consumer-02
④ 执行测试类 ModelProducerTest 中的 publishSubscribeSendMessageTest() 方法,发送消息
⑤ 消息监听结果, test-12-rabbitmq-consumer-01 的 publishSubscribeQueuesReceive() 方法中收到消息如下:
⑥ 消息监听结果, test-12-rabbitmq-consumer-02 的 publishSubscribeQueuesReceive() 方法中收到消息如下:
结论:在发布订阅模式(Publish/Subscribe)下一条消息是可以被多个消费者同时消费的(多个队列绑定同一个交换机)。
4、路由模式(Routing)
Ⅰ、模式特点
① 一个生产者,一个交换机,多个队列,多个消费者,消费者监听队列,队列跟交换机绑定
② 使用 direct 交换机
③ 队列绑定交换机的时候必须指定 RoutingKey,不支持通配符
④ 发送消息的时候必须指定 RoutingKey
⑤ RoutingKey 可以重复
⑥ 工作流程:生产者生产消息并将消息发送给交换机 –> 交换机收到消息后根据路由规则将消息转发给绑定该交换机且符合路由规则的队列 –> 队列收到消息 –> 消费者监听队列获取消息进行消费
Ⅱ、生产者实现
① 在生产者工程 test-11-rabbitmq-producer 下 RabbitMQ 的配置类 RabbitmqConfig 中添加两个队列和一个自定义交换机,添加完成后将 RabbitmqConfig 复制到 rabbitmq-consumer-01 和 rabbitmq-consumer-02 工程中,生产者和所有消费者中的配置类完全一样即可,添加后的内容如下:
package com.test.rabbitmq_producer.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * RabbitMQ 消息生产者配置类 * 配置类需要配置哪些东西: * 1、定义队列 * 2、定义交换机 * 3、配置交换机类型等相关信息(比如说:是否持久化、是否自动删除...) * 4、声明队列 * 5、队列绑定交换机(RabbitMQ默认使用direct交换机,每个新建的队列都会自动绑定到 direct 交换机上,且绑定的路由键 routingKey 名称和队列名称相同。如果想让队列绑定其他交换机,可以手动进行绑定。) */@Slf4j@Configurationpublic class RabbitmqConfig { //定义队列 public static final String SIMPLE_QUEUE = "simple.queue";//简单模式测试队列 public static final String WORK_QUEUES = "work.queues";//工作队列模式测试队列 public static final String PUBLISH_SUBSCRIBE_QUEUES_1 = "publish.subscribe.queues.01";//发布订阅模式测试队列 public static final String PUBLISH_SUBSCRIBE_QUEUES_2 = "publish.subscribe.queues.02";//发布订阅模式测试队列 public static final String ROUTING_QUEUES_SMS = "routing.queues.sms";//路由模式测试队列 public static final String ROUTING_QUEUES_EMAIL = "routing.queues.email";//路由模式测试队列//定义交换机public static final String PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE = "fanout_exchange";//发布订阅模式测试交换机public static final String ROUTING_DIRECT_TEST_EXCHANGE = "direct_exchange";//路由模式测试交换机 //声明-简单模式测试队列 @Bean(SIMPLE_QUEUE) public Queue SIMPLE_QUEUE() { return new Queue(SIMPLE_QUEUE); }//声明-工作队列模式测试队列 @Bean(WORK_QUEUES) public Queue WORK_QUEUES() { return new Queue(WORK_QUEUES); } //声明-发布订阅模式测试队列01 @Bean(PUBLISH_SUBSCRIBE_QUEUES_1) public Queue PUBLISH_SUBSCRIBE_QUEUES_1() { return new Queue(PUBLISH_SUBSCRIBE_QUEUES_1); } //声明-发布订阅模式测试队列02 @Bean(PUBLISH_SUBSCRIBE_QUEUES_2) public Queue PUBLISH_SUBSCRIBE_QUEUES_2() { return new Queue(PUBLISH_SUBSCRIBE_QUEUES_2); }//声明-路由模式短信测试队列 @Bean(ROUTING_QUEUES_SMS) public Queue ROUTING_QUEUES_SMS() { return new Queue(ROUTING_QUEUES_SMS); } //声明-路由模式邮件测试队列 @Bean(ROUTING_QUEUES_EMAIL) public Queue ROUTING_QUEUES_EMAIL() { return new Queue(ROUTING_QUEUES_EMAIL); }/** * 配置交换机--fanout交换机 * @return */@Bean(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) public Exchange PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE() { return ExchangeBuilder.fanoutExchange(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE).durable(true).build(); }/** * 配置交换机--direct交换机 * @return */ @Bean(ROUTING_DIRECT_TEST_EXCHANGE) public Exchange ROUTING_DIRECT_TEST_EXCHANGE() { return ExchangeBuilder.directExchange(ROUTING_DIRECT_TEST_EXCHANGE).durable(true).build(); }/** * 队列绑定到交换机 * @param queue 队列 * @param exchange 交换机 * @return */ @Bean public Binding BINDING_PUBLISH_SUBSCRIBE_QUEUES_1(@Qualifier(PUBLISH_SUBSCRIBE_QUEUES_1) Queue queue, @Qualifier(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) Exchange exchange) { //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符 return BindingBuilder.bind(queue).to(exchange).with("publish.subscribe.queues.01").noargs(); } /** * 队列绑定到交换机 * @param queue 队列 * @param exchange 交换机 * @return */ @Bean public Binding BINDING_PUBLISH_SUBSCRIBE_QUEUES_2(@Qualifier(PUBLISH_SUBSCRIBE_QUEUES_2) Queue queue, @Qualifier(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) Exchange exchange) { //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符 return BindingBuilder.bind(queue).to(exchange).with("publish.subscribe.queues.02").noargs(); }/** * 队列绑定到交换机 * @param queue 队列 * @param exchange 交换机 * @return */ @Bean public Binding BINDING_ROUTING_QUEUES_SMS(@Qualifier(ROUTING_QUEUES_SMS) Queue queue, @Qualifier(ROUTING_DIRECT_TEST_EXCHANGE) Exchange exchange) { //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符 return BindingBuilder.bind(queue).to(exchange).with("routing.queues.sms").noargs(); } /** * 队列绑定到交换机 * @param queue 队列 * @param exchange 交换机 * @return */ @Bean public Binding BINDING_ROUTING_QUEUES_EMAIL(@Qualifier(ROUTING_QUEUES_EMAIL) Queue queue, @Qualifier(ROUTING_DIRECT_TEST_EXCHANGE) Exchange exchange) { //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符 return BindingBuilder.bind(queue).to(exchange).with("routing.queues.email").noargs(); }}
② 在 生产者工程 test-11-rabbitmq-producer 下测试类 ModelProducerTest 中新增一个方法 routingSendMessageTest(),内容如下:
/** * 路由模式(Routing) * Routing模式 使用 Direct 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列) * 备注: * 1、生产者将消息发送给交换机,并指定路由规则 * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费) * 3、路由模式不支持使用通配符,convertAndSend() 方法中指定的 routingKey 和队列绑定交换机时 with() 方法中配置的 routingKey 必须完全匹配才能将消息发送到具体的队列上 */ @Test public void routingSendMessageTest() throws InterruptedException { for (int i = 1; i <= 10; i++) { String currTime = TimeFormatUtil.getTimeByMs(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"); //要发送的消息内容 String message = "hello,routing.queues,发送第 " + i + " 条消息,发送时间:" + currTime; String routingKey = ""; if (i % 2 == 0) { //发送短信 message = message.replace("queues", "queues.sms"); routingKey = "routing.queues.sms"; } else { //发送邮件 message = message.replace("queues", "queues.email"); routingKey = "routing.queues.email"; } /** * convertAndSend(String exchange, String routingKey, Object object): * 参数一(String exchange):交换机名称 * 参数二(String routingKey):指定的路由,Routing模式下指定的路由必须和 RabbitmqConfig 配置类中队列和交换机绑定时 with() 中配置的 routingKey 要一致才行,不支持使用通配符 * 参数三(Object object):发送的消息内容 */ rabbitTemplate.convertAndSend(RabbitmqConfig.ROUTING_DIRECT_TEST_EXCHANGE,routingKey,message); System.out.println("路由模式(Routing),生产者,发送第 " + i + " 条消息,发送的消息内容:" + message);// Thread.sleep(1000); } }
Ⅲ、消费者实现
① 在 消费者工程 test-12-rabbitmq-consumer-01 下消息监听类 ModelListener 中新增一个监听方法 routingQueuesReceive(),内容如下:
/** * 路由模式(Routing) * Routing模式 使用 Direct 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列) * 备注: * 1、生产者将消息发送给交换机,并指定路由规则 * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费) * 3、路由模式不支持使用通配符,convertAndSend() 方法中指定的 routingKey 和队列绑定交换机时 with() 方法中配置的 routingKey 必须完全匹配才能将消息发送到具体的队列上 */ @RabbitListener(queues = {RabbitmqConfig.ROUTING_QUEUES_SMS})//queues需要手动先创建队列 public void routingQueuesReceive(String message) { System.out.println("consumer-01,路由模式(Routing),消费者一*绑定队列*ROUTING_QUEUES_SMS,接收到的消息:" + message); }
② 在 消费者工程 test-12-rabbitmq-consumer-02 下消息监听类 ModelListener 中新增一个监听方法 routingQueuesReceive(),内容如下:
/** * 路由模式(Routing) * Routing模式 使用 Direct 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列) * 备注: * 1、生产者将消息发送给交换机,并指定路由规则 * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费) * 3、路由模式不支持使用通配符,convertAndSend() 方法中指定的 routingKey 和队列绑定交换机时 with() 方法中配置的 routingKey 必须完全匹配才能将消息发送到具体的队列上 */ @RabbitListener(queues = {RabbitmqConfig.ROUTING_QUEUES_EMAIL}) public void routingQueuesReceive(String message) throws InterruptedException { Thread.sleep(100); System.out.println("consumer-02,路由模式(Routing),消费者二*绑定队列*ROUTING_QUEUES_EMAIL,接收到的消息:" + message); }
Ⅳ、测试
① 启动生产者工程 test-11-rabbitmq-producer
② 启动消费者工程 test-12-rabbitmq-consumer-01
③ 启动消费者工程 test-12-rabbitmq-consumer-02
④ 执行测试类 ModelProducerTest 中的 routingSendMessageTest() 方法,发送消息
⑤ 消息监听结果, test-12-rabbitmq-consumer-01 的 routingQueuesReceive() 方法中收到消息如下:
⑥ 消息监听结果, test-12-rabbitmq-consumer-02 的 routingQueuesReceive() 方法中收到消息如下:
问题:
1、路由模式(Routing)和 发布订阅模式(Publish/Subscribe)的区别在哪?
交换机不同
路由模式使用的是 direct 交换机,direct 交换机会把消息路由到那些 binding key 与 routing key 完全匹配的队列中。(不仅要队列绑定交换机,还要验证 binding key 与 routing key 是否匹配,两个条件均满足才能收到消息)
发布订阅模式使用的是 fanout 交换机,fanout 交换机会把所有发送到该交换机的消息路由到所有与它绑定的队列中。(只要队列绑定了交换机就能收到消息,不会验证 binding key 与 routing key 是否匹配)
2、什么是 routing key ?什么是 binding key ?
routing key:convertAndSend(String exchange, String routingKey, Object object)
binding key:BindingBuilder.bind(queue).to(exchange).with(“publish.subscribe.queues.02”).noargs(); – with() 中的就是 binding key
5、主题模式(Topic,也叫通配符模式)
Ⅰ、模式特点
① 使用 topic 交换机
② 其他和路由模式基本一样,只是在路由模式的基础上增加了 使用通配符来进行 RoutingKey 匹配的功能
Ⅱ、生产者实现
① 在生产者工程 test-11-rabbitmq-producer 下 RabbitMQ 的配置类 RabbitmqConfig 中添加两个队列和一个自定义交换机,添加完成后将 RabbitmqConfig 复制到 rabbitmq-consumer-01 和 rabbitmq-consumer-02 工程中,生产者和所有消费者中的配置类完全一样即可,添加后的内容如下:
package com.test.rabbitmq_producer.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * RabbitMQ 消息生产者配置类 * 配置类需要配置哪些东西: * 1、定义队列 * 2、定义交换机 * 3、配置交换机类型等相关信息(比如说:是否持久化、是否自动删除...) * 4、声明队列 * 5、队列绑定交换机(RabbitMQ默认使用direct交换机,每个新建的队列都会自动绑定到 direct 交换机上,且绑定的路由键 routingKey 名称和队列名称相同。如果想让队列绑定其他交换机,可以手动进行绑定。) */@Slf4j@Configurationpublic class RabbitmqConfig { //定义队列 public static final String SIMPLE_QUEUE = "simple.queue";//简单模式测试队列 public static final String WORK_QUEUES = "work.queues";//工作队列模式测试队列 public static final String PUBLISH_SUBSCRIBE_QUEUES_1 = "publish.subscribe.queues.01";//发布订阅模式测试队列 public static final String PUBLISH_SUBSCRIBE_QUEUES_2 = "publish.subscribe.queues.02";//发布订阅模式测试队列 public static final String ROUTING_QUEUES_SMS = "routing.queues.sms";//路由模式测试队列 public static final String ROUTING_QUEUES_EMAIL = "routing.queues.email";//路由模式测试队列 public static final String TOPIC_QUEUES_SMS = "topic_queues_sms";//主题模式短信队列 public static final String TOPIC_QUEUES_EMAIL = "topic_queues_email";//主题模式邮件队列//定义交换机public static final String PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE = "fanout_exchange";//发布订阅模式测试交换机public static final String ROUTING_DIRECT_TEST_EXCHANGE = "direct_exchange";//路由模式测试交换机public static final String TOPIC_TEST_EXCHANGE = "topic_exchange";//主题模式测试交换机 //声明-简单模式测试队列 @Bean(SIMPLE_QUEUE) public Queue SIMPLE_QUEUE() { return new Queue(SIMPLE_QUEUE); }//声明-工作队列模式测试队列 @Bean(WORK_QUEUES) public Queue WORK_QUEUES() { return new Queue(WORK_QUEUES); } //声明-发布订阅模式测试队列01 @Bean(PUBLISH_SUBSCRIBE_QUEUES_1) public Queue PUBLISH_SUBSCRIBE_QUEUES_1() { return new Queue(PUBLISH_SUBSCRIBE_QUEUES_1); } //声明-发布订阅模式测试队列02 @Bean(PUBLISH_SUBSCRIBE_QUEUES_2) public Queue PUBLISH_SUBSCRIBE_QUEUES_2() { return new Queue(PUBLISH_SUBSCRIBE_QUEUES_2); }//声明-路由模式短信测试队列 @Bean(ROUTING_QUEUES_SMS) public Queue ROUTING_QUEUES_SMS() { return new Queue(ROUTING_QUEUES_SMS); } //声明-路由模式邮件测试队列 @Bean(ROUTING_QUEUES_EMAIL) public Queue ROUTING_QUEUES_EMAIL() { return new Queue(ROUTING_QUEUES_EMAIL); }//声明-主题模式短信队列 @Bean(TOPIC_QUEUES_SMS) public Queue TOPIC_QUEUES_SMS() { return new Queue(TOPIC_QUEUES_SMS); } //声明-主题模式邮件队列 @Bean(TOPIC_QUEUES_EMAIL) public Queue TOPIC_QUEUES_EMAIL() { return new Queue(TOPIC_QUEUES_EMAIL); }/** * 配置交换机--fanout交换机 * @return */@Bean(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) public Exchange PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE() { return ExchangeBuilder.fanoutExchange(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE).durable(true).build(); }/** * 配置交换机--direct交换机 * @return */ @Bean(ROUTING_DIRECT_TEST_EXCHANGE) public Exchange ROUTING_DIRECT_TEST_EXCHANGE() { return ExchangeBuilder.directExchange(ROUTING_DIRECT_TEST_EXCHANGE).durable(true).build(); }/** * 配置交换机--topic交换机 * @return */ @Bean(TOPIC_TEST_EXCHANGE) public Exchange TOPIC_TEST_EXCHANGE() { return ExchangeBuilder.topicExchange(TOPIC_TEST_EXCHANGE).durable(true).build(); }/** * 队列绑定到交换机 * @param queue 队列 * @param exchange 交换机 * @return */ @Bean public Binding BINDING_PUBLISH_SUBSCRIBE_QUEUES_1(@Qualifier(PUBLISH_SUBSCRIBE_QUEUES_1) Queue queue, @Qualifier(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) Exchange exchange) { //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符 return BindingBuilder.bind(queue).to(exchange).with("publish.subscribe.queues.01").noargs(); } /** * 队列绑定到交换机 * @param queue 队列 * @param exchange 交换机 * @return */ @Bean public Binding BINDING_PUBLISH_SUBSCRIBE_QUEUES_2(@Qualifier(PUBLISH_SUBSCRIBE_QUEUES_2) Queue queue, @Qualifier(PUBLISH_SUBSCRIBE_FANOUT_TEST_EXCHANGE) Exchange exchange) { //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符 return BindingBuilder.bind(queue).to(exchange).with("publish.subscribe.queues.02").noargs(); }/** * 队列绑定到交换机 * @param queue 队列 * @param exchange 交换机 * @return */ @Bean public Binding BINDING_ROUTING_QUEUES_SMS(@Qualifier(ROUTING_QUEUES_SMS) Queue queue, @Qualifier(ROUTING_DIRECT_TEST_EXCHANGE) Exchange exchange) { //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符 return BindingBuilder.bind(queue).to(exchange).with("routing.queues.sms").noargs(); } /** * 队列绑定到交换机 * @param queue 队列 * @param exchange 交换机 * @return */ @Bean public Binding BINDING_ROUTING_QUEUES_EMAIL(@Qualifier(ROUTING_QUEUES_EMAIL) Queue queue, @Qualifier(ROUTING_DIRECT_TEST_EXCHANGE) Exchange exchange) { //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符 return BindingBuilder.bind(queue).to(exchange).with("routing.queues.email").noargs(); }/** * 队列绑定到交换机 * @param queue 队列 * @param exchange 交换机 * @return */ @Bean public Binding BINDING_TOPIC_QUEUES_SMS(@Qualifier(TOPIC_QUEUES_SMS) Queue queue, @Qualifier(TOPIC_TEST_EXCHANGE) Exchange exchange) { //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符 return BindingBuilder.bind(queue).to(exchange).with("topic.queues.sms.*").noargs(); } /** * 队列绑定到交换机 * @param queue 队列 * @param exchange 交换机 * @return */ @Bean public Binding BINDING_TOPIC_QUEUES_EMAIL(@Qualifier(TOPIC_QUEUES_EMAIL) Queue queue, @Qualifier(TOPIC_TEST_EXCHANGE) Exchange exchange) { //在 with() 中指定 routingKey ,routingKey 中的 # 号表示 通配符,可以匹配任何字符 return BindingBuilder.bind(queue).to(exchange).with("topic.queues.*.email").noargs(); }}
② 在 生产者工程 test-11-rabbitmq-producer 下测试类 ModelProducerTest 中新增一个方法 topicSendMessageTest(),内容如下:
/** * 主题模式(Topic,也叫通配符模式) * Topic模式 使用 topic 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列) * 备注: * 1、生产者将消息发送给交换机,并指定路由规则 * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费) * 3、主题模式 相比于 路由模式 增加了通配符的使用 * RabbitMQ 通配符有哪些: * 1、 # :代表没有或一个或多个单词(不是一个字符哦!单词与单词之间用“.”分割); * 2、 * :代表没有或一个英文单词(不是一个字符哦); * 通配符具体使用案例: * 1、topic.queues.sms.# 能匹配 topic.queues.sms、topic.queues.sms.111(111 可以换成任何字符) * 2、topic.queues.#.email 能匹配 topic.queues.email、topic.queues.111.email(111 可以换成任何字符) */ @Test public void topicSendMessageTest() throws InterruptedException { for (int i = 1; i <= 9; i++) { String currTime = TimeFormatUtil.getTimeByMs(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"); //要发送的消息内容 String message = "hello,topic.queues,发送第 " + i + " 条消息,发送时间:" + currTime; String routingKey = ""; if (i % 3 == 0) {//3、6、9 //发送短信 message = message.replace("queues", "queues.sms"); routingKey = "topic.queues.sms"; } else if (i % 3 == 1){//1、4、7 //发送邮件 message = message.replace("queues", "queues.email"); routingKey = "topic.queues.email"; } else {//2、5、8 //短信和邮件都发送 message = message.replace("queues", "queues.sms.email"); routingKey = "topic.queues.sms.email"; } /** * convertAndSend(String exchange, String routingKey, Object object): * 参数一(String exchange):交换机名称 * 参数二(String routingKey):指定的路由,Topic模式下支持使用通配符进行匹配 * 参数三(Object object):发送的消息内容 */ rabbitTemplate.convertAndSend(RabbitmqConfig.TOPIC_TEST_EXCHANGE,routingKey,message); System.out.println("主题模式(Topic,也叫通配符模式),生产者,发送第 " + i + " 条消息,发送的消息内容:" + message);// Thread.sleep(500); } }
Ⅲ、消费者实现
① 在 消费者工程 test-12-rabbitmq-consumer-01 下消息监听类 ModelListener 中新增一个监听方法 topicQueuesReceive(),内容如下:
/** * 主题模式(Topic,也叫通配符模式) * Topic模式 使用 topic 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列) * 备注: * 1、生产者将消息发送给交换机,并指定路由规则 * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费) * 3、主题模式 相比于 路由模式 增加了通配符的使用 * RabbitMQ 通配符有哪些: * 1、 # :代表没有或一个或多个单词(不是一个字符哦!单词与单词之间用“.”分割); * 2、 * :代表没有或一个英文单词(不是一个字符哦); * 通配符具体使用案例: * 1、topic.queues.sms.# 能匹配 topic.queues.sms、topic.queues.sms.111(111 可以换成任何字符) * 2、topic.queues.#.email 能匹配 topic.queues.email、topic.queues.111.email(111 可以换成任何字符) */ @RabbitListener(queues = {RabbitmqConfig.TOPIC_QUEUES_SMS}) public void topicQueuesReceive(String message) { System.out.println("consumer-01,主题模式(Topic,也叫通配符模式),消费者一*绑定队列*TOPIC_QUEUES_SMS,接收到的消息:" + message); }
② 在 消费者工程 test-12-rabbitmq-consumer-02 下消息监听类 ModelListener 中新增一个监听方法 routingQueuesReceive(),内容如下:
/** * 主题模式(Topic,也叫通配符模式) * Topic模式 使用 topic 类型的交换机,将接收到的消息路由到每一个跟其绑定的queue(队列) * 备注: * 1、生产者将消息发送给交换机,并指定路由规则 * 2、交换机将收到的所有消息按照路由规则转发到跟当前交换机绑定的所有队列上(每个和当前交换机绑定的队列都会收到消息,然后被消费者消费,也就是说一条消息可以被多个消费者同时消费) * 3、主题模式 相比于 路由模式 增加了通配符的使用 * RabbitMQ 通配符有哪些: * 1、 # :代表没有或一个或多个单词(不是一个字符哦!单词与单词之间用“.”分割); * 2、 * :代表没有或一个英文单词(不是一个字符哦); * 通配符具体使用案例: * 1、topic.queues.sms.# 能匹配 topic.queues.sms、topic.queues.sms.111(111 可以换成任何字符) * 2、topic.queues.#.email 能匹配 topic.queues.email、topic.queues.111.email(111 可以换成任何字符) */ @RabbitListener(queues = {RabbitmqConfig.TOPIC_QUEUES_EMAIL}) public void topicQueuesReceive(String message) { System.out.println("consumer-02,主题模式(Topic,也叫通配符模式),消费者二*绑定队列*TOPIC_QUEUES_EMAIL,接收到的消息:" + message); }
Ⅳ、测试
① 启动生产者工程 test-11-rabbitmq-producer
② 启动消费者工程 test-12-rabbitmq-consumer-01
③ 启动消费者工程 test-12-rabbitmq-consumer-02
④ 执行测试类 ModelProducerTest 中的 topicSendMessageTest() 方法,发送消息
⑤ 消息监听结果, test-12-rabbitmq-consumer-01 的 topicQueuesReceive() 方法中收到消息如下:
⑥ 消息监听结果, test-12-rabbitmq-consumer-02 的 topicQueuesReceive() 方法中收到消息如下:
问题:
1、主题模式(Topic,也叫通配符模式)和 路由模式(Routing)的区别在哪?
主题模式只是在路由模式的基础上增加了通配符匹配,其他没区别。