RocketMQ提供了延时消息实现,不过这个延时是一定级别的延迟,默认在MessageStoreConfig.messageDelayLevel定义,

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

这里来看下RocketMQ实现这一功能的相关实现。

消息存储

延时消息的存储过程与一般消息存在差异,在CommitLog.putMessage中对其做了特殊处理。

设置了Delay Level的消息,在存盘之前Broker会修改Topic为延时Topic,SCHEDULE_TOPIC_XXXX,同时备份原Topic信息到消息属性当中。而后则按正常消息存储流程进行处理。不同的Delay Level会对应到不同的MessageQueue中。

消息投递

因为修改了Topic,所以数据存盘之后Consumer并不能消费到消息。在消费之前需要将消息重新投递回初始的Topic,再经由正常消费逻辑进行处理。

重新投递这一逻辑在ScheduleMessageService中实现。从代码来看,延时的处理策略不复杂。

ScheduleMessageService中通过一个Timer来延时触发投递消息检查。每一个Delay Level对应一个DeliverDelayedMessageTimerTask。当对应Task被触发时会去检查当前Delay Level对应的MessageQueue中待处理的消息时间是否到达,如果没有满足条件则再次定时检查,如果延时时间满足,则恢复Message原Topic,并通过DefaultMessageStore.putMessage重新进入正常消息处理流程。

定时消息

RocketMQ没有提供定时消息功能,如何在MQ中实现支持任意延迟的消息对这个问题进行了分析讨论,个人觉得描述的比较清楚。

如果后续考虑在RocketMQ上实现定时消息功能的话,可以按照类似的思路进行实施。