文章目录
- 添加 RocketMQ 依赖
- 消费者 Consumer
- YAML 配置
- 创建监听器
- 消息过滤
- Tag 过滤
- 生产者 Producer
- YAML 配置
- 发送同步消息
- 发送异步消息
- 发送单向消息
- 发送延迟消息
- 发送顺序消息
- 发送批量消息
- 发送集合消息
添加 RocketMQ 依赖
在 Maven 仓库【https://mvnrepository.com/】中搜索 RocketMQ 依赖:
在 SpringBoot 项目的 Pom.xml 文件中添加对应 MQ 版本的依赖:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.2</version></dependency>
消费者 Consumer
YAML 配置
在 SpringBoot 项目的 yml 配置文件中添加以下配置:
rocketmq:name-server: 192.168.68.121:9876 # rocketMq的nameServer地址
创建监听器
创建一个 MQMsgListener 类用于监听 RocketMQ 的消息,类上标注注解:@Component
、@RocketMQMessageListener
,该类需要实现 RocketMQListener 接口,并使用泛型指定接收的消息类型:
@Component@RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")public class MQMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String msgId = message.getMsgId();String msg = new String(message.getBody());System.out.println("消息id:"+msgId+"消息内容:"+msg);}}
@RocketMQMessageListener
注解参数如下:
参数 | 描述 |
---|---|
topic | 消费者订阅的主题 |
consumerGroup | 消费者组 |
consumeMode | 消费模式:并发接收消息 | 有序接收消息【ConsumeMode.CONCURRENTLY or ConsumeMode.ORDERLY 】 |
messageModel | 消息模式:集群模式 | 广播模式【MessageModel.CLUSTERING or MessageModel.BROADCASTING 】 |
selectorType | 过滤消息的方式:Tag | SQL92【SelectorType.TAG or SelectorType.SQL92 】 |
selectorExpression | 过滤消息的表达式:Tag | SQL92【`tag1 |
maxReconsumeTimes | 消息消费失败后,可被重复投递的最大次数。消息重试只针对集群消费模式生效。 |
delayLevelWhenNextConsume | 并发模式的消息重试策略。-1,无需重试,直接放入死信队列(%DLQ%+消费组) |
消息过滤
Tag 过滤
消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。
编写并启动消费者项目订阅 tagTopic 主题:
@Component@RocketMQMessageListener(topic = "tagTopic",consumerGroup = "boot-mq-group-consumer",selectorType = SelectorType.TAG,selectorExpression = "java")public class MQMsgListener implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println(message);}}
编写生产者 Controller,使用 RocketMQTemplate 的 syncSend()
方法发送一个带 Tag 的同步消息:
@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/tag")public String sendSyncMessage() {SendResult result = rocketMQTemplate.syncSend("tagTopic:java", "这是一个带有 java tag 的消息");return "发送状态:" + result.getSendStatus() + "
消息id:" + result.getMsgId();}}
运行项目,访问接口:http://localhost:8080/send/tag
查看 RocketMQ 控制台,可以看到消息带有 java tag:
查看消费者项目的 IDEA 控制台:
生产者 Producer
YAML 配置
在 SpringBoot 项目的 yml 配置文件中添加以下配置:
rocketmq:name-server: 192.168.68.121:9876 # rocketMq的nameServer地址producer:group: boot-mq-group-producer # 生产者组名
注:生产者需要标注生产者组名,否则会报异常:
'org.apache.rocketmq.spring.core.RocketMQTemplate' that could not be found.
发送同步消息
编写 Controller,使用 RocketMQTemplate 的 syncSend() 方法发送同步消息,并将消息发送的结果进行打印:
@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/sync/{msg}")public String sendSyncMessage(@PathVariable String msg){SendResult result = rocketMQTemplate.syncSend("syncTopic", msg);return "发送状态:"+result.getSendStatus()+"
消息id:"+result.getMsgId();}}
运行项目,访问接口:http://localhost:8080/send/sync/同步消息
访问控制台,查看【syncTopic】主题,可以看到队列中存在一条消息:
发送异步消息
不同于同步消息,异步消息在发出后,并不会等待服务端返回响应,直接继续向下执行,发送方通过回调接口接收服务端响应,并处理响应结果。
编写 Controller,使用 RocketMQTemplate 的 asyncSend()
方法发送异步消息,并使用回调接口打印发送的结果:
@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/async/{msg}")public String sendAsyncMessage(@PathVariable String msg) {rocketMQTemplate.asyncSend("asyncTopic", msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("异步消息发送成功");}@Overridepublic void onException(Throwable throwable) {System.out.println("异步消息发送失败");}});System.out.println("异步消息已发送完成");return "发送异步消息";}}
运行项目,访问接口:http://localhost:8080/send/async/异步消息,查看 IDEA 控制台:
访问控制台,查看【asyncTopic】主题,可以看到队列中存在一条消息:
发送单向消息
编写 Controller,使用 RocketMQTemplate 的 sendOneWay()
方法发送单向消息:
@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/oneWay/{msg}")public String sendOneWayMessage(@PathVariable String msg) {rocketMQTemplate.sendOneWay("oneWayTopic",msg);return "单向消息发送成功";}}
运行项目,访问接口:http://localhost:8080/send/oneWay/单向消息
访问控制台,查看【oneWayTopic】主题,可以看到队列中存在一条消息:
发送延迟消息
编写并启动消费者项目订阅 delayTopic 主题:
@Component@RocketMQMessageListener(topic = "delayTopic",consumerGroup="boot-mq-group-consumer")public class MQMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String msgId = message.getMsgId();String msg = new String(message.getBody());System.out.println("消息id:"+msgId+"\n消息内容:"+msg+"\n消息收到时间:"+new Date());}}
编写生产者 Controller,使用 RocketMQTemplate 的 syncSend()
方法发送同步消息:
@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/delay/{msg}")public String sendDelayMessage(@PathVariable String msg) {Message<String> message = MessageBuilder.withPayload(msg).build();// 延迟级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"SendResult result = rocketMQTemplate.syncSend("delayTopic", message, 2000, 3);return "发送状态:" + result.getSendStatus() + "
消息id:" + result.getMsgId()+"
消息发送时间:"+new Date();}}
运行项目,访问接口:http://localhost:8080/send/delay/延迟消息
查看消费者项目的 IDEA 控制台,可以看到过去了10s,对应我们设置的延迟级别。
发送顺序消息
编写订单类,用于模拟【下订单->发短信->物流->签收】的顺序流程:
public class Order {//订单号private String orderId;//订单名称private String orderName;//订单的流程顺序private String seq;}
编写并启动两个消费者项目订阅 orderlyTopic 主题,并将消费模式设置为顺序消费模式:
@Component@RocketMQMessageListener(topic = "orderlyTopic",consumerGroup="boot-mq-group-consumer",consumeMode = ConsumeMode.ORDERLY)public class MQMsgListener implements RocketMQListener<Order> {@Overridepublic void onMessage(Order message) {System.out.println("消费者:"+message);}}
编写生产者 Controller,使用 RocketMQTemplate 的 syncSendOrderly()
方法发送同步顺序消息:
@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/orderly")public String sendOrderlyMessage() {List<Order> orders = Arrays.asList(new Order(UUID.randomUUID().toString(), "下订单", "1"),new Order(UUID.randomUUID().toString(), "发短信", "1"),new Order(UUID.randomUUID().toString(), "物流", "1"),new Order(UUID.randomUUID().toString(), "签收", "1"),new Order(UUID.randomUUID().toString(), "下订单", "2"),new Order(UUID.randomUUID().toString(), "发短信", "2"),new Order(UUID.randomUUID().toString(), "物流", "2"),new Order(UUID.randomUUID().toString(), "签收", "2"));//控制流程:下订单->发短信->物流->签收//将 seq 作为 hashKey,这样 seq 相同的会放在同一个队列里面,顺序消费orders.forEach(order -> {rocketMQTemplate.syncSendOrderly("orderlyTopic",order,order.getSeq());});return "发送成功";}}
运行项目,访问接口:http:localhost:8080/send/orderly
查看 RocketMQ 控制台,可以看到我们的消息分别存储在两个队列中:
查看消费者项目的 IDEA 控制台,按照消息的顺序进行消费:
发送批量消息
编写并启动消费者项目订阅 batchOrderly 主题:
@Component@RocketMQMessageListener(topic = "batchOrderly",consumerGroup="boot-mq-group-consumer")public class MQMsgListener implements RocketMQListener<Order> {@Overridepublic void onMessage(Order message) {System.out.println(Thread.currentThread().getName()+":"+message);}}
编写生产者 Controller,将消息打包成 Collection msgs
传入 syncSend()
方法中发送:
@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/batch")public String sendOrderlyMessage() {List<Message> messages = Arrays.asList(MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build(),MessageBuilder.withPayload(new Order(UUID.randomUUID().toString(), "下订单", "1")).build());return rocketMQTemplate.syncSend("batchOrderly", messages).getSendStatus().toString();}}
运行项目,访问接口:http:localhost:8080/send/batch
查看 RocketMQ 控制台,可以看到队列中一次传入4条消息:
查看消费者项目的 IDEA 控制台,多个线程并发进行消费:
发送集合消息
编写并启动消费者项目订阅 listTopic 主题:
@Component@RocketMQMessageListener(topic = "listTopic",consumerGroup="boot-mq-group-consumer")public class MQMsgListener implements RocketMQListener<List<Order>> {@Overridepublic void onMessage(List<Order> orders) {orders.forEach(o -> {System.out.println(Thread.currentThread().getName()+":"+o);});}}
编写生产者 Controller,将集合传入 syncSend()
方法中发送:
@RestControllerpublic class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@GetMapping("/send/list")public String sendOrderlyMessage() {List<Order> orders = Arrays.asList(new Order(UUID.randomUUID().toString(), "下订单", "1"),new Order(UUID.randomUUID().toString(), "下订单", "1"),new Order(UUID.randomUUID().toString(), "下订单", "1"),new Order(UUID.randomUUID().toString(), "下订单", "1"));rocketMQTemplate.syncSend("listTopic",orders);return "发送成功";}}
运行项目,访问接口:http:localhost:8080/send/list
查看 RocketMQ 控制台,可以看到队列中一条消息:
查看消费者项目的 IDEA 控制台,进行消费: