目录
介绍
使用场景
模拟案例
准备工作
写法一(死信队列TTL)
RabbitMQ配置文件
生产者
消费者
测试
写法二 (死信队列TTL)
RabbitMQ配置文件
生产者
消费者
测试
写法三 (插件版本-推荐)
插件安装
RabbitMQ配置文件
生产者
消费者
测试
延迟队列方法推荐
介绍
顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
使用场景
- 预支付订单创建成功后,30分钟后还没有支付,自动取消订单,修改订单状态
- 用户注册成功后,如果3天没有登录则进行短信提醒
- 优惠券过期前发送短信进行提醒
- ….
以上场景都可以用延时队列来完成
模拟案例
需求:生产者发布消息,10秒、60秒后消费者拿到消息进行消费
准备工作
导入RabbitMQ依赖
org.springframework.boot spring-boot-starter-amqp
配置RabbitMQ连接相关信息
#MySQLspring: rabbitmq: host: 127.0.0.1 port: 5672 username: xxxx password: xxxserver: port: 8087
写法一(死信队列TTL)
生产者生产消息——>到交换机分发给对应的队列(A10秒过期,B60秒过期)——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)
RabbitMQ配置文件
import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;/** * @author 小影 * @create: 2022/8/18 10:26 * @describe:mq配置 如示例图配置:2交换机、4队列、4路由key */@Configurationpublic class RabbitMQConfiguration { // 延迟交换机 public static final String DELAY_EXCHANGE_NAME = "delay.exchange"; // 延迟队列 public static final String DELAY_QUEUE_NAME_A = "delay.queue.a"; public static final String DELAY_QUEUE_NAME_B = "delay.queue.b"; // 延迟队列路由key public static final String DELAY_QUEUE_ROUTING_KEY_A = "delay.routingKey.a"; public static final String DELAY_QUEUE_ROUTING_KEY_B = "delay.routingKey.b"; // 死信交换机 public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange"; // 死信队列 public static final String DEAD_LETTER_QUEUE_NAME_A = "dead.letter.queue.a"; public static final String DEAD_LETTER_QUEUE_NAME_B = "dead.letter.queue.b"; // 私信队列路由key public static final String DEAD_LETTER_ROUTING_KEY_A = "dead.letter.delay_10s.routingkey.a"; public static final String DEAD_LETTER_ROUTING_KEY_B = "dead.letter.delay_60s.routingkey.b"; // 声明延迟交换机 @Bean("delayExchange") public DirectExchange delayExchange() { return new DirectExchange(DELAY_EXCHANGE_NAME); } // 声明死信交换机 @Bean("deadLetterExchange") public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME); } // 声明延迟队列A,延迟10s,并且绑定到对应的死信交换机 @Bean("delayQueueA") public Queue delayQueueA() { HashMap args = new HashMap(); // 声明队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME); // 声明队列的属性路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_A); // 声明队列的消息TTL存活时间 args.put("x-message-ttl", 10000); return QueueBuilder.durable(DELAY_QUEUE_NAME_A).withArguments(args).build(); } // 声明延迟队列B,延迟60s,并且绑定到对应的死信交换机 @Bean("delayQueueB") public Queue delayQueueB() { HashMap args = new HashMap(); // 声明队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME); // 声明队列的属性路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_B); // 声明队列的消息TTL存活时间 args.put("x-message-ttl", 60000); return QueueBuilder.durable(DELAY_QUEUE_NAME_B).withArguments(args).build(); } // 声明死信队列A,用于接收延迟10S的消息 @Bean("deadLetterQueueA") public Queue deadLetterQueueA() { return new Queue(DEAD_LETTER_QUEUE_NAME_A); } // 声明死信队列B,用于接收延迟60S的消息 @Bean("deadLetterQueueB") public Queue deadLetterQueueB() { return new Queue(DEAD_LETTER_QUEUE_NAME_B); } // 设置延迟队列A的绑定关系 @Bean public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_A); } // 设置延迟队列B的绑定关系 @Bean public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_B); } // 设置死信队列A的绑定关系 @Bean public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_A); } // 设置死信队列B的绑定关系 @Bean public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_B); }}
此配置文件的代码关系图如下
生产者
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_A;import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_B;/** * @author 小影 * @create: 2022/8/18 11:13 * @describe:延迟消息生产者 */@Componentpublic class DelayMessageProducer { @Resource private RabbitTemplate rabbitTemplate; public void send(String message,int type) { switch (type){ case 1: // 10s的消息 // param:队列名称、路由key、消息 rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_A, message); break; case 2:// 60s的消息 rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_B, message); break; } }}
消费者
import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.time.LocalDateTime;import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_A;import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_B;/** * @author 小影 * @create: 2022/8/18 11:19 * @describe:死信消费者 */@Slf4j@Componentpublic class DeadLetterQueueConsumer { /** * 监听私信队列A * @param message * @param channel 作手动回执、确认 */ @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_A) public void receiveA(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("当前时间:{},死信队列A收到消息:{}", LocalDateTime.now(),msg); } /** * 监听私信队列B * @param message * @param channel 作手动回执、确认 */ @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_B) public void receiveB(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("当前时间:{},死信队列B收到消息:{}", LocalDateTime.now(),msg); }}
测试
@Slf4j@RestController@RequestMapping("rabbitmq")public class RabbitMqController { @Resource private DelayMessageProducer producer; @GetMapping("send") public void send(String message, Integer type) { log.info("当前时间:{},消息:{},延迟类型:{}", LocalDateTime.now(), message, Objects.requireNonNull(type)); producer.send(message, type); }}
分别请求:
http://localhost:8089/rabbitmq/send” />
如果出现异常:Channel shutdown: channel error; protocol method:#method(reply-code=406, reply-text=PRECONDITION_FAILED – inequivalent arg ‘type’ for exchange ‘delay.exchange’ in vhost ‘/’: received ”x-delayed-message” but current is ‘direct’, class-id=40, method-id=10
可能是mq已经存在交换机了先去删掉
弊端:后期要扩展其他不同延时的时间,就需要增加延时的配置,非常麻烦
写法二 (死信队列TTL)
生产者生产消息(并设置过期时间)——>到交换机分发给延迟队列——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)
RabbitMQ配置文件
import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;/** * @author 小影 * @create: 2022/8/18 10:26 * @describe:mq配置 如示例图配置:2交换机、2队列、2路由key */@Configurationpublic class RabbitMQConfiguration { // 延迟交换机 public static final String DELAY_EXCHANGE_NAME = "delay.exchange"; // 延迟队列 public static final String DELAY_QUEUE_NAME = "delay.queue"; // 延迟队列路由key public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey"; // 死信交换机 public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange"; // 死信队列 public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue"; // 私信队列路由key public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routingkey"; // 声明延迟交换机 @Bean("delayExchange") public DirectExchange delayExchange() { return new DirectExchange(DELAY_EXCHANGE_NAME); } // 声明死信交换机 @Bean("deadLetterExchange") public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME); } // 声明延迟队列,不设置存活时间,并且绑定到对应的死信交换机 @Bean("delayQueue") public Queue delayQueue() { HashMap args = new HashMap(); // 声明队列绑定的死信交换机 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME); // 声明队列的属性路由key args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY); return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build(); } // 声明死信队列 @Bean("deadLetterQueue") public Queue deadLetterQueue() { return new Queue(DEAD_LETTER_QUEUE_NAME); } // 设置延迟队列的绑定关系 @Bean public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY); } // 设置死信队列的绑定关系 @Bean public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY); }}
生产者
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;/** * @author 小影 * @create: 2022/8/18 11:13 * @describe:延迟消息生产者 */@Componentpublic class DelayMessageProducer { @Resource private RabbitTemplate rabbitTemplate; /** * * @param message 消息 * @param delayTime 存活时间 */ public void send(String message,String delayTime) { // param:延迟交换机,路由KEY,存活时间 rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> { msg.getMessageProperties().setExpiration(delayTime); return msg; }); }}
消费者
import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.time.LocalDateTime;import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME;/** * @author 小影 * @create: 2022/8/18 11:19 * @describe:死信消费者 */@Slf4j@Componentpublic class DeadLetterQueueConsumer { /** * 监听私信队列A * @param message * @param channel 作手动回执、确认 */ @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME) public void receiveA(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("当前时间:{},死信队列收到消息:{}", LocalDateTime.now(),msg); }}
测试
@Slf4j@RestController@RequestMapping("rabbitmq")public class RabbitMqController { @Resource private DelayMessageProducer producer; @GetMapping("send") public void send(String message, String delayTime) { log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime); producer.send(message, delayTime); }}
分别请求
http://localhost:8089/rabbitmq/send” />
插件安装
1.进入mq官网社区插件:Community Plugins — RabbitMQ
2.找到rabbitmq_delayed_message_exchange
选择对应版本的ez文件下载
Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
注:我的MQ是通过yum安装的
1.在系统中查看安装的rabbitmq
rpm -qa |grep rabbitmq
2.查询mq的安装的相关文件目录
rpm -ql rabbitmq-server-3.10.7-1.el8.noarch
翻到最下面发现mnesia的安装目录; mnesia=分布式数据库,看看就好
然后把我们下载的ez安装包解压放到 /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.7/plugins 里面
3.重启RabbitMQ服务
systemctl restart rabbitmq-server.service
4.重启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
RabbitMQ配置文件
/** * @author 小影 * @create: 2022/8/18 10:26 * @describe:mq配置 如示例图配置:1交换机、1队列、1路由key */@Configurationpublic class RabbitMQConfiguration { // 延迟交换机 public static final String DELAY_EXCHANGE_NAME = "delay.exchange"; // 延迟队列 public static final String DELAY_QUEUE_NAME = "delay.queue"; // 延迟队列路由key public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey"; // 声明延迟交换机 @Bean("delayExchange") public CustomExchange delayExchange() { HashMap args = new HashMap(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args); } // 声明延迟队列 @Bean("delayQueue") public Queue delayQueue() { return new Queue(DELAY_QUEUE_NAME); } // 设置延迟队列的绑定关系 @Bean public Binding delayBinding(@Qualifier("delayQueue") Queue queue, @Qualifier("delayExchange") CustomExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY).noargs(); }}
生产者
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;/** * @author 小影 * @create: 2022/8/18 11:13 * @describe:延迟消息生产者 */@Componentpublic class DelayMessageProducer { @Resource private RabbitTemplate rabbitTemplate; /** * * @param message 消息 * @param delayTime 存活时间 */ public void send(String message,Integer delayTime) { // param:延迟交换机,路由KEY,存活时间 rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> { msg.getMessageProperties().setDelay(delayTime); return msg; }); }}
消费者
import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_NAME;/** * @author 小影 * @create: 2022/8/18 11:19 * @describe:消费者 */@Slf4j@Componentpublic class DeadLetterQueueConsumer { /* * 监听私信队列 * @param message * @param channel 作手动回执、确认 */ @RabbitListener(queues = DELAY_QUEUE_NAME) public void receiveA(Message message, Channel channel) { String msg = new String(message.getBody()); log.info("当前时间:{},延迟队列收到消息:{}", LocalDateTime.now(),msg); }}
测试
@Slf4j@RestController@RequestMapping("rabbitmq")public class RabbitMqController { @Resource private DelayMessageProducer producer; @GetMapping("send") public void send(String message, Integer delayTime) { log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime); producer.send(message, delayTime); }}
启动项目查看rabbitmq的可视化界面
如下图此时生成的交换机是x-delayed-message类型的
分别发送:
http://localhost:8089/rabbitmq/send” />
结局并不是60秒先被消费,完成了我们的意愿。
原理:
- 交换机里面有个数据库,生产者生产信息把这个信息放入数据库中
- 交换机里面的插件就会一直监听这个时间
- 时间到了把对应数据取出来,放入队列,让消费者进行消费
延迟队列方法推荐
这是小编在开发学习使用和总结,这中间或许也存在着不足,希望可以得到大家的理解和建议。如有侵权联系小编!