CommitLog的存储结构之前已经分析了,那么就具体来看下Broker是怎么将数据落地写入到磁盘的。

线程模型

在消息写入上,Broker的实现可以认为是“并行处理、单一写入”。

Broker与Producer之间通过Netty维持长连接。Producer发送过来的消息在经过处理之后会交由BrokerController.sendMessageExecutor线程组进行处理。不过在最终写入CommitLog内存数据的时候进行了一步加锁处理。在写入数据之前,需要申请CommitLog.putMessageLock锁。内存中CommitLog写入完成后会释放锁,之后的步骤也是并发处理的。

从代码上可以看到,消息在进入Broker之后经过的调用流程不长,不考虑数据持久化的情况下,在性能上影响最大的就是加锁那一步的处理。

同步刷盘/异步刷盘

通过MessageStoreConfig.flushDiskType可以配置Broker刷盘方式。存在两种刷盘方式,同步刷盘与异步刷盘。

CommitLog.putMessage结束的最后调用CommitLog.handleDiskFlush方法,此方法中会根据刷盘策略,分别进行同步、异步处理。实际代码逻辑中,同步、异步方式,写入文件都是在独立的FlushCommitLogService中实现的。两者的差别主要在同步模式下,原线程进行了等待处理,异步模式下,原线程只是进行通知而不进行等待。

MappedFile

Broker收到的消息都是存储在CommitLog中,CommitLog会对应一个个独立的数据文件。I/O操作一般都是最为耗时的,RocketMQ为了提升I/O效率,使用了NIO中提供的FileChannel.map方法,将文件以mmap形式映射到内存中。

Broker收到消息之后会将消息写入MappedFile的mappedByteBuffer当中,而后再由刷盘实现将内存中变更的数据刷入磁盘当中。在异步刷盘模式下,因为刷盘操作位于独立线程,因此可以获得很好的性能。

TransientStorePool

与数据存储相关的还有TransientStorePool这一实现。通过MessageStoreConfig.transientStorePoolEnable可以进行配置,不过只在异步刷盘模式下生效。

TransientStorePool与MappedFile在数据处理上的差异在什么地方呢?分析其代码,TransientStorePool会通过ByteBuffer.allocateDirect调用直接申请对外内存,消息数据在写入内存的时候是写入预申请的内存中。在异步刷盘的时候,再由刷盘线程将这些内存中的修改写入文件。

那么与直接使用MappedByteBuffer相比差别在什么地方呢?修改MappedByteBuffer实际会将数据写入文件对应的Page Cache中,而TransientStorePool方案下写入的则为纯粹的内存。因此在消息写入操作上会更快,因此能更少的占用CommitLog.putMessageLock锁,从而能够提升消息处理量。使用TransientStorePool方案的缺陷主要在于在异常崩溃的情况下回丢失更多的消息。

方案选择

同步/异步方式下,能够明显感知到的是Broker的处理性能。同步的优点则是能够保证不丢消息。所以同步/异步的选择就是在性能与消息可靠性上做选择。TransientStorePool能够进一步提升性能,但也增加了异常情况下的消息丢失数量。所以具体根据需求场景去进行选择吧。

通常情况下,消息队列本身就没有被看做是一个特别稳定的消息源,因此异步的选择更为常见。