‍作者名称:DaenCode
作者简介:啥技术都喜欢捣鼓捣鼓,喜欢分享技术、经验、生活。
人生感悟:尝尽人生百味,方知世间冷暖。
所属专栏:项目所感所想



文章目录

  • 架构图
  • application.properties
  • RabbitMQ配置
  • 消息协议封装
  • 消息类型封装
  • C端消费者
  • B端消费者
  • 发送消息与处理消息
  • 最后

架构图

application.properties

redundancy.mq.redundancy-event-exchange=redundancy.event.exchangeredundancy.mq.add-routing-key=redundancy.add.business.consumer.routing.keyredundancy.mq.add-business-binding-key=redundancy.add.business.*.routing.keyredundancy.mq.add-consumer-binding-key=redundancy.add.*.consumer.routing.keyredundancy.mq.add-business-queue=redundancy.add.business.queueredundancy.mq.add-consumer-queue=redundancy.add.consumer.queue

RabbitMQ配置

package top.daencode.config;import lombok.Data;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.Exchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * Copyright (C) 2023-11-29智源恩创网络科技工作室 * * @BelongsProject: architecture-solution * @BelongsPackage: top.daencode.mq * @author: DaenCode * @createTime: 2023-11-2915:08 * @description: TODO * @version: 1.0 */@Configuration@Slf4j@Data@ConfigurationProperties(prefix = "redundancy.mq")public class RabbitMqForRedundancyConfig {/** * 交换机 */private String redundancyEventExchange;/** * 添加路由key */private String addRoutingkey;/** * B端添加绑定key */private String addBusinessBindingKey;/** * C端添加绑定key */private String addConsumerBindingKey;/** * B端添加队列 */private String addBusinessQueue;/** * C端添加队列 */private String addConsumerQueue;/** * 创建冗余双写交换机 * @return */@Beanpublic Exchange redundancyEventExchange(){return new TopicExchange(redundancyEventExchange);}/** * 创建B端添加队列 * @return */@Beanpublic Queue addBusinessQueue(){return new Queue(addBusinessQueue,true,false,false);}/** * 创建C端添加队列 * @return */@Beanpublic Queue addConsumerQueue(){return new Queue(addConsumerQueue,true,false,false);}/** * B端绑定关系 */@Beanpublic Binding addBusinessBinding(){return new Binding(addBusinessQueue, Binding.DestinationType.QUEUE,redundancyEventExchange,addBusinessBindingKey,null);}/** * C端交换机绑定到队列 * @return */@Beanpublic Binding addConsumerBinding(){return new Binding(addConsumerQueue, Binding.DestinationType.QUEUE,redundancyEventExchange,addConsumerBindingKey,null);}}

消息协议封装

@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic class EventMessage implements Serializable {/** * 消息队列id */private String messageId;/** * 事件类型 */private String eventMessageType;/** * 业务id */private String bizId;/** * 消息体 */private String content;/** * 异常备注 */private String remark;}

消息类型封装

public enum EventMessageTypeEnum {REDUNDANCY_ADD,REDUNDANCY_ADD_BUSINESS,REDUNDANCY_ADD_CONSUMER,REDUNDANCY_DEL,REDUNDANCY_DEL_BUSINESS,REDUNDANCY_DEL_CONSUMER,REDUNDANCY_UPDATE,REDUNDANCY_UPDATE_BUSINESS,REDUNDANCY_UPDATE_CONSUMER,}

C端消费者

@Component@Slf4j@RabbitListener(queuesToDeclare = {@Queue("redundancy.add.consumer.queue")})public class RedundancyAddConsumerMQListener {@Autowiredprivate DetailService detailService;/** * 消费消息 * @param eventMessage * @param message * @param channel */@RabbitHandlerpublic void handleAddConsumer(EventMessage eventMessage, Message message, Channel channel){try {eventMessage.setEventMessageType(EventMessageTypeEnum.REDUNDANCY_ADD_CONSUMER.name());boolean flag= detailService.handleAddDetail(eventMessage);} catch (Exception e) {log.error("handleAddConsumer--消费失败{}",eventMessage);}}}

B端消费者

@Component@Slf4j@RabbitListener(queuesToDeclare = {@Queue("redundancy.add.business.queue")})public class RedundancyAddBusinessMQListener {@Autowiredprivate DetailService detailService;/** * 消费消息 * @param eventMessage * @param message * @param channel */@RabbitHandlerpublic void handleAddBusiness(EventMessage eventMessage, Message message, Channel channel){try {eventMessage.setEventMessageType(EventMessageTypeEnum.REDUNDANCY_ADD_BUSINESS.name());boolean flag= detailService.handleAddDetail(eventMessage);} catch (Exception e) {log.error("handleAddBusiness--消费失败{}",eventMessage);}}}

发送消息与处理消息

 /** * 发送新增消息 * @param detailRequest */@Overridepublic void addDetail(DetailRequest detailRequest) {detailRequest.setBId(IDUtil.generateRandomNumber(5));detailRequest.setCId(IDUtil.generateRandomNumber(5));//构造消息EventMessage eventMessage = EventMessage.builder().messageId(IDUtil.generateRandomNumber(5).toString()).content(JsonUtil.obj2Json(detailRequest)).eventMessageType(EventMessageTypeEnum.REDUNDANCY_ADD.name()).build();rabbitTemplate.convertAndSend(rabbitMqForRedundancyConfig.getRedundancyEventExchange(),rabbitMqForRedundancyConfig.getAddRoutingkey(),eventMessage);}//处理新增消息@Overridepublic boolean handleAddDetail(EventMessage eventMessage) {String messageType= eventMessage.getEventMessageType();DetailRequest detailRequest=JsonUtil.json2Obj(eventMessage.getContent(), DetailRequest.class);if (messageType.equals(EventMessageTypeEnum.REDUNDANCY_ADD_CONSUMER.name())){CDetailDO cDetailDOIndb=cDetailMapper.selectOne(new QueryWrapper<CDetailDO>().eq("c_id",detailRequest.getCId()).eq("detail",detailRequest.getDetail()));if (cDetailDOIndb==null){CDetailDO cDetailDO = CDetailDO.builder().bId(detailRequest.getBId()).cId(detailRequest.getCId()).detail(detailRequest.getDetail()).build();cDetailMapper.insert(cDetailDO);}else {log.error("handleAddDetail---REDUNDANCY_ADD_CONSUMER重复{}",eventMessage);}} else if (messageType.equals(EventMessageTypeEnum.REDUNDANCY_ADD_BUSINESS.name())) {BDetailDO bDetailDOIndb=bDetailMapper.selectOne(new QueryWrapper<BDetailDO>().eq("b_id",detailRequest.getCId()).eq("detail",detailRequest.getDetail()));if (bDetailDOIndb==null){BDetailDO bDetailDO = BDetailDO.builder().bId(detailRequest.getBId()).cId(detailRequest.getCId()).detail(detailRequest.getDetail()).build();bDetailMapper.insert(bDetailDO);}else {log.error("handleAddDetail---REDUNDANCY_ADD_BUSINESS重复{}",eventMessage);}}return false;}

最后

最后,感谢大家对本文的阅读,希望对大家有帮助。