Producer发送的消息会进入Broker,在Broker正确处理完消息之后,Consumer才能够获取到消息进行消费。RocketMQ消息处理能力的关键在于Broker相关逻辑的实现。如果能够理清楚Broker的内在细节,那么可以说是真正掌握了RocketMQ的用法。这里尝试对Broker进行分析吧。

Broker结构

Broker的结构从整体上来看其实与Name Server差异不大,基本都是Startup + Controller形式,Controller中再封装大量成员组件,具体逻辑由这些成员组件来进行完成。Broker与Producer/Consumer/NameServer之间都是通过Netty维持长连接。

Broker的整体工作流程比较简单,Broker收到来自Producer的消息,将消息持久化,而后对其创建索引与ConsumeQueue。Consumer发送数据请求到Broker拉取数据。Broker定期注册自身信息到Name Server,同时接受Name Server发送过来的一些请求命令。

Broker数据

Broker维护核心数据都是和消息直接相关,分别是CommitLog、ConsumeQueue、IndexFile,这三个数据存储流程大致可以简化为,

CommitLog

数据写入CommitLog是第一步,CommitLog写入之后通过dispatch形式去写入ConsumeQueue与IndexFile。CommitLog写入能力实际就是Broker处理消息的能力。

消息进入CommitLog的路径很明确,考虑正常消息情况,Netty层收到RequestCode.SEND_MESSAGE、RequestCode.SEND_MESSAGE_V2之后会将其交由SendMessageProcessor处理,会经由DefaultMessageStore.putMessage方法到达CommitLog.putMessage。消息格式持久化写入逻辑在DefaultAppendMessageCallback.doAppend中。这个方法中会将Message对象转化成固定格式的Byte二进制形式进行存储。消息存储格式,

RocketMQ中,不同Topic的消息都会写入同一个CommitLog文件中,CommitLog存放路径位于MessageStoreConfig.storePathCommitLog中。单个CommitLog文件大小限制为MessageStoreConfig.mapedFileSizeCommitLog。

ConsumeQueue

ConsumeQueue对应MessageQueue,记录了每一个Queue中每一条消息在CommitLog中的存储偏移,Consumer能够消费到的消息一定是写入ConsumeQueue之后的消息。其每一条记录的存储格式为,

ConsumeQueue在持久化机制上与CommitLog类似,其路径大小也在MessageStoreConfig中进行配置。

IndexFile

在消息发送消费逻辑下IndexFile倒不是必需品,IndexFile顾名思义就是索引文件,这个索引是针对消息中的Keys进行构建的。其整体结构概念类似文件化了的哈希表,查询添加方式类似哈希表的连链地址法。IndexFile的作用就是给定Topic以及Key查询消息。主要应用场景也是在数据统计以及问题排查上。

单个IndexFile的大小是固定的,其计算方式为,

IndexHeader.INDEX_HEADER_SIZE + (MessageStoreConfig.maxHashSlotNum * 4) + (MessageStoreConfig.maxIndexNum * 20)

其结构为,

总结

CommitLog、ConsumeQueue、IndexFile是Broke人中维护的主要数据,这里暂时只是介绍了其格式与大体用途,后续会再来具体分析其写入过程逻辑。

Broker中还有什么其它数据吗?其实可以去Broker存储目录下去观察,RocketMQ的数据存储都是基于文件的,所以可以直接通过文件系统上存在的内容来进行了解。