1. pom.xml导入RocketMQ依赖

    org.apache.rocketmqrocketmq-spring-boot-starter2.2.2
  2. application.yml中添加配置

    rocketmq:name-server: 127.0.0.1:9876producer:group: xaccess-key: myaccesskeysecret-key: mysecretKeysend-message-timeout: 10000tls-enable: trueconsumer:group: xaccess-key: myaccesskeysecret-key: mysecretKeytls-enable: true
  3. 创建MQ工具类

    public class MqUtil {private final RocketMQTemplate rocketMQTemplate;public MqUtil(RocketMQTemplate rocketMQTemplate) {this.rocketMQTemplate = rocketMQTemplate;}/** * 单条通知发送 * * @param topic 主题 * @param message 消息 */public void convertAndSend(String topic, Object message) {rocketMQTemplate.convertAndSend(topic, message);}/** * 批量通知发送 * * @param topic主题 * @param messages 消息集合 */public <T extends Message> SendResult syncSend(String topic, Collection messages) {return rocketMQTemplate.syncSend(topic, messages);}/** * 批量通知发送 * * @param topic主题 * @param messages 消息集合 * @param sendCallback 回调函数 */public <T extends Message> void asyncSend(String topic, Collection messages, SendCallback sendCallback) {rocketMQTemplate.asyncSend(topic, messages, sendCallback);}}
  4. 注入工具类Bean

    @Beanpublic MqUtil mqUtil(RocketMQTemplate rocketMQTemplate) {return new MqUtil(rocketMQTemplate);}
  5. 测试发消息

    @Resourceprivate MqUtil mqUtil;@Testpublic void test() {mqUtil.convertAndSend(TopicConstant.TOPIC_B, "123456");}
  6. 订阅接收消息

    @Slf4j@Service@RequiredArgsConstructor(onConstructor = @__({@Autowired}))@RocketMQMessageListener(consumerGroup = GroupConstant.GROUP_A, topic = TopicConstant.TOPIC_B)public class TopicAConsumer implements RocketMQListener {@Overridepublic void onMessage(Message message) {log.info(JsonUtil.toJsonStr(message));}}