Rebalance&多线程实例消费(十二)

简介: Rebalance&多线程实例消费(十二)

上篇文章说了,kafka位移提交通过enable.auto.commit控制手动提交还是自动提交,手动提交又分为异步提交和同步提交,还可以指定分区进行提交,默认是提交给所有分区。手动提交可以对应不同的业务场景,当需要业务全部处理完才提交位移,则可以选择手动提交,但这时候需要做幂等性处理,因为当业务执行完毕,但系统宕机,这时候consumer重启则因为位移没提交会重复消费之前的数据。

Consumer位移管理-Kafka从入门到精通(十一)


一、Rebalance


Rebalance是什么?

它本质是一组协议,规定了consumer group如何达成一致性来分配订阅所有分区的。假设有20个consumer,需要订阅100个分区的topic,这时候就会每个consumer会平均订阅5个分区,这个过程就是rebalace。

和旧版本依托于zookeeper不同,新版本consumer使用了kafka内置一个权限的协调协议(group coordination protocol)。Kafka的某个broker会被选举为组协调者(group coordinator),他负责对组的状态进行管理,他的主要职责是当新成员到达时促进组内所有的成员重新分配,即coordinator负责rebalance。


什么时候他会触发rebalance呢?

1、组成员发生变化,比如新的consumer加入组,或者有consumer离开组,或者consumer崩溃时候触发。

2、消费组订阅的topic发生变化。

3、组订阅的topic分区发生变更。


真实应用场景中引用rebalance最常见原因违背了第一条件,特别是consumer崩溃情况,崩溃不一定是consumer进程宕机或者挂掉,当consumer无法在指定时间内完成消息处理时候,那么coordinator则会认为consumer已经崩溃,从而引发新一轮的rebalance。当group程序下业务处理逻辑过重,这时候就会导致消费超时,从而导致coordinator认为consumer挂掉,引发rebalance,这时候就要注意这些参数的配置request.timeout.ms、max.poll.interval.ms、max.poll.records等。


Rebalance分区配置?

之前提到过rebalance时group下所有consumer会一起协调共同参与分区分配,kafka新版本consumer默认提供了三种分区策略,分别是range、round-robin、sticky。

Range策略主要是基于范围思想,它将单个topic的所有分区按照顺序排列,然后把这些分区划分为固定大小的分区并且依次分给各个consumer。而round-robin策略则会把所有topic的所有分区顺序摆开,然后轮询式的分配给各个consumer。最新发布的sticky策略有效避免上诉两种策略完全无视历史分配方案缺陷,采用“有粘性”对所有consumer实例进行分配,可以最大程度的避免分配倾斜。

新版本consumer默认的分配策略是range,用户根据consumer参数partition.assignment.strategy来进行设置,另外也可以通过自定义来分配策略。


Rebalance协议:

前面说了rebalance本质就是一组协议,group与coordinator共同使用这组协议来完成group的rebalance,最新版本的kafka中提供下面五种协议来处理rebalance。

Joingroup请求:consumer请求加入组。

SyncGroup请求:group leader吧分配方案同步更新到组内所有成员中。

Heartbeat请求:consumer定期向coordinator汇报心跳表明依然存活。

LeaveGroup请求:consumer主动通知coordinator该consume即将离组。

DescribeGroup请求:查看组的所有信息,包括成员信息,协议信息,分配方案,订阅信息。该请求类型主要提供管理员使用。Coordinator不使用该请求执行rebalance。


在rebalance过程中,coordinator主要处理consumer发过来的joinGroup和syncGroup请求,当consumer主动离组时会发送leaveGroup请求给coordinator。

在成功rebalance后,组内所有consumer都需要定期向coordinator发送heartbeat请求,而每个consumer也是根据heartBeat请求的响应中是否包含rebalance_in_progress来判断当前group是否开启了新一轮的rebalance。


rebalance监听器:

在位移提交章节中,consumer默认在新版本是把位移提交到_consumer_offsets中。其实kafka也支持把位移提交到外部存储中,比如数据库。若要实现这个功能,则必须使用rebalance监听器,而使用监听器的前提是用户必须使用consumer group。如果使用独立的consumer或者直接手动分配分区,那么rebalance监听是无效的。


多线程实例消费


如前所述,kafkaConsumer是非线程安全的,他和kafkaProducer不同,后者是线程安全的,因此可以在多个线程中使用同一个kafkaProducer实例,而且这样的效率是比每个线程维护一个kafkaProducer更高。


Consumer group分为 每个线程单独维护一个kafkaConsumer,和 单kafkaConsumer+多work线程。

两者区别是,后者在全局维护一个或者多个kafkaConsumer实例执行消息获取任务。使用全局的kafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的work线程执行工作,之后work线程完成处理上报位移状态,由全局的consumer提交位移。


那么他们的优缺点呢?

每个线程维护专属consumer:优点:实现简单,速度快,因为无线程之间的交互管理,方便管理位移,易于维护分区间的消费顺序。缺点:socker连接开销大;consumer受限与topic分区,扩展性差。Broker端处理负载高(因为发往broker请求多);rebalance可能性大。

单consumer+多worker模式:优点:消息获取处理解耦;扩展性强,独立扩展consumer数量和worker。缺点:实现负载;难以维护分区内的顺序消息;处理链路变长,导致位移管理困难;worker线程异常导致数据丢失。

独立consumer


前面说的都是group consumer消费者组形式出现,group自动实行分区分配和rebalance。对于需要多个consumer共同读取某个topic来说,使用group非常方便。但有的时候用户需要精准消费某个consumer消费某个分区。

1、如果进程自己维护分区状态,那么它就可以固定消费某些分区而不用担心状态丢失问题。

2、如果进程本身已经是高可用且能够自动重启恢复错误,那么它就不需要让kafka来帮它完成错误检测和状态恢复。

以上两种情况中consumer group都无用武之地,而独立consumer更合适(standlone consumer)。

使用standalone方法就是调用kafkaConsumer.assign,前面我们订阅则是使用kafkaConsumer.subscribe。

相关文章
|
7月前
|
Java
多线程实例练习题~
多线程实例练习题~
|
7月前
线程间通信实例之轮流打印ABC
线程间通信实例之轮流打印ABC
54 0
|
7月前
|
并行计算 Python
python 多线程应用实例
python 多线程应用实例
|
安全 Java 调度
【Java|多线程与高并发】线程安全问题以及synchronized使用实例
Java多线程环境下,多个线程同时访问共享资源时可能出现的数据竞争和不一致的情况。
|
Java
多线程实例代码(demo)
多线程实例代码(demo)
91 0
|
程序员 C++ 计算机视觉
C++多线程编程和同步机制:详解和实例演示
C++中的多线程编程和同步机制使得程序员可以利用计算机的多核心来提高程序的运行效率和性能。本文将介绍多线程编程和同步机制的基本概念和使用方法。
204 0
C++多线程编程和同步机制:详解和实例演示
|
设计模式 Java 开发工具
设计模式之命令模式 Java实例讲解 + 线程池中的应用场景
设计模式之命令模式 Java实例讲解 + 线程池中的应用场景
204 0
|
存储 分布式计算 Java
Java 8 - Stream基本实例及Stream的并行处理在线程上的表现
Java 8 - Stream基本实例及Stream的并行处理在线程上的表现
249 0
|
安全 Java
手把手实例对比String、StringBuilder字符串的连接效率及StringBuilder和StringBuffer线程安全的比较...
手把手实例对比String、StringBuilder字符串的连接效率及StringBuilder和StringBuffer线程安全的比较...
186 0
|
Java 容器
【JavaEE】多线程代码实例:单例模式与阻塞队列BlockingQueue(二)
【JavaEE】多线程代码实例:单例模式与阻塞队列BlockingQueue
【JavaEE】多线程代码实例:单例模式与阻塞队列BlockingQueue(二)