Kafka如何保证数据的可靠性&Kafka集群

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Kafka如何保证数据的可靠性&Kafka集群

正文


一、Kafka数据存储方式


名词解释


Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群


Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发


message: Kafka中最基本的传递对象。


Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列


       每个分区都有一台 server 作为 “leader”,零台或者多台server作为 follwers 。leader server 处理一切对 partition (分区)的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers 中的一台服务器会自动成为新的 leader。        


Replica:副本,为实现备份的功能,保证集群中的某个节点发生故障时,该节点上的 Partition 数据不丢失,且 Kafka 仍然能够继续工作,Kafka 提供了副本机制,一个 Topic 的每个分区都有若干个副本,一个 Leader 和若干个 Follower。


Segment:partition物理上由多个segment组成,每个Segment存着message信息


Producer : 生产者,生产message发送到topic


Consumer : 消费者,订阅topic并消费message, consumer作为一个线程来消费


Consumer Group:消费者组,一个Consumer Group包含多个consumer


Offset:偏移量,理解为消息partition中的索引即可


分区分步示意图


创建一个6个分区,3个副本的topic


    @Bean
    public NewTopic myTopic() {
        return new NewTopic("my-topic-partition", 6, (short) 3);
    }


通过ZKtools可知几个分区分步如下。


   partition1:{"controller_epoch":4,"leader":1,"version":1,"leader_epoch":0,"isr":[1,3,2]}

   partition2:{"controller_epoch":4,"leader":2,"version":1,"leader_epoch":0,"isr":[2,1,3]}

   partition3:{"controller_epoch":4,"leader":3,"version":1,"leader_epoch":0,"isr":[3,2,1]}

   partition4:{"controller_epoch":4,"leader":1,"version":1,"leader_epoch":0,"isr":[1,2,3]}

   partition5:{"controller_epoch":4,"leader":2,"version":1,"leader_epoch":0,"isr":[2,3,1]}

   partition6:{"controller_epoch":4,"leader":3,"version":1,"leader_epoch":0,"isr":[3,1,2]}


其中controller_epoch表示的是当前的kafka控制器,leader表示当前分区的leader副本所在的broker的id编号,version表示版本号(当前半本固定位1),leader_epoch表示当前分区的leader纪元,isr表示变更后的isr列表(后面解释什么ISR)。


222.png


由图可见,每一个Broker都冗余了每个分区的数据。我们称为副本机制。这样有以下优点


提供数据冗余:即使有Broker宕机,系统依然能够继续运转不会丢失数据,因而增加了整体可用性以及数据持久性。

提供高伸缩性:支持横向扩展,能够通过添加机器的方式来提升读的性能,进而提高读操作吞吐量。

改善数据局部性:允许将数据放入与用户地理位置相近的地方,从而降低系统延时。


Kafka数据存放


在一个分区中,会将一个大的分区拆分n多个不同小segment文件 ,每个segment文件 存放我们该分区日志消息。在每个segment中会有.index、.log。在默认的情况下,每个segment文件容量最大是为1073741824KB(1024MB),如果超过的情况下依次内推,产生一个新的segment文件,可以通过修改配置文件log.segment.bytes=1073741824修改。


00000000000000000000.index-----消息偏移量索引文件


00000000000000000000.log-----消息持久化内容


333.png


如上图假如第一个分区存放的offset到1000,那么下一个文件的命名从上一个offset位置结束的位置开始。


如下图是文件存储的样子


555.jpg


总结: 每个分区是由多个segment组成,每个segment由多个index和多个log文件组成,并且是按照一定的顺序存放数据的。


命名规则


每个segment file也有自己的命名规则,每个名字有20个字符,不够用0填充,每个名字从0开始命名,下一个segment file文件的名字就是,上一个segment file中最后一条消息的索引值。在.index文件中,存储的是key-value格式的,key代表在.log中按顺序开始顺序消费的offset值,value代表该消息的物理消息存放位置。但是在.index中不是对每条消息都做记录,它是每隔一些消息记录一次(稀疏索引),避免占用太多内存。即使消息不在index记录中,在已有的记录中查找,范围也大大缩小了。


如何查看Kafka日志和index文件


#index
./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-689fb31d544a/my-topic-partition-1/00000000000000000000.index
#log
./bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /kafka/kafka-logs-689fb31d544a/my-topic-partition-1/00000000000000000000.log


Index文件


111.png


Log文件


555.jpg


Kafka如何通过offset查找到Message


111.png


首先根据二分查找法找到对应的segment文件。

通过二分查找找到对应的.index索引文件中position的值。

通过稀疏索引在log文件中查找对应的message信息。


小结:


1、topic是逻辑概念,partition是物理概念。

2、.log文件存放实际数据,生产者的数据都会追加到.log文件中。

3、为防止.log文件过大导致数据定位效率低下,kafka采取了分片(segment)和稀疏索引机制,将partition分为多个segment,分别进行索引。

4、.index文件存储大量的索引信息,.log文件存储大量的数据,索引文件中的元数据指向对应数据文件中Message的物理偏移地址。


二、Kafka如何确保数据不丢失


生产者的ack机制


向 Kafka 写数据时,producers 设置 ack 是否提交完成。


0:不等待broker返回确认消息,效率高可能丢失数据。

1:leader副本保存成功返回,当leader还没有将数据同步到Follwer宕机,存在丢失数据的可能性。

-1:(all): 所有副本都保存成功返回 设置 “ack = all” 并不能保证所有的副本都写入了消息。

注意:默认情况下,当 acks = all 时,只要 ISR 副本同步完成,就会返回消息已经写入。例如,一个 topic 仅仅设置了两个副本,那么只有一个 ISR 副本,那么当设置acks = all时返回写入成功时,剩下了的那个副本数据也可能数据没有写入。


消费者的offset commit


消费者通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,会接着上次的offset进行消费。kafka并不像其他消息队列,消费完消息之后,会将数据从队列中删除,而是维护了一个日志文件,通过时间和储存大小进行日志删除策略。默认情况下每隔 5分钟(log.retention.check.interval.ms=300000)会检测一次是否有日志文件需要删除。日志文件会保留log.retention.hours=168小时(7天),当日志文件超过(log.retention.bytes=1073741824)1024MB(与时间保留策略独立)都会进行删除。如果offset没有提交,程序提交之后,会从上次消费的位置继续消费,有可能存在重复消费的情况。


Offset Reset 三种模式


earliest(最早):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

latest(最新的):当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

none(没有):topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

broker的副本 机制


每个broker中的partition我们一般都会设置有replication(副本)的个数,生产者写入的时候首先根据分发策略(有partition按partition,有key按key,都没有轮询)写入到leader中,follower(副本)再跟leader同步数据,这样有了备份,也可以保证消息数据的不丢失。


三、Kafka可以支持高吞吐量的原因


1、顺序读写:基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,一些情况下磁盘顺序读写性能甚至要高于内存随机读写。(Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了显著提升 。)


2、Page Cache:为了优化读写性能,kafka利用了操作系统本身的page cache,就是利用操作系统自身的内存而不是JVM空间内存,这样做的好处是:


        a:避免Object消耗:如果是使用java堆,java对象的内存消耗比较大,通常是所存储数据的两倍甚至更多。


        b:避免GC问题:随着JVM中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在GC问题。


通过操作系统的page cache,kafka的读写操作基本上是基于内存的,读写速度得到了极大的提升。


3、零拷贝:(不使用的时候,数据在内核空间和用户空间之间穿梭了两次),使用零拷贝技术后避免了这种拷贝。通过这种 “零拷贝” 的机制,Page Cache 结合 sendfile 方法,Kafka消费端的性能也大幅提升。这也是为什么有时候消费端在不断消费数据时,我们并没有看到磁盘io比较高,此刻正是操作系统缓存在提供数据。


4、分区分段+索引:topic 中的数据是按照一个一个的partition即分区存储到不同broker节点的,每个partition对应了操作系统上的一个文件夹,partition实际上又是按照segment分段存储的,这也非常符合分布式系统分区分桶的设计思想。kafka的message消息实际上是分布式存储在一个一个segment中的,每次文件操作也是直接操作的segment。为了进一步的查询优化,kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件.这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据处理的并行度。


5、批量读写:Kafka数据读写也是批量的而不是单条的。在向Kafka写入数据时,可以启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。假设网络带宽为10MB/S,一次性传输10MB的消息比传输1KB的消息10000万次显然要快得多。


6、批量压缩:


在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。


如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩

Kafka允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩

Kafka支持多种压缩协议,包括Gzip和Snappy压缩协议

    Kafka速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优;读取数据的时候配合sendfile直接暴力输出。


四、Kafka选举策略


什么是ISR


       简单来说,分区中的所有副本统称为 AR (Assigned Replicas)。所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成 ISR (In Sync Replicas)。 ISR 集合是 AR 集合的一个子集。消息会先发送到leader副本,然后follower副本才能从leader中拉取消息进行同步。 同步期间,follow副本相对于leader副本而言会有一定程度的滞后。 “一定程度同步“ 是指可忍受的滞后范围,这个范围可以通过参数进行配置。 于leader副本同步滞后过多的副本(不包括leader副本)将组成 OSR (Out-of-Sync Replied)由此可见,AR = ISR + OSR。 正常情况下,所有的follower副本都应该与leader 副本保持 一定程度的同步,即AR=ISR,OSR集合为空。


什么是LEO、LSO、HW、LW


111.png


LEO:LEO是Log End Offset的缩写,它表示了当前日志文件中下一条待写入消息的offset。上图的LEO分别是8、6、9


LSO:Log Stable Offset。这是 Kafka 事务的概念。如果你没有使用到事务,那么这个 值不存在(其实也不是不存在,只是设置成一个无意义的值)。该值控制了事务型消费 者能够看到的消息范围。就是消费者只能消费到事务被提交的消息。


HW:分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW, 对消费者而言只能消费HW之前的消息,HW之后的消息消费者是消费不到的。


LW:Low Watermark的缩写,俗称“低水位”,代表AR集合中最小的logStartOffset值(日志起始位移值)。上图中的LW都是从1开始的。


数据更新过程


更新记录进入主副本节点处理,为该记录分配Sn(Serial Number),然后将该记录插入prepare list,该list上的记录按照sn有序排列;

主副本节点将携带sn的记录发往从节点,从节点同样将该记录插入到prepare list;

一旦主节点收到所有从节点的响应,确定该记录已经被正确写入所有的从节点,那就将commit list向前移动,并将这些消息应用到主节点的状态机;

主节点提交后即可给客户端返回响应,同时向所有从节点发送消息,告诉从节点可以提交刚刚写入的记录了。 所有的读需要全部发往主节点,这是因为客户端来读时,主节点有可能尚未将commit消息发送至从,因此,如果读从节点可能会无法获取最新数据。


Follwer同步数据


首先,Follower 发送 FetchRequest 请求给 Leader。 接着,Leader 会读取底层日志文件中的消 息数据,再更新它内存中的 Follower 副本的 LEO 值,更新为 FetchRequest 请求中的 fetchOffset 值。 最后,尝试更新分区高水位值(HW )。Follower 接收到 FETCH 响应之后,会把 消息写入到底层日志,接着更新 LEO 和 HW 值。


Kafaka的复制机制不是完全的同步复制,也不是单纯的异步复制,事实上, 同步复制要求所有能工作的Follower副本都复制完,这条消息才会被确认为成功提交, 这种复制方式影响了性能。而在异步复制的情况下, follower副本异步地从leader副本中复制数据, 数据只要被leader副本写入就被认为已经成功提交。在这种情况下,如果follower副本都没有复制完而落后于leader副本, 如果突然leader副本宕机,则会造成数据丢失。Kafka正是使用这种ISR的方式有效的权衡了数据可靠性与性能之间的关系。


分区 Leader故障转移&选举策略


Kafka会选择一个 broker 作为 “controller”节点。 controller 节点负责 检测 brokers 级别故障,并负责在 broker 故障的情况下更改这个故障 Broker 中的 partition 的 leadership 。 这种方式可以批量的通知主从关系的变化,使得对于拥有大量partition 的broker ,选举过程的代价更低并且速度更快。 如果 controller 节点挂了,其他存活的 broker 都可能成为新的 controller 节点。


Kafka的选举策略大致分一下 几种情况


OfflinePartition Leader 选举:每当有分区上线时,就需要执行 Leader 选举。 所谓的分区上线,可能是创建了新分区,也可能是之前的下线分区重新上线。这是最常见的分区 Leader 选举场景。

ReassignPartition Leader 选举:当你手动运行 kafka-reassign-partitions 命令,或者是调用 Admin 的 alterPartitionReassignments 方法执行分区副本重分配时, 可能触发此类选举。假设原来的 AR 是[1,2,3],Leader 是 1,当执行副本重分配后,副本集 合 AR 被设置成[4,5,6],显然, Leader 必须要变更,此时会发生 Reassign Partition Leader 选举。

PreferredReplicaPartition Leader 选举:当你手动运行 kafka-preferred-replica- election 命令,或自动触发了 Preferred Leader 选举时,该类策略被激活。 所谓的 Preferred Leader,指的是 AR 中的第一个副本。比如 AR 是[3,2,1],那么, Preferred Leader 就是 3。

ControlledShutdownPartition Leader 选举:当 Broker 正常关闭时,该 Broker 上 的所有 Leader 副本都会下线,因此,需要为受影响的分区执行相应的 Leader 选举。


这 4 类选举策略的大致思想是类似的,即从 AR 中挑选首个在 ISR 中的副本,作为新 Leader。


参考:


https://blog.csdn.net/sillyzhangye/article/details/86181345


https://blog.csdn.net/qq_26838315/article/details/106883256


https://www.cnblogs.com/18800105616a/p/13863254.html


相关文章
|
10天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
34 4
|
29天前
|
消息中间件 监控 数据可视化
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
大数据-79 Kafka 集群模式 集群监控方案 JavaAPI获取集群指标 可视化监控集群方案: jconsole、Kafka Eagle
47 2
|
7天前
|
消息中间件 存储 Prometheus
Kafka集群如何配置高可用性
Kafka集群如何配置高可用性
|
22天前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
29天前
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
59 6
|
29天前
|
消息中间件 SQL 分布式计算
大数据-74 Kafka 高级特性 稳定性 - 控制器、可靠性 副本复制、失效副本、副本滞后 多图一篇详解
大数据-74 Kafka 高级特性 稳定性 - 控制器、可靠性 副本复制、失效副本、副本滞后 多图一篇详解
21 2
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
41 1
|
3月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
121 0
|
3月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
消息中间件 监控 Kafka
实时计算 Flink版产品使用问题之处理Kafka数据顺序时,怎么确保事件的顺序性
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。