从rocketmq到kafka

在基于了解或掌握其他同类MQ的基础知识上,怎么比较快速的掌握kafka的核心设计,确保在使用的过程中做到心中有数,做到知其然并知其所以然?本篇文章主要是笔者在已有的rmq的基础上学习kafka的思路以及过程的总结。

ps、rmq指RocketMQ

ps、文章写着写着发现有点长,应该挺乱了……

ps、因为是学习笔记,所以就这样吧,随便看看……

疑问

带着问题去学习新的技能,也许会更贴近自己原有的知识储备,也能更好的把新知识纳入自己原有的知识体系并加以补充或者延展,形成更完整的知识脉络。基于原有的rmq的知识体系,在提前梳理了几个相关的,并且浅显的问题,主要是两个方面的内容,一类是MQ模型中生产者客户端的设计与消费者客户端的设计,一类是服务端的总体架构设计。

服务端的总体架构设计

  • Kafka的部署架构模型
  • 日志文件如何设计,如何支持高并发的读写
  • 集群中元数据的同步,以及分区,副本,isr,asr等核心概念

客户端的总体架构设计

  • 生产者客户端的架构设计,同步、异步、重试,顺序等等,如何做负载均衡,流控,
  • 消费者客户端的架构设计,消费者如何做负载均衡,如何做重平衡,如何支持重消费,延时消费,死信队列等(不一定都有这些功能)

服务端

请弱化,甚至忘记kafka中broker的概念。

相信很多人不管是在面试中,还是在做MQ选型时,都会遇到几个问题,比如Kafka在超过2k的topic时性能会急剧下降,但是rmq在超过2k的topic时性能不存大规模下降,比如Kafka是一个分布式消息队列中间件,而rmq更像一个单机版消息队列中间件等。这些问题的背后,正是两个消息中间件在架构设计上的差异性所导致的,各有优劣势,我们更多的关注设计思路。先看这几个问题。

为什么topic多了,kafka的性能会大规模下降?

在rmq服务端写入时,完全是基于commit log 做log append,避免了磁盘的随机读写,再配合零拷贝等技术特性,成为了MQ的高并发利器。而由于RMQ的全量日志都维护在commit log,这也是其余kafka的一个架构设计上的区别。相信初步了解过kafka的同学,都应该知道其设计理念中关于分区与副本的概念,一个topic在集群中存在多个分区,一个分区在集群中存在多个副本,不同的topic之间分区是互不关联的,当单机维护超过2k的topic时,意味着单机存在2k多个分区,即便topic内日志采用log append,那么在高并发写入刷盘时,磁头在这些分区的副本文件上移来移去,性能自然会随之下降,看起来像是‘随机读写’。

kafka是一个分布式消息队列,而rmq则更像是一个单机版消息队列?

这个说法是在一个中间件爱好群里看到大家在讨论时聊到的,感觉相当有意思,这种看法背后又是怎么的逻辑呢?首先,网上能找到的二者大量的对比都是基于单机的对比,集群对比很少。从分区+副本的思路来看,kafka的部署架构看起来是多个broker组成集群,但是内部运转逻辑是分区维度的多副本间高可用,即topic在多个broker之间做高可用的保证,而副本间的运转逻辑是基于zookeeper的ZAB机制。反观rmq最开始的架构确实主从架构,看起来更简单,但是可用性的保证上完全不一样,由于所有的topic都在主节点上,主节点挂了整个集群就运转不下去了,因为只有主可以支持写,所以rmq推荐使用双主架构,后来才引入raft协议支持选举,但依旧是基于broker的选举。二者最大的区别在于,集群中某个节点挂机对于整个集群的影响程度不同,毫无疑问,rmq显得更重。同样的多节点集群中,每个kafka broker都在提供读写能力,因为不同的topic的副本散落在各个broker中,而每个topic的leader副本也会分散在整个集群中,而rmq则不同,所以理论上kafka集群能提供的吞吐量应该会比rmq更高。

从前两个问题,提到了几个很核心的概念,包括分区,副本,而这也是kafka最核心设计内容。kafka的分区这个设计很有意思(关于kafka分区),kafka的集群是一个整体,对于topic而言,分区个数相当于多少个可读写节点,一个分区下存在多个副本组成一个分布式可选主的‘集群’。

从rocketmq到kafka
image.png

如上图,在一个kafka集群中,部署了三个服务端节点,在topic-a创建时,创建了2个分区,3个副本,在这个部署下,提供读写能力的只有broker1节点上1分区的副本,broker2节点上2分区的副本。对于部署节点broker而言并无主次之分,分区与分区间相互独立,分区内副本间组成集群为topic-a : partation-1提供服务。topic 与 分区 可以看做是逻辑概念,副本为物理概念。所以,前文提到弱化broker的概念就在于,它是基于分区提供服务,这个与rmq的设定完全不同,也许是先入为主的关系,又或者在rmq架构中broker的设定更像是mysql的主从设定,rmq的broker理解起来更简单。

那么什么是isr, asr? 在说这个之前,先说说对于一条消息而言,kafka理论上应该如何在兼顾一定性能的情况下获取更高的可靠性?请求写入分区1的leader副本,就能保证数据一定不丢失吗?如果此时leader节点宕机发生选举,由于follower节点还没同步leader数据,那是不是一段时间内的数据就丢失了呢?那为了更高的可靠性,是不是可以选择等所有副本都同步到当前消息才算本次写入成功?follower节点的数据时从leader节点复制而来(此处会抽象一个很常见的水位高低的概念,但是还没详细了解,暂时忽略),那如果follower节点的数据跟leader节点的数据很接近的话,那么复制会很快完成,但是如果某个follower节点的数据落后leader的节点很多,等待完全同步需要更长的时间,毫无疑问将会引发灾难性的结果。那么,有没有一种相对均衡,可接受的方案,比如只等待落后leader节点数据量较低的follower节点成功复制就算成功?技术方案的选择往往都是取舍,特别是多副本间的数据一致性的问题。

isr集合,俗称副本同步集。kafka并非是根据副本间数据复制的偏移量来计算集合,而是根据数据同步的时间间隔(参数为**[replica.lag.time.max.ms](http://replica.lag.time.max.ms/)**,默认为10s),将相同分区中leader与follower之间同步消息的时间间隔不超过设置的阔值的副本放入isr集合,而asr则表示所有副本集合。

从rocketmq到kafka
image.png

有了isr集合,那么副本数据间的一定程度的一致性,可以转为只要写入isr成功就算成功,但是就算这样就可以了吗?如果leader副本宕机了集群要重新选举,选出了一个落后的follower副本,那数据还是照样丢了,kafka是不是要确保一个非isr集群的副本不能参加选举?其次,如果isr长期只有一个节点,那是不是风险依旧很高?鉴于这些问题,另外提供两个参数以供解决(参数unclean.leader.election.enable, 设置为false表示非isr节点不参与选举,参考文章。参数min.insync.replicas,最小同步副本个数,既isr集合大小,推荐设置asr -1 ,具体视副本数大小设定 ),推荐查阅的isr文章。在兼顾kafka的性能与可靠性间取舍,通过生产者端的acks参数来设定。

在聊kafka的isr集合时,让笔者想到rmq的几个参数,客户端的同步/异步发送,服务端的同步刷盘,异步刷盘,同步复制,异步复制,由于后期rmq基于raft协议做集群的选举,并不知道是否还有其他的副本间数据一致性的方案,并且也有数年未翻过rmq的源码,细节了解不多,所以不好下定论,但是从颗粒度的设计上无疑rmq会更粗糙一些,理解上也更加简单,但是,个人认为kafka的设计相对更高级,在生产环境中使用会更加放心。

以上大致也谈了kafka集群的部署,以及isr合集,分区副本等概念,还未谈及kafka服务端的日志的设计,从网上大致了解了一番,感觉跟rmq的设计有相似之处(rmq的设计理念很多也是借鉴了kafka),这块内容待后续补充吧。

生产者客户端

不得不说,kafka的生产者客户端设计相当精致。(还未翻阅消费者可短的设计,或许还有惊喜)

前文带着问题来学习时提到客户端中生产者我们需要关注的内容,包括同步异步,负载均衡,流控等的内容。生产者的本质是什么,无疑是如何将消息正确的投递到服务端。由于目前笔者使用的是Java,所以直接翻阅了Java语言实现的客户端程序(spring-kafka-2.1.7, kafka-client-1.0.1),其他语言实现的客户端程序可能存在一定差异。先提供几篇学习资料的传送门:初识kafka producer、Kafka消息发送流程、Sender线程详解

先了解几个比较核心的接口

Partitioner // 分区器,默认的分区器实现:DefaultPartitioner,用于负载均衡,没有指定key的情况下默认轮训算法
ProducerInterceptors // 钩子函数,主要提供onSend,onAcknowledgement两个方法,大致上用于处理消息发送前发送后的钩子函数,但是更详细的用法请参考注释
Callback // 请求完成时的回调函数,主要用于同步控制,但是更详细的用法请参考注释
KafkaProducer // 生产者客户端
TopicPartition // topic分区
RecordAccumulator // 消息累加器,kafka 批量发送
    Deque // 默认是ArragDeque, 双端队列。消息累加器中核心的数据结构,元素为ProducerBatch。
    ProducerBatch // 消息批次,一次发给服务端的批次消息
    MemoryRecordsBuilder // 消息批次存储器
Sender // 发送器
KafkaClient // kafka 客户端,默认实现NetworkClient,提供了异步的请求响应的网络IO

不得不提的几个参数

acks 重点参数,默认为1
    0表示不关注服务端真实结果,安全性极低,性能最高
    1 表示leader写入成功即可,安全性较高,性能较好
    -1 或者 all 表示isr集合全部写入成功即可,安全性最高,性能最低
retries 重试次数,建议配置Integer.MAX_VALUE
batch.size 默认16k, 消息缓冲区
linger.ms 这个参数最有意思,可以理解为在内存中最大可驻留时间,超过这个时间不管内存中驻留的消息大小都会发出去,容易让人想到tcp nagle算法
max.in.flight.requests.per.connection = 1 限制客户端在单个连接上能够发送的未响应请求的个数(默认5)。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。注意:设置此参数是为了避免消息乱序

流程图(看起来画了个寂寞)

从rocketmq到kafka
image.png

其中笔者觉得有几个点是需要重点关注的,如下:

1、分区器是重点需要关注

分区器的逻辑相对简单,但是却要关注,因为它很重要,它在做选择分区的活。

2、acks=0的逻辑(其他的逻辑需要原来服务端的响应,可以看看NetworkClient#handleCompletedReceives)

// 看Selector#pollSelectionKeys
void pollSelectionKeys(Set selectionKeys,
                           boolean isImmediatelyConnected,
                           long currentTimeNanos){
// ....
                if (channel.ready() && key.isWritable()) {
                    Send send = null;
                    try {
                        send = channel.write(); // write
                    } catch (Exception e) {
                        sendFailed = true;
                        throw e;
                    }
                    if (send != null) {
                        this.completedSends.add(send); // 把send放到completedSends列表中
                        this.sensors.recordBytesSent(channel.id(), send.size());
                    }
                }

//....
}

// 如上图,NetworkClient#handleCompletedSends,此处是调用完poll方法后立刻调用
private void handleCompletedSends(List responses, long now) {
        // if no response is expected then when the send is completed, return it
        for (Send send : this.selector.completedSends()) {
            InFlightRequest request = this.inFlightRequests.lastSent(send.destination());
            if (!request.expectResponse) {
                this.inFlightRequests.completeLastSent(send.destination());// 清理最前一个inFlightRequest
                responses.add(request.completed(null, now)); // response为空
            }
        }
    }

// 如上图,NetworkClient#handleCompletedSends
private void completeResponses(List responses) {
        for (ClientResponse response : responses) {
            try {
                response.onComplete(); // 调用callback,该callback引用来自于InFlightRequest,引用是Sender#handleProduceResponse
// 内部最终就是调用的Thunks列表的回调对象,处理KafkaProducer的回调逻辑
            } catch (Exception e) {
                log.error("Uncaught error in request completion:", e);
            }
        }
    }

3、RecordAccumulator#ready 中判断了什么

// Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
// partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
// partition batches.  大致解释一下,获取一组可发送的分区列表,最早
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
        Set readyNodes = new HashSet();
        long nextReadyCheckDelayMs = Long.MAX_VALUE;
        Set unknownLeaderTopics = new HashSet();

        boolean exhausted = this.free.queued() > 0;// 阻塞在等待内存分配的线程数量,大于0表示内存已满
        for (Map.Entry> entry : this.batches.entrySet()) { // 所有的消息
            TopicPartition part = entry.getKey();
            Deque deque = entry.getValue();

            Node leader = cluster.leaderFor(part); // 循环leader节点
            synchronized (deque) { // 加锁
                if (leader == null && !deque.isEmpty()) {
                    // This is a partition for which leader is not known, but messages are available to send.
                    // Note that entries are currently not removed from batches when deque is empty.
                    unknownLeaderTopics.add(part.topic()); // leader节点未知
                } else if (!readyNodes.contains(leader) && !muted.contains(part)) { 
// 不在已准备好的leader节点列表 并且 不再muted列表(muted列表跟max.in.flight.requests.per.connection,用于顺序控制)
                    ProducerBatch batch = deque.peekFirst(); // 取队列中的首节点
                    if (batch != null) {
                        long waitedTimeMs = batch.waitedTimeMs(nowMs); // 节点已等待的时间
                        boolean backingOff = batch.attempts() > 0 && waitedTimeMs  1 || batch.isFull(); // 队列是否满了,队列大于1说明肯定有一个满了,batch.isFull说明当前的批次已满
                        boolean expired = waitedTimeMs >= timeToWaitMs; // 是否过期,已等待时间 >= 等待时间
                        boolean sendable = full || expired || exhausted || closed || flushInProgress(); // flushInProgress表示调用了flush 
                        if (sendable && !backingOff) {
                            readyNodes.add(leader); // 可发送,并且非可回退
                        } else {
                            long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); // 计算下一次检查是否ready的延时时间,只是一个估算值
                            // Note that this results in a conservative estimate since an un-sendable partition may have
                            // a leader that will later be found to have sendable data. However, this is good enough
                            // since we'll just wake up and then sleep again for the remaining time.
                            nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
                        }
                    }
                }
            }
        }

        return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
    }

4、回调里究竟做了什么?


private void handleProduceResponse(ClientResponse response, Map batches, long now) {
        RequestHeader requestHeader = response.requestHeader();
        int correlationId = requestHeader.correlationId();
        if (response.wasDisconnected()) { // 连接问题
            for (ProducerBatch batch : batches.values())
                completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now); // 网络异常
        } else if (response.versionMismatch() != null) { // 版本不匹配
            for (ProducerBatch batch : batches.values())
                completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now); // 不支持版本
        } else {
            if (response.hasResponse()) { // 有响应值
                ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
                for (Map.Entry entry : produceResponse.responses().entrySet()) {
                    TopicPartition tp = entry.getKey();
                    ProduceResponse.PartitionResponse partResp = entry.getValue();
                    ProducerBatch batch = batches.get(tp);
                    completeBatch(batch, partResp, correlationId, now);
                }
            } else {
                // this is the acks = 0 case, just complete all requests
                for (ProducerBatch batch : batches.values()) { // 没有响应值,认为acks = 0
                    completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now);
                }
            }
        }
    }

// Complete or retry the given batch of records.
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                               long now) {
        Errors error = response.error;

        if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
                (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
            // ... 消息批次量太大(batch.size 配置的可能过大)
            this.accumulator.splitAndReenqueue(batch); // 切割后重新入队
            this.accumulator.deallocate(batch); // 释放原来的
            this.sensors.recordBatchSplit();
        } else if (error != Errors.NONE) { // 其他错误
            if (canRetry(batch, response)) { // 是否可重试,判断attempts 

5、流控问题

private boolean canSendRequest(String node) {
   return connectionStates.isReady(node) && selector.isChannelReady(node) && inFlightRequests.canSendMore(node);
}

public boolean canSendMore(String node) {
        Deque queue = requests.get(node); // 阶段的已发送队列
        return queue == null || queue.isEmpty() || // 还未 有队列,或者队列已清空
               (queue.peekFirst().send.completed() && queue.size() 

应该算初步掌握Kafka Producer,至少日常使用应该是没问题了,前文提到同步、异步、流控、负载均衡等问题都应该有了了解,其他的细节有时间有遇到问题再单独补充吧~

消费客户端

消费端的核心流程一般有三个:1、元数据同步过来后的负载均衡,消费端剔除,reblance等。2、消息的poll流程。3、offset commit流程。先提供几篇学习资料:线程拉取模型,核心参数,重平衡

基本上看完了消费者客户端的核心流程,总体感觉相对复杂。相比rmq而言,印象中,rmq的rebalance的设计更简单一些,有一条线程去做reblance,有一条线程去做poll,线程间相互隔离,relbance的流程不会阻塞poll流程。并且由于是客户端均分负载,每个客户端只负责从ns中拿到元数据,然后客户端按照算法本地均分,结果并不会反馈给服务端,既rmq的客户端如果采用算法不一样,就会有风险。

kafka的reblance流程在poll流程中的,原本想画一下流程图,但是感觉设计的较复杂,就省了只简单说核心流程。

1、kafka的reblance中需要引入一个协调器Coordinator,客户端从本地缓存的元数据中随机找到一个可用的leader节点,向其发送寻找协调器的节点。主要向服务端带了一个grouopId的数据去找到合适的协调器,寻找协调器的过程中是超时阻塞的。

2、找到协调器后,判断是否需要加入总的分配分组(仅看选择了自动分配的订阅类型的)。判断上一次分配的元数据跟当前的元数据是否发生变更,判断已加入的主题列表与当前的主题列表是否发生变更,已经是否需要rejoin的标记位。

3、确认需要加入分组后,如果是自动分配分区的场景下,需要拉去最新的元数据,之后进入reblance阶段。

4、reblance分为两个阶段,一个是prepare阶段,主要做包括(自动的)同步本地的最新commit,触发reblance钩子函数,重置分组订阅关系,还需要停掉当前的心跳(防止它影响分配的过程)。另外一个就是加入阶段,向服务端发起请求后等待结果异步回调,回调中会判断如果当前的节点是leader节点,则执行分配逻辑,并将分配逻辑发给服务端。如果当前的节点是follower节点,则向服务端发送空的分配结果,并等待服务端回调最终的分配结果。等服务端返回最终结果都是阻塞状态。

5、执行协调器分配完成。

从中可以理解,分配的逻辑在客户端中的某一个节点,客户端分配完成后将结果返回给服务端,服务端再分发给各个节点,确保整个客户端集群的分配算法是统一的。触发reblance之后,如果集群中的节点发生变更,会怎么触发第二次reblance?在客户端父服务端的交互中,比如通过心跳上报时,服务端返回Errors.REBALANCE_IN_PROGRESS,客户端重置本地needRejoin标记位,等待下一次发起poll时进行第二次reblance。同理,提交偏移量时会返回需要reblance。

kafka的poll流程并不复杂,除开relbance的流程后,基本上就是按照分配的分区节点构建poll request。

  • 因为构建poll reuqest需要确定分区的偏移量,所以在poll前需要判断是否都有了偏移量,缺失的部分需要发起偏移量同步的请求。
  • 查找到所有的已分配的分区的偏移量后,开始从内存中查找上一次已经拉取下来的数据completedFetches,如果有数据,则返回客户端进行消费。
  • 内存中没有待处理的数据,则构建请求fetcher.sendFetches(),构建好的请求写入到unset中,构建的回调会将结果写入completedFetches
  • 将unset中的数据写入socket buffer中,激活channel的写事件,触发nio事件,处理连接、读,写等事件。
  • 执行process completed actions下的事件处理,主要是已发出去的,以及收到的数据处理,其他的就是连接,版本等问题了。(数据读取后调用回调,写入completedFetches
  • 判断是否需要加入分组,需要的话返回空列表,否则执行2,从内存中序列化出数据
  • 执行poll完成

关于offset commit,主要是自动提交,手动提交。

  • 手动提交,支持同步与异步。
  • 自动提交offset在coordinator,也就是本地拉取的数据,消费完了等下下一次拉取前进行commit。发起commit后,回调后更新offset
  • 提交ack时,会更新本地的分区对应的offset,如果一个批次拉取中有某一个消息消费失败了(有可能抛出的RuntimeException被外部KafkaMessageListenerContainer所catch的并且没有做ack),后续成功了,最新的offset提交会覆盖,所以消息消费失败后(没有ack)一般情况下无法被重试。如果抛出的并非是RuntimeException,则会继续往外层抛,最终被catch后打印,重新进入拉取消息,那么在消费失败之前消费成功的消息可以正常的被commit ,重新拉取消息时会同步本地offset,所以可以拉取到还未消费的消息。细节可以查阅KafkaMessageListenerContainer#doInvokeRecordListener

本文章来源于互联网,如有侵权,请联系删除!原文地址:从rocketmq到kafka