pom.xml导入RocketMQ依赖
org.apache.rocketmqrocketmq-spring-boot-starter2.2.2
application.yml中添加配置
rocketmq:name-server: 127.0.0.1:9876producer:group: xaccess-key: myaccesskeysecret-key: mysecretKeysend-message-timeout: 10000tls-enable: trueconsumer:group: xaccess-key: myaccesskeysecret-key: mysecretKeytls-enable: true
创建MQ工具类
public class MqUtil {private final RocketMQTemplate rocketMQTemplate;public MqUtil(RocketMQTemplate rocketMQTemplate) {this.rocketMQTemplate = rocketMQTemplate;}/** * 单条通知发送 * * @param topic 主题 * @param message 消息 */public void convertAndSend(String topic, Object message) {rocketMQTemplate.convertAndSend(topic, message);}/** * 批量通知发送 * * @param topic主题 * @param messages 消息集合 */public <T extends Message> SendResult syncSend(String topic, Collection messages) {return rocketMQTemplate.syncSend(topic, messages);}/** * 批量通知发送 * * @param topic主题 * @param messages 消息集合 * @param sendCallback 回调函数 */public <T extends Message> void asyncSend(String topic, Collection messages, SendCallback sendCallback) {rocketMQTemplate.asyncSend(topic, messages, sendCallback);}}
注入工具类Bean
@Beanpublic MqUtil mqUtil(RocketMQTemplate rocketMQTemplate) {return new MqUtil(rocketMQTemplate);}
测试发消息
@Resourceprivate MqUtil mqUtil;@Testpublic void test() {mqUtil.convertAndSend(TopicConstant.TOPIC_B, "123456");}
订阅接收消息
@Slf4j@Service@RequiredArgsConstructor(onConstructor = @__({@Autowired}))@RocketMQMessageListener(consumerGroup = GroupConstant.GROUP_A, topic = TopicConstant.TOPIC_B)public class TopicAConsumer implements RocketMQListener {@Overridepublic void onMessage(Message message) {log.info(JsonUtil.toJsonStr(message));}}