SOLILOQUIZE ZooKeeper IntelliJ IDEA中配置开发环境 2019-05-22 00:54:39 numerical /2019/05/22/ZooKeeper-IntelliJ-IDEA中配置开发环境

最近需要在ZooKeeper某个旧版上进行些改造,旧版构建工具是Ant、Ivy,和当前流行的略有差异,因此这里进行记录。

检出分支

从Github获取ZooKeeper源码之后,需要切换到目标版本分支。ZooKeeper发布都有打tag,因此可以从tag中获取,

git tag -l

找到对应目标tag之后,

git checkout -b branch_name tag_name

打开工程目录

在IntelliJ IDEA中打开ZooKeeper目录。默认情况下IDEA无法识别工程,

设置Project SDK

在菜单栏中选择“File - Project Structure - Project”,设置Project SDK。JDK版本和语言级别都设置为1.8。

同时设置Project compile output输出目录。

设置Sources Root

ZooKeeper代码主体为Java,将“src/java/main”目录设置为Sources Root。设置之后,该目录下的Java代码被IDEA识别了,但会发现缺少依赖包。

获取依赖包

命令行下运行,

ant

运行之后会自动生成部分代码。将自动生成的代码目录“src/java/generated”也设置为Sources Root。

同时依赖包夜被下载到了本地,在菜单栏中选择“File - Project Structure - Libraries”,增加Java Libraries。添加之代码中就可以识别出依赖包中的类了。

触发编译

完成上述步骤之后可以在IDEA中触发Build操作,如无意外则会顺利执行。

发布打包

在IDEA中完成修改之后,最后的打包操作还是可以通过命令行的ant来触发。生成的包会位于build目录下。

Netty EventLoop机制 2019-05-01 14:23:21 numerical /2019/05/01/Netty-EventLoop机制

Netty服务端启动之后,每有一个客户端连接进入,就会创建一个新的Channnel对象以表示两端之间的连接。后续两者之间的数据传输读写就都发生在Channel之上。不同Channel之间的处理,肯定是并发的。同一个Channel之上的处理,则有一定顺序。但何时会有数据对Netty服务端来说是未知的,

Netty是一个异步化处理框架,Channel创建之后,从Channel中读写数据都可能异步化,这种能力通过Channel上绑定的EventLoop来提供。EventLoop即事件循环,在网络I/O处理上,数据的到来时机无法控制,服务端获取到数据之后需要进行相应处理,这种处理对应到Netty中则是触发ChannelHandler中的方法或事件,而这些方法调用或是事件相应的实际运行都在EventLoop当中。

Netty是如何保障这一点的,就需要来具体看一下EventLoop的相关实现了。

EventLoop在代码层面是什么

EventLoop首先是Netty中的接口定义,其继承关系如图,

从接口定义上可以发现EventLoop是一个Executor,且是一个保证执行顺序的OrderedEventExecutor。向EventLoop中submit的任务最终会按顺序执行。保证按顺序执行,那么EventLoop

Netty异步处理框架 异步的实现EventLoop EventLoop&EventLoopGroup ServerBootstrap在构建的时候需要传递

DefaultEventLoop实现,SingleThreadEventLoop

类继承关系

一个Channel只会有一个EventLoop,一个EventLoop可能绑定到多个Channel上 一个EventLoop背后只会有一个线程

为什么异步,异步的来源于IO事件本身的异步性, 收到io时间后交由eventloop进行处理。

channelhandler是作为逻辑实现的重点,如何保证与eventloop之间的关联 通过channel获取到eventloop,判定当前线程是否与eventloop线程相同,相同则执行,否则则放入队列

Channel 与EventLoop 绑定

Executor 与 EventExecutor 的关系 ThreadExecutorMap通过ThreadLocal维系了两者之间的关联,在Runnable任务中获取eventExector时能获取到原先的EventExecutor

MultithreadEventExecutorGroup

channel 与 EventLoopGroup 之间关系

EventLoopGroup创建多个EventLoop,每个EventLoop与Channel关系

Netty Channel接口与创建流程 2019-05-01 07:46:47 numerical /2019/05/01/Netty-Channel接口与创建流程

Netty中的Channel可以简单理解成连接通道。当有新连接创建时,也意味着有新Channel对象被创建。不同Channel实现与底层的传输处理方式相关,Netty提供了多种Channel实现,典型的有,

  • NioSocketChannel
  • NioServerSocketChannel
  • OioSocketChannel
  • OioServerSocketChannel
  • EpollSocketChannel
  • EpollServerSocketChannel

在Netty封装之后,不论底层用什么机制去处理,是用Java IO/NIO库实现还是和平台Native实现相关,都不再特别需要去关注。对于上层使用方来说,看到的就都只有Channel。

Channel接口

状态信息,

  • isOpen(),是否开启
  • isRegistered(),是否注册到EventLoop
  • isActive(),是否活跃
  • isWritable(),是否可写

Channel自身的生命周期内,状态变化路径大体是,

Registered -> Active -> Inactive -> Unregistered

配置信息,

  • config(),Channel上可以配置底层传输的相关参数

地址信息,

  • localAddress(),本地监听的地址
  • remoteAddress(),远程访问的来源地址

关联的EventLoop,

  • eventLoop(),每一个Channel只会与一个EventLoop相绑定

关联的ChannelPipeline,

  • pipeline(),ChannelPipeline中的ChannelHandler是实际各种功能逻辑的实现者

Channel接口继承了AttributeMap接口,可以向Channel中进行Attribute的读写。Channel的I/O操作都是异步的,具体功能逻辑由一个或多个ChannelHandler组合而成,ChannelHandler可以获取到关联的Channel对象,因此不同ChannelHandler之间可以通过Channel上的Attribute进行一定意义上的数据传递。从这个视角来看,Channel也可以认为是多个ChannelHandler之间公共数据的一个暂存地。

在使用Netty去进行开发时,对于Channel,实际可能只是在应用启动之初进行选择,选择具体的Channel实现,而后就基本无需再特别关注了。

Channel创建过程

对于服务端来说,以Nio连接处理为例,

  • 在ServerBootstrap中传入制定Channel类型,NioServerSocketChannel.class
  • 在ServerBootstrap.bind调用过程中,会在基类AbstractBootstrap的initAndRegister方法中进行创建,具体是通过ReflectiveChannelFactory.newChannel根据Channel的类型进行反射创建。
  • 在服务端启动之后,客户端连接进入时,则在NioServerSocketChannel.doReadMessages方法创建NioSocketChannel。

Channel创建完毕之后,在I/O处理过程中,会按照流程触发Register、Active操作,通过Channel相关接口可以获取到对应状态的变化结果。

一般来说,Channel的创建流程是不太需要去关心的。通过Channel接口在ChannelPipeline处理链路中去读取或改变Channel相关状态即可。

Netty 典型服务端代码结构 2019-05-01 07:07:56 numerical /2019/05/01/Netty-典型服务端代码结构

最近重新对Netty产生了深入了解学习的兴趣。一方面因为很多关联项目内都应用了Netty,加深对Netty的了解无疑会增加对那些项目的掌握度。另一方面,也想尝试应用Netty来实现一个简单的异步化网关。简单翻阅了Zuul2、Reactive-Netty等项目的代码实现,在网络这一层的处理看上去也没多么复杂,代码也就那样。不过刚好可以结合着这些项目代码来学习实现相关功能,加深对Netty的认知。

首先来看下Netty应用的整体结构。不仅从Zuul2等项目中,在Netty自身提供的例子中也很容易就可以发现。Netty应用的结构是相当类似的,

  • 选择EventLoopGroup实现
  • 通过ChannelInitializer添加ChannelHandler到ChannelPipeline
  • 通过Bootstrap串联起各模块
  • 自主控制的部分主要在于各种参数,以及ChannelHandler的选择与自定义

用简单的例子来分析,例如,

try {
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
        .channel(NioServerSocketChannel.class)
        .localAddress(new InetSocketAddress(port))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new DiscardServerHandler());
            }
        });
    ChannelFuture future = bootstrap.bind().sync();
    future.channel().closeFuture().sync();
} finally {
    group.shutdownGracefully().sync();
}

上述代码已经包含了Netty应用的各要件。真实场景下的代码,首先是将各种参数选择配置化,可以通过配置文件或配置下发的形式进行控制。ChannelInitializer的实现肯定会独立,现实场景的ChannelHandler往往由多个构成,构成逻辑也会比较复杂。Netty内置了不少Handler实现,业务自身在内置实现之外,往往也要实现不少ChannelHandler。有的可能是用于进行数据流转解析,有的是用于实现具体的业务逻辑。

Netty中的Channel、ChannelPipeline、ChannelHandler、EventLoop、EventLoopGroup、Bootstrap等概念定义与目的究竟如何,各自的内部实现与处理流程如何就是后续要一个一个去进行分析的。虽然现在还没有一个很清楚的概念,但若是这些都理清的话,那么单纯使用Netty就不会是什么困难的问题。

使用Druid进行简单SQL分析 2019-04-30 13:10:03 numerical /2019/04/30/使用Druid进行简单SQL分析

Druid SQL模块除了用于进行SQL重写之外,最近发现了一个新的应用场景,即用来做一些简单SQL的分析预警。对于简单SQL,根据语句中部分传入参数值,进行预警通知,例如,

select * from foobar order by id limit 10000 offset 0;

语句中limit参数可能传入一个很大的数值,如果能进行分析,那么就可以做到预警。

获取到上述limit对应的值并不困难,在之前提及SQL重写问题时就处理过limit offset的问题。针对这个问题,解决思路也是类似的,即在遍历SQL解析之后的语法树时获取目标值。

处理limit offset值之外,能够预想到的一些可能分析有,

  • 某具体字段对应的参数值
  • IN子句元素数量

当然,SQL语句本身可能包含复杂的运算或是函数调用,对于这类场景暂时就是无能为力的。但把能做的事情做了,多少也还是有用的。

为了实现上述目标,需要自定义Visitor进行数据获取,获取到数据之后采取怎样的行动策略就很方便了。

public class AnalyseVisitor extends MySqlOutputVisitor {

    private List<Pair<String, String>> cmpInfo = new ArrayList<>();
    private List<Pair<String, String>> inInfo = new ArrayList<>();
    private String limit;
    private String offset;

    public AnalyseVisitor(Appendable appender) {
        super(appender);
    }

    public boolean visit(SQLBinaryOpExpr expr) {
        if (isIdentifier(expr.getLeft()) && isValue(expr.getRight())) {
            cmpInfo.add(Pair.of(expr.getLeft().toString(), expr.getRight().toString()));
        }
        return super.visit(expr);
    }

    public boolean visit(SQLInListExpr expr) {
        if (isIdentifier(expr.getExpr())) {
            inInfo.add(Pair.of(expr.getExpr().toString(), String.valueOf(expr.getTargetList().size())));
        }
        return super.visit(expr);
    }

    public boolean visit(SQLLimit expr) {
        limit = expr.getRowCount().toString();
        if (expr.getOffset() != null) {
            offset = expr.getOffset().toString();
        }
        return super.visit(expr);
    }

    private boolean isIdentifier(SQLExpr expr) {
        return expr.getClass() == SQLIdentifierExpr.class;
    }

    private boolean isValue(SQLExpr expr) {
        Class<?> clazz = expr.getClass();
        return clazz == SQLVariantRefExpr.class
                || clazz == SQLIntegerExpr.class
                || clazz == SQLNumberExpr.class
                || clazz == SQLCharExpr.class
                || clazz == SQLBooleanExpr.class;
    }
}

重载visitor函数,针对不同参数分别进行处理。上述示例选了SQLBinaryOpExpr、SQLInListExpr、SQLLimit。如果有更复杂的场景,那么也是类似根据情况去进行定义。

之后再使用上述Vistor,就可以获取到想要的数据了,例如,

String sql = "select * from foobar where id = 1 and value in (1, 2, 3) order by id limit 1000 offset 0";
AnalyseVisitor visitor = new AnalyseVisitor(new StringBuilder());
SQLUtils.parseSingleMysqlStatement(sql).accept(visitor);
// [(id,1)]
System.out.println((visitor).getCmpInfo());
// [(value,3)]
System.out.println((visitor).getInInfo());
// 1000
System.out.println((visitor).getLimit());
// 0
System.out.println((visitor).getOffset());

可以看出,至少在这个SQL例子中是获取到了目标数据的。再复杂一些的SQL,Visitor可能也需要再进一步去完善,覆盖遗漏的一些条件处理。不过整体方向上应该差不多。如果能在运行时进行这样的处理,不论是预警还是变换或是统计等等行动就都有可能去实施了。

关于服务限流的一些思考 2019-04-27 08:40:46 numerical /2019/04/27/关于服务限流的一些思考

流量控制在单体应用的时候也同样存在,在微服务化之后需要进行流控的场景进一步增加了。

典型的微服务架构如下,网关接收请求,路由RPC到后端应用,应用之间直接通过RPC互相调用,限流可以应用的地方就是在各入口处,例如网关接收到的HTTP请求,服务提供的RPC调用。

Flow Control

策略

限流策略有不少,典型的有,

  • 单机QPS限流。统计单进程内的QPS,在进程内进行运算,无需引入外部依赖。
  • 全局QPS限流。统计相同接口在集群多台机器上的全局QPS统计,需要引入外部依赖,如Redis,进行QPS计数统计。
  • 全局高频限流。计算模型同全局QPS,但关注接口关联的参数,例如一个用户单位时间内访问一定次数,或是特定资源参数单位时间内访问一定次数。

问题

单机QPS限流

易失效或过敏感

随着应用集群规模的增长,单机QPS适用性会逐渐降低。机器规模增长,则单机分配到的请求必然降低。单机QPS阈值设置大了可能没效果,设小了又过于敏感。如果调度策略略微不均或是部分节点故障被剔除之后,均可能造成一些设置了低阈值的单机限流被触发。

全局QPS限流

外部依赖降低了稳定性

全局QPS在触发上不会那么敏感,但是为了计算全局数据,则必然需要引入外部依赖。常见的就是引入类似Redis一样的KV存储去维护滑动窗口内的QPS统计。外部依赖的增加无疑降低了服务的可用性。在依赖服务异常时可能会造成限流功能不可用或是触发更为严重的问题。限流场景下的计算天然就是大流量的,对依赖服务的压力显而易见,因此这并不是一个可以忽略或是低概率发生的问题。

统计精度与性能压力的权衡

如果每来一个请求都很精确的进行全局计算,那么外部依赖的压力会很高。如果进行一定的预取以及本地计算,则容易造成统计不准确。这一点在集群规模很大的情况下易出现。

限流阈值的维护性问题

限流的目标是什么

个人理解是在系统能够提供的最大允许范围内尽可能提供服务,拒绝掉系统承载能力之外的请求以保证服务稳定性。

按照这个目标,那么限流阈值配置成多少就不是一个易于回答的问题。应用的承载能力在代码变更或是部署调整时都可能产生较大的变化。这类变化往往不能在事前被维护人员意识到,或者是单纯遗忘了存在着的限流配置,更常见的是初始设置成什么样的阈值也难以去度量。

如何获得合理的QPS阈值

一种思路是通过压测去对系统进行真实的度量,最后将结果关联到降级配置。这种方式的问题在于压测模型与线上真是运行环境不一定相同,但接口的压测不能说明问题,混合接口压测又难以真实反应实际流量场景。

另一种思路是通过梳理各应用监控数据,从当前系统高峰期的QPS统计,通过预定义的计算公式来计算预期的限流QPS值。通常是根据高峰期的水位情况进行一定的放大。这种方式的问题在于系统性能拐点未知,单纯的预测不一定准确。

是否可以基于系统运行反馈自动设置QPS阈值

单纯的自动设置,是很容易的。例如可以采集每一个API在每日高峰期全集群的QPS数值,乘以一定系数进行设置。但是这个值的合理性很难保证,系数可能保守可能乐观,同样会导致限流策略的无效。

如果先按照上述方式获得了一个限流值,随后根据系统运行情况去调整会如何呢?假定初始QPS阈值设高了,在限流未触发时,应用集群负载出现了问题。此时通过监控巡检去发现这一问题,反向调低QPS阈值。反过来当初始阈值低了,那么在限流触发情况下,应用负载还是正常,那莪这个时候去自动调大阈值。

看上去可以,但去实施时会发现难以动手。应用可能对外提供多个接口,可能只部分接口有限流配置。即便观测到了负载与预期不符,但与预期的差异可能是别的原因造成的,也可能是未配置限流接口带来的。此时调整已有接口的阈值就都是无用的。

思考

思考上面的问题,准确的阈值是无法通过非实时方式获取到的,如果想实时获取,那么必然要基于应用的运行状况反馈。单机器的反馈是最准确的。因此更合理的限流方式可能需要回归到单机上。

应用可能提供多个接口,任何一个接口都可能是问题所在导致负载问题。从应用的运行数据反馈中也难以准确的获取究竟是哪一个接口造成的影响。可能是单个异常接口,也可能是每个接口都贡献了一份。也就是说,更有效的限流方式是应用级的,因此不是基于QPS去进行限流,而是基于应用的负载状况去进行限流。

微信的这篇论文Overload Control for Scaling WeChat Microservices描述的就是应用级别的负载保护策略。论文中应用通过RPC队列平均等待时间这一指标去判定应用是否处在过载状态,如果过载则按请求优先级逐步限流,如果正常则逐步恢复。如果微信的系统真实采用了论文中的方案,那么自然这个策略就肯定有效。单纯进行分析,也不难发现上述策略的合理性。

实际应用中可能是多种限流策略的组合。对于特定请求进行高频限流保护,对于可以稳定梳理维护的接口通过全局QPS保护,之后再通过上述应用级限流策略进行整体兜底。多方组合,才有助于提升系统稳定性。

关于开发规范的一些想法 2019-04-20 08:05:16 numerical /2019/04/17/关于开发规范的一些想法

最近协助整理组内的开发规范,整理思考了一段时间之后梳理出了一个初版,自己也来总结下这期间的一些想法。

规范到底需不需要

有规范或是没规范,这个应该不难得出结论。有比没有好,有了规范即使没有执行也不过就是和没有规范一样,所以还是要有。

规范的好处在于明确一个方向,“取乎其上,得乎其中”,也是在面临一些问题判断时的抉择依据。

规范的目标

提升专业性,提升质量,避免可能重复出现的低级问题,提升全局效率。

规范要做到什么粒度上

之前对所谓规范不置可否的原因在于很多规范是不易落地或是过于麻烦的。如果想让规范能够去实施,那么按照个人的理解,规范必须简明。不要在细节以及容易产生个人偏好的问题上去纠结。抓大放小,只关心核心问题。

规范结果而不是过程

如果在项目之初就有一份成熟的规范,那么可能可以更有针对性的去把控。但实际往往都是在一定时间后,根据现实遭遇的一些问题而整理出规范,在这种情境下,规范关键点的输出结果会比规范如何去做的流程会更容易被接受或实施。

规范的来源

一是参考能了解到的优秀团队的做法,二是调研周边团队实际落地了的做法,三是从已有未明确化的工作流程中梳理。

规范要达成共识

对于一些强势管理风格的团队,可能自上而下强制推行。但如果不采取那么生硬的方式,那么规范必须要在团队内达成共识。

首先是目标的共识,其次是方式的共识,再次是推进手段的共识。

否则定义的规范再多也只是空文,甚至带来比较不好的负面抵触情绪。

不符合规范的要去调整,而非容忍

对于一些遗留项目的处理也是规范能否落地的关键,例外不是不能有,但一定需要是必要的。否则例外多了,例外就成了常态,规范反倒成了例外。

规范的推进与监督

推进需要落实到具体的任务中,设置改进的时间点并进行追踪。改进的工作要计入工作计划中。

能与工具自动化结合的尽量结合,不能的也定期抽查检验。

规范的迭代

规范需要是活的,需要根据项目的实际运行情况来进行调整与进化。

Druid SQL重写遇到的一个问题 2019-04-14 02:50:48 numerical /2019/04/14/Druid-SQL重写遇到的一个问题

Druid在SQL格式化输出时,使用MySQL格式输出,会将limit n offset m修改为limit m, n

select * from foobar where id > 1 limit 10 offset 0;

会变成,

select * from foobar where id > 1 limit 0, 10;

在limit offset中传入具体值时,这种改写不会带来问题,但是如果传入的是占位通配符,则对应参数的顺序就进行了调整,

select * from foobar where id > 1 limit ? offset ?;

会变成,

select * from foobar where id > 1 limit ?, ?;

这个时候通过PreparedStatement进行参数设置的时候,参数就会传反。

为了应对这种情况,需要重写格式化输出部分,将limit子句的格式化输出方式进行调整,

public class CustomSqlVisitor extends MySqlOutputVisitor {

    public CustomSqlVisitor(Appendable appender) {
        super(appender);
    }

    public boolean visit(SQLLimit x) {
        this.print0(this.ucase ? "LIMIT " : "limit ");
        SQLExpr rowCount = x.getRowCount();
        this.printExpr(rowCount);
        SQLExpr offset = x.getOffset();
        if (offset != null) {
            this.print0(this.ucase ? " OFFSET " : " offset ");
            this.printExpr(offset);
        }
        return false;
    }
}

最后再实现类似SQLUtils.toSQLString方法,将其中的Visitor实现替换成上面自定义的CustomSqlVisitor,如此limit重写问题就能得到解决,

public static String toSQLString(SQLObject sqlObject, String dbType, SQLUtils.FormatOption option) {
    StringBuilder out = new StringBuilder();
    SQLASTOutputVisitor visitor = new CustomSqlVisitor(out);
    if (option == null) {
        option = DEFAULT_FORMAT_OPTION;
    }

    visitor.setUppCase(option.isUppCase());
    visitor.setPrettyFormat(option.isPrettyFormat());
    visitor.setParameterized(option.isParameterized());
    visitor.setFeatures(option.features);
    sqlObject.accept(visitor);
    return out.toString();
}
Druid SQL的一些使用场景 2019-03-15 13:46:46 numerical /2019/03/15/Druid-SQL的一些使用场景

最近一些场景下需要对SQL进行处理,搜寻了一通发现Druid中提供的SQL处理能力可以满足需求。

Druid本身的功能特性并不止于SQL处理,但如果项目中在数据库层已经有提供类似功能的框架,一般就不太会去变化,不过Druid的SQL处理部分没有那么重的依赖,可以单纯使用SQL这部分相关功能。

SQL处理的一些典型场景有,

SQL监控统计

例如统一不同SQL语句的平均耗时、最大耗时等等统计类的需求。

在进行统计之前需要对SQL进行归一化处理,将一些变量统一进行替换,如此才能对SQL进行归类。

Druid提供了一个快捷的方法,

public class ParameterizedOutputVisitorUtils {
    public static String parameterize(String sql, String dbType);
}

// 测试执行
ParameterizedOutputVisitorUtils.parameterize("select * from foobar where id > 1", JdbcConstants.MYSQL);

// 输出
SELECT *
FROM foobar
WHERE id > ?

SQL动态拦截

拦截具体SQL

这个场景与上面类似,同样也许要将SQL归一化后处理。而后可以与预设的拦截规则进行匹配判断,将制定的SQL语句进行拦截。

拦截表操作

在某些情况下,可能需要紧急禁止对表的操作访问,也可以通过Druid进行处理。这种情况下需要获取SQL语句中的表名,

public static List<String> getRelatedTable(String sql) {
    SQLStatement statement = SQLUtils.parseStatements(sql, JdbcConstants.MYSQL).get(0);
    if (statement instanceof SQLInsertStatement) {
        return Arrays.asList(((SQLInsertStatement) statement).getTableName().getSimpleName());
    } else if (statement instanceof SQLUpdateStatement) {
        return Arrays.asList(((SQLUpdateStatement) statement).getTableName().getSimpleName());
    } else if (statement instanceof SQLDeleteStatement) {
        return Arrays.asList(((SQLDeleteStatement) statement).getTableName().getSimpleName());
    } else {
        MySqlSchemaStatVisitor visitor = new MySqlSchemaStatVisitor();
        statement.accept(visitor);
        Map<TableStat.Name, TableStat> info = visitor.getTables();
        return info.keySet().stream().map(TableStat.Name::getName).collect(Collectors.toList());
    }
}

表映射修改(影子表)

在处理数据表隔离时可能会使用影子表方案,根据环境不同将SQL路由到不同的表中,对此Druid提供了快捷操作接口,

public class SQLUtils {
    public static String refactor(String sql, String dbType, Map<String, String> tableMapping);
}

例如,

Map<String, String> mapping = new HashMap<>();
mapping.put("foobar", "foobar_shadow");
System.out.println(SQLUtils.refactor("select * from foobar where id > 1";, JdbcConstants.MYSQL, mapping));

// 输出

SELECT *
FROM foobar_shadow
WHERE id > 1

条件变更

在一些情况下对SQL的查询条件可能需要统一进行增加条件处理,对此也有直接可用的接口,

public class SQLUtils {
    public static String addCondition(String sql, String condition, String dbType);
}

System.out.println(SQLUtils.addCondition("select * from foobar", "id > 1", JdbcConstants.MYSQL));

// 输出

SELECT *
FROM foobar
WHERE id > 1
Windows 10应用安装与开发环境配置 2019-02-26 13:54:13 numerical /2019/02/26/Windows-10应用安装与开发环境配置

笔记本切换到小米Pro之后,重新开始使用Windows。因此这里来重新梳理下Windows上必备的一些软件安装以及开发环境配置。

常用软件

浏览器

Windows 10自带的Microsoft Edge、Internet Explorer在当前的网络环境下并不好用,因此还是需要另外安装浏览器。选择Firefox而不是Chome仅仅是个人偏好。

Chrome也是需要安装的,但不设置为默认浏览器,仅仅用于Firefox不兼容情况下的替代方案。

输入法

Bing输入法已经不再维护更新了,其特性可能也整合进了系统自带的微软拼音输入法,但微软拼音用着不习惯。搜狗拼音如果不是附带那么多杂七杂八的内容,也是可以考虑的。

通讯工具

除了公司内部IM之外,只安装了微信。目前也可以触及所有需要联系的人了。

阅读工具

PC主要用于PDF阅读,书以及论文等,所以Adobe Reader还是不能少的。

下载工具

延续Mac下的使用习惯,继续使用Transmission。

效率工具

Visual Studio Code基本能够满足所有简单文本编辑的需求了,插件也很丰富。日常Markdown文本编写等等都可以用它搞定

偶尔文档编写需要脑图,因此需要XMind。

影音娱乐

习惯了网易云音乐,PC上歌曲下载后易于整理,虽然部分下载变成了不方便的私有格式。

Windows上的播放器选择余地很多,同样延续Mac下的使用习惯,继续使用MPV。

开发工具

当前主要使用Java,兼用Python,因此只安装了IDEA。IDEA的插件也很丰富,其它一些功能开发的需求大都能通过插件得到满足。

cmder是Windows上的一款替代命令行工具,可以与Linux子系统进行集成,默认启动的窗口可以直接进入到Linux子系统中。

开发环境

Linux Sub System

原本切换回Windows带来的最大影响在于Windows下没有好用的命令行工具,无法便利地使用Linux/Unix Shell命令。不过Windows 10自带了Linux Sub System,开启这项功能后将能够在Windows下无缝使用Linux,因此启动这一功能是第一步。

笔记本预装的是中文版的系统,因此启动该功能的操作路径大体是,

  • 控制面板 -> 程序 -> 启用或关闭Windows功能 -> 适用于Windows的Linux子系统

开启之后需要从Windows Store下载安装适配的Linux发行版,选择了Ubuntu LTS版本。

环境配置

程序语言以及工具库的安装这里就不再列上了,反正需要什么安装什么。但因为使用了Linux子系统,一些地方还是需要注意。

对于语言来说,例如JDK、Python基本为了满足日常的需求,在Windows、Linux下都需要分别安装。但是一些可以夸平台的库就不需要存在双份,例如,

Maven用于Java的包管理,如果Windows、Linux下同时存在,缓存的本地仓库数据就会双份,同时Windows下使用IDE去编译与Linux下命令行去操作会产生差异,因此这类工具需要放在可以被共享访问的目录,两个系统使用同一份。

另外一个要点在于换行符。Windows、Linux的换行符不同,导致同一份文件在两个系统内看到的改动状态不同,因此需要设置常用的编辑工具,将换行符进行统一,统一到Linux换行符下。对于IDEA来说在命令菜单中操作,对于Visual Studio Code来说需要设置files.eol参数。

面向错误编程 2018-12-29 22:50:44 numerical /2018/12/29/面向错误编程

真正对外提供服务的代码需要关心的除了核心业务逻辑之外,就是如何处理错误了。即便是单一服务也存在着不少外部依赖,微服务之后种种依赖项就更多了。只保证核心业务逻辑实现正常是不够的,更需要细致去考虑如何应对各种错误。自己并没有想到有什么一劳永逸的办法,但认识到这一问题具备这样的意识总是没错的。

逃不开的“墨菲定律”,凡是可能出现问题的都会出现问题。如果编码的时候就隐约感觉到某部分可能出问题,那大概率一定会发生,但更多的问题是编码时候没有意识到的。如果可以无脑的按照某种思路步骤一步步去考虑应对,相对来说系统的可用性可能可以得到改善。

主动处理外部依赖异常

首先要解决外部依赖问题,不管是数据库、缓存还是下游RPC,代码调用的这些服务都可能异常或者超时。这些依赖调用地方都要做好异常处理,有网络请求的都需要设置超时时间,避免被拖垮。

外部依赖的破坏性测试

不论是手动触发真正的依赖故障,还是依托故障注入的手段。对于外部依赖处理需要通过破坏性测试去进行验证。如果已经有方便的框架流程可以自动跑当然好,没有的话也需要手动去进行验证。只有实际测试过才能够验证上面异常处理的正确性。

预埋依赖隔离开关

很多时候除了对依赖异常进行处理,还可以考虑预留开关在其出现问题的时候进行手动隔离。依赖隔离肯定是有损的,但有损肯定也比服务不可用强得多。

尽可能多的使用配置下发机制

有时候触发问题是因为一些配置项参数设置不合理,比如不合理的超时时间、不合理的重试次数等,能下发的配置最好都进行下发。这样有问题的时候可能可以通过配置变更救回。

关键路径的单元测试覆盖

单元测试不要去追求全局覆盖率,但是关键路径还是要去完善的。否则功能本身的正确性无从验证,后续代码变更就更提心吊胆了。核心功能要覆盖测试,一些兜底处理策略也要覆盖测试,否则预留的后手可能根本不能生效。

关键路径包含哪些呢?放在可用性层面的话,那就是一些开关控制逻辑,资源初始化与清理逻辑,配置变更下发响应逻辑。

性能测试

性能测试往往也是能够发现问题的。比如线程池参数设置不合理,高并发访问很容易导致线程池资源耗尽。性能测试利于发现此类问题。性能测试最好能自动化、常态化。

灰度上线验证

代码上线最好也是一步步来,时间不紧急的话,小规模灰度看效果,再逐步放开。有的时候过于乐观自信容易导致问题,还是稳妥一点比较好。

总结

写bug虽然免不了,但要把bug的影响范围控制住。在微服务场景中,除了业务功能之外,还要注意保护服务自身、避免拖累其它服务。如上这些手段策略一定程度上可以帮助实现这一目标,当然最为重要的还是编码本身了。如何写出正确的代码,那又是另外的问题了。

微服务的一些理解 2018-12-15 08:54:06 numerical /2018/12/15/微服务的一些理解

微服务这一概念之前只是简单了解没有直接体会,近距离观察一段时间之后有了一些看法。

优点

易于多团队协作

将单一服务切分到微服务之后,容易与实际中的团队组织结构相对应。以服务为粒度,可以交由不同团队进行维护,彼此之间通过RPC等通信协议进行协作。在业务内容与人员都达到一定规模的场景下,这样的切分实际能够划定不同团队之间的工作边界,使得团队之间的协作不那么耦合。

风险隔离,提升稳定性

单一系统遇到问题的时候很容易造成整体服务不可用,拆分微服务之后,很多问题只会在局部服务中出现,问题的影响范围可以得到控制,提升了系统整体的稳定性与可靠性。

便于定向优化

服务拆分之后,局部的性能问题可以有更多的手段与策略进行优化。优化的成本与风险都会相较单一服务低。堆机器这种粗暴方案也能更有针对性。

降低技术升级的心智负担

如上所说单独的服务即使遇到问题,影响的范围也相对可控,因此更利于进行一些技术升级改造。当然,实际上的优化升级往往都是很漫长的过程,但那更多是愿不愿意改的问题,而不是敢不敢改的问题。

降低成本

当系统的复杂性达到一定量级之后,微服务化不仅仅从资源层面可以让硬件资源针对部分服务进行适配,提升资源利用率。另一方面也可以降低系统维护复杂度,微服务自身的复杂度与系统整体的复杂度相比减轻了很多,对于维护的人员来说,可以筛减掉很多无需关心的内容。

缺点

问题排查变得复杂

原先可以在一处解决的问题,变成了一系列RPC调用。原本可能就是代码逻辑问题,现在则可能是RPC下游的服务可用性问题,RPC调用的网络问题。在问题排查定位上,需要关注的因素会更多。

服务运行环境变得复杂

单一服务很容易进行部署,微服务之后服务之间的依赖关系使得微服务的部署变得复杂。不同环境、不同版本所需部署的微服务都有各自的需求。

沟通协调增加

微服务之间的依赖关系,使得在某些服务升级变更的时候需要沟通上下游,增加了沟通协调成本。

响应时间增加

不同服务之间的RPC调用肯定比本地的函数调用慢,网络调用开销的增加是不可避免的。如果服务链路过长,那么受到的影响就更明显了。

关键点

服务切分合理性

微服务的关键问题是如何进行划分,如何控制服务的粒度。服务切分得不好会带来后续一系列问题。如何切分这件事情和业务场景相关,也和实际人员的经验能力相关。

服务依赖可视化

微服务之间的依赖关系需要以简明直接的方式进行展现,这样有利于在服务切分不合理的时候第一时间发现。

调用链路追踪

微服务之间的调用链路一定需要进行追踪,否则问题排查是无从进行的。

监控预警

当微服务数量达到一定量级,局部出现问题总是无可避免的,这种情况下完善的监控预警就显得尤为重要了。否则在遇到问题时容易抓瞎。

代码框架与公共服务统一

微服务化之后也会继续出现新的业务逻辑与场景,这就意味着会继续微服务化。那么代码层面的一致性与底层框架公共服务的可用性、易用性就变得很重要了。

程序语言的统一

理论上微服务之后服务之间只要保证协议即可,使用什么语言都可以。但实际上需要进行控制,多语言的成本太高,完全没有必要。

RocketMQ os.sh参数说明 2018-09-02 22:15:11 numerical /2018/09/02/RocketMQ-ossh参数说明

RocketMQ在消息读写上的处理是其性能的关键,重度依赖底层操作系统的特性。os.sh中提供了一组默认的操作系统参数配置,这里一条条来分析下。

命令

sudo sysctl -w vm.extra_free_kbytes=2000000

这个参数应该也是用来控制空闲内存大小的。但是很多发行版没有这个参数,一般根据部署环境看是否支持,不支持的话不进行设置应该也没关系。

sudo sysctl -w vm.min_free_kbytes=1000000

设置系统需要保留的最小内存大小。当系统内存小于该数值时,则不再进行内存分配。这个命令默认被注释掉,根据文档看,如果设置了不恰当的值,比如比实际内存大,则系统可能直接就会崩溃掉。实际数值应该根据RocketMQ部署机器的内存进行计算,经验数值大概是机器内存的5% - 10%。

这个数值设置的过高,则内存浪费。若设置的过低,那么在内存消耗将近时,RocketMQ的Page Cache写入操作可能会很慢,导致服务不可用。

sudo sysctl -w vm.overcommit_memory=1

控制是否允许内存overcommit。设为1,则是允许。当应用申请内存时,系统都会认为存在足够的内存,准许申请。

sudo sysctl -w vm.drop_caches=1

设置为1会释放page cache。这个操作是一次性的,在执行该命令时释放page cache,没有持续性作用。

sudo sysctl -w vm.zone_reclaim_mode=0

该配置用于控制zone内存使用完之后的处理策略,设为0则禁止zone reclaim。对于大量使用缓存的应用来说,一般都需要禁止掉。

sudo sysctl -w vm.max_map_count=655360

Rocket使用MMAP映射文件到内存,因此需要设置映射文件的数量,避免MMAP操作失败。

sudo sysctl -w vm.dirty_background_ratio=50

设置内存中的脏数据占比,当超过该值时,内核后台线程将数据脏数据刷入磁盘。

sudo sysctl -w vm.dirty_ratio=50

设置内存的脏数据占比,当超过该至时,应用进程会主动将数据刷入磁盘。

sudo sysctl -w vm.dirty_writeback_centisecs=360000

内核线程会定期启动将内存中的旧数据刷入磁盘,通过此参数可以控制启动间隔。

sudo sysctl -w vm.page-cluster=3

设置一次读操作会加载几个数据页。

sudo sysctl -w vm.swappiness=1

vm.swappiness用于控制使用系统swap空间比例,一般设置为0是禁止使用swap,设为1估计是某些系统不支持设置为0.

RocketMQ的数据存储基于MMAP,大量使用内存,因此需要禁止swap,否则性能会急剧下降。

echo 'ulimit -n 655350' >> /etc/profile

设置可以打开的文件符号数。RocketMQ的存储都是基于文件的,因此稍稍设大默认值。

echo '* hard nofile 655350' >> /etc/security/limits.conf

设置一个进程能够打开的文件数。

echo 'deadline' > /sys/block/${DISK}/queue/scheduler

RocketMQ使用MMAP映射文件到内存,在存在新消息的时候都是追加顺序写,投递消息的时候则是根据Offset从CommitLog进行随机读取,Deadline调度方法会在调度时间内合并随机读为书序读,因此对RocketMQ性能有帮助。

总结

RocketMQ的存储模型以来在MMAP纸上,因此os.sh大部分参数控制都与内存管理相关。这些参数对Broker的性能有明显影响。默认提供的数值可以作为一个参考,实际还是可以根据这些参数的定义与实际机器配置来测试得到更优的配置。

参考

RocketMQ 延时消息实现 2018-09-02 13:35:47 numerical /2018/09/02/RocketMQ-延时消息实现

RocketMQ提供了延时消息实现,不过这个延时是一定级别的延迟,默认在MessageStoreConfig.messageDelayLevel定义,

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

这里来看下RocketMQ实现这一功能的相关实现。

消息存储

延时消息的存储过程与一般消息存在差异,在CommitLog.putMessage中对其做了特殊处理。

设置了Delay Level的消息,在存盘之前Broker会修改Topic为延时Topic,SCHEDULE_TOPIC_XXXX,同时备份原Topic信息到消息属性当中。而后则按正常消息存储流程进行处理。不同的Delay Level会对应到不同的MessageQueue中。

消息投递

因为修改了Topic,所以数据存盘之后Consumer并不能消费到消息。在消费之前需要将消息重新投递回初始的Topic,再经由正常消费逻辑进行处理。

重新投递这一逻辑在ScheduleMessageService中实现。从代码来看,延时的处理策略不复杂。

ScheduleMessageService中通过一个Timer来延时触发投递消息检查。每一个Delay Level对应一个DeliverDelayedMessageTimerTask。当对应Task被触发时会去检查当前Delay Level对应的MessageQueue中待处理的消息时间是否到达,如果没有满足条件则再次定时检查,如果延时时间满足,则恢复Message原Topic,并通过DefaultMessageStore.putMessage重新进入正常消息处理流程。

定时消息

RocketMQ没有提供定时消息功能,如何在MQ中实现支持任意延迟的消息对这个问题进行了分析讨论,个人觉得描述的比较清楚。

如果后续考虑在RocketMQ上实现定时消息功能的话,可以按照类似的思路进行实施。

RocketMQ 消息消费失败的处理策略 2018-09-02 10:24:39 numerical /2018/09/02/RocketMQ-消息消费失败的处理策略

Consumer从Broker拉取到消息之后进行消费,但是消费并不一定都是顺利的,不可避免会遇到一些异常情况,这种情况下RocketMQ提供了怎样的处理机制?以PushConsumer为例来看下与之相关的操作。

Consumer消费异常处理流程

在Consumer使用的时候需要注册MessageListener,对于PushConsumer来说需要注册MessageListenerConcurrently,其中消费消息的接口会返回处理状态,分别是,

  • ConsumeConcurrentlyStatus.CONSUME_SUCCESS,消费成功
  • ConsumeConcurrentlyStatus.RECONSUME_LATER,推迟消费

MessageListener是在ConsumeMessageConcurrentlyService中被调用的,可以看到上述两个状态会分别映射到CMResult定义的枚举值,

  • CMResult.CR_SUCCESS,消费成功
  • CMResult.CR_LATER,推迟消费
  • CMResult.CR_ROLLBACK,事务消息回滚
  • CMResult.CR_COMMIT,事务消息投递
  • CMResult.CR_THROW_EXCEPTION,消费过程异常
  • CMResult.CR_RETURN_NULL,消费结果状态为null

消息消费的结果会在ConsumeMessageConcurrentlyService.processConsumeResult中进行处理。

从代码看返回ConsumeConcurrentlyStatus.RECONSUME_LATER状态之后的处理策略是将该组消息发送回Broker,等待后续消息。发送回的消息会设置重试Topic,重试Topic命名为:"%RETRY%" + Consumer组名。原先实际的Topic会暂存到消息属性当中,以及设置delayLevel以及reconsumeTimes。

Consumer消费的时候可以设置consumeMessageBatchMaxSize来控制传入MessageLisenter的消息数量,这里的失败处理策略是,其中只要有一条消息消费失败就认为全部失败,这一批消息都会发送回Broker。因此consumeMessageBatchMaxSize这个值的设置需要注意,否则容易出现消息重复消费问题。

Broker消费失败消息处理流程

Broker端对应的处理位于SendMessageProcessor.consumerSendMsgBack方法中。对于Consumer发送失败返回的消息,Broker会将其放入重试Topic中。

重试消息的重新投递逻辑与延迟消息一致,等待delayLevel对应的延时一到,Broker会尝试重新进行投递处理。

DelayLevel对应的延时级别是固定的,RocketMQ对应的配置为MessageStoreConfig.messageDelayLevel,默认的级别为,

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

鉴于RocketMQ的实现机制,可以去调整每一个级别对应的时间,但可以看出一是时间精度不够细,二是级别为固定级别。

自定义消费失败处理

Consumer对消费失败可以进行一定程度的介入,默认失败后的级别设置为msg.getReconsumeTimes() + 1,如果Consumer明确知道未来可以成功消费的时间,那么就可以主动去设置重试次数与重试级别来进行控制。

顺序消息非顺序消息的差异

在消息失败处理上,顺序消息与非顺序消息是有明显差异的。对于顺序消息来说,如果消费失败后将其延迟消费,那么顺序性实际就被破坏掉了。

所以顺序消息消费失败的话,消息消费不会再推进,知道失败的消息消费成功为止。

死信

RocketMQ中的消息无法无限次重新消费,当然了,手动修改重试次数是可以的,不介入的话不行。当重试次数超过所有延迟级别之后。消息会进入死信,死信Topic的命名为:%DLQ% + Consumer组名。

进入死信之后的消息肯定不会再投递了,不过可以通过接口去查询当前RocketMQ中私信队列的消息。如果在上层实现自有命令,那么可以将消息从死信中移出并重新投递。

RocketMQ 顺序消息实现 2018-08-29 22:17:27 numerical /2018/08/29/RocketMQ-顺序消息实现

RocketMQ提供了“顺序消息”功能。在分析其用法与实现之前,需要先了解下这个功能本身的定义与约束。

功能定义

支持顺序消息即代表消息队列支持按照消息的顺序投递给消费方。RocketMQ是怎么来支持这种顺序性的?

RocketMQ中顺序消息的顺序定义为到达Broker的先后顺序,而不是消息自身的逻辑顺序。因此需要Producer将顺序相关的消息投递到同一个MessageQueue中,这样通过RocketMQ默认的消息处理与存储机制来保证消息投递的顺序性。

因此,使用者需要注意发送顺序消息需要有明确的先后顺序。如果同一个Producer发送顺序消息,那么自然发送顺序回合消息到达Broker顺序一致,如果是不同机器上的多个Producer发送,如果时间很临近,那么逻辑上的顺序性是不能够保证的。

使用方法

了解基本定义与约束之后,直接看下作为调用方通过怎样的接口来使用顺序消息,

发送顺序消息

前面说了顺序消息需要发送到同一个MessageQueue中,因此Producer在发送消息的时候需要指定MessageQueue,一般会选择通过实现MessageQueueSelector来进行分发,

String topic = "Topic";
DefaultMQProducer producer = new DefaultMQProducer("Producer");
producer.start();
for (int i = 0; i < 10; i++) {
    Message msg = new Message(topic, String.format("msg %d", i).getBytes());
    producer.send(msg, new MessageQueueSelector() {
        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg) {
            return list.get(0);
        }        
    }, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
        }

        @Override
        public void onException(Throwable throwable) {
        }
    });
}

消费消息

顺序消息的消费方也需要一定处理,通过注册MessageListenerConcurrently的实现来消费顺序消息,

String topic = "Topic";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe(topic, "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt msg : list) {
            System.out.println(msg);
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

内部实现

Producer的相关逻辑很简单,在发送消息之前通过MessageQueueSelector的接口来选择MessageQueue,而后再进行正常的消息发送流程就可以了,简单的调用流程为,

DefaultMQProducer.send
DefaultMQProducerImpl.send
DefaultMQProducerImpl.sendSelectImpl
DefaultMQProducerImpl.sendKernelImpl

相对复杂的处理在Consumer端。对于普通消息与顺序消息,Consumer分别通过ConsumeMessageConcurrentlyService、ConsumeMessageOrderlyService来进行处理。

先来推测下,因为存在多个Consumer,不同Consumer之间消费消息的顺序如何进行保证呢?

BROADCASTING模式

广播模式下,每一条消息都会发送给所有Consumer,因此Consumer之间不需要进行协调。每个Consumer按顺序消费消息就可以了。

CLUSTERING模式

如果是CLUSTERING模式,一个消息只会被投递到一个Consumer,怎么实现?

从ConsumeMessageOrderlyService代码来看,它会定期向Broker发送lockBatchMQ操作,尝试对MessageQueue进行加锁。而后每次消费消息的时候创建ConsumeRequest进行异步消费,开始消费之前先判定本地是否获取了ProcessQueue的锁,

this.processQueue.isLocked() && !this.processQueue.isLockExpired())

ConsumeMessageOrderlyService中通过定时任务不断去尝试进行加锁,如果Broker返回成功,则设置本地ProcessQueue的相应状态。Broker端也会记录锁状态,锁存在一个超时机制,默认超时为RebalanceLocalManager.REBALANCE_LOCK_MAX_LIVE_TIME

协调完不同Consumer之后,Consumer内部是通过多线程去进行消费的,因此ConsumeMessage的处理中会对MessageQueue进行锁粒度控制,不同MessageQueue并行消费。

总结

RocketMQ的顺序消息实现的并不复杂,但提供的这个特性有一定约束前提在。此外通过ConsumeMessageOrderlyService可以发现,为了保证消费有序,引入了一个分布式锁的概念实现,顺序消息的响应吞吐比普通消息会差很多。

RocketMQ Consumer注意点 2018-08-28 23:15:01 numerical /2018/08/28/RocketMQ-Consumer注意点

RocketMQ中提供了DefaultMQPushConsumer、DefaultMQPullConsumer,这里主要分析下DefaultMQPushConsumer实现中一些需要注意的地方。

消息获取

DefaultMQPushConsumer对外层暴露的接口像是推送模式,但实际阅读代码就会发现,其消息获取逻辑也还是由Consumer向Broker去进行拉取而获得的。

PullMessageService是用于拉取消息的独立线程,在Consumer启动时该线程会启动。PullMessageService实例位于MQClientInstance中,一个进程中的多个Consumer Group会共享同一个PullMessageService。

PullMessageService会一直发起拉取消息请求,如果顺利拉取到消息,将会将消息存放到ProcessQueue中等待消费。不过预先拉取的消息存在一定限制,

  • DefaultMQPushConsumer.pullThresholdForQueue,限制预取的消息条数,默认1000条
  • DefaultMQPushConsumer.pullThresholdSizeForQueue,限制预取的消息大小,默认100MB

预取缓存在本地是基于MessageQueue进行区分的,因此同一个Consumer订阅不同Topic之间不会产生互相影响。

均衡处理

RocketMQ Consumer实现中包含了对消息队列消费的负载均衡处理。RebalancePushImpl、RebalancePullImpl分别对应Push、Pull下的均衡处理。

RebalanceService是Consumer中的一个独立线程,按照固定等待间隔,持续触发RebalanceImpl的doRebalance接口,进行均衡处理。

具体均衡逻辑操作在ReplaceImpl.rebalanceByTopic中,均衡的处理也是基于MessageQueue。均衡的目的在于将Topic下的MessageQueue按照策略分配给不同的Consumer实例进行消费处理。

RocketMQ中提供的MessageQueue分配策略实现有,

  • AllocateMachineRoomNearby,根据机房进行分配
  • AllocateMessageQueueAveragely,均分哈希策略
  • AllocateMessageQueueAveragelyByCircle,环形哈希策略
  • AllocateMessageQueueByConfig,根据配置进行分配
  • AllocateMessageQueueByMachineRoom,机房哈希
  • AllocateMessageQueueConsistentHash,一致性哈希

默认的策略是AllocateMessageQueueAveragely。

从这个均衡处理可以看到一个MessageQueue只会有一个Consumer进行消费。因此在实际运行环境中,Topic下面MessageQueue的数量值得关注,MessageQueue数量少而Consumer多的话,很多Consumer会消费不到消息,造成性能损失。

OffsetStore

OffsetStore这个概念用于表达Consumer消费位点的存储。不同消费模式下游不同的OffsetStore实现,

  • LocalFileOffsetStore,存储在本地,对应BROADCASTING消费模式
  • RemoteBrokerOffsetStore,存储在Broker,对应CLUSTERING消费模式

CLUSTERING模式下一条消息只会发送给一个Consumer,因此记录在Broker。BROADCASTING则是发送给所有Consumer,因此记录在各Consumer端。BROADCASTING模式需要注意一些,因为本地增加了一个有状态的记录文件,如果文件丢失则会再重新消费。

RocketMQ Consumer使用方法 2018-08-27 22:23:20 numerical /2018/08/27/RocketMQ-Consumer使用方法

Consumer也是RocketMQ中的很重要一环,RocketMQ提供了Consumer相关接口,Consumer在上层系统调用。RocketMQ的一些功能特性需要Consumer配合才能真正有效,因此Consumer的使用方法是很有必要去了解的。

消费模式

在使用层面上,RocketMQ提供了Push/Pull两种方式让Consumer消费数据。

DefaultMQPushConsumer

先来看下Push方式下的Consumer使用,RocketMQ提供了DefaultPushConsumer,实现,

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SampleGroup");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("SampleTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

在Push模式下,调用方注册MessageListener,消息的消费逻辑在MessageListner的接口中被异步调用。

DefaultMQPullConsumer

RocketMQ中的DefaultMQPullConsumer是默认的Pull方式实现,

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("SampleGroup");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.start();

Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("SampleToic");
for (MessageQueue mq : mqs) {
    while (true) {
        try {
            long offset = consumer.fetchConsumeOffset(mq, false);
            PullResult pullResult = consumer.pull(mq, "*", offset, MAX_PULL_NUM);
            if (pullResult.getPullStatus() != PullStatus.FOUND) {
                break;
            }
            for (MessageExt msg : pullResult.getMsgFoundList()) {
                System.out.println("Msg pulled");
            }
            consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
            consumer.getDefaultMQPullConsumerImpl().updateConsumeOffsetToBroker(mq, pullResult.getNextBeginOffset(), true);
        } catch (Exception e) {
            logger.error("handlePull", e);
            break;
        }
    }
}

单纯从上面的使用方式来看,Pull方式用起来略微繁琐一些。两者的差异有,

  • Push方式下消息消费逻辑时被异步调用的,Pull方式下是在调用方线程直接调用
  • Push方式消息消费有专门的线程池进行处理,处理效率更高
  • Push方式下消费进度信息自动进行了记录,Pull方式下需要手动调用相关接口更新消费偏移
  • Push方式不能细粒度控制,Pull方式可以自主选择拉取消息的时机以及处理方式
  • Push方式下消息消费失败重试走默认流程,Pull方式可以根据需求主动控制是重新拉取、等待还是放弃

各项参数

消费模式

当存在多个Consumer时,消息消费还可以通过设置消费模式,RocketMQ默认为CLUSTERING模式,

// Consumer
consumer.setMessageModel(MessageModel.BROADCASTING);
定义描述
MessageModel.BROADCASTING广播模式, 一个ConsumerGroup的每一个Consumer都会收到消息
MessageModel.CLUSTERING集群模式, 一个ConsumerGroup下只有一个订阅者会收到消息

消费起点

上面样例代码中需要注意setConsumeFromWhere这个调用,这个方法设置了Consumer从个消息开始消费,ConsumeFramWhere定义了支持的几种方式,

public enum ConsumeFromWhere {
    CONSUME_FROM_LAST_OFFSET,
    CONSUME_FROM_FIRST_OFFSET,
    CONSUME_FROM_TIMESTAMP,
} 
定义描述
CONSUME_FROM_LAST_OFFSET只消费Consumer启动之后新来的消息
CONSUME_FROM_FIRST_OFFSET消费当前已经存在的所有消息
CONSUME_FROM_TIMESTAMP消费某个时间之后的消息

顺序消费/并行消费

Consumer在消费消息的时候需要通过registerMessageListener设置消息处理函数,存在两种方式,

定义描述
MessageListenerOrderly顺序消费消息
MessageListenerConcurrently并行消费消息

非顺序消息的话都应该使用MessageListenerConcurrently。顺序消息只能使用MessageListenerOrderly,如果顺序消息下用错了MessageListener,实际消费到的消息是没有顺序可言的。

消息过滤

Consumer通过subscribe方法订阅消息topic,subscribe订阅时可以进行过滤,存在几种过滤定义方式,

根据表达式过滤tag,表达式支持||操作,例如tag1 || tag2 || tag3,以及*

public class DefaultMQPushConsumer {
    public void subscribe(String topic, String subExpression) throws MQClientException {...}
}

提供MessageFilter的实现类进行过滤,

public class DefaultMQPushConsumer {
    public void subscribe(String topic, String fullClassName, String filterClassSource) throws MQClientException {...}
}

通过MessageSelector进行过滤,MessageSelector支持两种表达式,一是SQL92,另外则是tag过滤,

public class DefaultMQPushConsumer {
    public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {...}
}

如无特别复杂需求的话,使用Tag过滤最好,ConsumeQueue的条目中记录了Tag的Hash值,可以更高效的进行过滤。

RocketMQ CommitLog刷盘机制 2018-08-25 17:50:35 numerical /2018/08/25/RocketMQ-CommitLog刷盘机制

CommitLog的存储结构之前已经分析了,那么就具体来看下Broker是怎么将数据落地写入到磁盘的。

线程模型

在消息写入上,Broker的实现可以认为是“并行处理、单一写入”。

Broker与Producer之间通过Netty维持长连接。Producer发送过来的消息在经过处理之后会交由BrokerController.sendMessageExecutor线程组进行处理。不过在最终写入CommitLog内存数据的时候进行了一步加锁处理。在写入数据之前,需要申请CommitLog.putMessageLock锁。内存中CommitLog写入完成后会释放锁,之后的步骤也是并发处理的。

从代码上可以看到,消息在进入Broker之后经过的调用流程不长,不考虑数据持久化的情况下,在性能上影响最大的就是加锁那一步的处理。

同步刷盘/异步刷盘

通过MessageStoreConfig.flushDiskType可以配置Broker刷盘方式。存在两种刷盘方式,同步刷盘与异步刷盘。

CommitLog.putMessage结束的最后调用CommitLog.handleDiskFlush方法,此方法中会根据刷盘策略,分别进行同步、异步处理。实际代码逻辑中,同步、异步方式,写入文件都是在独立的FlushCommitLogService中实现的。两者的差别主要在同步模式下,原线程进行了等待处理,异步模式下,原线程只是进行通知而不进行等待。

MappedFile

Broker收到的消息都是存储在CommitLog中,CommitLog会对应一个个独立的数据文件。I/O操作一般都是最为耗时的,RocketMQ为了提升I/O效率,使用了NIO中提供的FileChannel.map方法,将文件以mmap形式映射到内存中。

Broker收到消息之后会将消息写入MappedFile的mappedByteBuffer当中,而后再由刷盘实现将内存中变更的数据刷入磁盘当中。在异步刷盘模式下,因为刷盘操作位于独立线程,因此可以获得很好的性能。

TransientStorePool

与数据存储相关的还有TransientStorePool这一实现。通过MessageStoreConfig.transientStorePoolEnable可以进行配置,不过只在异步刷盘模式下生效。

TransientStorePool与MappedFile在数据处理上的差异在什么地方呢?分析其代码,TransientStorePool会通过ByteBuffer.allocateDirect调用直接申请对外内存,消息数据在写入内存的时候是写入预申请的内存中。在异步刷盘的时候,再由刷盘线程将这些内存中的修改写入文件。

那么与直接使用MappedByteBuffer相比差别在什么地方呢?修改MappedByteBuffer实际会将数据写入文件对应的Page Cache中,而TransientStorePool方案下写入的则为纯粹的内存。因此在消息写入操作上会更快,因此能更少的占用CommitLog.putMessageLock锁,从而能够提升消息处理量。使用TransientStorePool方案的缺陷主要在于在异常崩溃的情况下回丢失更多的消息。

方案选择

同步/异步方式下,能够明显感知到的是Broker的处理性能。同步的优点则是能够保证不丢消息。所以同步/异步的选择就是在性能与消息可靠性上做选择。TransientStorePool能够进一步提升性能,但也增加了异常情况下的消息丢失数量。所以具体根据需求场景去进行选择吧。

通常情况下,消息队列本身就没有被看做是一个特别稳定的消息源,因此异步的选择更为常见。

RocketMQ Broker数据处理 2018-08-25 12:58:46 numerical /2018/08/25/RocketMQ-Broker数据处理

Producer发送的消息会进入Broker,在Broker正确处理完消息之后,Consumer才能够获取到消息进行消费。RocketMQ消息处理能力的关键在于Broker相关逻辑的实现。如果能够理清楚Broker的内在细节,那么可以说是真正掌握了RocketMQ的用法。这里尝试对Broker进行分析吧。

Broker结构

Broker的结构从整体上来看其实与Name Server差异不大,基本都是Startup + Controller形式,Controller中再封装大量成员组件,具体逻辑由这些成员组件来进行完成。Broker与Producer/Consumer/NameServer之间都是通过Netty维持长连接。

Broker的整体工作流程比较简单,Broker收到来自Producer的消息,将消息持久化,而后对其创建索引与ConsumeQueue。Consumer发送数据请求到Broker拉取数据。Broker定期注册自身信息到Name Server,同时接受Name Server发送过来的一些请求命令。

Broker数据

Broker维护核心数据都是和消息直接相关,分别是CommitLog、ConsumeQueue、IndexFile,这三个数据存储流程大致可以简化为,

CommitLog

数据写入CommitLog是第一步,CommitLog写入之后通过dispatch形式去写入ConsumeQueue与IndexFile。CommitLog写入能力实际就是Broker处理消息的能力。

消息进入CommitLog的路径很明确,考虑正常消息情况,Netty层收到RequestCode.SEND_MESSAGE、RequestCode.SEND_MESSAGE_V2之后会将其交由SendMessageProcessor处理,会经由DefaultMessageStore.putMessage方法到达CommitLog.putMessage。消息格式持久化写入逻辑在DefaultAppendMessageCallback.doAppend中。这个方法中会将Message对象转化成固定格式的Byte二进制形式进行存储。消息存储格式,

RocketMQ中,不同Topic的消息都会写入同一个CommitLog文件中,CommitLog存放路径位于MessageStoreConfig.storePathCommitLog中。单个CommitLog文件大小限制为MessageStoreConfig.mapedFileSizeCommitLog。

ConsumeQueue

ConsumeQueue对应MessageQueue,记录了每一个Queue中每一条消息在CommitLog中的存储偏移,Consumer能够消费到的消息一定是写入ConsumeQueue之后的消息。其每一条记录的存储格式为,

ConsumeQueue在持久化机制上与CommitLog类似,其路径大小也在MessageStoreConfig中进行配置。

IndexFile

在消息发送消费逻辑下IndexFile倒不是必需品,IndexFile顾名思义就是索引文件,这个索引是针对消息中的Keys进行构建的。其整体结构概念类似文件化了的哈希表,查询添加方式类似哈希表的连链地址法。IndexFile的作用就是给定Topic以及Key查询消息。主要应用场景也是在数据统计以及问题排查上。

单个IndexFile的大小是固定的,其计算方式为,

IndexHeader.INDEX_HEADER_SIZE + (MessageStoreConfig.maxHashSlotNum * 4) + (MessageStoreConfig.maxIndexNum * 20)

其结构为,

总结

CommitLog、ConsumeQueue、IndexFile是Broke人中维护的主要数据,这里暂时只是介绍了其格式与大体用途,后续会再来具体分析其写入过程逻辑。

Broker中还有什么其它数据吗?其实可以去Broker存储目录下去观察,RocketMQ的数据存储都是基于文件的,所以可以直接通过文件系统上存在的内容来进行了解。