RabbitMQ整合RabbitMQ
/** * 使用RabbitMQ * 1、引入ampq场景,RabbitAutoConfiguration 就会自动生效 * 2、给容器中自动配置了 * RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate * 所有的属性都是在 * @EnableConfigurationProperties(RabbitProperties.class) * @ConfigurationProperties(prefix = "spring.rabbitmq") * public class RabbitProperties * 3、给配置文件中配置 spring.rabbitmq 信息 * 4、@EnableRabbit 开启功能 * 5、监听消息:使用 @RabbitListener,必须有 @EnableRabbit * @RabbitListener:类 + 方法上 * @RabbitHandler: 只能标在方法上 */
org.springframework.boot spring-boot-starter-amqp
# rabbit 配置文件spring.rabbitmq.host=192.168.106.101spring.rabbitmq.port=5672spring.rabbitmq.virtual-host=/
测试
package com.atguigu.gulimall.order;import com.atguigu.gulimall.order.entity.OrderReturnApplyEntity;import lombok.extern.slf4j.Slf4j;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.amqp.core.AmqpAdmin;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.Date;@Slf4j@RunWith(SpringRunner.class)@SpringBootTestpublic class GulimallOrderApplicationTests { @Autowired AmqpAdmin amqpAdmin; @Autowired RabbitTemplate rabbitTemplate; /** * 1、创建Exchange[hello.java.exchange]、Queue、Binding * - 使用 AmqpAdmin 进行创建 * * 2、如何收发消息 -> RabbitTemplate * 如果发送的消息是个对象,使用序列化机制,将对象写出去,对象实现 Serializable 接口 * 自定义序列化添加配置 * @Configuration * public class MyRabbitConfig { * @Bean * public MessageConverter messageConverter() { * return new Jackson2JsonMessageConverter(); * } * } */ @Test public void sendMessageTest() { String msg = "Hello World"; OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity(); orderReturnApplyEntity.setId(1L); orderReturnApplyEntity.setSkuName("华为"); orderReturnApplyEntity.setCreateTime(new Date()); rabbitTemplate.convertAndSend("hello.java.exchange", "hello.java", orderReturnApplyEntity); log.info("消息发送完成:{}", orderReturnApplyEntity); } @Test public void createExchange() { //amqpAdmin /** * DirectExchange * public DirectExchange(String name, boolean durable, boolean autoDelete, Map arguments) */ DirectExchange exchange = new DirectExchange("hello.java.exchange", true,false); amqpAdmin.declareExchange(exchange); log.info("Exchange[{}]创建成功", "hello.java.exchange"); } @Test public void createQueue() { /** * public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) */ Queue queue = new Queue("hello-java-queue", true, false,true); amqpAdmin.declareQueue(queue); log.info("Queue[{}]创建成功", "hello-java-queue"); } @Test public void createBinding() { /** * public Binding(String destination【目的地】, * DestinationType destinationType【目的地类型】, * String exchange【交换机】, * String routingKey【路由键】, * Map arguments)【参数】 * 将 exchange 指定交换机和 destination目的地进行绑定,使用routingKey作为指定路由键 */ Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello.java.exchange","hello.java",null); amqpAdmin.declareBinding(binding); log.info("Binding == 创建成功"); }}
测试监听消息
/** * queues:声明需要监听的所欲队列 * * org.springframework.amqp.core.Message; * * 参数可以写以下类型 * 1、Message message;原生消息详细信息,头 + 体 * 2、T OrderReturnApplyEntity content * 3、Channel channel:当前传输数据的通道 * * Queue:可以很多人都来监听,只要收到消息,队列删除消息,而且只有一个人收到此消息 * 1、订单服务启动多个:同一个消息,只能有一个客户端收到 * 2、只有一个消息完全处理完,方法运行结束,我们就可以接受到下一个消息 *///@RabbitListener(queues = {"hello-java-queue"})@RabbitHanderpublic void receiveMessage(Message message, OrderReturnReasonEntity content) { System.out.println("接收到消息....:"+ message + "===>内容;" + content + "类型是:" + message.getClass()); byte[] body = message.getBody(); //消息头属性信息 MessageProperties properties = message.getMessageProperties(); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消息处理完成=》" + content.getName());}
@RabbitListener
简介:
1.用于标注在监听类或监听方法上,接收消息,需要指定监听的队列(数组)2.使用该注解之前,需要在启动类加上该注解:@EnableRabbit3.@RabbitListener即可以标注在方法上又可以标注在类上标注在类上:表示该类是监听类,使得@RabbitHandler注解生效标注在方法上:表示该方法时监听方法,会监听指定队列获得消息4.一般只标注在方法上,并配合@RabbitHandler使用,重载的方式接收不同消息对象
@RabbitHandler
作用:
配合@RabbitListener,使用方法重载的方法接收不同的消息类型
简介:
1.用于标注在监听方法上,接收消息,不需要指定监听的队列2.使用该注解之前,需要在启动类加上该注解:@EnableRabbit3.@RabbitListener只可以标注在方法,重载的方式接收不同消息对象
发送端消息确认配置
1、配置
2、定制 RabbitTemplate,设置确认回调
# rabbit 配置文件spring.rabbitmq.host=192.168.106.101spring.rabbitmq.port=5672spring.rabbitmq.virtual-host=/# 开启发送端确认spring.rabbitmq.publisher-confirms=true#开启发送端消息抵达确认spring.rabbitmq.publisher-returns=true#只要抵达队列。以异步发送优先回调returnconfirmspring.rabbitmq.template.mandatory=true# 手动ack消息spring.rabbitmq.listener.simple.acknowledge-mode=manual
package com.atguigu.gulimall.order.config;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import javax.annotation.PostConstruct;@Configurationpublic class MyRabbitConfig { @Autowired RabbitTemplate rabbitTemplate; @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } /** * 定制 rabbitTemplate * 1、服务收到消息就回调 * 1、spring.rabbitmq.publisher-confirms=true * 2、设置确认回调ConfirmCallback * 2、消息正确地打队列进行回调 * 1、spring.rabbitmq.publisher-returns=true * spring.rabbitmq.template.mandatory=true * 2、设置消息抵达队列的回调 * 3、消费端确认【保证每一个消息被正确消费,此时才可以让broker删除】 * 1、默认是自动确认,只要消息接受到,自动确认,服务端就会移除这个消息 * 2、手动确认默认,只要没有明确告诉MQ,货物被签收,没有ACK,消息一直是unacked状态。 * 即使Cosumer宕机,消息也不会丢失,会重新变成Ready,等待下一次新的consumer链接发给他 * 3、如果手动确认:Channel channel -> long deliveryTag = properties.getDeliveryTag(); -> channel.basicAck(deliveryTag, false); * channel.basicAck(deliveryTag, false); 签收 * channel.basicNack(deliveryTag, false, true); 拒签 */ @PostConstruct // MyRabbitConfig 对象创建完成以后执行这个方法 public void initRabbitTemplate(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * 只要抵达服务器,ack就确认为true * @param correlationData 当前消息的唯一关联数据(消息的唯一id) * @param ack 是否成功或者失败 * @param cause 失败的原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm..." + correlationData + "==> ack:" + ack + "==> cause:" + cause); } }); //设置消息抵达队列的回调 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息没有投递给指定的队列,就触发失败回调 * @param message 投递失败的消息详细信息 * @param replyCode 回复的状态码 * @param replyText 回复的文本内容 * @param exchange 消息发给那个交换机 * @param routingKey 当时这个消息使用哪个路由键 */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("Fail Message:" + message + "==> replyTest:" + replyText + "==>exchange" + exchange + "==>routingKey:" + routingKey); } }); }}
/** * queues:声明需要监听的所欲队列 * * org.springframework.amqp.core.Message; *
* 参数可以写以下类型 * 1、Message message;原生消息详细信息,头 + 体 * 2、T OrderReturnApplyEntity content * 3、Channel channel:当前传输数据的通道 *
* Queue:可以很多人都来监听,只要收到消息,队列删除消息,而且只有一个人收到此消息 * 1、订单服务启动多个:同一个消息,只能有一个客户端收到 * 2、只有一个消息完全处理完,方法运行结束,我们就可以接受到下一个消息 */@RabbitListener(queues = {"hello-java-queue"})public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws IOException { //System.out.println("接收到消息....:"+ message + "===>内容;" + content + "类型是:" + message.getClass()); System.out.println("接收到消息....:" + content); byte[] body = message.getBody(); //消息头属性信息 MessageProperties properties = message.getMessageProperties(); /*try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }*/ System.out.println("消息处理完成=》" + content.getName()); long deliveryTag = properties.getDeliveryTag(); System.out.println("deliverTag: " + deliveryTag); if (deliveryTag % 2 == 0) { //收货 // 签收获取,非批量模式 channel.basicAck(deliveryTag, false); } else { //requeue 重新入队 //basicNack(long deliveryTag, boolean multiple, boolean requeue) channel.basicNack(deliveryTag, false, true); System.out.println("没有签收的货物....." + deliveryTag); }}
最终整合
1.导入mq依赖 org.springframework.boot spring-boot-starter-amqp2.ware模块导入配置spring: rabbitmq: host: 192.168.56.10 port: 5672 # 虚拟主机 virtual-host: / # 开启发送端发送确认,无论是否到达broker都会触发回调【发送端确认机制+本地事务表】 publisher-confirm-type: correlated # 开启发送端抵达队列确认,消息未被队列接收时触发回调【发送端确认机制+本地事务表】 publisher-returns: true # 消息在没有被队列接收时是否强行退回 template: mandatory: true # 消费者手动确认模式,关闭自动确认,否则会消息丢失 listener: simple: acknowledge-mode: manual3.添加注解// 开启rabbit@EnableRabbit4.创建配置类/** * @Author: wanzenghui * @Date: 2021/12/15 0:04 */@Configurationpublic class MyRabbitConfig { @Autowired RabbitTemplate rabbitTemplate; @Bean public MessageConverter messageConverter() { // 使用json序列化器来序列化消息,发送消息时,消息对象会被序列化成json格式 return new Jackson2JsonMessageConverter(); } /** * 定制RabbitTemplate * 1、服务收到消息就会回调 * 1、spring.rabbitmq.publisher-confirms: true * 2、设置确认回调 * 2、消息正确抵达队列就会进行回调 * 1、spring.rabbitmq.publisher-returns: true * spring.rabbitmq.template.mandatory: true * 2、设置确认回调ReturnCallback * * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息) */ @PostConstruct // (MyRabbitConfig对象创建完成以后,执行这个方法) public void initRabbitTemplate() { /** * 发送消息触发confirmCallback回调 * @param correlationData:当前消息的唯一关联数据(如果发送消息时未指定此值,则回调时返回null) * @param ack:消息是否成功收到(ack=true,消息抵达Broker) * @param cause:失败的原因 */ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { System.out.println("发送消息触发confirmCallback回调" + "\ncorrelationData ===> " + correlationData + "\nack ===> " + ack + "" + "\ncause ===> " + cause); System.out.println("================================================="); }); /** * 消息未到达队列触发returnCallback回调 * 只要消息没有投递给指定的队列,就触发这个失败回调 * @param message:投递失败的消息详细信息 * @param replyCode:回复的状态码 * @param replyText:回复的文本内容 * @param exchange:接收消息的交换机 * @param routingKey:接收消息的路由键 */ rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 需要修改数据库 消息的状态【后期定期重发消息】 System.out.println("消息未到达队列触发returnCallback回调" + "\nmessage ===> " + message + "\nreplyCode ===> " + replyCode + "\nreplyText ===> " + replyText + "\nexchange ===> " + exchange + "\nroutingKey ===> " + routingKey); System.out.println("=================================================="); }); }}5.创建ware解锁库存的延时队列、死信队列、交换机、绑定关系/** * 创建队列,交换机,延时队列,绑定关系 的configuration * 1.Broker中的Queue、Exchange、Binding不存在的情况下,会自动创建(在RabbitMQ),不会重复创建覆盖 * 2.懒加载,只有第一次使用的时候才会创建(例如监听队列) */@Configurationpublic class MyRabbitMQConfig { /** * 用于首次创建队列、交换机、绑定关系的监听 * @param message */ @RabbitListener(queues = "stock.release.stock.queue") public void handle(Message message) { } /** * 交换机 * Topic,可以绑定多个队列 */ @Bean public Exchange stockEventExchange() { //String name, boolean durable, boolean autoDelete, Map arguments return new TopicExchange("stock-event-exchange", true, false); } /** * 死信队列 */ @Bean public Queue stockReleaseStockQueue() { //String name, boolean durable, boolean exclusive, boolean autoDelete, Map arguments return new Queue("stock.release.stock.queue", true, false, false); } /** * 延时队列 */ @Bean public Queue stockDelay() { HashMap arguments = new HashMap(); arguments.put("x-dead-letter-exchange", "stock-event-exchange"); arguments.put("x-dead-letter-routing-key", "stock.release"); // 消息过期时间 2分钟 arguments.put("x-message-ttl", 120000); return new Queue("stock.delay.queue", true, false, false,arguments); } /** * 绑定:交换机与死信队列 */ @Bean public Binding stockLocked() { //String destination, DestinationType destinationType, String exchange, String routingKey, // Map arguments return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null); } /** * 绑定:交换机与延时队列 */ @Bean public Binding stockLockedBinding() { return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null); }}