Kafka消费者——从 Kafka读取数据

简介: 应用程序使用 KafkaConsumer向 Kafka 订阅主题,并从订阅的主题上接收消息 。 从 Kafka 读取数据不同于从其他悄息系统读取数据,它涉及一些独特的概念和想法。

应用程序使用 KafkaConsumer向 Kafka 订阅主题,并从订阅的主题上接收消息 。 从 Kafka 读取数据不同于从其他悄息系统读取数据,它涉及一些独特的概念和想法。如果不先理解 这些概念,就难以理解如何使用消费者 API。所以我们接下来先解释这些重要的概念,然 后再举几个例子,横示如何使用消费者 API 实现不同的应用程序。

消费者和消费者群组

假设我们有一个应用程序需要从-个 Kafka主题读取消息井验证这些消息,然后再把它们 保存起来。应用程序需要创建一个消费者对象,订阅主题并开始接收消息,然后验证消息 井保存结果。过了 一阵子,生产者往主题写入消息的速度超过了应用程序验证数据的速 度,这个时候该怎么办?如果只使用单个消费者处理消息,应用程序会远跟不上消息生成 的速度。显然,此时很有必要对消费者进行横向伸缩。就像多个生产者可以向相同的 主题 写入消息一样,我们也可以使用多个消费者从同一个主题读取消息,对消息进行分流。

Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者 接收主题一部分分区的消息。

假设主题 T1 有 4 个分区,我们创建了消费者 C1 ,它是群组 G1 里唯 一 的消费者,我们用 它订阅主题 T1。消费者 Cl1将收到主题 T1全部 4个分区的消息,如图 4-1 所示。

img_9fd4d8006b364d156dd7e5408c40a668.png

如果在群组 G1 里新增一个消费者 C2,那么每个消费者将分别从两个分区接收消息。我 假设消费者 C1接收分区 0 和分区 2 的消息,消费者 C2 接收分区 1 和分区 3 的消息,如图 4-2 所示。

img_9a2f2c724ca95922286e938b08e53961.png

如果群组 G1 有 4 个消费者,那么每个消费者可以分配到 一个分区,如图 4-3 所示。

img_9b65bf8d9f4e7715b358067182732246.png

如果我们往群组里添加更多的消费者,超过主题的分区数量,那么多出的消费者就会被闲置,不会接收到任何消息。

往群组里增加消费者是横向伸缩消费能力的主要方式。 Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。不过要性意,不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。

除了通过增加消费者来横向伸缩单个应用程序外,还经常出现多个应用程序从同一个主题读取数据的情况。实际上, Kafka 设计的主要目标之一 ,就是要让 Kafka 主题里的数据能够满足企业各种应用场景的需求。在这些场景里,每个应用程序可以获取到所有的消息, 而不只是其中的 一部分。只要保证每个应用程序有自己的消费者群组,就可以让它们获取到主题所有的消息。不同于传统的消息系统,横向伸缩 Kafka消费者和消费者群组并不会对性能造成负面影响。

在上面的例子里,如果新增一个只包含一个消费者的群组 G2,那么这个消费者将从主题 T1 上接收所有的消息,与群组 G1 之间互不影响。群组 G2 可以增加更多的消费者,每个消费者可以消费若干个分区,就像群组 G1 那样,如图 4-5 所示。总的来说,群组 G2 还是会接收到所有消息,不管有没有其他群组存在。

简而言之,为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者群组, 然后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每个消费者只处理一部分消息。

img_338d32973ff025951bff3718585960dd.png

消费者群组和分区再均衡

我们已经从上一个小节了解到,群组里的消费者共同读取主题的分区。一个新的消费者加 入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时 , 比如管理员添加了新的分区,会发生分区重分配。

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常重要, 它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者), 不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另 一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存 ,在它重新恢复状态之前会拖慢应用程序。我们将在本章讨论如何进行安全的再均衡,以及如何避免不必要的再均衡。

消费者通过向被指派为 群组协调器的 broker (不同的群组可以有不同的协调器)发送 心跳 来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息 (为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。

如果一个消费者发生崩溃,井停止读取消息,群组协调器(broker)会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。在本章的后续部分,我们将讨论一些用于控制发送心跳频率和会话过期时间的配置参数,以及如何根据实际需要来配置这些参数 。

分配分区是怎样的一个过程

当消费者要加入群组时,它会向群组协调器发送 一 个 JoinGroup 请求。第 一 个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列 表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的), 并负责给每一个消费者分配分区。它使用 一个实现了 PartitionAssignor接口的类来决定哪些分 区应该被分配给哪个消费者 。

Kafka 内置了两种分配策略,在后面的配置参数小节我们将深入讨论。分配完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群 主知道群组 里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。

创建 Kafka消费者

在读取消息之前,需要先创建 一个 KafkaConsumer对象 。 创建 KafkaConsumer 对象与创建 KafkaProducer对象非常相似——把想要传给消费者的属性放在 Properties 对象里。本章 后续部分会深入讨论所有的属性。在这里,我们只需要使用 3个必要的属性: bootstrap.servers、 key.deserializer、 value.deserializer。

下面代码演示了如何创建一个KafkaConsumer对象:

Properties props = new Properties();

props.put("bootstrap.servers", "broker1:9092, broker2:9092");

props.put("group.id", "CountryCounter");

props.put("key.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

deserializer使用指定的类(反序列化器)把字节数组转成 Java对象。

group.id指定了KafkaConsumer 属于哪一个消费者群组。
group.id不是必需的,不过我们现在姑且认为它是必需的。它指定了 KafkaConsumer 属于哪一个消费者群组。创建不属于任何一个群组的消费者也是可以的,只是这样做不太常见。

订阅主题

创建好消费者之后,下一步可以开始订阅主题了。subscribe()方法接受一个主题列表作为参数

consumer.subscribe(Collections.singletonList("customerCountries"));

在这里我们创建了一个包含单个元素的列表,主题的名字叫作“customerCountries”,我们也可以在调用subscribe()方法时传入一个正则表达式,正则表达式可以匹配多个主题如果有人创建了新的主题,并且主题名与正则表达式匹配,那么会立即触发一次再均衡,消费者就可以读取新添加的主题。如果应用程序需要读取多个主题,并且可以处理不同类型的数据,那么这种订阅方式就很管用。在Kafka和其他系统之间复制数据时,使用正则表达式的方式订阅多个主题时很常见的做法。

要订阅所有test相关的主题,可以这样做:consumer.subscribe("test.*");

轮询

消息轮询是消费者 API 的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题 ,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据, 开发者只需要使用一组简单的 API 来处理从分区返回的数据。消费者代码的主要部分如下所示 :

img_89a85ff93050ee91618cde2184a7b300.png

轮询不只是获取数据那么简单。在第一次调用新消费者的 poll() 方法时,它会负责查找 GroupCoordinator, 然后加入群组,接受分配的分区。 如果发生了再均衡,整个过程也是在轮询期间进行的。当然 ,心跳也是从轮询里发迭出去的。所以,我们要确保在轮询期间所做的任何处理工作都应该尽快完成。

线程安全

在同一个群组中,我们无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。按照规则,一个消费者使用一个线程。如果要在同一个消费者群组里运行多个消费者,需要让每个消费者运行在自己的线程里。最好是把消费者的逻辑封装在自己的对象里,然后使用Java的ExecutorService启动多个线程,使每个消费者运行在自己的线程上。Confluent的博客(https://www.confluent.io/blog/)上有一个教程介绍如何处理这种情况。


消费者的配置

到目前为止,我们学习了如何使用消费者 API,不过只介绍了几个配置属’性一一如bootstrap.servers、 key.deserializer、 value.deserializer、group.id。 Kafka的文档列出了所有与消费者相关的配置说明。大部分参数都有合理的默认值,一般不需要修改它们,不过有一些参数与消费 者的性能和可用性有很大关系。接下来介绍这些重要的属性。

1. fetch.min.bytes

该属性指定了消费者从服务器获取记录的最小字节数。 broker 在收到消费者的数据请求时, 如果可用的数据量小于 fetch.min.bytes指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。

2. fetch.max.wait.ms

我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而 fetch.max.wait.ms则用于指定 broker的等待时间,默认是 500ms。如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致500ms的延迟。 如果要降低潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。如果 fetch.max.wait.ms被设 为 100ms,并且 fetch.min.bytes 被设为 1MB,那么 Kafka在收到消费者的请求后,要么返 回 1MB 数据,要么在 100ms 后返回所有可用的数据 , 就看哪个条件先得到满足。

3. max.parition.fetch.bytes

该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB,也 就是说, KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.parition.fetch.bytes 指定的字节。如果一个主题有 20个分区和 5 个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因 为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。 max.parition.fetch.bytes 的值必须比 broker能够接收的最大消息的字节数(通过 max.message.size属 性配置 )大, 否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。 消费者需要频繁调用 poll() 方法来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况, 可以把 max.parition.fetch.bytes 值改小 ,或者延长会话过期时间。

4. session.timeout.ms

该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者 。该属性与 heartbeat.interval.ms紧密相关。heartbeat.interval.ms 指定了poll()方法向协调器 发送心跳的频 率, session.timeout.ms 则指定了消费者可以多久不发送心跳。所以, 一般需要同时修改这两个属性, heartbeat.interval.ms 必须比 session.timeout.ms 小, 一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 ls。 把 session.timeout.ms 值设 得比默认值小,可以更快地检测和恢 复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设置得大一些,可以减少意外的再均衡 ,不过检测节点崩溃需要更长的时间。

5. auto.offset.reset

该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时井被删除)该作何处理。它的默认值是latest, 意 思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者 启动之 后生成的记录)。另一个值是 earliest,意思是说,在偏移量无效的情况下,消费者将从 起始位置读取分区的记录。

6. enable.auto.commit

我们稍后将介绍 几种 不同的提交偏移量的方式。该属性指定了消费者是否自动提交偏移量,默认值是 true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自己控制何时提交偏移量。如果把它设为 true,还可以通过配置 auto.commit.interval.mls 属性来控制提交的频率。

7. partition.assignment.strategy

我们知道,分区会被分配给群组里的消费者。 PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。 Kafka 有两个默认的分配策略 。

  • Range

该策略会把主题的若干个连续的分区分配给消费者。假设悄费者 C1 和消费者 C2 同时 订阅了主题 T1 和主题 T2,井且每个主题有 3 个分区。那么消费者 C1 有可能分配到这 两个主题的分区 0 和 分区 1,而消费者 C2 分配到这两个主题 的分区 2。因为每个主题 拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了 Range策略,而且分区数量无法被消费者数量整除,就会出现这种情况。

  • RoundRobin

该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobin 策略来给消费者 C1 和消费者 C2分配分区,那么消费者 C1 将分到主题 T1 的分区 0和分区 2以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1 的分区 l 以及主题T2 的分区 0和分区 2。一般 来说,如果所有消费者都订阅相同的主题(这种情况很常见), RoundRobin策略会给所 有消费者分配相同数量 的分区(或最多就差一个分区)。

可以通过设置 partition.assignment.strategy 来选择分区策略。默认使用的是 org. apache.kafka.clients.consumer.RangeAssignor, 这个类实现了 Range策略,不过也可以 把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor。我们还可以使用自定 义策略,在这种情况下 , partition.assignment.strategy 属性的值就是自定义类的名字。

8. client.id

该属性可以是任意字符串 , broker用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。

9. max.poll.records

该属性用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。

10. receive.buffer.bytes 和 send.buffer.bytes

socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为-1,就使用操作系统的默认值。如果生产者或消费者与 broker处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有 比较高的延迟和比较低的带宽 。

目录
相关文章
|
1月前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
70 2
|
2月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
4月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
139 62
|
4月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
134 58
|
2月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
42 1
|
2月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
52 1
|
4月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
317 9
|
4月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
109 3
|
4月前
|
图形学 C# 开发者
全面掌握Unity游戏开发核心技术:C#脚本编程从入门到精通——详解生命周期方法、事件处理与面向对象设计,助你打造高效稳定的互动娱乐体验
【8月更文挑战第31天】Unity 是一款强大的游戏开发平台,支持多种编程语言,其中 C# 最为常用。本文介绍 C# 在 Unity 中的应用,涵盖脚本生命周期、常用函数、事件处理及面向对象编程等核心概念。通过具体示例,展示如何编写有效的 C# 脚本,包括 Start、Update 和 LateUpdate 等生命周期方法,以及碰撞检测和类继承等高级技巧,帮助开发者掌握 Unity 脚本编程基础,提升游戏开发效率。
92 0
|
4月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
153 0