SOLILOQUIZE RocketMQ os.sh参数说明 2018-09-02 22:15:11 numerical /2018/09/02/RocketMQ-ossh参数说明

RocketMQ在消息读写上的处理是其性能的关键,重度依赖底层操作系统的特性。os.sh中提供了一组默认的操作系统参数配置,这里一条条来分析下。

命令

sudo sysctl -w vm.extra_free_kbytes=2000000

这个参数应该也是用来控制空闲内存大小的。但是很多发行版没有这个参数,一般根据部署环境看是否支持,不支持的话不进行设置应该也没关系。

sudo sysctl -w vm.min_free_kbytes=1000000

设置系统需要保留的最小内存大小。当系统内存小于该数值时,则不再进行内存分配。这个命令默认被注释掉,根据文档看,如果设置了不恰当的值,比如比实际内存大,则系统可能直接就会崩溃掉。实际数值应该根据RocketMQ部署机器的内存进行计算,经验数值大概是机器内存的5% - 10%。

这个数值设置的过高,则内存浪费。若设置的过低,那么在内存消耗将近时,RocketMQ的Page Cache写入操作可能会很慢,导致服务不可用。

sudo sysctl -w vm.overcommit_memory=1

控制是否允许内存overcommit。设为1,则是允许。当应用申请内存时,系统都会认为存在足够的内存,准许申请。

sudo sysctl -w vm.drop_caches=1

设置为1会释放page cache。这个操作是一次性的,在执行该命令时释放page cache,没有持续性作用。

sudo sysctl -w vm.zone_reclaim_mode=0

该配置用于控制zone内存使用完之后的处理策略,设为0则禁止zone reclaim。对于大量使用缓存的应用来说,一般都需要禁止掉。

sudo sysctl -w vm.max_map_count=655360

Rocket使用MMAP映射文件到内存,因此需要设置映射文件的数量,避免MMAP操作失败。

sudo sysctl -w vm.dirty_background_ratio=50

设置内存中的脏数据占比,当超过该值时,内核后台线程将数据脏数据刷入磁盘。

sudo sysctl -w vm.dirty_ratio=50

设置内存的脏数据占比,当超过该至时,应用进程会主动将数据刷入磁盘。

sudo sysctl -w vm.dirty_writeback_centisecs=360000

内核线程会定期启动将内存中的旧数据刷入磁盘,通过此参数可以控制启动间隔。

sudo sysctl -w vm.page-cluster=3

设置一次读操作会加载几个数据页。

sudo sysctl -w vm.swappiness=1

vm.swappiness用于控制使用系统swap空间比例,一般设置为0是禁止使用swap,设为1估计是某些系统不支持设置为0.

RocketMQ的数据存储基于MMAP,大量使用内存,因此需要禁止swap,否则性能会急剧下降。

echo 'ulimit -n 655350' >> /etc/profile

设置可以打开的文件符号数。RocketMQ的存储都是基于文件的,因此稍稍设大默认值。

echo '* hard nofile 655350' >> /etc/security/limits.conf

设置一个进程能够打开的文件数。

echo 'deadline' > /sys/block/${DISK}/queue/scheduler

RocketMQ使用MMAP映射文件到内存,在存在新消息的时候都是追加顺序写,投递消息的时候则是根据Offset从CommitLog进行随机读取,Deadline调度方法会在调度时间内合并随机读为书序读,因此对RocketMQ性能有帮助。

总结

RocketMQ的存储模型以来在MMAP纸上,因此os.sh大部分参数控制都与内存管理相关。这些参数对Broker的性能有明显影响。默认提供的数值可以作为一个参考,实际还是可以根据这些参数的定义与实际机器配置来测试得到更优的配置。

参考

RocketMQ 延时消息实现 2018-09-02 13:35:47 numerical /2018/09/02/RocketMQ-延时消息实现

RocketMQ提供了延时消息实现,不过这个延时是一定级别的延迟,默认在MessageStoreConfig.messageDelayLevel定义,

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

这里来看下RocketMQ实现这一功能的相关实现。

消息存储

延时消息的存储过程与一般消息存在差异,在CommitLog.putMessage中对其做了特殊处理。

设置了Delay Level的消息,在存盘之前Broker会修改Topic定延时Topic,SCHEDULE_TOPIC_XXXX,同时备份原Topic信息到消息属性当中。而后则按正常消息存储流程进行处理。不同的Delay Level会对应到不同的MessageQueue中。

消息投递

因为修改了Topic,所以数据存盘之后Consumer并不能消费到消息。在消费之前需要将消息重新投递回初始的Topic,再经由正常消费逻辑进行处理。

重新投递这一逻辑在ScheduleMessageService中实现。从代码来看,延时的处理策略不复杂。

ScheduleMessageService中通过一个Timer来延时触发投递消息检查。每一个Delay Level对应一个DeliverDelayedMessageTimerTask。当对应Task被触发时会去检查当前Delay Level对应的MessageQueue中待处理的消息时间是否到达,如果没有满足条件则再次定时检查,如果延时时间满足,则恢复Message原Topic,并通过DefaultMessageStore.putMessage重新进入正常消息处理流程。

定时消息

RocketMQ没有提供定时消息功能,如何在MQ中实现支持任意延迟的消息对这个问题进行了分析讨论,个人觉得描述的比较清楚。

如果后续考虑在RocketMQ上实现定时消息功能的话,可以按照类似的思路进行实施。

RocketMQ 消息消费失败的处理策略 2018-09-02 10:24:39 numerical /2018/09/02/RocketMQ-消息消费失败的处理策略

Consumer从Broker拉取到消息之后进行消费,但是消费并不一定都是顺利的,不可避免会遇到一些异常情况,这种情况下RocketMQ提供了怎样的处理机制?以PushConsumer为例来看下与之相关的操作。

Consumer消费异常处理流程

在Consumer使用的时候需要注册MessageListener,对于PushConsumer来说需要注册MessageListenerConcurrently,其中消费消息的接口会返回处理状态,分别是,

  • ConsumeConcurrentlyStatus.CONSUME_SUCCESS,消费成功
  • ConsumeConcurrentlyStatus.RECONSUME_LATER,推迟消费

MessageListener是在ConsumeMessageConcurrentlyService中被调用的,可以看到上述两个状态会分别映射到CMResult定义的枚举值,

  • CMResult.CR_SUCCESS,消费成功
  • CMResult.CR_LATER,推迟消费
  • CMResult.CR_ROLLBACK,事务消息回滚
  • CMResult.CR_COMMIT,事务消息投递
  • CMResult.CR_THROW_EXCEPTION,消费过程异常
  • CMResult.CR_RETURN_NULL,消费结果状态为null

消息消费的结果会在ConsumeMessageConcurrentlyService.processConsumeResult中进行处理。

从代码看返回ConsumeConcurrentlyStatus.RECONSUME_LATER状态之后的处理策略是将该组消息发送回Broker,等待后续消息。发送回的消息会设置重试Topic,重试Topic命名为:"%RETRY%" + Consumer组名。原先实际的Topic会暂存到消息属性当中,以及设置delayLevel以及reconsumeTimes。

Consumer消费的时候可以设置consumeMessageBatchMaxSize来控制传入MessageLisenter的消息数量,这里的失败处理策略是,其中只要有一条消息消费失败就认为全部失败,这一批消息都会发送回Broker。因此consumeMessageBatchMaxSize这个值的设置需要注意,否则容易出现消息重复消费问题。

Broker消费失败消息处理流程

Broker端对应的处理位于SendMessageProcessor.consumerSendMsgBack方法中。对于Consumer发送失败返回的消息,Broker会将其放入重试Topic中。

重试消息的重新投递逻辑与延迟消息一致,等待delayLevel对应的延时一到,Broker会尝试重新进行投递处理。

DelayLevel对应的延时级别是固定的,RocketMQ对应的配置为MessageStoreConfig.messageDelayLevel,默认的级别为,

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

鉴于RocketMQ的实现机制,可以去调整每一个级别对应的时间,但可以看出一是时间精度不够细,二是级别为固定级别。

自定义消费失败处理

Consumer对消费失败可以进行一定程度的介入,默认失败后的级别设置为msg.getReconsumeTimes() + 1,如果Consumer明确知道未来可以成功消费的时间,那么久可以主动去设置重试次数与重试级别来进行控制。

顺序消息非顺序消息的差异

在消息失败处理上,顺序消息与非顺序消息是有明显差异的。对于顺序消息来说,如果消费失败后将其延迟消费,那么顺序性实际就被破坏掉了。

所以顺序消息消费失败的话,消息消费不会再推进,知道失败的消息消费成功为止。

死信

RocketMQ中的消息无法无限次重新消费,当然了,手动修改重试次数是可以的,不介入的话不行。当重试次数超过所有延迟级别之后。消息会进入死信,死信Topic的命名为:%DLQ% + Consumer组名。

进入死信之后的消息肯定不会再投递了,不过可以通过接口去查询当前RocketMQ中私信队列的消息。如果在上层实现自有命令,那么可以将消息从四信中移出并重新投递。

RocketMQ 顺序消息实现 2018-08-29 22:17:27 numerical /2018/08/29/RocketMQ-顺序消息实现

RocketMQ提供了“顺序消息”功能。在分析其用法与实现之前,需要先了解下这个功能本身的定义与约束。

功能定义

支持顺序消息即代表消息队列支持按照消息的顺序投递给消费方。RocketMQ是怎么来支持这种顺序性的?

RocketMQ中顺序消息的顺序定义为到达Broker的先后顺序,而不是消息自身的逻辑顺序。因此需要Producer将顺序相关的消息投递到同一个MessageQueue中,这样通过RocketMQ默认的消息处理与存储机制来保证消息投递的顺序性。

因此,使用者需要注意发送顺序消息需要有明确的先后顺序。如果同一个Producer发送顺序消息,那么自然发送顺序回合消息到达Broker顺序一致,如果是不同机器上的多个Producer发送,如果时间很临近,那么逻辑上的顺序性是不能够保证的。

使用方法

了解基本定义与约束之后,直接看下作为调用方通过怎样的接口来使用顺序消息,

发送顺序消息

前面说了顺序消息需要发送到同一个MessageQueue中,因此Producer在发送消息的时候需要指定MessageQueue,一般会选择通过实现MessageQueueSelector来进行分发,

String topic = "Topic";
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.start();
for (int i = 0; i < 10; i++) {
    Message msg = new Message(topic, String.format("msg %d", i).getBytes());
    producer.send(msg, new MessageQueueSelector() {
        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg) {
            return list.get(0);
        }        
    }, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
        }

        @Override
        public void onException(Throwable throwable) {
        }
    });
}

消费消息

顺序消息的消费方也需要一定处理,通过注册MessageListenerConcurrently的实现来消费顺序消息,

String topic = "Topic";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(topic, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt msg : list) {
            System.out.println(msg);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

内部实现

Producer的相关逻辑很简单,在发送消息之前通过MessageQueueSelector的接口来选择MessageQueue,而后再进行正常的消息发送流程就可以了,简单的调用流程为,

DefaultMQProducer.send
DefaultMQProducerImpl.send
DefaultMQProducerImpl.sendSelectImpl
DefaultMQProducerImpl.sendKernelImpl

相对复杂的处理在Consumer端。对于普通消息与顺序消息,Consumer分别通过ConsumeMessageConcurrentlyService、ConsumeMessageOrderlyService来进行处理。

先来推测下,因为存在多个Consumer,不同Consumer之间消费消息的顺序如何进行保证呢?

BROADCASTING模式

广播模式下,每一条消息都会发送给所有Consumer,因此Consumer之间不需要进行协调。每个Consumer按顺序消费消息就可以了。

CLUSTERING模式

如果是CLUSTERING模式,一个消息只会被投递到一个Consumer,怎么实现?

从ConsumeMessageOrderlyService代码来看,它会定期向Broker发送lockBatchMQ操作,尝试对MessageQueue进行加锁。而后每次消费消息的时候创建ConsumeRequest进行异步消费,开始消费之前先判定本地是否获取了ProcessQueue的锁,

this.processQueue.isLocked() && !this.processQueue.isLockExpired())

ConsumeMessageOrderlyService中通过定时任务不断去尝试进行加锁,如果Broker返回成功,则设置本地ProcessQueue的相应状态。Broker端也会记录锁状态,锁存在一个超时机制,默认超时为RebalanceLocalManager.REBALANCE_LOCK_MAX_LIVE_TIME

协调完不同Consumer之后,Consumer内部是通过多线程去进行消费的,因此ConsumeMessage的处理中会对MessageQueue进行锁粒度控制,不同MessageQueue并行消费。

总结

RocketMQ的顺序消息实现的并不复杂,但提供的这个特性有一定约束前提在。此外通过ConsumeMessageOrderlyService可以发现,为了保证消费有序,引入了一个分布式锁的概念实现,顺序消息的响应吞吐比普通消息会差很多。

RocketMQ Consumer注意点 2018-08-28 23:15:01 numerical /2018/08/28/RocketMQ-Consumer注意点

RocketMQ中提供了DefaultMQPushConsumer、DefaultMQPullConsumer,这里主要分析下DefaultMQPushConsumer实现中一些需要注意的地方。

消息获取

DefaultMQPushConsumer对外层暴露的接口像是推送模式,但实际阅读代码就会发现,其消息获取逻辑也还是由Consumer向Broker去进行拉取而获得的。

PullMessageService是用于拉取消息的独立线程,在Consumer启动时该线程会启动。PullMessageService实例位于MQClientInstance中,一个进程中的多个Consumer Group会共享同一个PullMessageService。

PullMessageService会一直发起拉取消息请求,如果顺利拉取到消息,将会将消息存放到ProcessQueue中等待消费。不过预先拉取的消息存在一定限制,

  • DefaultMQPushConsumer.pullThresholdForQueue,限制预取的消息条数,默认1000条
  • DefaultMQPushConsumer.pullThresholdSizeForQueue,限制预取的消息大小,默认100MB

预取缓存在本地是基于MessageQueue进行区分的,因此同一个Consumer订阅不同Topic之间不会产生互相影响。

均衡处理

RocketMQ Consumer实现中包含了对消息队列消费的负载均衡处理。RebalancePushImpl、RebalancePullImpl分别对应Push、Pull下的均衡处理。

RebalanceService是Consumer中的一个独立线程,按照固定等待间隔,持续触发RebalanceImpl的doRebalance接口,进行均衡处理。

具体均衡逻辑操作在ReplaceImpl.rebalanceByTopic中,均衡的处理也是基于MessageQueue。均衡的目的在于将Topic下的MessageQueue按照策略分配给不同的Consumer实例进行消费处理。

RocketMQ中提供的MessageQueue分配策略实现有,

  • AllocateMachineRoomNearby,根据机房进行分配
  • AllocateMessageQueueAveragely,均分哈希策略
  • AllocateMessageQueueAveragelyByCircle,环形哈希策略
  • AllocateMessageQueueByConfig,根据配置进行分配
  • AllocateMessageQueueByMachineRoom,机房哈希
  • AllocateMessageQueueConsistentHash,一致性哈希

默认的策略是AllocateMessageQueueAveragely。

从这个均衡处理可以看到一个MessageQueue只会有一个Consumer进行消费。因此在实际运行环境中,Topic下面MessageQueue的数量值得关注,MessageQueue数量少而Consumer多的话,很多Consumer会消费不到消息,造成性能损失。

OffsetStore

OffsetStore这个概念用于表达Consumer消费位点的存储。不同消费模式下游不同的OffsetStore实现,

  • LocalFileOffsetStore,存储在本地,对应BROADCASTING消费模式
  • RemoteBrokerOffsetStore,存储在Broker,对应CLUSTERING消费模式

CLUSTERING模式下一条消息只会发送给一个Consumer,因此记录在Broker。BROADCASTING则是发送给所有Consumer,因此记录在各Consumer端。BROADCASTING模式需要注意一些,因为本地增加了一个有状态的记录文件,如果文件丢失则会再重新消费。

RocketMQ Consumer使用方法 2018-08-27 22:23:20 numerical /2018/08/27/RocketMQ-Consumer使用方法

Consumer也是RocketMQ中的很重要一环,RocketMQ提供了Consumer相关接口,Consumer在上层系统调用。RocketMQ的一些功能特性需要Consumer配合才能真正有效,因此Consumer的使用方法是很有必要去了解的。

消费模式

在使用层面上,RocketMQ提供了Push/Pull两种方式让Consumer消费数据。

DefaultMQPushConsumer

先来看下Push方式下的Consumer使用,RocketMQ提供了DefaultPushConsumer,实现,

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SampleGroup");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("SampleTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

在Push模式下,调用方注册MessageListener,消息的消费逻辑在MessageListner的接口中被异步调用。

DefaultMQPullConsumer

RocketMQ中的DefaultMQPullConsumer是默认的Pull方式实现,

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("SampleGroup");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.start();

Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("SampleToic");
for (MessageQueue mq : mqs) {
    while (true) {
        try {
            long offset = consumer.fetchConsumeOffset(mq, false);
            PullResult pullResult = consumer.pull(mq, "*", offset, MAX_PULL_NUM);
            if (pullResult.getPullStatus() != PullStatus.FOUND) {
                break;
            }
            for (MessageExt msg : pullResult.getMsgFoundList()) {
                System.out.println("Msg pulled");
            }
            consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
            consumer.getDefaultMQPullConsumerImpl().updateConsumeOffsetToBroker(mq, pullResult.getNextBeginOffset(), true);
        } catch (Exception e) {
            logger.error("handlePull", e);
            break;
        }
    }
}

单纯从上面的使用方式来看,Pull方式用起来略微繁琐一些。两者的差异有,

  • Push方式下消息消费逻辑时被异步调用的,Pull方式下是在调用方线程直接调用
  • Push方式消息消费有专门的线程池进行处理,处理效率更高
  • Push方式下消费进度信息自动进行了记录,Pull方式下需要手动调用相关接口更新消费偏移
  • Push方式不能细粒度控制,Pull方式可以自主选择拉取消息的时机以及处理方式
  • Push方式下消息消费失败重试走默认流程,Pull方式可以根据需求主动控制是重新拉取、等待还是放弃

各项参数

消费模式

当存在多个Consumer时,消息消费还可以通过设置消费模式,RocketMQ默认为CLUSTERING模式,

// Consumer
consumer.setMessageModel(MessageModel.BROADCASTING);
定义描述
MessageModel.BROADCASTING广播模式, 一个ConsumerGroup的每一个Consumer都会收到消息
MessageModel.CLUSTERING集群模式, 一个ConsumerGroup下只有一个订阅者会收到消息

消费起点

上面样例代码中需要注意setConsumeFromWhere这个调用,这个方法设置了Consumer从个消息开始消费,ConsumeFramWhere定义了支持的几种方式,

public enum ConsumeFromWhere {
    CONSUME_FROM_LAST_OFFSET,
    CONSUME_FROM_FIRST_OFFSET,
    CONSUME_FROM_TIMESTAMP,
} 
定义描述
CONSUME_FROM_LAST_OFFSET只消费Consumer启动之后新来的消息
CONSUME_FROM_FIRST_OFFSET消费当前已经存在的所有消息
CONSUME_FROM_TIMESTAMP消费某个时间之后的消息

顺序消费/并行消费

Consumer在消费消息的时候需要通过registerMessageListener设置消息处理函数,存在两种方式,

定义描述
MessageListenerOrderly顺序消费消息
MessageListenerConcurrently并行消费消息

非顺序消息的话都应该使用MessageListenerConcurrently。顺序消息只能使用MessageListenerOrderly,如果顺序消息下用错了MessageListener,实际消费到的消息是没有顺序可言的。

消息过滤

Consumer通过subscribe方法订阅消息topic,subscribe订阅时可以进行过滤,存在几种过滤定义方式,

根据表达式过滤tag,表达式支持||操作,例如tag1 || tag2 || tag3,以及*

public class DefaultMQPushConsumer {
    public void subscribe(String topic, String subExpression) throws MQClientException {...}
}

提供MessageFilter的实现类进行过滤,

public class DefaultMQPushConsumer {
    public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {...}
}

通过MessageSelector进行过滤,MessageSelector支持两种表达式,一是SQL92,另外则是tag过滤,

public class DefaultMQPushConsumer {
    public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {...}
}

如无特别复杂需求的话,使用Tag过滤最好,ConsumeQueue的条目中记录了Tag的Hash值,可以更高效的进行过滤。

RocketMQ CommitLog刷盘机制 2018-08-25 17:50:35 numerical /2018/08/25/RocketMQ-CommitLog刷盘机制

CommitLog的存储结构之前已经分析了,那么就具体来看下Broker是怎么将数据落地写入到磁盘的。

线程模型

在消息写入上,Broker的实现可以认为是“并行处理、单一写入”。

Broker与Producer之间通过Netty维持长连接。Producer发送过来的消息在经过处理之后会交由BrokerController.sendMessageExecutor线程组进行处理。不过在最终写入CommitLog内存数据的时候进行了一步加锁处理。在写入数据之前,需要申请CommitLog.putMessageLock锁。内存中CommitLog写入完成后会释放锁,之后的步骤也是并发处理的。

从代码上可以看到,消息在进入Broker之后经过的调用流程不长,不考虑数据持久化的情况下,在性能上影响最大的就是加锁那一步的处理。

同步刷盘/异步刷盘

通过MessageStoreConfig.flushDiskType可以配置Broker刷盘方式。存在两种刷盘方式,同步刷盘与异步刷盘。

CommitLog.putMessage结束的最后调用CommitLog.handleDiskFlush方法,此方法中会根据刷盘策略,分别进行同步、异步处理。实际代码逻辑中,同步、异步方式,写入文件都是在独立的FlushCommitLogService中实现的。两者的差别主要在同步模式下,原线程进行了等待处理,异步模式下,原线程只是进行通知而不进行等待。

MappedFile

Broker收到的消息都是存储在CommitLog中,CommitLog会对应一个个独立的数据文件。I/O操作一般都是最为耗时的,RocketMQ为了提升I/O效率,使用了NIO中提供的FileChannel.map方法,将文件以mmap形式映射到内存中。

Broker收到消息之后会将消息写入MappedFile的mappedByteBuffer当中,而后再由刷盘实现将内存中变更的数据刷入磁盘当中。在异步刷盘模式下,因为刷盘操作位于独立线程,因此可以获得很好的性能。

TransientStorePool

与数据存储相关的还有TransientStorePool这一实现。通过MessageStoreConfig.transientStorePoolEnable可以进行配置,不过只在异步刷盘模式下生效。

TransientStorePool与MappedFile在数据处理上的差异在什么地方呢?分析其代码,TransientStorePool会通过ByteBuffer.allocateDirect调用直接申请对外内存,消息数据在写入内存的时候是写入预申请的内存中。在异步刷盘的时候,再由刷盘线程将这些内存中的修改写入文件。

那么与直接使用MappedByteBuffer相比差别在什么地方呢?修改MappedByteBuffer实际会将数据写入文件对应的Page Cache中,而TransientStorePool方案下写入的则为纯粹的内存。因此在消息写入操作上会更快,因此能更少的占用CommitLog.putMessageLock锁,从而能够提升消息处理量。使用TransientStorePool方案的缺陷主要在于在异常崩溃的情况下回丢失更多的消息。

方案选择

同步/异步方式下,能够明显感知到的是Broker的处理性能。同步的优点则是能够保证不丢消息。所以同步/异步的选择就是在性能与消息可靠性上做选择。TransientStorePool能够进一步提升性能,但也增加了异常情况下的消息丢失数量。所以具体根据需求场景去进行选择吧。

通常情况下,消息队列本身就没有被看做是一个特别稳定的消息源,因此异步的选择更为常见。

RocketMQ Broker数据处理 2018-08-25 12:58:46 numerical /2018/08/25/RocketMQ-Broker数据处理

Producer发送的消息会进入Broker,在Broker正确处理完消息之后,Consumer才能够获取到消息进行消费。RocketMQ消息处理能力的关键在于Broker相关逻辑的实现。如果能够理清楚Broker的内在细节,那么可以说是真正掌握了RocketMQ的用法。这里尝试对Broker进行分析吧。

Broker结构

Broker的结构从整体上来看其实与Name Server差异不大,基本都是Startup + Controller形式,Controller中再封装大量成员组件,具体逻辑由这些成员组件来进行完成。Broker与Producer/Consumer/NameServer之间都是通过Netty维持长连接。

Broker的整体工作流程比较简单,Broker收到来自Producer的消息,将消息持久化,而后对其创建索引与ConsumeQueue。Consumer发送数据请求到Broker拉取数据。Broker定期注册自身信息到Name Server,同时接受Name Server发送过来的一些请求命令。

Broker数据

Broker维护核心数据都是和消息直接相关,分别是CommitLog、ConsumeQueue、IndexFile,这三个数据存储流程大致可以简化为,

CommitLog

数据写入CommitLog是第一步,CommitLog写入之后通过dispatch形式去写入ConsumeQueue与IndexFile。CommitLog写入能力实际就是Broker处理消息的能力。

消息进入CommitLog的路径很明确,考虑正常消息情况,Netty层收到RequestCode.SEND_MESSAGE、RequestCode.SEND_MESSAGE_V2之后会将其交由SendMessageProcessor处理,会经由DefaultMessageStore.putMessage方法到达CommitLog.putMessage。消息格式持久化写入逻辑在DefaultAppendMessageCallback.doAppend中。这个方法中会将Message对象转化成固定格式的Byte二进制形式进行存储。消息存储格式,

RocketMQ中,不同Topic的消息都会写入同一个CommitLog文件中,CommitLog存放路径位于MessageStoreConfig.storePathCommitLog中。单个CommitLog文件大小限制为MessageStoreConfig.mapedFileSizeCommitLog。

ConsumeQueue

ConsumeQueue对应MessageQueue,记录了每一个Queue中每一条消息在CommitLog中的存储偏移,Consumer能够消费到的消息一定是写入ConsumeQueue之后的消息。其每一条记录的存储格式为,

ConsumeQueue在持久化机制上与CommitLog类似,其路径大小也在MessageStoreConfig中进行配置。

IndexFile

在消息发送消费逻辑下IndexFile倒不是必需品,IndexFile顾名思义就是索引文件,这个索引是针对消息中的Keys进行构建的。其整体结构概念类似文件化了的哈希表,查询添加方式类似哈希表的连链地址法。IndexFile的作用就是给定Topic以及Key查询消息。主要应用场景也是在数据统计以及问题排查上。

单个IndexFile的大小是固定的,其计算方式为,

IndexHeader.INDEX_HEADER_SIZE + (MessageStoreConfig.maxHashSlotNum * 4) + (MessageStoreConfig.maxIndexNum * 20)

其结构为,

总结

CommitLog、ConsumeQueue、IndexFile是Broke人中维护的主要数据,这里暂时只是介绍了其格式与大体用途,后续会再来具体分析其写入过程逻辑。

Broker中还有什么其它数据吗?其实可以去Broker存储目录下去观察,RocketMQ的数据存储都是基于文件的,所以可以直接通过文件系统上存在的内容来进行了解。

RocketMQ Producer使用方法 2018-08-19 11:00:15 numerical /2018/08/18/RocketMQ-Producer使用方法

RocketMQ系统中的消息来自于Producer,Producer的功能就是发送消息,这里简要了解下发送消息的一些需要注意的地方。

调用流程

RocketMQ提供的Producer实现为DefaultMQProducer。Producer在创建的时候必须指定Producer Group Name,在正式发送消息之前需要调用start方法初始化Producer,

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.start();

Producer创建完成之后则可以发送消息,消息能够发送的前提是存在对应的路由信息,基本发送流程可以简化为,

消息发送方式

Producer发送消息存在三种模式,分别为:Sync、Async、Oneway,

同步发送(Sync)

同步发送,在调用发送结果之后会等待发送结果,对于调用方来说式同步使用方式,

Message msg = new Message("SampleTopic", "SampleTag", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(msg);

异步发送(Async)

异步发送,调用方在发送消息之后立即返回,消息发送的结果处理在回调中进行,

Message msg = new Message("SampleTopic", "SampleTag", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult result) {
        System.out.printf("send success %s", result.getMsgId());
    }
    @Override
    public void onException(Throwable e) {
        System.out.printf("send fail %s", e);
        e.printStackTrace();
    }
});

发送且无返回结果(One Way)

同步发送、异步发送都有一个发送结果的处理,但是对应Oneway来说,就是管发不管结果,成功失败与否发送方表示不在意,


Message msg = new Message("SampleTopic", "SampleTag", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);

三种方式的差异与适用场景

其实Producer这三种发送方式从代码实现角度来说,根本的差别都在NettyRemotingAbstract中。RocketMQ各构件之间都是通过Netty进行网络通信的。同步发送最终的调用为,NettyRemotingAbstract.invokeSyncImpl,在写入数据之后直接等待结果返回,

public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) {
    ...
    RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
    ...
}

发送超时的配置为DefaultMQProducer.sendMsgTimeout,通过set接口进行配置。

对于异步、Oneway发送来说,两者在发送时的处理是类似的,分别通过semaphoreAsync、semaphoreOneway控制同时发送的数量。这两个参数在NettyClientConfig中进行配置,默认值为,

public class NettyClientConfig {
    private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
    private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;
}

在使用场景方面,如果发送方对消息发送要立即掌控发送成功与否,那么比较适合用同步发送。如果是批量发送很多消息,那么异步发送比较合适。Oneway理论上比较适合发送那些可以丢弃的消息,但实际中估计比较少用。

消息功能特性

RocketMQ针对消息提供了一些功能特写,大部分都是Producer/Consumer互相配合着来的,这里主要看下Producer的处理,

消息过滤处理

Producer发送消息可以设置Tags,这样在Consumer消费的时候可以根据Tags区进行过滤。

根据Key查询消息

RocketMQ中的消息可以通过MsgId进行查询,也可以通过Topic + Key的方式进行查询,这里的Key,就是通过Message.setKeys设置的,也是在Producer发送消息之前进行配置的。

延时消息

RocketMQ支持延时消息,但是延时消息的级别是固定的,只能选择预定义的延时级别,默认的级别配置在MessageStoreConfig中定义,

public class MessageStoreConfig {
    ...
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    ...
}

在消息发送时通过Message.setDelayTimeLevel设置对应级别。

顺序消息

RocketMQ也提供了顺序消息支持。对于Producer来说,想使用顺序消息特性就需要将消息有序的发送到同一个MessageQueue当中,这样才能利用RocketMQ的存取特性实现顺序消息功能。

Producer提供了发送指定MessageQueue的接口,对于发送方来说,需要显示定义MessageQueue的选择规则,

Message msg = new Message("SampleTopic", "SampleTag", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        return mqs.get(((Integer) arg) % mqs.size());
    }
}, orderId);

顺序消息是需要Producer/Consumer配合实现,严格顺序则在RocketMQ的运维部署上也存在一定要求。

批量发送

Producer提供了批量发送接口,

List<Message> messages = new ArrayList<>();
for (int i = 0; i < 3; i++) {
    String key = String.format("ID%d", i);
    byte[] body = String.format("Hello %d", i).getBytes();
    messages.add(new Message("SampleTopic", "SampleTag", key, body));
}
producer.send(messages);

文档上描述一次发送的批量消息大小不能超过1MB,否则需要分批发送。不过实际代码中默认的最大消息大小在DefaultMQProducer.maxMessageSize中定义,默认是配成了4MB。

如果以Message列表形式进行传参只有同步发送方式,如果想异步Batch发送消息,可以先构建org.apache.rocketmq.common.message.MessageBatch,然后再走异步发送消息接口。实际上在Producer内部也是从Message列表构建MessageBatch来进行处理的。

发送Hook

RocketMQ在发送流程上提供了注册Hook相关接口。可以设置两种Hook,一种是RPCHook,另外是SendMessageHook。

RPCHook是在每次有Netty网络操作时调用。SendMessageHook是在发送消息前、发送消息后分别调用其接口。这些Hook里面可以做一些Hack的处理,或者是简单的日志统计。

注意事项

Producer提供的接口实际是比较简单的,要说需要注意的话就是要关注发送的日志。日志相关配置在ClientLogger中,默认写入的位置是~/logs/rocketmqlogs/rocketmq_client.log。

如果出现Producer发送消息,但Consumer无法消费到消息。那么首先要排查下Producer是否真正发出,那么一是要看rocketmq_client.log中是否存在异常日志,二是需要到Broker中去看消息的发送统计,最后再看是否Consumer那一方是否存在异常。

RocketMQ Name Server的功能与定位 2018-08-18 16:33:57 numerical /2018/08/18/RocketMQ-Name-Server的功能与定位

RocketMQ 架构与核心概念中已经提到过Name Server的作用,这里再具体分析一下。

结构

Name Server可以存在多个实例,但是多个实例彼此之间是无关联的。但是它们的数据会相同,Broker/Producer/Consumer往Name Server写数据的时候,实际是往所有实例中去写,以此来保证多个Name Server实例间的数据一致性。通过这种简单的同构方式来获得高可用能力。

功能

为什么需要Name Server这个角色的存在?Producer、Consumer在发送、消费的消息都是与Broker打交到,那它们是怎么知道与哪个Broker去交互呢?类似这样的信息就需要从Name Server去获取。Name Server提供的最主要的功能就是Broker信息的发现、Topic路由信息的维护。Name Server承担的就是服务发现、配置分发功能。

那为什么RocketMQ中实现了Name Server,而不是选择与一个提供类似功能的第三方服务去进行集成呢?个人推测,

  • 一是因为RocketMQ这个项目发起的时候,并不存在一个可靠好用的替代者
  • 二是Name Server只需要提供这么简单的功能,没有必要引入一个复杂的第三方
  • 三是替换成第三方服务后需要调整原先实现,增加了系统的复杂度,且没有实质上的改进

Topic信息维护

Name Server提供的最重要的功能就是Topic路由信息的维护,任何发送消费的消息都归属于某个Topic,发送消费都需要和Broker打交道。Name Server维护了Topic/Broker的关联关系。Topic/Broker之间的关系则是Broker定期上报Name Server,Name Server自身定期检查Broker信息是否有效,如果Broker宕机或下线,对应的路由信息会被移除。Name Server中用于维护相关信息的代码实现主要位于RouteInfoManager,其关键用于记录的成员为,

public class RouteInfoManager {
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}

Broker信息维护

其实和上面内容一样,Broker信息维护在RouteInfoManager当中,当在命令行下通过命令去操控修改某cluster或broker上配置时,都是通过Name Server先行获取实际目标地址,再来进行后续操作。

分布式KV

除了既定的Topic路由信息之外,Name Server还提供了一个分布式KV管理服务,主要由KVConfigManager、TimedKVConfigManager去维护。Name Server提供的KV主要还是用于RocketMQ自身需要的一些全局性配置。分布式的KV服务比如Redis/Memcache等也能提供相似的功能,但也没有被使用。原因推测可能一是因为在消息队列场景下,没有太多这种KV需要配置,二是不想再引入依赖。

配置

Name Server的配置项不多,除了通用的NettyServerConfig之外,其余配置项都在NamesrvConfig中,

public class NamesrvConfig {
    private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
    private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
    private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
    private String productEnvName = "center";
    private boolean clusterTest = false;
    private boolean orderMessageEnable = false;
}

主要的配置项就是两个路径。Name Server中维护的KV信息发生变更时,就会持久化到磁盘。数据保存到Name Server本地的风险就是,如果Name Server所在机器硬盘损坏,那么RocketMQ集群就不是能够立即恢复的了,因此实际使用中部署多个Name Server是必要的。对于Topic、Broker等信息,Broker会定期上报,因此无需存盘处理。这部分则是可以任意恢复的数据。

RocketMQ Message结构的定义 2018-08-12 17:29:17 numerical /2018/08/12/RocketMQ-Message结构的定义

RocketMQ中消息的结构定义虽然简单,不过还是有必要了解下。

Message

RocketMQ消息定义相关的代码位于org.apache.rocketmq.common.message下,Producer发送的消息定义为Message类,其字段定义为,

topic

Message都有Topic这一属性,Producer发送指定Topic的消息,Consumer订阅Topic下的消息。通过Topic字段,Producer会获取消息投递的路由信息,决定发送给哪个Broker。

flag

网络通信层标记。在代码中可能与之相关的使用代码有,

// org.apache.rocketmq.remoting.protocal.RemotingCommand

class RemotingCommand {
    @JSONField(serialize = false)
    public boolean isOnewayRPC() {
        int bits = 1 << RPC_ONEWAY;
        return (this.flag & bits) == bits;
    }

    @JSONField(serialize = false)
    public boolean isResponseType() {
        int bits = 1 << RPC_TYPE;
        return (this.flag & bits) == bits;
    }
}

翻阅代码之后只找到上面这一出与该字段相关的逻辑,并没有特别明确该字段的作用。

body

Producer要发送的实际消息内容,以字节数组形式进行存储。Message消息有一定大小限制。

transactionId

RocketMQ 4.3.0引入的事务消息相关的事务编号。

properties

该字段为一个HashMap,存储了Message其余各项参数,比如tag、key等关键的消息属性。RocketMQ预定义了一组内置属性,除了内置属性之外,还可以设置任意自定义属性。当然属性的数量也是有限的,消息序列化之后的大小不能超过预设的最大消息大小。

系统内置属性定义于org.apache.rocketmq.common.message.MessageConst

对于一些关键属性,Message类提供了一组set接口来进行设置,

class Message {
    public void setTags(String tags) {...}
    public void setKeys(Collection<String> keys) {...}
    public void setDelayTimeLevel(int level) {...}
    public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {...}
    public void setBuyerId(String buyerId) {...}
}

这几个set接口对应的作用分别为为,

属性接口用途
MessageConst.PROPERTY_TAGSsetTags在消费消息时可以通过tag进行消息过滤判定
MessageConst.PROPERTY_KEYSsetKeys可以设置业务相关标识,用于消费处理判定,或消息追踪查询
MessageConst.PROPERTY_DELAY_TIME_LEVELsetDelayTimeLevel消息延迟处理级别,不同级别对应不同延迟时间
MessageConst.PROPERTY_WAIT_STORE_MSG_OKsetWaitStoreMsgOK在同步刷盘情况下是否需要等待数据落地才认为消息发送成功
`MessageConst.PROPERTY_BUYER_IDsetBuyerId没有在代码中找到使用的地方,所以暂不明白其用处

这几个字段为什么用属性定义,而不是单独用一个字段进行表示?方便之处可能在于消息数据存盘结构早早定义,一些后期添加上的字段功能为了适应之前的存储结构,以属性形式存储在一个动态字段更为方便,自然兼容。

MessageExt

对于发送方来说,上述Message的定义以足够。但对于RocketMQ的整个处理流程来说,还需要更多的字段信息用以记录一些必要内容,比如消息的id、创建时间、存储时间等等。在同package下可以找到与之相关的其余类定义。首先就是MessageExt,

字段用途
queueId记录MessageQueue编号,消息会被发送到Topic下的MessageQueue
storeSize记录消息在Broker存盘大小
queueOffset记录在ConsumeQueue中的偏移
sysFlag记录一些系统标志的开关状态,MessageSysFlag中定义了系统标识
bornTimestamp消息创建时间,在Producer发送消息时设置
storeHost记录存储该消息的Broker地址
msgId消息Id
commitLogOffset记录在Broker中存储便宜
bodyCRC消息内容CRC校验值
reconsumeTimes消息重试消费次数
preparedTransactionOffset事务详细相关字段

Message还有一个名为MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX的属性,在消息发送时由Producer生成创建。上面表格中的msgId则是消息在Broker端进行存储时通过MessageDecoder.createMessageId方法生成的,其构成为,

这个MsgId是在Broker生成的,Producer在发送消息时没有该信息,Consumer在消费消息时则能获取到该值。RocketMQ也提供了相关命令,

命令实现类描述
queryMsgByIdQueryMsgByIdSubCommand根据MsgId查询消息
RocketMQ Topic的创建与管理 2018-08-11 10:52:08 numerical /2018/08/11/RocketMQ-Topic的创建与管理

RocketMQ中Producer向Topic发送消息,Consumer订阅Topic。消息从属与Topic,因此Topic的创建管理可以认为是最基础的一个功能特性。

创建Topic

RocketMQ中存在不同的Topic创建方式,

自动创建

默认情况下,Topic不用手动创建。在Producer发送消息的时候,如果没有对应的Topic,那么这个时候会创建该Topic并投递消息。

自动创建的开关配置在BrokerConfig中,通过autoCreateTopicEnable字段进行控制。当这个开关打开之后,Broker上会默认创建一个MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC的Topic。Broker启动之后向Name Server注册时,会表明自身创建了该Topic。Producer在发送消息时会向Name Server查询这个消息的投递路径,相关的方法为DefaultMQProducerImpl.tryToFindTopicPublishInfo、MQClientInstance.updateTopicRouteInfoFromNameServer,这个时候会获取到该Topic对应的信息。之后消息会被投递到对应的Broker上。当消息抵达Broker之后,AbstractSendMessageProcessor.msgCheck函数会检查Topic是否存在,当不存在根据是否允许自动创建调用TopicConfigManager.createTopicInSendMessageMethod方法创建。

还有一些特殊Topic也是通过自动创建进行的,这个时候不通过autoCreateTopicEnable判断,比如消息消费失败之后会进入重试队列,重试队列的创建为Broker自动创建,实际创建方法为TopicConfigManager.createTopicInSendMessageBackMethod。

预先创建

实际环境中,Topic自动创建的开关是需要关闭的,否则难以管控集群的运行状态。这种情况下就需要预先创建Topic。RocketMQ中可以通过mqadmin提供的Topic相关命令进行创建。

命令的实现位于UpdateTopicSubCommand,执行,

./mqadmin updateTopic

会输出该命令的各项参数,

usage: mqadmin updateTopic [-b <arg>] [-c <arg>] [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>]
       -t <arg> [-u <arg>] [-w <arg>]
 -b,--brokerAddr <arg>       create topic to which broker
 -c,--clusterName <arg>      create topic to which cluster
 -h,--help                   Print help
 -n,--namesrvAddr <arg>      Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
 -o,--order <arg>            set topic's order(true|false
 -p,--perm <arg>             set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]
 -r,--readQueueNums <arg>    set read queue nums
 -s,--hasUnitSub <arg>       has unit sub (true|false
 -t,--topic <arg>            topic name
 -u,--unit <arg>             is unit topic (true|false
 -w,--writeQueueNums <arg>   set write queue nums

可以看到Topic可以创建在指定的Cluster或指定的Broker上,同时可以配置Topic的一些相关连参数。

Topic相关命令

Topic相关的命令有,

命令实现类描述
allocateMQAllocateMQSubCommand申请创建新的MessageQueue
deleteTopicDeleteTopicSubCommand删除Topic
topicClusterListTopicClusterSubCommand列出含有该Topic的Cluster列表
topicListTopicListSubCommand列出当前集群中的所有Topic
topicRouteTopicRouteSubCommand列出Topic的路由信息
topicStatusTopicStatusSubCommand列出Topic的一些统计数据状态
updateOrderConfUpdateOrderConfSubCommand更新Topic顺序相关配置
updateTopicPermUpdateTopicPermSubCommand更新Topic的读写权限
updateTopicUpdateTopicSubCommand创建或更新Topic配置

其中的一些查询性命令在遇到问题调试的时候能够提供一定帮助,理解创建修改型命令的各项参数,有助于了解Topic相关的功能特性。

RocketMQ 本地环境搭建 2018-08-09 21:33:59 numerical /2018/08/09/RocketMQ-本地环境搭建/

准备阶段

获取RocketMQ

RocketMQ可以从Apache上下载代码或可执行包,但从GitHub上获取源码到本地可能更为方便,至少也可以及时更近新的变化。

编译RocketMQ

RocketMQ使用Maven进行包管理,从GitHub获取代码之后,命令行下进行编译,

mvn -Prelease-all -DskipTests clean install -U

将会在distribution/target/apache-rocketmq/bin目录下生成可执行的脚本。

环境配置

为了通过RocketMQ提供的启动脚本启动RocketMQ各项组件,需要先设置环境变量ROCKETMQ_HOME,将其值配置到distribution/target/apache-rocketmq目录,

export ROCKETMQ_HOME=/path/to/distribution/target/apache-rocketmq

启动服务

启动Name Server

运行mqnamesrv启动Name Server,

nohup sh mqnamesrv > ns.log 2>&1 &

启动Broker

默认配置启动

运行mqbroker启动Broker,

nohup sh mqbroker -n localhost:9876 > broker.log 2>&1 &

启动多个Broker

实际开发情况下为了测试验证多Broker的情况,有必要在本地启动多个Broker。默认配置启动会有各项冲突,因此需要制定不同的配置进行启动,

Broker需要监听端口以及读写文件数据,通过配置文件修改端口以及存储路径,可以在单机上启动多个实例。与之相关的配置文件对应的Java类在BrokerStarup中可以找到,分别有BrokerConfig、NettyServerConfig、NettyClientConfig、MessageStoreConfig。

以启动两个Broker Cluster为例,分别配置为,

# broker1.properties
brokerClusterName=Cluster1
brokerName=broker1
brokerId=0
listenPort=10911
haListenPort=10912
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
# broker2.properties
brokerClusterName=Cluster2
brokerName=broker2
brokerId=0
listenPort=10921
haListenPort=10922
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/path/to/store
storePathCommitLog=/path/to/commit

启动之时通过-c指定配置文件,

nohup sh mqbroker -n localhost:9876 -c broker1.properties > broker1.log 2>&1 &
nohup sh mqbroker -n localhost:9876 -c broker2.properties > broker2.log 2>&1 &

测试服务状态

分别启动Producer/Consumer收发消息来进行处理,需要先设置Name Server地址

export NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Producer
tools.cmd org.apache.rocketmq.example.quickstart.Consumer

关停服务

运行mqshutdown进行关停,

关停Name Server,

./mqshutdown namesrv

关停Broker

./mqshutdown broker

配置Intellij IDEA

为了便于阅读代码,理顺调用流程。可以使用Intellij IDEA提供的调试功能。断点调试对于阅读代码还是有很大帮助的。

Intellij IDEA工具栏中找到Run -> Edit Configurations -> add remote。Debugger mode选择Attach,Listen按理应该可行,但后续配置的时候遇到些问题,暂时先用Attach形式。

启动服务时设置添加JAVA_OPT参数,添加与Remote Debug中对应的调试端口信息,

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005
java.time的基本使用方法 2018-07-18 21:09:13 numerical /2018/07/18/javatime的基本使用方法/

JDK 8提供了新的时间日期库,相关实现位于java.time包下。这里记录下基本用法,大致列举下常用的时间日期相关操作,

常见用法

当前时间获取

LocalDateTime dateTime = LocalDateTime.now();
LocalDate date = LocalDate.now();
LocalTime time = LocalTime.now();

LocalDateTime包含日期与具体时间,LocalDate只包含日期,LocalTime只包含时间。看到这个类定义,顺带想起Python中提供的对应模块,有点接近,可以对照看一下,

now = datetime.datetime.now()
date = now.date()
time = now.time()

静态构造

LocalDateTime等类提供了一组静态函数用以构建对象,

LocalDateTime end = LocalDateTime.of(2012, 12, 21, 3, 14, 35);

通过Java的函数重载机制可以在多种精度上构建事件对象。

格式化输出与字符串解析

时间格式化通过DateTimeFormatter类来实现,默认提供了一组预定义的转化定义,也可以通过字符串来进行构建,

LocalDateTime now = LocalDateTime.now();
now.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
LocalDateTime end = LocalDateTime.parse("2012-12-21 03:14:35", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

日期时间与当前时刻的互相转化

以前获取当前时刻的毫秒数通过下面代码进行获取,

System.currentTimeMillis();

java.time提供了Instant类来获取同样的时刻点,

Instant.now().toEpochMilli();

LocalDateTimeInstant之间可以互相进行转化,

LocalDateTime now = LocalDateTime.ofInstant(Instant.now(), ZoneId.systemDefault());
now.toInstant(ZoneOffset.UTC).toEpochMilli();

Instant可以从时刻进行构建,

Instant.ofEpochMilli(System.currentTimeMillis());

时间计算

时间相关的计算主要有两种,一种为给定时间和变化量,获取运算后的时间,

LocalDateTime now = LocalDateTime.now();
now = now.plus(1, ChronoUnit.DAYS);

LocalDateTime中提供了一组plusminuswith接口用于计算。

另外一种为计算两个时间之间的差距,

LocalDateTime start = LocalDateTime.of(2018, 1, 1, 0, 0, 0);
LocalDateTime end = LocalDateTime.of(2018, 1, 2, 1, 0, 1);
Duration duration = Duration.between(start, end);
duration.toDays();
duration.toHours();
duration.toMinutes();
duration.getSeconds();
start.until(end, ChronoUnit.DAYS);
start.until(end, ChronoUnit.HOURS);
start.until(end, ChronoUnit.SECONDS);
start.until(end, ChronoUnit.HOURS);

如上两种写法都能够获得差值。通过Duration可能会更有利于后续的运算处理。

时区处理

LocalDateTime等表示的是本地时间,不带时区信息。与时间相关的是ZoneDateTime相关类,

ZonedDateTime now = ZonedDateTime.of(LocalDateTime.now(), ZoneId.of("Asia/Shanghai"));

同一时刻在不同时区下的时间展示处理,

long now = System.currentTimeMillis();
LocalDateTime start = LocalDateTime.ofInstant(Instant.ofEpochMilli(now), ZoneId.of("Asia/Shanghai"));
LocalDateTime end = LocalDateTime.ofInstant(Instant.ofEpochMilli(now), ZoneId.of("Asia/Tokyo"));
System.out.println(Duration.between(start, end).toHours());

注意事项

java.time中提供的类基本都是线程安全的,因此可以避免掉先前SimpleDateFormat线程相关问题。在新项目中,时间处理最好都切换到新的API下。

参考

RocketMQ 架构与核心概念 2018-07-05 06:22:51 numerical /2018/07/05/RocketMQ-架构与核心概念/

RocketMQ是阿里巴巴捐献给Apache基金会的消息队列实现。从阿里云在其消息产品对比中给出的性能特性指标来看还不错,当然至于这个指标是否可信,可能还要实际测试过才好说。

架构

RocketMQ Architecture提供了RocketMQ的架构构成,

Architecture

RocketMQ由如下几部分构成,

  • Name Server
  • Broker
  • Producer
  • Consumer

Name Server

RocketMQ没有引入第三方服务依赖,消息队列内部的服务发现以及配置更新等,都借由Name Server来完成。从功能上来说,Name Server相当于一个轻量级简化版的Zookeeper,或者说提供了类似ZK的功能。

Name Server的定位是维护RocketMQ全局相关配置,提供消息路由信息,除此之外并不包含过多复杂逻辑。因为其相对轻量级,一般一组Name Server集群可以服务多组Broker集群。

Name Server Cluster是多个Name Server实例的统称,Name Server之间并无关联,互相也不同步信息。多个Name Server的存在是为了提供高可用服务,不同实例之间的数据信息同步则实际是在数据写入的时候保证的。一份配置或消息路由信息会写入所有Name Server实例中。

Broker

RocketMQ的核心逻辑是Broker。Broker是实际用于手法消息的功能单元。从RocketMQ使用者的角度来看,生产者通过接口将消息投递到Broker,消费者从Broker获取消息进行消费。RocketMQ提供了推拉结合的方式用于获取消息。

Producer

Producer为消息生产者,实际为需要RocketMQ服务的上层系统。

Consumer

Consumer为消息消费者,实际为需要RocketMQ服务的上层系统。

概念名词

在深入了解RocketMQ之前,对一些关键概念名词需要先有一个简单的认识,Core Concept上提供了一些名词解释。

Producer

Producer为消息生产者,负责创建消息发送给Broker。RocketMQ默认提供了DefaultMQProducer、TransactionMQProducer用于发送消息。

Producer Group

Producer Group是一组Producer的名称。通常来说一个业务系统可能会奉陪一个Producer Group。Producer Group后续可以用于消息发送相关的各项管理监控功能。

Consumer

Consumer是消息消费者,用于从消息队列获取消息。

Consumer Group

Consumer Group是一组Consumer的名称。相同Group下的Consumer需要有同样的订阅关系,否则消息投递的时候可能会出现一些难以排查的问题。Consumer Group同样用于分配给不同的业务系统。通过管理工具可以控制Group的消费范围。

PullConsumer

拉取型Consumer,获取消息的方式为调用Consumer接口手动从Broker获取消息,手动更新消费位点。PullConsumer使用起来相对麻烦些,但需要细粒度控制消息从何时何处开始消费的地方可以考虑使用PullConsumer。

RocketMQ默认提供了DefaultMQPullConsumer实现。

PushConsumer

推送型Consumer,从使用者的角度来看,其提供的接口像是Broker推送消息过来进行消费。其内部也还是通过定期拉取方式从Broker获取消息。

RocketMQ默认提供了DefaultMQPushConsumer实现。

Topic

一个消息都会从属与某个Topic,可以理解成消息数据的以及类别,Producer/Consumer的消息发送/消费都是基于Topic进行处理的。

Message

RocketMQ中的消息。Message必须设置Topic以及消息体,除此之外还可以配置一些自定义属性。只要不超过预定义的消息大小,自定义属性可以任意添加。

Tag

Message可以设置Tag,Tag是系统预定义的属性。Message设置了Tag之后,在消费的时候可以根据Tag进行过滤。RocketMQ提供了几种过滤方式。可以认为Tag是Message的二级类别。

Message Model

消息投递存在两种不同类别,

Clustering

Clustering模型下,一个Topic的每一条消息只会被投递到某一个Consumer上进行消费,也就是说一个Topic的消息可能上一条消息在机器A上消费,下一条消息在机器B上消费。

Broadcasting

Broadcasting模式下,一个Topic的每一条消息会被投递到每一个Consumer上进行消费。意味着Consumer在处理消息的时候需要幂等。如果后续加上消费统计数据监控的话,这种情况下消息消费的数量就会随Consumer数量而上涨。

Message Order

RocketMQ提供了不同的顺序性能力

Concurrently

默认的消息是无序消息,因此Consumer在进行消费的时候使用Concurrently模式可以有效的提升消费的吞吐量。

Orderly

当使用顺序消息之后,Consumer需要使用Orderly消费,否则顺序也是无法保证的。

代码结构

从Apache下载RocketMQ 4.2.0代码,或是从GitHub获取当前的代码,可以看到RocketMQ代码构成为下列模块,

模块功能
broker负责实现Broker核心逻辑
client提供了给应用层使用的Producer/Consumer实现
common一些通用协议或辅助代码实现
filter提供了消息过滤所需的实现
filtersrv独立的消息过滤服务实现
logappender日志处理相关实现
namesrvNameServer的代码实现
openmessagingOMS协议相关处理
remotingMQ各进程之间的网络处理部分,基于Netty实现
srvutil少量服务进程辅助类
store消息持久化等核心逻辑实现
toolsMQ提供的tools实现

所有Java代码总共在10w左右量级,核心部分代码量应该更少,有时间会具体来看各项功能的使用方法与是实现方式。

参考

Netty in Action读书笔记 2018-06-07 06:45:06 numerical /2018/06/07/Netty-in-Action读书笔记/

关于Netty的书好像就Netty in Action值得一读。最近翻看了下,对Netty的整体结构有了一个概况性了解,实际掌握Netty的话,还需要具体去看下源码以及去实现一些样例,这里还是记录下读完本书后的概括性了解。

Netty是什么

Netty一个高性能网络I/O框架。Java不是提供了NIO库,可以直接使用标准库的相关接口来实现高性能I/O,怎么还需要一个专门的库来做这样的事情呢?

阅读完之后,自己认可的结论是,Netty有下面几个优点或者说特性吧,

  • 以一致的接口封装了标准库中的IO/NIO接口
  • 封装的接口相较标准库中的更易理解使用
  • 实现了高性能的传输处理类,比如io.netty.channel.epoll,使用jni实现,比默认的nio性能更好
  • 提供了多种连接处理实现类,减少了从零实现这些功能的负担

为什么需要了解Netty呢?Netty实际已经成为了Java网络传输中的一个通用库,分布式网络环境中需要高效的数据传输实现,因此很多上层的框架都应用了Netty,所以有必要学习。一方面便于后续再学习一些上层框架,另一方面如果自己需要实现类似的功能,Netty会是一个必要的工具。

Netty的结构

Netty中的关键构成为,

  • io.netty.channel.Channel
  • io.netty.channel.ChannelHandler
  • io.netty.channel.ChannelFuture
  • io.netty.channel.EventLoop
  • io.netty.channel.EventLoopGroup
  • io.netty.bootstrap.ServerBootstrap

BootstrapEventLoopChannel是Netty的核心。Netty的对IO/NIO的抽象统一也在于这些类的实现。Netty的高性能通过EventLoop事件循环机制来保证。

使用Netty编码的思路大体为,程序之间的连接建立为ChannelChannel需要有对应的数据处理实现ChannelHandler,Netty提供的异步获取接口为ChannelFuture。这些逻辑都运行在EventLoop中。通过Bootstrap启动这些服务。

具体每一个结构的实现要点,之后具体再看源码来分析。

Netty样例

直接记录书中最一开始的服务端样例,

final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup group = new NioEventLoopGroup();
try {
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(group)
        .channel(NioServerSocketChannel.class)
        .localAddress(new InetSocketAddress(port))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(serverHandler);
                }
            }
        );
    ChannelFuture future = bootstrap.bind().sync();
    future.channel().closeFuture().sync();
} finally {
    group.shutdownGracefully().sync();
}

上面的片段虽然简单,但也展现了提供一个Netty服务所涉及到的框架性结构。不论Netty是否高性能,单纯接口设计上就挺值得学习。Netty对连接处理的每一步都进行了良好的抽象设计,提供了一致的简洁接口。类似的设计思路,在一些上层系统中也可以借鉴。

书评

Manning出版社的in Action系列通常读完一个感觉,好像什么都说了,但好像也和没说差不多。Netty in Action翻完对整体可以有概况性了解,知道各个构件是为了什么而存在。但是对于其原理实现,书中并没有过多提及。对于Netty来说,应该也足够。这本书只是一个开始,源码阅读与代码实践是后续学习少不了的。

Java 9 Modules 2018-06-03 15:23:34 numerical /2018/06/03/Java-9-Modules/

JSR 376提议的Java模块化方式,在Java 9中发布,适时跟进一下。

目标

先来看下JSR 376的目标,可以简要概况为,

  • 让Java在不同类型的计算设备上更适配、易裁剪
  • 提高代码安全性与可维护性
  • 提升应用性能

实现

直接观察JDK代码结构,JDK 9里对原有代码进行了模块化切分。大致浏览下代码目录,可以看到Java Module信息如何定义。

module-info.java文件中定义了Module信息,比如,

module java.base {
    exports java.io;
    exports jdk.internal.jmod to 
        jdk.compiler,
        jdk.jlink;
    uses java.lang.System.LoggerFinder;
    provides java.nio.file.spi.FileSystemProvider with jdk.internal.jrtfs.JrtFileSystemProvider;
}

module java.se {
    requires transitive java.compiler;
}

module java.desktop {
    opens javax.swing.plaf.basic to jdk.jconsole;
}

在model-info.java里面引入了exportsmoduleopenopensprovidesrequiresuseswithtotransitive关键字。

module

module {module-name}用于定义当前module-info.java描述的module的名称。

exports

exports {package-name}、exports {package-name} to {module-name},用于声明当前模块下的哪些package被导出。

requires

requires {module-name}requires transitive {module-name}用于声明当前模块依赖哪些模块,transitive用于表明这种依赖是否可传递。

provides

provides {interface-name} with {class-name}用于声明当前module是一个service提供者,提供了interface-name接口定义的具体实现。

uses

uses {interface-name}声明当前模块使用了指定名称的service,当前模块为对应service的使用者。

JDK文档中可以查看java.util.ServiceLoader<S>来看具体的service相关概念与实现机制。

open

opens {package-name} to {module-name}声明当前模块将指定package开放给对应的其它模块。在之前的Java中,通过反射可以后去任意类的成员与函数,加上这个声明之后只有指定模块中的代码才可以进行相应访问。

总结

上面只是简要的看了下Java中的模块怎么定义,通过新提供的这些model-info.java中关键字的功能,可以看到除了模块化定义本身之外,之前几个概念与用法发生了相应变化。

首先是publicprotectedprivate等描述符的含义发生了变化,public也有了约束,必须模块间显示进行了声明,否则也是无法访问的。

其次是反射接口的能力,先前可以进行的操作,现在加上了opens ... to ...这样的模块定义约束。动态能力得到了控制,从接口提供角度来说,避免了不应该的暴露。

实际项目中去使用这套模块机制,大的方向还是很明确的,就是拆分模块。没有这套机制,良好的代码组织也会进行模块切分,估计会比较容易得到应用。

参考

Python性能优化的方法路径 2018-05-26 20:13:54 numerical /2018/05/26/Python性能优化的方法路径/

Python作为程序语言最被人诟病的一点在于其性能。Python作为动态语言,性能并不是它的优势,但很多时候实际也不会面临太多性能问题,所以Python的使用场景很广。但是,总会有碰到性能问题的一天,这种情况下首要的就是要在Python技术框架内来进行性能优化了。

决定是否优化

性能优化是手段,是为了解决其它一些问题。是否将性能优化作为正式开发任务,首要的就是达成共识。开发期的项目自然好说,上线后的项目就需要估计性能优化的工作量、相应风险以及评估大致的影响范围。让产品、开发、测试都意识到这项任务的必要性,而后才合适正式开始实施。

性能分析

性能优化必要步骤是分析瓶颈,首先得找到究竟慢在哪了,让后再对症下药去进行修改。Python性能分析,直接使用自带的cProfile进行处理就可以。用法也很简单,阅读官方文档中The Python Profilers 一节就足够了。

cProfile结果的可视化工具有不少,不过配合编码一起来的话,直接使用PyCharm打开最为方便。可以快速跳转每一个耗时函数对应的代码。

关注那些调用次数多的以及单次性能差的,那些大概率就是需要被优化的逻辑。

优化检查

在定位问题后就可以进行修改了。控制每次优化改动的范围,一次改动一处,然后再次Profile对比。优化的每一次改进都需要数据的支撑,这样能保证行进在正确的方向上,同时时间上也能有所掌控。

一些代码的优化并不一定好实施,因此这种渐进性的优化方式,便于降低项目整体开发风险,能够保证随时有可用的版本,且是经过了一定优化的版本。

持续监控

在进行了性能优化之后,就需要注意对性能指标进行监控了。这样能够及时的了解性能全貌,一旦再次需要进行优化,也能及早得知决策。

Python代码优化的常见方向

减少不必要的函数调用

很多时候没有必要去优化一个函数实现本身,优先考虑是否能够降低这个函数的调用次数,或是直接删除这个函数。

这是一种很常见的情况,一些功能层面已经不再需要的遗留代码占据了很多性能。如果一份代码多人经手,需求多变,反复迭代,那么这种代码是很有可能存在的。

搞清楚,然后删掉它们。

让运行时的动态计算静态化

部分运行逻辑可以先离线生成得到静态结果,运行时直接加载静态结果就可以了,这部分性能也是应该要回收的。

适当应用缓存

那些不能直接静态化的计算可能也可以缓存化。资源的创建销毁也可以根据情况判定是否应用缓存或对象池资源池一类实现方式,避免重复运算。

需要注意的是避免因为缓存带来的内存泄漏。

减少gc

Python通过引用计数与gc进行对象内存管理。对于引用计数无法处理的情况,Python会唤起gc进行处理,gc对性能有一定影响。因此代码实现的时候最好gc友好,减少代码中的引用环。

另外也可以通过设置gc参数来控制gc触发的几率,减少在高频计算时的gc触发,在空闲时刻手动触发gc。

修改明显错误的代码实现

很多性能问题都是那些非关键逻辑导致的,这些逻辑往往一眼看去就欠缺性能上的考虑,比如,

  • 不断通过for循环去进行查找
  • 错误的选择了容器,导致不断的遍历查找
  • 在property实现中含有复杂的耗时逻辑,让调用方忽视这一点
  • 在循环中添加了很多可以移到循环之外只运行一次的函数调用
  • 可以实现成事件触发的逻辑变成了轮询
  • 函数中存在大量重复调用

这类代码本就不应该存在,找出来改掉,并尽力保证未来此类代码不再进入代码库。

高频调用中改进一些基础用法

比如这里列出的一些常见用法的性能比较,在高频调用的时候就可以看出性能差异,选择更有效率的实现方式。

优化算法

部分逻辑可能可以从是实现方法层面进行优化,优化算法复杂度本身。

将部分逻辑改为原生代码实现

通过C/C++来替换掉部分逻辑也是Python优化的常见手段。有几种选择,

  • 裸写C/C++扩展
  • 使用Boost.Python实现C++扩展
  • 使用Cython
  • 使用PyPy

裸写C/C++需要处理繁琐的对象转化以及引用处理,一般会选择使用Boost.Python。使用起来方便很多。

采用Cython可以不去写C++代码,但是要写Cython脚本。根据情况,有选择使用。

PyPy相当于替换了运行时,在运行时不受掌控的情况下是不能采用的。服务端的话,可以考虑部分使用,通过进程隔离的方式解决一些依赖上的问题。

需要注意保留必要的Python实现,以便在遇到问题时可以快速切换回去。

Windows上通过Docker Toolbox配置Linux开发环境 2018-03-28 22:16:52 numerical /2018/03/28/Windows上通过Docker-Toolbox配置Linux开发环境/

获取Docker

上面两个工具,根据当前系统要求选用一个即可。Windows 10的话可能需要安装Docker for Windows

启动Docker

完成安装之后,在开始菜单点击Docker Quickstart Terminal可以进入Docker环境。

安装Linux镜像

以Debian为例来说明下,

获取指定Debian版本

docker pull debian:jessie

运行获取到的Debian Image

docker run -it --name debian debian:jessie bash

配置运行的Debian,默认拉取的基本什么内容也没有

安装vim

apt-get update
apt-get install vim

配置源,修改sources.list,用国内的源,比如网易的Debian镜像

vim /etc/apt/sources.list
deb http://mirrors.163.com/debian/ jessie main non-free contrib
deb http://mirrors.163.com/debian/ jessie-updates main non-free contrib
deb http://mirrors.163.com/debian/ jessie-backports main non-free contrib
deb-src http://mirrors.163.com/debian/ jessie main non-free contrib
deb-src http://mirrors.163.com/debian/ jessie-updates main non-free contrib
deb-src http://mirrors.163.com/debian/ jessie-backports main non-free contrib
deb http://mirrors.163.com/debian-security/ jessie/updates main non-free contrib
deb-src http://mirrors.163.com/debian-security/ jessie/updates main non-free contrib

安装所需的各项软件,例如,

基础库
apt-get install build-essential
apt-get install git
apt-get install wget
apt-get install zlibc zlib1g zlib1g-dev
apt-get install libssl-dev
apt-get install libreadline6 libreadline6-dev
从源代码安装最新Python
wget -c https://www.python.org/ftp/python/2.7.14/Python-2.7.14rc1.tgz
tar -xzvf Python-2.7.14rc1.tgz
cd Python-2.7.14rc1
./configure
make && make install
从源代码安装setuptools
wget -c https://github.com/pypa/setuptools/archive/v39.0.1.tar.gz
tar -xzvf v39.0.1.tar.gz
cd setuptools-39.0.1
python bootstrap.py
python setup.py install
从源代码安装pip
wget https://pypi.python.org/packages/c4/44/e6b8056b6c8f2bfd1445cc9990f478930d8e3459e9dbf5b8e2d2922d64d3/pip-9.0.3.tar.gz
tar -xzvf pip-9.0.3.tar.gz
cd pip-9.0.3
python setup.py install
安装supervisor
pip install supervisor
安装MongoDB
wget -c http://downloads.mongodb.org/linux/mongodb-linux-x86_64-debian81-3.2.8.tgz
tar -xzvf mongodb-linux-x86_64-debian81-3.2.8.tgz
vim ~/.bashrc
  export MONGOHOME=/path/to/mongo
  export PATH=$PATH:$MONGOHOME/bin
保存container
docker commit debian dev

启动镜像

启动的时候注意配置端口映射与路径映射,

docker run -it --name 'dev1' -p 50001:5000 -v //c/Users:/server /server/path/to/program/start.sh

start.sh里为需要启动的服务,末尾加上,

tail -f /dev/null

来让container不退出。

默认路径映射需要在c:/Users下,在5-useful-docker-tip-and-tricks-on-windows里提供了配置任意目录映射的方法。

打开Virtualbox进入虚拟机,添加共享文件夹,命名为shared,

cd /mnt/sda1/var/lib/boot2docker
vi profile
  mkdir -p /shared
  mount -t vboxsf -o uid=1000,gid=50 shared /shared

启动的时候指定,

docker run -it --name "test" -v //shared:/shared debian:jessie bash

容器中的MongoDB的数据配置如果只在容器路径中的话,在关闭之后就会丢失。因此MongoDB的数据需要写入到持久存储的地方。MongoDB数据在Docker中无法直接写入VirtualBox Shared Folder中,需要做一定处理,

启动的时候绑定目录到虚拟机目录下,

-v /var/lib/boot2docker/my-mongodb-data/:/data/db/

或者用docker volume命令创建,

docker volume create --name=mongodata
docker run -d -p 27017:27017 -v mongodata:/data/db --name=mymongo mongodb:3.3

常用命令

查看镜像

docker images

查看运行的container

docker ps -a

停止运行的container

docker stop container_id

移除镜像

docker rmi image_id

移除container

docker rm container_id

移除所有container

docker rm $(docker ps -a -q)

重启container

docker restart container_id

将image保存到文件

docker save -o /path/to/output image_id

从文件加载image

docker load -i /path/to/image
部署Sentry进行异常收集 2018-03-25 15:32:00 numerical /2018/03/25/部署Sentry进行异常收集/

Sentry是什么

Sentry是一个异常收集服务。对比简单的将异常输出到日志,通过Sentry来收集异常的好处是更加直观,且能够进一步与其它系统进行集成,比如邮件提醒、缺陷管理等。

Sentry提供了多种语言系统的客户端,使用各种语言实现的系统都可以选择Sentry来收集异常。在线上环境自己还没有正式应用过它,当前主要是将其用作开发期的异常监控服务。Sentry提供了线上托管版本,也可以选择自己进行部署。开发期的问题自然是选择在自有服务器上进行部署了。

安装Sentry

安装virtualenv

Sentry是Django实现的,自身有不少依赖库。为了避免依赖库之间的冲突,选择将Sentry安装在virtualenv环境下。

pip install virtualenv

创建对应的virtualenv环境

virtualenv ~/virtualenv/sentry
source ~/virtualenv/sentry/bin/activate

安装Sentry

在virtualenv环境下,

pip install -U sentry

安装Redis

Sentry依赖Redis,为了正常运行需要安装Redis。Mac下可以选择从redis.io下载或者从homebrew进行安装,

brew install redis

安装MySQL

Sentry是Django应用,因此数据库可以灵活选择。官方推荐PostgreSQL,不过个人觉得还是MySQL将就着用吧,毕竟资料更多也更熟悉些。还是通过homebrew进行安装。

brew install mysql

配置Sentry

创建Sentry所需的数据库

通过MySQL root账号进入刚安装的MySQL,

mysql -uroot

创建数据库,

create database sentry;

创建用户并授权,

grant all privileges on sentry.* to ’sentry@localhost’ identified by ‘password’;

创建Sentry配置文件

在shell下运行,

sentry init

默认配置文件为~/.sentry/sentry.conf.py。就直接使用默认位置的配置文件进行配置好了,修改配置文件中的内容,配置数据库连接,

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.mysql',
        'NAME': 'sentry',
        'USER': 'sentry',
        'PASSWORD': 'password',
        'HOST': '',
        'PORT': '',
    }
}

修改web url地址,

SENTRY_URL_PREFIX = 'http://0.0.0.0:9000'

运行Sentry

在shell下分别启动Sentry、以及Sentry Worker,

sentry start
sentry celery worker -B

也可以通过supervisor管理相关进程。

参考