soliloquize 2018-07-19T22:28:55+08:00 RocketMQ(十一)顺序消息实现 2018-07-19 22:28:55 numerical /2018/07/19/RocketMQ十一顺序消息实现

RocketMQ提供了“顺序消息”功能。在分析其用法与实现之前,需要先了解下这个功能本身的定义与约束。

功能定义

支持顺序消息即代表消息队列支持按照消息的顺序投递给消费方。RocketMQ是怎么来支持这种顺序性的?

RocketMQ中顺序消息的顺序定义为到达Broker的先后顺序,而不是消息自身的逻辑顺序。因此需要Producer将顺序相关的消息投递到同一个MessageQueue中,这样通过RocketMQ默认的消息处理与存储机制来保证消息投递的顺序性。

因此,使用者需要注意发送顺序消息需要有明确的先后顺序。如果同一个Producer发送顺序消息,那么自然发送顺序回合消息到达Broker顺序一致,如果是不同机器上的多个Producer发送,如果时间很临近,那么逻辑上的顺序性是不能够保证的。

使用方法

了解基本定义与约束之后,直接看下作为调用方通过怎样的接口来使用顺序消息,

发送顺序消息

前面说了顺序消息需要发送到同一个MessageQueue中,因此Producer在发送消息的时候需要指定MessageQueue,一般会选择通过实现MessageQueueSelector来进行分发,

String topic = "Topic";
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.start();
for (int i = 0; i < 10; i++) {
    Message msg = new Message(topic, String.format("msg %d", i).getBytes());
    producer.send(msg, new MessageQueueSelector() {
        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg) {
            return list.get(0);
        }        
    }, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
        }

        @Override
        public void onException(Throwable throwable) {
        }
    });
}

消费消息

顺序消息的消费方也需要一定处理,通过注册MessageListenerConcurrently的实现来消费顺序消息,

String topic = "Topic";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(topic, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt msg : list) {
            System.out.println(msg);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

内部实现

Producer的相关逻辑很简单,在发送消息之前通过MessageQueueSelector的接口来选择MessageQueue,而后再进行正常的消息发送流程就可以了,简单的调用流程为,

DefaultMQProducer.send
DefaultMQProducerImpl.send
DefaultMQProducerImpl.sendSelectImpl
DefaultMQProducerImpl.sendKernelImpl

相对复杂的处理在Consumer端。对于普通消息与顺序消息,Consumer分别通过ConsumeMessageConcurrentlyService、ConsumeMessageOrderlyService来进行处理。

先来推测下,因为存在多个Consumer,不同Consumer之间消费消息的顺序如何进行保证呢?

BROADCASTING模式

广播模式下,每一条消息都会发送给所有Consumer,因此Consumer之间不需要进行协调。每个Consumer按顺序消费消息就可以了。

CLUSTERING模式

如果是CLUSTERING模式,一个消息只会被投递到一个Consumer,怎么实现?

从ConsumeMessageOrderlyService代码来看,它会定期向Broker发送lockBatchMQ操作,尝试对MessageQueue进行加锁。而后每次消费消息的时候创建ConsumeRequest进行异步消费,开始消费之前先判定本地是否获取了ProcessQueue的锁,

this.processQueue.isLocked() && !this.processQueue.isLockExpired())

ConsumeMessageOrderlyService中通过定时任务不断去尝试进行加锁,如果Broker返回成功,则设置本地ProcessQueue的相应状态。Broker端也会记录锁状态,锁存在一个超时机制,默认超时为RebalanceLocalManager.REBALANCE_LOCK_MAX_LIVE_TIME

协调完不同Consumer之后,Consumer内部是通过多线程去进行消费的,因此ConsumeMessage的处理中会对MessageQueue进行锁粒度控制,不同MessageQueue并行消费。

总结

RocketMQ的顺序消息实现的并不复杂,但提供的这个特性有一定约束前提在。此外通过ConsumeMessageOrderlyService可以发现,为了保证消费有序,引入了一个分布式锁的概念实现,顺序消息的响应吞吐比普通消息会差很多。

java.time的基本使用方法 2018-07-18 21:09:13 numerical /2018/07/18/javatime的基本使用方法

JDK 8提供了新的时间日期库,相关实现位于java.time包下。这里记录下基本用法,大致列举下常用的时间日期相关操作,

常见用法

当前时间获取

LocalDateTime dateTime = LocalDateTime.now();
LocalDate date = LocalDate.now();
LocalTime time = LocalTime.now();

LocalDateTime包含日期与具体时间,LocalDate只包含日期,LocalTime只包含时间。看到这个类定义,顺带想起Python中提供的对应模块,有点接近,可以对照看一下,

now = datetime.datetime.now()
date = now.date()
time = now.time()

静态构造

LocalDateTime等类提供了一组静态函数用以构建对象,

LocalDateTime end = LocalDateTime.of(2012, 12, 21, 3, 14, 35);

通过Java的函数重载机制可以在多种精度上构建事件对象。

格式化输出与字符串解析

时间格式化通过DateTimeFormatter类来实现,默认提供了一组预定义的转化定义,也可以通过字符串来进行构建,

LocalDateTime now = LocalDateTime.now();
now.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
LocalDateTime end = LocalDateTime.parse("2012-12-21 03:14:35", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

日期时间与当前时刻的互相转化

以前获取当前时刻的毫秒数通过下面代码进行获取,

System.currentTimeMillis();

java.time提供了Instant类来获取同样的时刻点,

Instant.now().toEpochMilli();

LocalDateTimeInstant之间可以互相进行转化,

LocalDateTime now = LocalDateTime.ofInstant(Instant.now(), ZoneId.systemDefault());
now.toInstant(ZoneOffset.UTC).toEpochMilli();

Instant可以从时刻进行构建,

Instant.ofEpochMilli(System.currentTimeMillis());

时间计算

时间相关的计算主要有两种,一种为给定时间和变化量,获取运算后的时间,

LocalDateTime now = LocalDateTime.now();
now = now.plus(1, ChronoUnit.DAYS);

LocalDateTime中提供了一组plusminuswith接口用于计算。

另外一种为计算两个时间之间的差距,

LocalDateTime start = LocalDateTime.of(2018, 1, 1, 0, 0, 0);
LocalDateTime end = LocalDateTime.of(2018, 1, 2, 1, 0, 1);
Duration duration = Duration.between(start, end);
duration.toDays();
duration.toHours();
duration.toMinutes();
duration.getSeconds();
start.until(end, ChronoUnit.DAYS);
start.until(end, ChronoUnit.HOURS);
start.until(end, ChronoUnit.SECONDS);
start.until(end, ChronoUnit.HOURS);

如上两种写法都能够获得差值。通过Duration可能会更有利于后续的运算处理。

时区处理

LocalDateTime等表示的是本地时间,不带时区信息。与时间相关的是ZoneDateTime相关类,

ZonedDateTime now = ZonedDateTime.of(LocalDateTime.now(), ZoneId.of("Asia/Shanghai"));

同一时刻在不同时区下的时间展示处理,

long now = System.currentTimeMillis();
LocalDateTime start = LocalDateTime.ofInstant(Instant.ofEpochMilli(now), ZoneId.of("Asia/Shanghai"));
LocalDateTime end = LocalDateTime.ofInstant(Instant.ofEpochMilli(now), ZoneId.of("Asia/Tokyo"));
System.out.println(Duration.between(start, end).toHours());

注意事项

java.time中提供的类基本都是线程安全的,因此可以避免掉先前SimpleDateFormat线程相关问题。在新项目中,时间处理最好都切换到新的API下。

参考

RocketMQ(十)CommitLog实现 2018-07-07 08:40:38 numerical /2018/07/07/RocketMQ十CommitLog实现

RocketMQ的消息通过org.apache.rocketmq.store.CommitLog类进行持久化。其中关键的成员为,

public class CommitLog {
    private final MappedFileQueue mappedFileQueue;
    private final DefaultMessageStore defaultMessageStore;
    private final FlushCommitLogService flushCommitLogService;
    private final FlushCommitLogService commitLogService;
    private final AppendMessageCallback appendMessageCallback;
}

在Broker收到新消息后会调用到CommitLog.putMessage,消息需要进行保存。

获取MappedFile

消息保存首先需要一个可以用于保存的对象,CommitLog内通过MappedFileQueue维护了MappedFile队列,实际数据在MappedFile中进行保存。

MappedFileQueue负责处理MappedFile的申请创建逻辑。在可用的MappedFile不存在时,需要进行创建,

public class MappedFileQueue {
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {...}
}

MappedFile的存储路径在MessageStoreConfig中维护,

public class MessageStoreConfig {
    @ImportantField
    private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
    @ImportantField
    private String storePathCommitLog = System.getProperty("user.home") + File.separator + "store" + File.separator + "commitlog";
    // CommitLog file size,default is 1G
    private int mapedFileSizeCommitLog = 1024 * 1024 * 1024;
}

从中也可以看到MappedFile默认大小为1G。MappedFile文件命名为开始消息对应的偏移量,长度为20位,

public class MappedFileQueue {
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        ...
        String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
        ...
    }
}

public class UtilAll {
    public static String offset2FileName(final long offset) {
        final NumberFormat nf = NumberFormat.getInstance();
        nf.setMinimumIntegerDigits(20);
        nf.setMaximumFractionDigits(0);
        nf.setGroupingUsed(false);
        return nf.format(offset);
    }
}

在准备好MappedFile所需参数后,交由AllocateMappedFileService去进行申请。AllocateMappedFileService为独立的线程,在运行时不断检查是否有相关操作请求,

public class AllocateMappedFileService {
    public void run() {
        while (!this.isStopped() && this.mmapOperation()) {
        }
    }
}

MappedFile通过Java NIO相关接口提供数据持久化能力,

public class MappedFile {
    protected FileChannel fileChannel;
    protected ByteBuffer writeBuffer = null;
    protected TransientStorePool transientStorePool = null;
}

实际在写入数据的时候根据有无writeBuffer来区分处理,

public class MappedFile {
    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        ...
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        ...
    }
}

fileChannel的通过NIO接口直接将文件映射到内存,

public class MappedFile {
    private void init(final String fileName, final int fileSize) throws IOException {
        ...
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        ...
    }
}

writeBuffer则是TransientStorePool通过ByteBuffer.allocateDirect申请的直接内存。

写入数据

在申请到MappedFile之后,向buffer中写入数据的逻辑位于,

class DefaultAppendMessageCallback implements AppendMessageCallback {
    public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) {...}
}

一条消息占用的大小计算方式如下,

public class CommitLog {
    private static int calMsgLength(int bodyLength, int topicLength, int propertiesLength) {
    final int msgLen = 4 //TOTALSIZE
        + 4 //MAGICCODE
        + 4 //BODYCRC
        + 4 //QUEUEID
        + 4 //FLAG
        + 8 //QUEUEOFFSET
        + 8 //PHYSICALOFFSET
        + 4 //SYSFLAG
        + 8 //BORNTIMESTAMP
        + 8 //BORNHOST
        + 8 //STORETIMESTAMP
        + 8 //STOREHOSTADDRESS
        + 4 //RECONSUMETIMES
        + 8 //Prepared Transaction Offset
        + 4 + (bodyLength > 0 ? bodyLength : 0) //BODY
        + 1 + topicLength //TOPIC
        + 2 + (propertiesLength > 0 ? propertiesLength : 0) //propertiesLength
        + 0;
    return msgLen;
}

单条消息的长度上限默认定义在MessageStoreConfig中,实际最大限定为4KB,

public class MessageStoreConfig {
    // The maximum size of a single log file,default is 512K
    private int maxMessageSize = 1024 * 1024 * 4;
}

实际设置数据的代码如下,从中可以看到消息存储中每一部分的数据来源,

public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) {
    ...
    // Initialization of storage space
    this.resetByteBuffer(msgStoreItemMemory, msgLen);
    // 1 TOTALSIZE
    this.msgStoreItemMemory.putInt(msgLen);
    // 2 MAGICCODE
    this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
    // 3 BODYCRC
    this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
    // 4 QUEUEID
    this.msgStoreItemMemory.putInt(msgInner.getQueueId());
    // 5 FLAG
    this.msgStoreItemMemory.putInt(msgInner.getFlag());
    // 6 QUEUEOFFSET
    this.msgStoreItemMemory.putLong(queueOffset);
    // 7 PHYSICALOFFSET
    this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
    // 8 SYSFLAG
    this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
    // 9 BORNTIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
    // 10 BORNHOST
    this.resetByteBuffer(hostHolder, 8);
    this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
    // 11 STORETIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
    // 12 STOREHOSTADDRESS
    this.resetByteBuffer(hostHolder, 8);
    this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
    //this.msgBatchMemory.put(msgInner.getStoreHostBytes());
    // 13 RECONSUMETIMES
    this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
    // 14 Prepared Transaction Offset
    this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
    // 15 BODY
    this.msgStoreItemMemory.putInt(bodyLength);
    if (bodyLength > 0)
        this.msgStoreItemMemory.put(msgInner.getBody());
    // 16 TOPIC
    this.msgStoreItemMemory.put((byte) topicLength);
    this.msgStoreItemMemory.put(topicData);
    // 17 PROPERTIES
    this.msgStoreItemMemory.putShort((short) propertiesLength);
    if (propertiesLength > 0)
        this.msgStoreItemMemory.put(propertiesData);

    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    // Write messages to the queue buffer
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);
    ...
}
RocketMQ(九)Broker发送消息调用流程 2018-07-07 08:38:42 numerical /2018/07/07/RocketMQ九Broker发送消息调用流程

在RocketMQ Producer代码实现中罗列了Producer里面提供的集中消息发送方式。Producer消息是发送给Broker的,这里就再来看下Broker收到消息后的一系列处理流程。

BrokerController

Broker的核心实现在org.apache.rocketmq.broker.BrokerController中,其中与接收消息相关的字段主要有,

public class BrokerController {
    private final BlockingQueue<Runnable> sendThreadPoolQueue;
    private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
    private MessageStore messageStore;
    private RemotingServer remotingServer;
    private RemotingServer fastRemotingServer;
    private ExecutorService sendMessageExecutor;
}

在Broker初始化阶段,Broker会为提供的不同的功能分别创建独立的线程池,与接收消息相关的为sendMessageExecutor,

public boolean initialize() throws CloneNotSupportedException {
    ...
    this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
        this.brokerConfig.getSendMessageThreadPoolNums(),
        this.brokerConfig.getSendMessageThreadPoolNums(),
        1000 * 60,
        TimeUnit.MILLISECONDS,
        this.sendThreadPoolQueue,
        new ThreadFactoryImpl("SendMessageThread_"));
    ...
}

在初始化好RemotingServer之后,注册对应消息的响应处理类,

public void registerProcessor() {
    SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
    sendProcessor.registerSendMessageHook(sendMessageHookList);
    sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
    this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
}

消息经过网络投递到Broker之后,具体的处理就交由org.apache.rocketmq.broker.processor.SendMessageProcessor处理了。

SendMessageProcessor

发送消息的入口处理函数为,

public class SendMessageProcessor {
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {...}
}

SendMessageContext会在收到消息后构建,用于记录追踪消息信息。在实际处理消息的开始与结束分别会执行注册的SendMessageHook定义的拦截函数。具体处理逻辑流转到,

public class SendMessageProcessor {
    private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
        final RemotingCommand request,
        final SendMessageContext sendMessageContext,
        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {...}
}

sendMessage中值得关注的调用有,

public class AbstractSendMessageProcessor {
    protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, final RemotingCommand response) {...}
}

msgCheck中会对topic做一定检查,如果发送消息的topic未创建,那么也会在此时进行创建。有一点暂时还未明白,逻辑中对topic做了很多与MixAll.RETRY_GROUP_TOPIC_PREFIX相关的判定,搜索代码只找到比较的地方,未找到设置对应topic的代码。从命名来看是重试相关topic,后面再看能否找到具体逻辑吧。

MessageExtBrokerInner/MessageExt

在RocketMQ消息定义中分析了Message定义,在Broker收到消息解析数据时会根据Message数据创建MessageExtBrokerInner,其中添加了一些新的字段,

public class MessageExt extends Message {
    private int queueId;
    private int storeSize;
    private long queueOffset;
    private int sysFlag;
    private long bornTimestamp;
    private SocketAddress bornHost;
    private long storeTimestamp;
    private SocketAddress storeHost;
    private String msgId;
    private long commitLogOffset;
    private int bodyCRC;
    private int reconsumeTimes;
    private long preparedTransactionOffset;
}

public class MessageExtBrokerInner extends MessageExt {
    private String propertiesString;
    private long tagsCode;
}

在随后的消息流转中,实际处理的都是MessageExtBrokerInner。

在完成topic初步检查之后,消息会交由org.apache.rocketmq.store.DefaultMessageStore处理。

DefaultMessageStore

DefaultMessageStore是Broker的核心逻辑所在。消息存储、高可用特性实现等都包含在其中。这里还是先关注消息发送的逻辑,对应的入口在,

public class DefaultMessageStore {
    public PutMessageResult putMessage(MessageExtBrokerInner msg) {...}
}

在实际添加消息之前,先对消息做了检查,

if (msg.getTopic().length() > Byte.MAX_VALUE) {
    log.warn("putMessage message topic length too long " + msg.getTopic().length());
    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}

if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
    log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
    return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}

可以看到topic的长度有限制,消息的properties也有限制,分别为Byte.MAX_VALUEShort.MAX_VALUE,也就是127字节和32KB。

最终消息的处理会到org.apache.rocketmq.store.CommitLog

CommitLog

CommitLog在发送消息时的相关入口为,

public class CommitLog {
    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {...}
}

具体存储结构等单独再分析,当putMessage完成之后,会通知Producer发送结果。

RocketMQ(八)NameServer功能与实现 2018-07-07 08:32:06 numerical /2018/07/07/RocketMQ八NameServer功能与实现

RocketMQ中NameServer维护了全局配置、Broker相关信息等。Broker注册信息到NameServer,Producer/Consumer从NameServer拉取数据。NameServer可以单独提供服务,也可以组成集群。

不过RocketMQ中NameServer之间是没有关联的,当存在多个NameServer时,每一个NameServer上保有同样多的数据,Broker等会连接所有NameServer。多个NameServer可以认为主要就是为了应对奔溃异常。

关键类

NamesrvStartup

org.apache.rocketmq.namesrv.NamesrvStartup类负责处理来自命令行的启动请求,实际NameServer逻辑并不直接实现在其中。

NamesrvController

org.apache.rocketmq.namesrv.NamesrvController是NameServer主要逻辑所在类。

public class NamesrvController {
    private final NamesrvConfig namesrvConfig;
    private final NettyServerConfig nettyServerConfig;
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("NSScheduledThread"));
    private final KVConfigManager kvConfigManager;
    private final RouteInfoManager routeInfoManager;
    private RemotingServer remotingServer;
    private BrokerHousekeepingService brokerHousekeepingService;
    private ExecutorService remotingExecutor;
    private Configuration configuration;
}

NameServer主要的功能分别交由上述各成员去实现。

NamesrvConfig

org.apache.rocketmq.namesrv.NamesrvConfig为NameServer所需的一些配置,在NamesrvStartup启动时根据传入参数,从配置文件中读取对应字段的配置。

public class NamesrvConfig {
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    private String productEnvName = "center";
    private boolean clusterTest = false;
    private boolean orderMessageEnable = false;
}

NettyServerConfig

org.apache.rocketmq.remoting.netty.NettyServerConfig是NameServer启动时提供给网络通信相关的参数,同样的也是在NamesrvStartup启动时读取配置文件进行设置。

public class NettyServerConfig implements Cloneable {
    private int listenPort = 8888;
    private int serverWorkerThreads = 8;
    private int serverCallbackExecutorThreads = 0;
    private int serverSelectorThreads = 3;
    private int serverOnewaySemaphoreValue = 256;
    private int serverAsyncSemaphoreValue = 64;
    private int serverChannelMaxIdleTimeSeconds = 120;
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    private boolean serverPooledByteBufAllocatorEnable = true;
}

KVConfigManager

org.apache.rocketmq.namesrv.kvconfig.KVConfgiManager在NameServer中用于记录KV配置,

public class KVConfigManager {
    private final HashMap<String/* Namespace */, HashMap<String/* Key */, String/* Value */>> configTable = new HashMap<String, HashMap<String, String>>();
}

其内部信息保存在configTable这个HashMap中。KVConfigManager自身提供了存储于加载相关处理接口。configTable是一个二级HashMap,namespace暂时理解成将一组配置进行隔离。实际应用场景中namespace会存入什么值还不太清楚。在项目代码中暂时只看到NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG有相关定义。

RouteInfoManager

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager存放着RocketMQ集群中的关键数据信息,Broker、Topic等相关信息,

public class RouteInfoManager {
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

这几个字段的名称基本说明了其中存储了哪类数据。

初始化流程

NameServer启动之后初始化,代码进入,

public class NamesrvController {
    public boolean initialize() {...}
}

大体步骤为,

  • 加载KVConfig
  • 初始化RemotingServer,负责处理网络连接
    • 调用RemotingServer.registerDefaultProcessor,注册消息处理类
  • 启动定时任务,扫描失效Broker,时间间隔是写死在代码中的

    ```java
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
    }, 5, 10, TimeUnit.SECONDS);
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        NamesrvController.this.kvConfigManager.printAllPeriodically();
    }
    

    }, 1, 10, TimeUnit.MINUTES);
    ```

在启动完成之后,NameServer就监听来自网络的消息,默认通过DefaultRequestProcessor作为网络命令的处理入口,等待来自Producer/Consumer/Broker的连接。

Broker信息注册

Broker启动之后会想NameServer进行注册,NameServer这边的处理流程为,

  • DefaultRequestProcessor.processRequest处理类别为RequestCode.REGISTER_BROKER的请求
    • 当前RocketMQ版本会选择DefaultRequestProcessor.registerBrokerWithFilterServer执行
      • 调用RouteInfoManager.registerBroker进行注册
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
    requestHeader.getClusterName(),
    requestHeader.getBrokerAddr(),
    requestHeader.getBrokerName(),
    requestHeader.getBrokerId(),
    requestHeader.getHaServerAddr(),
    registerBrokerBody.getTopicConfigSerializeWrapper(),
    registerBrokerBody.getFilterServerList(),
    ctx.channel());

细看下RouteInfoManager.registerBroker内的实现,

Broker信息添加到clusterAddrTable,其中维护的是一个Cluster下面所有Broker的名称集合,

Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
    brokerNames = new HashSet<String>();
    this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);

Broker信息添加到brokerAddrTable,brokerAddrTable中记录了Broker名称到BrokerData之间的映射,Master Broker、Slave Broker的名称相同,id不同,Master Broker id为0,BrokerData中记录了一组Master、Slave Broker的地址,

BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
    registerFirst = true;
    brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
    this.brokerAddrTable.put(brokerName, brokerData);
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);

当Master Broker注册时,根据一些条件判定进入RouteInfoManager.createAndUpdateQueueData来更新topicQueueTable。QueueData所需的TopicConfig也来自Broker传入。

当Slave Broker注册时,会设置对应的Master Broker信息,

if (MixAll.MASTER_ID != brokerId) {
    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
    if (masterAddr != null) {
        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
        if (brokerLiveInfo != null) {
            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
            result.setMasterAddr(masterAddr);
        }
    }
}
RocketMQ(七)Consumer实现 2018-07-07 08:23:45 numerical /2018/07/07/RocketMQ七Consumer实现

在简单了解Consumer的使用方式之后继续来看下其内部是如何实现的。

类型定义

DefaultMQPushConsumer

org.apache.rocketmq.client.consumer.DefaultMQPushConsumer是Consumer功能的入口,先来看下其关键成员定义,

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
    protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
    private String consumerGroup;
    private MessageModel messageModel = MessageModel.CLUSTERING;
    private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
    private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
    private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
    private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
    private MessageListener messageListener;
    private OffsetStore offsetStore;
}
  • DefaultMQPushConsumerImpl,Consumer实际实现类,外部调用的接口都转发到其实现
  • AllocateMessageQueueStrategy,Consumer分配MessageQueue的策略实现,能找到下列实现,
    • AllocateMessageQueueAveragely
    • AllocateMessageQueueAveragelyByCircle
    • AllocateMessageQueueByConfig
    • AllocateMessageQueueByMachineRoom
    • AllocateMessageQueueConsistentHash
  • OffsetStore,消息消费的偏移存储,有两个实现,
    • LocalFileOffsetStore,保存在Consumer本地
    • RemoteBrokerOffsetStore,保存在Broker

其余大部分字段为各项参数配置,从其它各项参数的命名可以看出PushConsumer内部其实也是用Pull去实现的,只是对上层封装了Push形式的接口。

DefaultMQPushConsumerImpl

DefaultMQPushConsumerImpl是实现的核心类,主要看下其中定义的关键成员,

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
    private MQClientInstance mQClientFactory;
    private PullAPIWrapper pullAPIWrapper;
    private ConsumeMessageService consumeMessageService;
}
  • RebalanceImpl,Consumer端负责负载均衡相关逻辑实现
    • RebalancePushImpl
    • RebalancePullImpl
  • MQClientInstance,网络通信相关实现,Producer也有对应实例
  • PullAPIWrapper,封装了最终发送的Pull相关方法
  • ConsumeMessageService,两种实现,
    • ConsumeMessageOrderlyService,顺序消费下的消息处理服务
    • ConsumeMessageConcurrentlyService,并行消费下的消息处理服务

启动流程

Consumer启动过程与Producer类似,入口为Consumer的start函数,

public class DefaultMQPushConsumer {
    public void start() throws MQClientException {...}
}

随后转移到DefaultMQPushConsumerImpl,

public class DefaultMQPushConsumerImpl {
    public synchronized void start() throws MQClientException {...}
}

真正主要的启动逻辑都在这里,

  • 处理订阅subscribe逻辑
  • 创建MQClientInstance
  • 初始化RebalanceImpl
  • 根据消息消费模式初始化OffsetStore
  • 根据配置初始化ConsumeMessageService
  • 启动内部各项服务

消息拉取流程

在启动完成之后,Consumer就会开始向Broker拉取消息数据。org.apache.rocketmq.client.impl.consumer.PullMessageService服务是负责向Broker拉取数据,也是一级级传递拉取操作,

  • PullMessageService.pullMessage
    • DefaultMQPushConsumerImpl.pullMessage
      • PullAPIWrapper.pullKernelImpl

拉取完成之后又会重新想拉取队列中放入请求操作,之后就是一直间隔着去执行拉取。

那么第一次拉取操作的源头来自于哪呢?分析代码后发现,初次操作来源于RebalanceService,

  • RebalanceService.run
    • MQClientInstance.doRebalance
      • DefaultMQPushConsumerImpl.doRebalance
        • RebalanceImpl.doRebalance
          • RebalanceImpl.rebalanceByTopic
            • RebalanceImpl.updateProcessQueueTableInRebalance
              • RebalancePushImpl.dispatchPullRequest
                • DefaultMQPushConsumerImpl.executePullRequestImmediately

消息消费流程

Consumer拉取消息之后,如果能获取到消息,那么进入消费流程。拉取成功之后调用PullCallback.onSuccess

public class DefaultMQPushConsumerImpl {
    public void pullMessage(final PullRequest pullRequest) {
        ...
        PullCallback pullCallback = new PullCallback() {...}
        ...
    }
}

消息的消费调用在回调中被转移给ConsumeMessageService实例,在ConsumeRequest.run中会最终调用MessageLister中定义的consumeMessage方法。

辅助结构

PullRequest

Consumer的Pull请求封装在,

public class PullRequest {
    private String consumerGroup;
    private MessageQueue messageQueue;
    private ProcessQueue processQueue;
    private long nextOffset;
}

Pull请求针对的是Topic下面的MessageQueue,包含了下一条消息的偏移量信息。

ProcessQueue

ProcessQueue是Consumer本地消息消费相关的具体实现,

public class ProcessQueue {
    private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
    private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
}

拉取到消息之后在这里进行处理。

LocalFileOffsetStore/RemoteBrokerOffsetStore

OffsetStore的两种实现,根据不同的消费方式,offset信息被保存在Consumer本地或是Broker上。

RocketMQ(六)Consumer使用 2018-07-06 06:53:39 numerical /2018/07/06/RocketMQ六Consumer使用

Consumer是RocketMQ中的消息消费者。在Producer将消息发送到Broker之后,RocketMQ提供了Push/Pull两种方式来让Consumer消费数据。还是从RocketMQ Quick Start提供的样例来开始分析。

PushConsumer

先来看下RocketMQ提供的Push方式,

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("SampleTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

消息过滤

Consumer通过subscribe方法订阅消息topic,subscribe订阅时可以进行过滤,存在几种过滤定义方式,

根据表达式过滤tag,表达式支持||操作,例如tag1 || tag2 || tag3,以及*

public class DefaultMQPushConsumer {
    public void subscribe(String topic, String subExpression) throws MQClientException {...}
}

提供org.apache.rocketmq.common.filter.MessageFilter的实现类进行过滤,

public class DefaultMQPushConsumer {
    public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {...}
}

通过org.apache.rocketmq.client.consumer.MessageSelector进行过滤,MessageSelector支持两种表达式,一是SQL92,另外则是tag过滤,

public class DefaultMQPushConsumer {
    public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {...}
}

消费起点

上面样例代码中需要注意setConsumeFromWhere这个调用,这个方法设置了Consumer从个消息开始消费,org.apache.rocketmq.common.consumer.ConsumeFramWhere定义了支持的集中方式,

public enum ConsumeFromWhere {
    CONSUME_FROM_LAST_OFFSET,
    CONSUME_FROM_FIRST_OFFSET,
    CONSUME_FROM_TIMESTAMP,
} 
  • CONSUME_FROM_LAST_OFFSET,只消费Consumer启动之后新来的消息
  • CONSUME_FROM_FIRST_OFFSET,消费当前已经存在的所有消息
  • CONSUME_FROM_TIMESTAMP,消费某个时间之后的消息

顺序消费/并行消费

Consumer在消费消息的时候需要通过registerMessageListener设置消息处理函数,存在两种方式,

  • MessageListenerOrderly,顺序消费消息
  • MessageListenerConcurrently,并行消费消息

如果消息顺序无关,那么并行消费的性能更佳。

广播消费/集群消费

当存在多个Consumer时,消息消费还可以通过设置消费模式,

// Consumer
consumer.setMessageModel(MessageModel.BROADCASTING);

public enum MessageModel {
    BROADCASTING("BROADCASTING"),
    CLUSTERING("CLUSTERING");
}
  • MessageModel.BROADCASTING,广播模式,一个ConsumerGroup的每一个Consumer都会收到消息
  • MessageModel.CLUSTERING,集群模式,一个ConsumerGroup下只有一个订阅者会收到消息

PullConsumer

RocketMQ还提供了拉取模式下的消息消费,相关实现为,

public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer {
    ...
}

PullConsumer提供了一组pull接口,写起来相对PushConsumer麻烦。

RocketMQ(五)Producer实现 2018-07-06 06:51:21 numerical /2018/07/06/RocketMQ五Producer实现

在初步了解Producer提供的功能之后,具体来看下其内部实现。

类型定义

RocketMQ默认提供org.apache.rocketmq.client.producer.DefaultMQProducer作为实现,DefaultMQProducer实现了MQProducer接口,MQProducer接口继承了MQAdmin接口。DefaultMQProducer另外还继承了ClientConfig。

MQAdmin

MQAdmin接口中定义了一组操作查询Topic/MessageQueue的接口方法,

public interface MQAdmin {
    void createTopic(final String key, final String newTopic, final int queueNum) throws MQClientException;
    void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException;
    long searchOffset(final MessageQueue mq, final long timestamp) throws MQClientException;
    long maxOffset(final MessageQueue mq) throws MQClientException;
    long minOffset(final MessageQueue mq) throws MQClientException;
    long earliestMsgStoreTime(final MessageQueue mq) throws MQClientException;
    MessageExt viewMessage(final String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
    QueryResult queryMessage(final String topic, final String key, final int maxNum, final long begin, final long end) throws MQClientException, InterruptedException;
    MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException;
}

MQProducer

MQProducer里面则是定义了一组消息send接口,就是之前提及的同步、异步、批量发送等。

不过这个继承方式和接口实现方式,感觉并不是那么合理。参看MQConsumer接口也是同样继承MQAdmin,Consumer实现类也是继承ClientConfig。更合理的定义应该是MQProducer/MQAdmin之间无继承关系,ClientConfig作为实例去持有。

ClientConfig

来看下org.apache.rocketmq.client.ClientConfig的定义,

public class ClientConfig {
    public static final String SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY = "com.rocketmq.sendMessageWithVIPChannel";
    private String namesrvAddr = System.getProperty(MixAll.NAMESRV_ADDR_PROPERTY, System.getenv(MixAll.NAMESRV_ADDR_ENV));
    private String clientIP = RemotingUtil.getLocalAddress();
    private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
    private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
    private int pollNameServerInterval = 1000 * 30;
    private int heartbeatBrokerInterval = 1000 * 30;
    private int persistConsumerOffsetInterval = 1000 * 5;
    private boolean unitMode = false;
    private String unitName;
    private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
    private boolean useTLS = TlsSystemConfig.tlsEnable;
}

因为MQProducer、MQConsumer都继承了ClientConfig,所以里面包含的字段包含了两者需要的内容。

  • pollNameServerInterval,定期访问NameServer的间隔
  • heartbeatBrokerInterval,与Broker之间心跳间隔
  • persistConsumerOffsetInterval,Consumer使用的offset持久化间隔
  • unitMode,暂时还未弄清楚其代表的功能
  • vipChannelEnabled,Broker会启动两个通信服务,一些操作类的消息通过vipChannel进行发送,与普通消息Channel进行区分

DefaultMQProducer

终于到了具体的Producer实现,还是先来看下字段定义,

public class DefaultMQProducer extends ClientConfig implements MQProducer {

    protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
    private String producerGroup;
    private String createTopicKey = MixAll.DEFAULT_TOPIC;
    private volatile int defaultTopicQueueNums = 4;
    private int sendMsgTimeout = 3000;
    private int compressMsgBodyOverHowmuch = 1024 * 4;
    private int retryTimesWhenSendFailed = 2;
    private int retryTimesWhenSendAsyncFailed = 2;
    private boolean retryAnotherBrokerWhenNotStoreOK = false;
    private int maxMessageSize = 1024 * 1024 * 4;
}

从实际代码来看DefaultMQProducer仅仅定义了少量参数字段,上面几个参数基本从命名可以判断其对应的功能。MQProducer、MQAdmin中定义的接口,实际代码是现在DefaultMQProducerImpl中,DefaultMQProducer仅仅包装了一层。

DefaultMQProducerImpl

大量Producer的实际逻辑在DefaultMQProducerImpl中实现,

public class DefaultMQProducerImpl implements MQProducerInner {
    private final Logger log = ClientLogger.getLog();
    private final Random random = new Random();
    private final DefaultMQProducer defaultMQProducer;
    private final ConcurrentMap<String, TopicPublishInfo> topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();
    private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
    private final RPCHook rpcHook;
    protected BlockingQueue<Runnable> checkRequestQueue;
    protected ExecutorService checkExecutor;
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    private MQClientInstance mQClientFactory;
    private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
    private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
}
  • topicPublishInfoTable,记录了topic对应的路由信息,更具体的内容等到后面再来看
  • sendMessageHookList,记录设置的在发送消息前后执行的hook对象
  • rpcHook,记录设置的在网络消息收发时候的hook对象
  • checkExecutor/checkRequestQueue,事务消息逻辑相关
  • serviceState,当前Producer的状态
    • ServiceState中定义了所有状态,分别为CREATE_JUST、RUNNING、SHUTDOWN_ALREADY、START_FAILED
  • mQClientFactory,实际负责网络通信的实现
  • checkForbiddenHookList,记录发送消息时候的CheckForbiddenHook实现,代码中暂时没有找到哪个类实现了CheckForbiddenHook接口
  • zipCompressLevel,消息数据压缩时候的级别
  • mqFaultStrategy,封装了错误处理相关的少数接口

MQClientInstance

MQClientInstance实际处理了Producer/Consumer与Broker/NameServer之间的通信,后续单独进行分析。

Producer初始化流程

从Producer的使用样例出发,

DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
producer.start();

具体的start逻辑在,

public class DefaultMQProducerImpl {
    public void start(final boolean startFactory) throws MQClientException {...}
}

其大体流程为,

  • 检查group name是否合法
  • 创建MQClientInstance
  • 注册自身信息到MQClientInstance
  • 调用MQClientInstance.start
    • 获取NameServer地址信息
    • 调用MQClientAPIImpl.start,启动网络连接
    • 开启一些定时任务
  • 开始给Broker发送心跳信息

Producer消息发送调用流程

以同步发送模式为例,调用逻辑会进入到,

public class DefaultMQProducerImpl {
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        ...
    }
}

在发送前会进行消息合法性检查,会检查topic是否合法,消息大小是否在限定范围内,

Validators.checkMessage(msg, this.defaultMQProducer);

在发送前会尝试获取消息路由信息,

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    ...
}

如果在本地无法获取到路由信息,则会尝试从NameServer进行获取,同步从网络获取,

// org.apache.rocketmq.client.impl.factory.MQClientInstance
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
    ...
}

而后选择MessageQueue进行发送,具体的发送逻辑又转到,

public class DefaultMQProducerImpl {
    private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    ...
    }
}

在sendKernelImpl中选择进行通信的channel、执行各种hook函数、构建SendMessageRequestHeader,最后通过MQClientAPIImpl将消息发送出去。

辅助信息类说明

TopicRouteData

TopicRouteData包含了topic关联的broker信息等,

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

这个数据是从NameServer进行获取。

BrokerData

BrokerData包含了Broker集群相关信息,一个Broker集群为1个master+多个slave。Master Broker的brokerId为0,相关信息在Broker启动之后注册到NameServer上,

public class BrokerData implements Comparable<BrokerData> {
    private String cluster;
    private String brokerName;
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
}

QueueData

QueueData在Broker向NameServer进行注册的时候创建,

public class QueueData implements Comparable<QueueData> {
    private String brokerName;
    private int readQueueNums;
    private int writeQueueNums;
    private int perm;
    private int topicSynFlag;
}

上述这些信息的初始化创建逻辑等到看NameServer相关代码的时候再具体分析下。

参考

RocketMQ(四)Producer使用 2018-07-06 06:48:58 numerical /2018/07/06/RocketMQ四Producer使用

Producer负责向RocketMQ中发送消息。之前已经看过Message的定义,现在就来看下如何使用Producer来发送Message。顺便过一下RocketMQ Quick Start上介绍的几种不同功能。

发送方式

Sync Send

Producer同步发送消息,发送方等待发送结果,

public static void syncSend() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
    producer.start();
    byte[] body = ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET);
    Message msg = new Message("SampleTopic", "SampleTag", body);
    SendResult result = producer.send(msg);
    System.out.printf("%s%n", result);
    producer.shutdown();
}

对于调用方来说,发送同步消息就是调用DefaultMQProducer.send这个接口。在send内部实现逻辑中会将消息发送到Broker并等待发送返回。

RocketMQ中的同步、异步发送都是可靠的。直观上感觉同步调用会影响发送方的吞吐,实际上可能是异步发送的情况更多。

Async Send

Producer异步发送,对于调用方来说,发送结果需要在回调中处理,

public static void asyncSend() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
    producer.start();
    byte[] body = "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET);
    Message msg = new Message("SampleTopic", "SampleTag", body);
    producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult result) {
            System.out.printf("send success %s", result.getMsgId());
        }
        @Override
        public void onException(Throwable e) {
            System.out.printf("send fail %s", e);
            e.printStackTrace();
        }
    });
    producer.shutdown();
}

异步的话需要去考虑线程问题,对发送方的效率来说应该是能有效提升。不过如果消息发送过快是否也会带来Broker的压力。

Oneway Send

所谓Oneway其实就是发送消息之后发送方不去处理发送成功与否,相应接口既没有返回值也没有回调函数。一些不重要允许丢失的消息,使用这种方式可能会合适。

public static void onewaySend() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
    producer.start();
    byte[] body = ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET);
    Message msg = new Message("SampleTopic", "SampleTag", body);
    producer.sendOneway(msg);
    producer.shutdown();
}

功能特性

批量发送

Producer提供了批量发送接口,

public static void batchSend() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
    producer.start();
    String topic = "SampleTopic";
    String tag = "SampleTag";
    List<Message> messages = new ArrayList<>();
    for (int i = 0; i < 3; i++) {
        String key = String.format("ID%d", i);
        byte[] body = String.format("Hello %d", i).getBytes();
        messages.add(new Message(topic, tag, key, body));
    }
    producer.send(messages);
}

文档上描述一次发送的批量消息大小不能超过1MB,否则需要分批发送。不过实际代码中默认的最大消息大小在DefaultMQProducer.maxMessageSize中定义,默认是配成了4MB。

如果以Message列表形式进行传参只有同步发送方式,如果想异步Batch发送消息,可以先构建org.apache.rocketmq.common.message.MessageBatch,然后再走异步发送消息接口。实际上在Producer内部也是从Message列表构建MessageBatch来进行处理的。

定时消息

RocketMQ提供了定时消息功能,在介绍Message定义的时候提到了通过设置MessageConst.PROPERTY_DELAY_TIME_LEVEL来配置定时发送。

public static void scheduleSend() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
    producer.start();
    byte[] body = ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET);
    Message msg = new Message("SampleTopic", "SampleTag", body);
    msg.setDelayTimeLevel(3);
    SendResult result = producer.send(msg);
    System.out.printf("%s%n", result);
    producer.shutdown();
}

顺序消息

直接使用官方文档上的例子,

public static void orderSend() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("producer_group_name");
    producer.start();
    String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
    for (int i = 0; i < 100; i++) {
        int orderId = i % 10;
        byte[] body = ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET);
        Message msg = new Message("SampleTopic", tags[i % tags.length], "KEY" + i, body);
        SendResult result = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                return mqs.get(((Integer) arg) % mqs.size());
            }
        }, orderId);
        System.out.printf("%s%n", result);
    }
    producer.shutdown();
}

RocketMQ中提供了顺序消息功能,从提供的接口来看,没有去实现全局消息有序,而是通过将需要维护顺序的消息发送到对应的MessageQueue中去进行处理。一个MessageQueue中的消息顺序可以保证。

事务消息

RocketMQ 4.2.0官方文档上没有提及对事务消息的支持,但是代码中还又transaction message的相关改动。所以不确定这个功能是否正式可用。

参考

RocketMQ(三)Message定义 2018-07-05 06:40:19 numerical /2018/07/05/RocketMQ三Message定义

RocketMQ是消息队列,所以首先来看下消息本身的数据定义吧。

最基础的消息定义类为org.apache.rocketmq.common.message.Message

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;
}

topic

表明Message所属的一级类别,每一个消息都需要设置topic。消息的发送与消费都基于topic来进行处理。

flag

网络通信层标记。在代码中可能与之相关的使用代码有,

// org.apache.rocketmq.remoting.protocal.RemotingCommand

class RemotingCommand {
    @JSONField(serialize = false)
    public boolean isOnewayRPC() {
        int bits = 1 << RPC_ONEWAY;
        return (this.flag & bits) == bits;
    }

    @JSONField(serialize = false)
    public boolean isResponseType() {
        int bits = 1 << RPC_TYPE;
        return (this.flag & bits) == bits;
    }
}

RemotingCommand中的flag字段是一级级传递进去的。

body

存储消息的二进制数据。

properties

记录了Message的其余各项参数,比如tag等。org.apache.rocketmq.common.message.MessageConst中定义了自带的各项property,

public class MessageConst {
    public static final String PROPERTY_KEYS = "KEYS";
    public static final String PROPERTY_TAGS = "TAGS";
    public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
    public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
    public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
    public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";
    public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";
    public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
    public static final String PROPERTY_PRODUCER_GROUP = "PGROUP";
    public static final String PROPERTY_MIN_OFFSET = "MIN_OFFSET";
    public static final String PROPERTY_MAX_OFFSET = "MAX_OFFSET";
    public static final String PROPERTY_BUYER_ID = "BUYER_ID";
    public static final String PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
    public static final String PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG";
    public static final String PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG";
    public static final String PROPERTY_MQ2_FLAG = "MQ2_FLAG";
    public static final String PROPERTY_RECONSUME_TIME = "RECONSUME_TIME";
    public static final String PROPERTY_MSG_REGION = "MSG_REGION";
    public static final String PROPERTY_TRACE_SWITCH = "TRACE_ON";
    public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
    public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES";
    public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME";
}

Message提供了个几个setXXX接口,这些信息也都是存入properties当中,

class Message {
    public void setTags(String tags) {...}
    public void setKeys(Collection<String> keys) {...}
    public void setDelayTimeLevel(int level) {...}
    public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {...}
    public void setBuyerId(String buyerId) {...}
}

分别使用了MessageConst下面这些定义,

  • MessageConst.PROPERTY_TAGS用于表示Message的tag信息,tag为消息的二级类别,在消费消息时可以通过tag进行消息过滤判定。
  • MessageConst.PROPERTY_KEYS用于表示Message的业务信息,由上层应用配置,Consumer据此进行适当的处理,比如处理消息重复发送等。
  • MessageConst.PROPERTY_DELAY_TIME_LEVEL用于表示Message延迟处理级别,RocketMQ提供了有限的延迟消息功能
  • MessageConst.PROPERTY_WAIT_STORE_MSG_OK用于表示Message是否等待数据存储完成
  • MessageConst.PROPERTY_BUYER_ID,没有在代码中找到使用的地方,所以暂不明白其用处

除了上述默认的property定义之外,Message也提供了设置自定义property的接口,

class Message {
    public void putUserProperty(final String name, final String value) {...}
}

对于RocketMQ的调用方来说,对于能够发送的Message内容只需要了解这里的定义就足够。