目录

介绍

使用场景

‍ 模拟案例

准备工作

写法一(死信队列TTL)

RabbitMQ配置文件

生产者

消费者

测试

写法二 (死信队列TTL)

RabbitMQ配置文件

生产者

消费者

测试

写法三 (插件版本-推荐)

插件安装

RabbitMQ配置文件

生产者

消费者

测试

延迟队列方法推荐


介绍

顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

使用场景

  • 预支付订单创建成功后,30分钟后还没有支付,自动取消订单,修改订单状态
  • 用户注册成功后,如果3天没有登录则进行短信提醒
  • 优惠券过期前发送短信进行提醒
  • ….

以上场景都可以用延时队列来完成


‍ 模拟案例

需求:生产者发布消息,10秒、60秒后消费者拿到消息进行消费

准备工作

导入RabbitMQ依赖

    org.springframework.boot    spring-boot-starter-amqp

配置RabbitMQ连接相关信息

#MySQLspring:  rabbitmq:    host: 127.0.0.1    port: 5672     username: xxxx    password: xxxserver:  port: 8087

写法一(死信队列TTL)

生产者生产消息——>到交换机分发给对应的队列(A10秒过期,B60秒过期)——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)

RabbitMQ配置文件

import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;/** * @author 小影 * @create: 2022/8/18 10:26 * @describe:mq配置 如示例图配置:2交换机、4队列、4路由key */@Configurationpublic class RabbitMQConfiguration {   // 延迟交换机   public static final String DELAY_EXCHANGE_NAME = "delay.exchange";   // 延迟队列   public static final String DELAY_QUEUE_NAME_A = "delay.queue.a";   public static final String DELAY_QUEUE_NAME_B = "delay.queue.b";   // 延迟队列路由key   public static final String DELAY_QUEUE_ROUTING_KEY_A = "delay.routingKey.a";   public static final String DELAY_QUEUE_ROUTING_KEY_B = "delay.routingKey.b";   // 死信交换机   public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";   // 死信队列   public static final String DEAD_LETTER_QUEUE_NAME_A = "dead.letter.queue.a";   public static final String DEAD_LETTER_QUEUE_NAME_B = "dead.letter.queue.b";   // 私信队列路由key   public static final String DEAD_LETTER_ROUTING_KEY_A = "dead.letter.delay_10s.routingkey.a";   public static final String DEAD_LETTER_ROUTING_KEY_B = "dead.letter.delay_60s.routingkey.b";   // 声明延迟交换机   @Bean("delayExchange")   public DirectExchange delayExchange() {      return new DirectExchange(DELAY_EXCHANGE_NAME);   }   // 声明死信交换机   @Bean("deadLetterExchange")   public DirectExchange deadLetterExchange() {      return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);   }   // 声明延迟队列A,延迟10s,并且绑定到对应的死信交换机   @Bean("delayQueueA")   public Queue delayQueueA() {      HashMap args = new HashMap();      // 声明队列绑定的死信交换机      args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);      // 声明队列的属性路由key      args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_A);      // 声明队列的消息TTL存活时间      args.put("x-message-ttl", 10000);      return QueueBuilder.durable(DELAY_QUEUE_NAME_A).withArguments(args).build();   }   // 声明延迟队列B,延迟60s,并且绑定到对应的死信交换机   @Bean("delayQueueB")   public Queue delayQueueB() {      HashMap args = new HashMap();      // 声明队列绑定的死信交换机      args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);      // 声明队列的属性路由key      args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_B);      // 声明队列的消息TTL存活时间      args.put("x-message-ttl", 60000);      return QueueBuilder.durable(DELAY_QUEUE_NAME_B).withArguments(args).build();   }   // 声明死信队列A,用于接收延迟10S的消息   @Bean("deadLetterQueueA")   public Queue deadLetterQueueA() {      return new Queue(DEAD_LETTER_QUEUE_NAME_A);   }   // 声明死信队列B,用于接收延迟60S的消息   @Bean("deadLetterQueueB")   public Queue deadLetterQueueB() {      return new Queue(DEAD_LETTER_QUEUE_NAME_B);   }   // 设置延迟队列A的绑定关系   @Bean   public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,                                @Qualifier("delayExchange") DirectExchange exchange) {      return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_A);   }   // 设置延迟队列B的绑定关系   @Bean   public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,                                @Qualifier("delayExchange") DirectExchange exchange) {      return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_B);   }   // 设置死信队列A的绑定关系   @Bean   public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,                                @Qualifier("deadLetterExchange") DirectExchange exchange) {      return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_A);   }   // 设置死信队列B的绑定关系   @Bean   public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,                                     @Qualifier("deadLetterExchange") DirectExchange exchange) {      return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_B);   }}

此配置文件的代码关系图如下

生产者

import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_A;import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_B;/** * @author 小影 * @create: 2022/8/18 11:13 * @describe:延迟消息生产者 */@Componentpublic class DelayMessageProducer {   @Resource   private RabbitTemplate rabbitTemplate;   public void send(String message,int type) {      switch (type){         case 1: // 10s的消息            // param:队列名称、路由key、消息            rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_A, message);            break;         case 2:// 60s的消息            rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_B, message);            break;      }   }}

消费者

import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.time.LocalDateTime;import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_A;import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_B;/** * @author 小影 * @create: 2022/8/18 11:19 * @describe:死信消费者 */@Slf4j@Componentpublic class DeadLetterQueueConsumer {   /**    * 监听私信队列A    * @param message    * @param channel 作手动回执、确认    */   @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_A)   public void receiveA(Message message, Channel channel) {      String msg = new String(message.getBody());      log.info("当前时间:{},死信队列A收到消息:{}", LocalDateTime.now(),msg);   }   /**    * 监听私信队列B    * @param message    * @param channel 作手动回执、确认    */   @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_B)   public void receiveB(Message message, Channel channel) {      String msg = new String(message.getBody());      log.info("当前时间:{},死信队列B收到消息:{}", LocalDateTime.now(),msg);   }}

测试

@Slf4j@RestController@RequestMapping("rabbitmq")public class RabbitMqController {   @Resource   private DelayMessageProducer producer;   @GetMapping("send")   public void send(String message, Integer type) {      log.info("当前时间:{},消息:{},延迟类型:{}", LocalDateTime.now(), message, Objects.requireNonNull(type));      producer.send(message, type);   }}

分别请求

http://localhost:8089/rabbitmq/send” />

如果出现异常:Channel shutdown: channel error; protocol method:#method(reply-code=406, reply-text=PRECONDITION_FAILED – inequivalent arg ‘type’ for exchange ‘delay.exchange’ in vhost ‘/’: received ”x-delayed-message” but current is ‘direct’, class-id=40, method-id=10

可能是mq已经存在交换机了先去删掉

弊端:后期要扩展其他不同延时的时间,就需要增加延时的配置,非常麻烦


写法二 (死信队列TTL)

生产者生产消息(并设置过期时间)——>到交换机分发给延迟队列——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)

RabbitMQ配置文件

import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;/** * @author 小影 * @create: 2022/8/18 10:26 * @describe:mq配置 如示例图配置:2交换机、2队列、2路由key */@Configurationpublic class RabbitMQConfiguration {   // 延迟交换机   public static final String DELAY_EXCHANGE_NAME = "delay.exchange";   // 延迟队列   public static final String DELAY_QUEUE_NAME = "delay.queue";   // 延迟队列路由key   public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey";   // 死信交换机   public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";   // 死信队列   public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";   // 私信队列路由key   public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routingkey";   // 声明延迟交换机   @Bean("delayExchange")   public DirectExchange delayExchange() {      return new DirectExchange(DELAY_EXCHANGE_NAME);   }   // 声明死信交换机   @Bean("deadLetterExchange")   public DirectExchange deadLetterExchange() {      return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);   }   // 声明延迟队列,不设置存活时间,并且绑定到对应的死信交换机   @Bean("delayQueue")   public Queue delayQueue() {      HashMap args = new HashMap();      // 声明队列绑定的死信交换机      args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);      // 声明队列的属性路由key      args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);      return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build();   }   // 声明死信队列   @Bean("deadLetterQueue")   public Queue deadLetterQueue() {      return new Queue(DEAD_LETTER_QUEUE_NAME);   }   // 设置延迟队列的绑定关系   @Bean   public Binding delayBinding(@Qualifier("delayQueue") Queue queue,                               @Qualifier("delayExchange") DirectExchange exchange) {      return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY);   }   // 设置死信队列的绑定关系   @Bean   public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,                                    @Qualifier("deadLetterExchange") DirectExchange exchange) {      return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY);   }}

生产者

import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;/** * @author 小影 * @create: 2022/8/18 11:13 * @describe:延迟消息生产者 */@Componentpublic class DelayMessageProducer {   @Resource   private RabbitTemplate rabbitTemplate;   /**    *    * @param message 消息    * @param delayTime 存活时间    */   public void send(String message,String delayTime) {      // param:延迟交换机,路由KEY,存活时间      rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> {         msg.getMessageProperties().setExpiration(delayTime);         return msg;      });   }}

消费者

import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.time.LocalDateTime;import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME;/** * @author 小影 * @create: 2022/8/18 11:19 * @describe:死信消费者 */@Slf4j@Componentpublic class DeadLetterQueueConsumer {   /**    * 监听私信队列A    * @param message    * @param channel 作手动回执、确认    */   @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME)   public void receiveA(Message message, Channel channel) {      String msg = new String(message.getBody());      log.info("当前时间:{},死信队列收到消息:{}", LocalDateTime.now(),msg);   }}

测试

@Slf4j@RestController@RequestMapping("rabbitmq")public class RabbitMqController {   @Resource   private DelayMessageProducer producer;   @GetMapping("send")   public void send(String message, String delayTime) {      log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime);      producer.send(message, delayTime);   }}

分别请求

http://localhost:8089/rabbitmq/send” />

插件安装

1.进入mq官网社区插件:Community Plugins — RabbitMQ

2.找到rabbitmq_delayed_message_exchange

选择对应版本的ez文件下载

Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

注:我的MQ是通过yum安装的

1.在系统中查看安装的rabbitmq

rpm -qa |grep rabbitmq

2.查询mq的安装的相关文件目录

rpm -ql rabbitmq-server-3.10.7-1.el8.noarch

翻到最下面发现mnesia的安装目录; mnesia=分布式数据库,看看就好

然后把我们下载的ez安装包解压放到 /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.7/plugins 里面

3.重启RabbitMQ服务

systemctl restart rabbitmq-server.service

4.重启插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange


RabbitMQ配置文件

/** * @author 小影 * @create: 2022/8/18 10:26 * @describe:mq配置 如示例图配置:1交换机、1队列、1路由key */@Configurationpublic class RabbitMQConfiguration {   // 延迟交换机   public static final String DELAY_EXCHANGE_NAME = "delay.exchange";   // 延迟队列   public static final String DELAY_QUEUE_NAME = "delay.queue";   // 延迟队列路由key   public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey";   // 声明延迟交换机   @Bean("delayExchange")   public CustomExchange delayExchange() {      HashMap args = new HashMap();      args.put("x-delayed-type", "direct");      return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args);   }   // 声明延迟队列   @Bean("delayQueue")   public Queue delayQueue() {      return new Queue(DELAY_QUEUE_NAME);   }   // 设置延迟队列的绑定关系   @Bean   public Binding delayBinding(@Qualifier("delayQueue") Queue queue,                               @Qualifier("delayExchange") CustomExchange exchange) {      return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY).noargs();   }}

生产者

import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;/** * @author 小影 * @create: 2022/8/18 11:13 * @describe:延迟消息生产者 */@Componentpublic class DelayMessageProducer {   @Resource   private RabbitTemplate rabbitTemplate;   /**    *    * @param message 消息    * @param delayTime 存活时间    */   public void send(String message,Integer delayTime) {      // param:延迟交换机,路由KEY,存活时间      rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> {         msg.getMessageProperties().setDelay(delayTime);         return msg;      });   }}

消费者

import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_NAME;/** * @author 小影 * @create: 2022/8/18 11:19 * @describe:消费者 */@Slf4j@Componentpublic class DeadLetterQueueConsumer {   /*    * 监听私信队列    * @param message    * @param channel 作手动回执、确认    */   @RabbitListener(queues = DELAY_QUEUE_NAME)   public void receiveA(Message message, Channel channel) {      String msg = new String(message.getBody());      log.info("当前时间:{},延迟队列收到消息:{}", LocalDateTime.now(),msg);   }}

测试

@Slf4j@RestController@RequestMapping("rabbitmq")public class RabbitMqController {   @Resource   private DelayMessageProducer producer;   @GetMapping("send")   public void send(String message, Integer delayTime) {      log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime);      producer.send(message, delayTime);   }}

启动项目查看rabbitmq的可视化界面

如下图此时生成的交换机是x-delayed-message类型的

分别发送:

http://localhost:8089/rabbitmq/send” />

结局并不是60秒先被消费,完成了我们的意愿。

原理:

  1. 交换机里面有个数据库,生产者生产信息把这个信息放入数据库中
  2. 交换机里面的插件就会一直监听这个时间
  3. 时间到了把对应数据取出来,放入队列,让消费者进行消费

延迟队列方法推荐

这是小编在开发学习使用和总结,这中间或许也存在着不足,希望可以得到大家的理解和建议。如有侵权联系小编!