延迟消息是在业务场景中比较常用的功能,可以作为延迟队列。比如订单n分钟未支付自动取消,活动倒计时,定时发消息都可以使用到延迟消息。
一、延迟消息介绍
生产者将消息发送到Broker,消费端不会立即消费,需要到达指定延迟时间才能被消费端消费。
RocketMQ的延迟消息,默认支持设置18个延迟等级,每一个等级对应一个延迟时间。在Broker中会创建一个默认的SCHEDULE_TOPIC_XXXX的topic,topic下有18个队列,对应18个延迟等级。消息发送过来,会先把消息存储在topic名字为SCHEDULE_TOPIC_XXXX的队列内,等着延迟时间到了,在转发到目标队列,之后消费者在消费。
由于broker是集群模式部署,每个节点都有18个队列。默认的延迟级别不满足业务需求也可以通过Message.setDelayTimeLevel( )方法进行设置,如下代码所示,修改最后一项时间配置2h为20h,这样第18等级对应的延迟时间就变更为20h。或者在配置后面新增一个20h,这样就会变成19个等级。
//默认private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";//修改成20hprivate String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 20h";//新增20h,等级就变成了19private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 20h";
二、延迟消息举例
生产者发送消息时,设置Message的DelayTimeLevel字段,设置消息的延迟等级。比如设置DelayTimeLevel=4 延迟等级,则对应的延迟为30s。消费者30s之后可以消费到该消息。
Message msg = new Message(topic, tag, body);//设置延迟level为4 对应30s。传输对应等级,不传输具体时间限制。//level设置为0表示不延迟msg.setDelayTimeLevel(level);
三、延时消息实现原理
RocketMQ延迟消息在Broker内部流程图:
步骤说明:
1.Broker接收到写入的消息,先将目标Topic和队列信息存储到消息的属性(Message对象内的Map properties)中,之后修改Message内的目标的Topic为SCHEDULE_TOPIC_XXXX,并根据生产者设置的延迟级别(delayTimeLevel)存入特定的queue。QueueId=delayTimeLevel-1。把消息内容写入到CommitLog中。
对应源码位置:org.apache.rocketmq.store.CommitLog#putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {......// Delay Delivery 是否有延迟级别if (msg.getDelayTimeLevel() > 0) {//判断生产者传输的延迟等级是否超过了设置的最大值,超过了需要修改为设置的最大取值if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}//修改topic为延迟队列topic SCHEDULE_TOPIC_XXXXtopic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;//根据延迟等级获取队列id msg.getDelayTimeLevel() - 1。确定放到SCHEDULE_TOPIC_XXXX中的那个队列queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId 记录原始的topic 和 queueId。MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));//更新消息发送的topic和queueIdmsg.setTopic(topic);msg.setQueueId(queueId);}.....}
2.异步的把CommitLog中的信息存储到Topic为SCHEDULE_TOPIC_XXXX的CosumeQueue中。CosumeQueue存储结构为CommitLog offset记录在commitLog中的位置、size记录存储消息的大小和MessageTag HashCode记录消息的Tag的哈希值,对于延迟消息记录的是投递时间的时间戳。投递时间 = 消息存储时间(storeTimestamp) + 延迟级别对应的时间。
对应源码位置:org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize
public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC, final boolean readBody) {// Timing message processing 定时消息处理逻辑{String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);//判断topic是否是延迟队列if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(topic) && t != null) {int delayLevel = Integer.parseInt(t);//判断生产者传输的延迟等级是否超过了设置的最大值,超过了需要修改为设置的最大取值if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();}//延迟界别大于0需要 计算 投递时间戳当作消息Tag的哈希值存储到CosumeQueue中。if (delayLevel > 0) {tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,storeTimestamp);}}}}
3.broker内部的ScheduleMessageService类,作为延迟服务消费SCHEDULE_TOPIC_XXXX的Topic消息,启动时会创建一个定时器timer。根据延迟级别的个数,启动数量相等的TimerTask,每一个TimerTask负责一个延迟级别的消费和投递。每隔1秒钟执行一次DeliverDelayedMessageTimerTask任务,将到期的消息从延迟队列中写入正常Topic中。每个TimerTask检查消息是否到期,只会检查第一条,如果没到期,其他的不会继续检查。如果到期了,投递成功继续检查下一条消息是否过期。
对应源码位置:org.apache.rocketmq.store.schedule.ScheduleMessageService#start
private final ConcurrentMap delayLevelTable = new ConcurrentHashMap(32);public void start() {//CAS操作 保证了同一时间只会有一个DeliverDelayedMessageTimerTask执行。executeOnTimeupif (started.compareAndSet(false, true)) {//创建定时器timerthis.timer = new Timer("ScheduleMessageTimerThread", true);//遍历延迟队列针对每个级别创建一个TimerTaskfor (Map.Entry entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();Long timeDelay = entry.getValue();Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}if (timeDelay != null) {//每隔1秒钟执行一次延迟队列任务,将到期的消息从延迟队列中写入正常Topic中this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {if (started.get()) ScheduleMessageService.this.persist();} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());}}
4.消息到期后需要投递到目标的topic中。把第一步记录的目标Topic和队列信息获取到,重新设置Topic存储到CommitLog中,由于之前tagsCode存储的是消息投递时间,需要重新计算tag的哈希值之后在存储。
对应源码位置org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#messageTimeup
private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setBody(msgExt.getBody());msgInner.setFlag(msgExt.getFlag());MessageAccessor.setProperties(msgInner, msgExt.getProperties());TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());//重新计算tagcodelong tagsCodeValue = MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());msgInner.setTagsCode(tagsCodeValue);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));msgInner.setSysFlag(msgExt.getSysFlag());msgInner.setBornTimestamp(msgExt.getBornTimestamp());msgInner.setBornHost(msgExt.getBornHost());msgInner.setStoreHost(msgExt.getStoreHost());msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());msgInner.setWaitStoreMsgOK(false);MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);int queueId = Integer.parseInt(queueIdStr);msgInner.setQueueId(queueId);return msgInner;}
5.将消息直接投递到目标Topic的CosumeQueue中。
6.消费者消费目标topic中的数据。