kafka的实现细节
一、Topic和Partition
1、partition是topic的物理分区,而topic是parttion的逻辑总称,topic是有很多的partition组成的。
2、在kafka中的每一条消息都有一个topic。一般来说在我们应用中产生不同类型的数据,都可以设置不同的主题。一个主题一般会有多个消息的订阅者。当生产者发布消息到某个主题时,订阅了这个主图的消费者都可以接收到生产者写入的新消息。
3、kafka为每个主题维护了分布式的分区(partition)日志文件,每个partition在kafka存储层面是append log。任何发布到此partition的消息都会被追加到log文件的尾部,在分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,也就是我们的offset,offset是一个long型的数字,我们通过这个offset可以确定一条在该partition下的唯一消息。在partition下面是保证了有序性,但是在topic下面没有保证有序性。
在上图中在我们的生产者会决定发送到哪个Partition。
如果没有Key值则进行轮询发送。
如果有Key值,对Key值进行Hash,然后对分区数量取余,保证了同一个Key值的会被路由到同一个分区(因为日志里面可以进行日志策略的),如果想队列的强顺序一致性,可以让所有的消息都设置为同一个Key。
不定义key的话性能会低,因为在轮询的过程中,明明在分区中有这样的一个key存在的话,而在轮询到2分区的时候,就把这个key存进去了,这时就存重复了,重复就浪费资源了。
二、消费模型
消息由生产者发送到kafka集群后,会被消费者消费。一般来说我们的消费模型有两种:推送模型(psuh)和拉取模型(pull)
基于推送模型的消息系统,由消息代理记录消费状态。消息代理将消息推送到消费者后,标记这条消息为已经被消费,但是这种方式无法很好地保证消费的处理语义。比如当我们把已经把消息发送给消费者之后,由于消费进程挂掉或者由于网络原因没有收到这条消息,如果我们在消费代理将其标记为已消费,这个消息就永久丢失了。如果我们利用生产者收到消息后回复这种方法,消息代理需要记录消费状态,这种不可取。如果采用push,消息消费的速率就完全由消费代理控制,一旦消费者发生阻塞,就会出现问题。
Kafka采取拉取模型(poll),由自己控制消费速度,以及消费的进度,消费者可以按照任意的偏移量进行消费。比如消费者可以消费已经消费过的消息进行重新处理,或者消费最近的消息等等。
三、网络模型(NIO的性能比BIO的性能要高)
3.1 KafkaClient --单线程Selector
单线程模式适用于并发链接数小,逻辑简单,数据量小。
在kafka中,consumer和producer都是使用的上面的单线程模式。这种模式不适合kafka的服务端,在服务端中请求处理过程比较复杂,会造成线程阻塞,一旦出现后续请求就会无法处理,会造成大量请求超时,引起雪崩。而在服务器中应该充分利用多线程来处理执行逻辑。
3.2 Kafka--server -- 多线程Selector
在kafka服务端采用的是多线程的Selector模型,Acceptor运行在一个单独的线程中,对于读取操作的线程池中的线程都会在selector注册read事件,负责服务端读取请求的逻辑。成功读取后,将请求放入message queue共享队列中。然后在写线程池中,取出这个请求,对其进行逻辑处理,即使某个请求线程阻塞了,还有后续的县城从消息队列中获取请求并进行处理,在写线程中处理完逻辑处理,由于注册了OP_WIRTE事件,所以还需要对其发送响应。
四、高可靠分布式存储模型
在kafka中保证高可靠模型的依靠的副本机制,有了副本机制之后,就算机器宕机也不会发生数据丢失。
4.1高性能的日志存储:
①、假如说日志文件无限的去增大,假如说里面可以存储1百万,1亿条消息的时候,想象下它的清理是有多难,怎么知道消息哪些读了,哪些消息没有读,所以在kafka中引入了高性能的日志存储,就是说所有的读和写都会将被日志存储起来,里面使用了稀疏的一种存储方式,每个日志文件被分成多个sagement。
②、kafka一个topic下面的所有消息都是以partition的方式分布式的存储在多个节点上。同时在kafka的机器上,每个Partition其实都会对应一个日志目录,在目录下面会对应多个日志分段(LogSegment)。LogSegment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment索引文件和数据文件。这两个文件的命令规则为:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,如下,假设有1000条消息,每个LogSegment大小为100,下面展现了900-1000的索引和Log:里面采用了二分法的查找方式:
1-100:代表索引,1235-2103:代表值 901-1000:代表Offset
由于kafka消息数据太大,如果全部建立索引,即占了空间又增加了耗时,所以kafka选择了稀疏索引的方式,这样的话索引可以直接进入内存,加快偏查询速度。
简单介绍一下如何读取数据,如果我们要读取第911条数据首先第一步,找到他是属于哪一段的,根据二分法查找到他属于的文件,找到0000900.index和00000900.log之后,然后去index中去查找 (911-900) =11这个索引或者小于11最近的索引,在这里通过二分法我们找到了索引是[10,1367]然后我们通过这条索引的物理位置1367,开始往后找,直到找到911条数据。
上面讲的是如何要找某个offset的流程,但是我们大多数时候并不需要查找某个offset,只需要按照顺序读即可,而在顺序读中,操作系统会对内存和磁盘之间添加page cahe,也就是我们平常见到的预读操作,所以我们的顺序读操作时速度很快。但是kafka有个问题,如果分区过多,那么日志分段也会很多,写的时候由于是批量写,其实就会变成随机写了,随机I/O这个时候对性能影响很大。所以一般来说Kafka不能有太多的partition(默认有50个分区)。针对这一点,RocketMQ把所有的日志都写在一个文件里面,就能变成顺序写,通过一定优化,读也能接近于顺序读。
日志策略
日志保留策略
无论消费者是否已经消费了消息,kafka都会一直保存这些消息,但并不会像数据库那样长期保存。为了避免磁盘被占满,kafka会配置响应的保留策略(retention policy),以实现周期性地删除陈旧的消息kafka有两种“保留策略”:
- 根据消息保留的时间,当消息在kafka中保存的时间超过了指定时间,就可以被删除;
- 根据topic存储的数据大小,当topic所占的日志文件大小大于一个阀值,则可以开始删除最旧的消息
日志压缩策略
在很多场景中,消息的key与value的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心key对应的最新的value。我们可以开启日志压缩功能,kafka定期将相同key的消息进行合并,只保留最新的value值
4.2 副本机制
Kafka的副本机制是多个服务端节点对其他节点的主题分区的日志进行复制。当集群中的某个节点出现故障,访问故障节点的请求会被转移到其他正常节点(这一过程通常叫Reblance),kafka每个主题的每个分区都有一个主副本以及0个或者多个副本,副本保持和主副本的数据同步,当主副本出故障时就会被替代。
比如说现在有一个副本和zk失去了连接,这个副本将会直接被剔除掉。因为zk是一个中心化的思想。在Kafka中并不是所有的副本都能被拿来替代主副本,所以在kafka的leader节点中维护着一个ISR(In sync Replicas)集合,翻译过来也叫正在同步中集合,在这个集合中的需要满足两个条件:
节点必须和ZK保持连接
在同步的过程中这个副本不能落后主副本太多
另外还有个AR(Assigned Replicas)用来标识副本的全集,OSR用来表示由于落后被剔除的副本集合,所以公式如下:ISR = leader + 没有落后太多的副本; AR = OSR+ ISR;
这里先要说下两个名词:HW(high watermark)是consumer能够看到的此partition的位置,LEO( log end offset)是每个partition的log最后一条Message的位置。HW能保证leader所在的broker失效,该消息仍然可以从新选举的leader中获取,不会造成消息丢失,因为要同步消息。
当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:
1(默认):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了,则会丢失数据。
0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
-1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时(其他节点都和zk断开连接,或者都没追上),这样就变成了acks=1的情况。
副本数据同步细节(HW和LEO)
上面的图中,总共是有四个步骤的:
①、当HW与LEO完全一致的话,代表的是Leader中的数据被Follower全部给同步好了,还没有新的数据写入的情况下,这时follower会处于阻塞的状态,等待leader有新的数据写入。
LEO:就是最后一条数据的偏移量
②、当leader写入两条数据的时候,这时阻塞将会被打开,然后follower会向leader同步这两条消息。意思就是说LEO和HW相同的位置时才可以正常消费的。
③、follower节点将Leader节点中的数据已经同步了第四条数据,但是第五条的数据没有同步完成,这时也leader也不会对外提供消费的。
④、当follower都同步完成之后,又继续阻塞等待。
4.3 数据操作
为避免broker挂后造成数据丢失,kafka实现了高可用方式。
- 基于partition实现Replica。并与zookeeper配合实现Leader的选举。
- 通过算法,将partition的Leader与Fellowers分散于不同的broker。
replica实现
在“brokers的物理结构”中,replication有多个follewers,分散于不同的brokers。通过增量日志实现。
这里有个同步机制和确认的机制
partition的log记录是顺序的,通过server.properties中log.retention.hours参数定义日志保留时长,过期则删除。新写入的message append记录在partition中。
为提升效率
- follewers会在message未写入log时,读到message则将ACK发送给Leader,因此只能保证存在Replica,不能保证数据一定持久化了。
- 批量复制
ISR(副本同步队列)
ISR是In-Sync Replicate 记录与Leader保持同步的列表。
维护的是有资格的follower节点
- 副本的所有节点都必须要和zookeeper保持连接状态
- 副本的最后一条消息的offset和leader副本的最后一条消息的offset之间的差值不能超过指定的阀值,这个阀值是可以设置的(replica.lag.max.messages)
4.4 leader 选举(Leader Election )
判断Replica活着,(1)与zk有心跳通讯;(2)与Leader通讯及时。两者有一不满足,fellower都会从ISR中移除。
选举算法
一般的leader选举算法,有Majority Vote/Zab/Raft/PacificA。kafka采用的即PacificA,kafka维护多个ISR,但不不像Majorty Vote算法,限制最少的2N+1节点和N+1以上投票。
即使只有1个follewer,也可完成Leader选举。
选举过程
五、Kafka的高吐量的因素
- 顺序写的方式存储数据 ;
- 批量发送: 在异步发送模式中。kafka允许进行批量发送,也就是先讲消息缓存到内存中,然后一次请求批量发送出去。这样减少了磁盘频繁io以及网络IO造成的性能瓶颈batch.size 每批次发送的数据大小 linger.ms 间隔时间
- 零拷贝:消息从发送到落地保存,broker维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不懂的通过socket发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤
在上面的图中,内核空间属于系统的,用户空间属于硬盘的。NIC:属于磁盘控制器
DMA:代表硬盘的拷贝。
上面的图的步骤如下:这里会有两次cpu的拷贝,还有两次硬盘的拷贝,很耗时间
1、操作系统将数据从磁盘读入到内核空间的页缓存
2、应用程序将数据从内核空间读入到用户空间缓存中
3、应用程序将数据写回到内核空间到socket缓存中
4、操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出
通过“零拷贝”技术可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数