RocketMQ提供了“顺序消息”功能。在分析其用法与实现之前,需要先了解下这个功能本身的定义与约束。

功能定义

支持顺序消息即代表消息队列支持按照消息的顺序投递给消费方。RocketMQ是怎么来支持这种顺序性的?

RocketMQ中顺序消息的顺序定义为到达Broker的先后顺序,而不是消息自身的逻辑顺序。因此需要Producer将顺序相关的消息投递到同一个MessageQueue中,这样通过RocketMQ默认的消息处理与存储机制来保证消息投递的顺序性。

因此,使用者需要注意发送顺序消息需要有明确的先后顺序。如果同一个Producer发送顺序消息,那么自然发送顺序回合消息到达Broker顺序一致,如果是不同机器上的多个Producer发送,如果时间很临近,那么逻辑上的顺序性是不能够保证的。

使用方法

了解基本定义与约束之后,直接看下作为调用方通过怎样的接口来使用顺序消息,

发送顺序消息

前面说了顺序消息需要发送到同一个MessageQueue中,因此Producer在发送消息的时候需要指定MessageQueue,一般会选择通过实现MessageQueueSelector来进行分发,

String topic = "Topic";
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.start();
for (int i = 0; i < 10; i++) {
    Message msg = new Message(topic, String.format("msg %d", i).getBytes());
    producer.send(msg, new MessageQueueSelector() {
        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg) {
            return list.get(0);
        }        
    }, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
        }

        @Override
        public void onException(Throwable throwable) {
        }
    });
}

消费消息

顺序消息的消费方也需要一定处理,通过注册MessageListenerConcurrently的实现来消费顺序消息,

String topic = "Topic";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(topic, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt msg : list) {
            System.out.println(msg);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

内部实现

Producer的相关逻辑很简单,在发送消息之前通过MessageQueueSelector的接口来选择MessageQueue,而后再进行正常的消息发送流程就可以了,简单的调用流程为,

DefaultMQProducer.send
DefaultMQProducerImpl.send
DefaultMQProducerImpl.sendSelectImpl
DefaultMQProducerImpl.sendKernelImpl

相对复杂的处理在Consumer端。对于普通消息与顺序消息,Consumer分别通过ConsumeMessageConcurrentlyService、ConsumeMessageOrderlyService来进行处理。

先来推测下,因为存在多个Consumer,不同Consumer之间消费消息的顺序如何进行保证呢?

BROADCASTING模式

广播模式下,每一条消息都会发送给所有Consumer,因此Consumer之间不需要进行协调。每个Consumer按顺序消费消息就可以了。

CLUSTERING模式

如果是CLUSTERING模式,一个消息只会被投递到一个Consumer,怎么实现?

从ConsumeMessageOrderlyService代码来看,它会定期向Broker发送lockBatchMQ操作,尝试对MessageQueue进行加锁。而后每次消费消息的时候创建ConsumeRequest进行异步消费,开始消费之前先判定本地是否获取了ProcessQueue的锁,

this.processQueue.isLocked() && !this.processQueue.isLockExpired())

ConsumeMessageOrderlyService中通过定时任务不断去尝试进行加锁,如果Broker返回成功,则设置本地ProcessQueue的相应状态。Broker端也会记录锁状态,锁存在一个超时机制,默认超时为RebalanceLocalManager.REBALANCE_LOCK_MAX_LIVE_TIME

协调完不同Consumer之后,Consumer内部是通过多线程去进行消费的,因此ConsumeMessage的处理中会对MessageQueue进行锁粒度控制,不同MessageQueue并行消费。

总结

RocketMQ的顺序消息实现的并不复杂,但提供的这个特性有一定约束前提在。此外通过ConsumeMessageOrderlyService可以发现,为了保证消费有序,引入了一个分布式锁的概念实现,顺序消息的响应吞吐比普通消息会差很多。