RabbitMQ的安装使用


RabbitMQ是什么?

MQ全称为Message Queue,消息队列,在程序之间发送消息来通信,而不是通过彼此调用通信。
RabbitMQ 主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

为什么使用RabbitMQ?

优点:
1、实现应用系统的解耦,客户端只关心发送消息,而不关心处理。
2、异步提升效率,在主业务逻辑发送消息,异步去处理消息
3、流量削峰,将请求放到mq消息队列中,mysql每秒去拉取请求消费,避免请求全部一下子全部打到mysql,请求过多而崩溃

怎么使用RabbitMQ?

1.安装windows的客户端,参考链接3

图片[1] - RabbitMQ的安装使用 - MaxSSL
图片[2] - RabbitMQ的安装使用 - MaxSSL
图片[3] - RabbitMQ的安装使用 - MaxSSL

2.java 代码引入相关jar包
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId><version>2.2.3.RELEASE</version></dependency>
3.编写发送,接收消息的工具类
延迟队列配置
package com.next.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @desc 延迟队列配置 */@Configurationpublic class RabbitDelayMqConfig {@Bean("delayDirectExchange")public DirectExchange delayDirectExchange() {DirectExchange directExchange = new DirectExchange(QueueConstants.DELAY_EXCHANGE, true, false);//交换机开启延迟设置true,延迟才会生效directExchange.setDelayed(true);return directExchange;}@Bean("delayNotifyQueue")public Queue delayNotifyQueue() {return new Queue(QueueConstants.DELAY_QUEUE);}@Bean("delayBindingNotify")public Binding delayBindingNotify(@Qualifier("delayDirectExchange") DirectExchange delayDirectExchange, @Qualifier("delayNotifyQueue") Queue delayNotifyQueue) {return BindingBuilder.bind(delayNotifyQueue).to(delayDirectExchange).with(QueueConstants.DELAY_ROUTING);}}
队列配置
package com.next.mq;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Primary;/** * @desc 队列配置 */@Configurationpublic class RabbitMqConfig {@Bean("directExchange")@Primarypublic DirectExchange directExchange() {return new DirectExchange(QueueConstants.COMMON_EXCHANGE, true, false);}@Bean("notifyQueue")@Primarypublic Queue notifyQueue() {return new Queue(QueueConstants.COMMON_QUEUE);}@Bean("bindingNotify")@Primarypublic Binding bindingNotify(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("notifyQueue") Queue notifyQueue) {return BindingBuilder.bind(notifyQueue).to(directExchange).with(QueueConstants.COMMON_ROUTING);}}
发送消息工具类
package com.next.mq;import com.next.util.JsonMapper;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageDeliveryMode;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;import javax.annotation.Resource;import java.util.UUID;/** * @desc 客户端工具类 -- 发送消息 */@Component@Slf4jpublic class RabbitMqClient {@Resourceprivate RabbitTemplate rabbitTemplate;//发送同步消息public void send(MessageBody messageBody) {try {//生成唯一的消息idString uuid = UUID.randomUUID().toString();//初始话消息CorrelationData correlationData = new CorrelationData(uuid);//使用模板工具类rabbitTemplate 来发消息rabbitTemplate.convertAndSend(QueueConstants.COMMON_EXCHANGE, QueueConstants.COMMON_ROUTING,JsonMapper.obj2String(messageBody), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);//记录日志log.info("message send, {}", message);return message;}}, correlationData);} catch (Exception e) {//日志打印,以便定位问题log.error("message send exception, msg:{}", messageBody.toString(), e);}}/** * @desc 发送延迟消息 */public void sendDelay(MessageBody messageBody, int delayMillSeconds) {try {//设置消息延迟时间messageBody.setDelay(delayMillSeconds);String uuid = UUID.randomUUID().toString();CorrelationData correlationData = new CorrelationData(uuid);//延迟交换机和路由rabbitTemplate.convertAndSend(QueueConstants.DELAY_EXCHANGE, QueueConstants.DELAY_ROUTING,JsonMapper.obj2String(messageBody), new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 消息持久化//设置消息延迟的时间(毫秒值)message.getMessageProperties().setDelay(delayMillSeconds);log.info("delay message send, {}", message);return message;}}, correlationData);} catch (Exception e) {log.error("delay message send exception, msg:{}", messageBody.toString(), e);}}}
接收消息工具类
package com.next.mq;import com.next.dto.RollbackSeatDto;import com.next.model.TrainOrder;import com.next.service.TrainOrderService;import com.next.service.TrainSeatService;import com.next.util.JsonMapper;import lombok.extern.slf4j.Slf4j;import org.codehaus.jackson.type.TypeReference;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import javax.annotation.Resource;/** * @desc rabbitmq的server端 - 延迟接收消息 * 用处:在主流程里面发送消息,异步流程里面接收消息,处理。提升代码性能 */@Component@Slf4jpublic class RabbitDelayMqServer {@Resourceprivate TrainSeatService trainSeatService;@Resourceprivate TrainOrderService trainOrderService;@RabbitListener(queues = QueueConstants.DELAY_QUEUE)public void receive(String message) {log.info("delay queue receive message, {}", message);try {MessageBody messageBody = JsonMapper.string2Obj(message, new TypeReference<MessageBody>() {});if (messageBody == null) {return;}switch (messageBody.getTopic()) {case QueueTopic.SEAT_PLACE_ROLLBACK:RollbackSeatDto dto = JsonMapper.string2Obj(messageBody.getDetail(), new TypeReference<RollbackSeatDto>() {});trainSeatService.batchRollbackSeat(dto.getTrainSeat(), dto.getFromStationIdList(), messageBody.getDelay());break;case QueueTopic.ORDER_PAY_DELAY_CHECK:TrainOrder trainOrder = JsonMapper.string2Obj(messageBody.getDetail(), new TypeReference<TrainOrder>() {});trainOrderService.delayCheckOrder(trainOrder);break;default:log.warn("delay queue receive message, {}, no need handle", message);}} catch (Exception e) {log.error("delay queue message handle exception, msg:{}", message, e);}}}

参考链接:
1.rabbitMQ到底是个啥东西?
2.超详细!!!Windows下安装RabbitMQ的步骤详解
3.windows安装rabbitmq和环境erlang(最详细版,包括对应关系,安装错误解决方法)
4.RabbitMQ安装或启动后,无法访问http://localhost:15672/

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享