与维护Consumer offset的方式类似,脱离ZK之后的Kafka集群将元数据视为日志,保存在一个内置的Topic中,且该Topic只有一个Partition。
元数据日志的消息格式与普通消息没有太大不同,但必须携带Leader的纪元值(即之前的Controller epoch):
Record => Offset LeaderEpoch ControlType Key Value Timestamp
这样,Follower以拉模式复制Leader日志,就相当于以Consumer角色消费元数据Topic,符合Kafka原生的语义。
那么在KRaft协议中,是如何维护哪些元数据日志已经提交——即已经成功复制到多数的Follower节点上的呢?Kafka仍然借用了原生副本机制中的概念——high watermark(HW,高水位线)保证日志不会丢失,HW的示意图如下。
状态机说明:
要让所有节点达成一致性的状态,大部分都是基于复制状态机来实现的(Replicated state machine)
简单来说就是:初始相同的状态+相同的输入过程=相同的结束状态,这个其实也好理解,就类似于一对双胞胎,出生时候就长得一样,然后吃的喝的用的穿的都一样,你自然很难分辨。其中最重要的就是一定要注意中间的相同输入过程,各个不同节点要以相同且确定性的函数来处理输入,而不要引入一个不确定的值。使用replicated log来实现每个节点都顺序的写入客户端请求,然后顺序的处理客户端请求,最终就一定能够达到最终一致性。
状态机安全性保证:
在安全性方面,KRaft与传统Raft的选举安全性、领导者只追加、日志匹配和领导者完全性保证都是几乎相同的。下面只简单看看状态机安全性是如何保证的,仍然举论文中的极端例子:
在时刻a,节点S1是Leader,epoch=2的日志只复制给了S2就崩溃了。
在时刻b,S5被选举为Leader,epoch=3的日志还没来得及复制,也崩溃了。
在时刻c,S1又被选举为Leader,继续复制日志,将epoch=2的日志给了S3。此时该日志复制给了多数节点,但还未提交。
在时刻d,S1又崩溃,并且S5重新被选举为领导者,将epoch=3的日志复制给S0~S4。
此时日志与新Leader S5的日志发生了冲突,如果按上图中d1的方式处理,消息2就会丢失。传统Raft协议的处理方式是:在Leader任期开始时,立刻提交一条空的日志,所以上图中时刻c的情况不会发生,而是如同d2一样先提交epoch=4的日志,连带提交epoch=2的日志。
与传统Raft不同,KRaft附加了一个较强的约束:当新的Leader被选举出来,但还没有成功提交属于它的epoch的日志时,不会向前推进HW。也就是说,即使上图中时刻c的情况发生了,消息2也被视为没有成功提交,所以按照d1方式处理是安全的。
日志格式说明:
所有节点持久化保存在本地的日志,大概就是类似于这个样子:
上图显示,共有八条日志数据,其中已经提交了7条,提交的日志都将通过状态机持久化到本地磁盘当中,防止宕机。
日志复制的保证机制
如果两个节点不同的日志文件当中存储着相同的索引和任期号,那么他们所存储的命令是相同的。(原因:leader最多在一个任期里的一个日志索引位置创建一条日志条目,日志条目所在的日志位置从来不会改变)。
如果不同日志中两个条目有着相同的索引和任期号,那么他们之前的所有条目都是一样的(原因:每次RPC发送附加日志时,leader会把这条日志前面的日志下标和任期号一起发送给follower,如果follower发现和自己的日志不匹配,那么就拒绝接受这条日志,这个称之为一致性检查)
日志的不正常情况
一般情况下,Leader和Followers的日志保持一致,因此Append Entries一致性检查通常不会失败。然而,Leader崩溃可能会导致日志不一致:旧的Leader可能没有完全复制完日志中的所有条目。
下图阐述了一些Followers可能和新的Leader日志不同的情况。一个Follower可能会丢失掉Leader上的一些条目,也有可能包含一些Leader没有的条目,也有可能两者都会发生。丢失的或者多出来的条目可能会持续多个任期。
如何保证日志的正常复制
如果出现了上述leader宕机,导致follower与leader日志不一致的情况,那么就需要进行处理,保证follower上的日志与leader上的日志保持一致,leader通过强制follower复制它的日志来处理不一致的问题,follower与leader不一致的日志会被强制覆盖。leader为了最大程度的保证日志的一致性,且保证日志最大量,leader会寻找follower与他日志一致的地方,然后覆盖follower之后的所有日志条目,从而实现日志数据的一致性。
具体的操作就是:leader会从后往前不断对比,每次Append Entries失败后尝试前一个日志条目,直到成功找到每个Follower的日志一致的位置点,然后向该Follower所在位置之后的条目进行覆盖。
详细过程如下:
Leader维护了每个Follower节点下一次要接收的日志的索引,即nextIndex。
Leader选举成功后将所有Follower的nextIndex设置为自己的最后一个日志条目+1。
Leader将数据推送给Follower,如果Follower验证失败(nextIndex不匹配),则在下一次推送日志时缩小nextIndex,直到nextIndex验证通过。
总结一下就是:当leader和follower日志冲突的时候,leader将校验 follower最后一条日志是否和leader匹配,如果不匹配,将递减查询,直到匹配,匹配后,删除冲突的日志。这样就实现了主从日志的一致性。
(二)Raft协议算法代码实现
前面我们已经大致了解了Raft协议算法的实现原理,如果我们要自己实现一个Raft协议的算法,其实就是将我们讲到的理论知识给翻译成为代码的过程,具体的开发需要考虑的细节比较多,代码量肯定也比较大,好在有人已经实现了Raft协议的算法了,我们可以直接拿过来使用。
创建maven工程并导入jar包地址如下:
<dependencies> <dependency> <groupId>com.github.wenweihu86.raft</groupId> <artifactId>raft-java-core</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>com.github.wenweihu86.rpc</groupId> <artifactId>rpc-java</artifactId> <version>1.8.0</version> </dependency> <dependency> <groupId>org.rocksdb</groupId> <artifactId>rocksdbjni</artifactId> <version>5.1.4</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
定义Server端代码实现:
public class Server1 { public static void main(String[] args) { // parse args // peers, format is "host:port:serverId,host2:port2:serverId2" //localhost:16010:1,localhost:16020:2,localhost:16030:3 localhost:16010:1 String servers = "localhost:16010:1,localhost:16020:2,localhost:16030:3"; // local server RaftMessage.Server localServer = parseServer("localhost:16010:1"); String[] splitArray = servers.split(","); List<RaftMessage.Server> serverList = new ArrayList<>(); for (String serverString : splitArray) { RaftMessage.Server server = parseServer(serverString); serverList.add(server); } // 初始化RPCServer RPCServer server = new RPCServer(localServer.getEndPoint().getPort()); // 设置Raft选项,比如: // just for test snapshot RaftOptions raftOptions = new RaftOptions(); /* raftOptions.setSnapshotMinLogSize(10 * 1024); raftOptions.setSnapshotPeriodSeconds(30); raftOptions.setMaxSegmentFileSize(1024 * 1024);*/ // 应用状态机 ExampleStateMachine stateMachine = new ExampleStateMachine(raftOptions.getDataDir()); // 初始化RaftNode RaftNode raftNode = new RaftNode(raftOptions, serverList, localServer, stateMachine); raftNode.getLeaderId(); // 注册Raft节点之间相互调用的服务 RaftConsensusService raftConsensusService = new RaftConsensusServiceImpl(raftNode); server.registerService(raftConsensusService); // 注册给Client调用的Raft服务 RaftClientService raftClientService = new RaftClientServiceImpl(raftNode); server.registerService(raftClientService); // 注册应用自己提供的服务 ExampleService exampleService = new ExampleServiceImpl(raftNode, stateMachine); server.registerService(exampleService); // 启动RPCServer,初始化Raft节点 server.start(); raftNode.init(); } private static RaftMessage.Server parseServer(String serverString) { String[] splitServer = serverString.split(":"); String host = splitServer[0]; Integer port = Integer.parseInt(splitServer[1]); Integer serverId = Integer.parseInt(splitServer[2]); RaftMessage.EndPoint endPoint = RaftMessage.EndPoint.newBuilder() .setHost(host).setPort(port).build(); RaftMessage.Server.Builder serverBuilder = RaftMessage.Server.newBuilder(); RaftMessage.Server server = serverBuilder.setServerId(serverId).setEndPoint(endPoint).build(); return server; }}
定义客户端代码实现如下:
public class ClientMain { public static void main(String[] args) { // parse args String ipPorts = args[0]; String key = args[1]; String value = null; if (args.length > 2) { value = args[2]; } // init rpc client RPCClient rpcClient = new RPCClient(ipPorts); ExampleService exampleService = RPCProxy.getProxy(rpcClient, ExampleService.class); final JsonFormat.Printer printer = JsonFormat.printer().omittingInsignificantWhitespace(); // set if (value != null) { ExampleMessage.SetRequest setRequest = ExampleMessage.SetRequest.newBuilder() .setKey(key).setValue(value).build(); ExampleMessage.SetResponse setResponse = exampleService.set(setRequest); try { System.out.printf("set request, key=%s value=%s response=%s\n", key, value, printer.print(setResponse)); } catch (Exception ex) { ex.printStackTrace(); } } else { // get ExampleMessage.GetRequest getRequest = ExampleMessage.GetRequest.newBuilder().setKey(key).build(); ExampleMessage.GetResponse getResponse = exampleService.get(getRequest); try { String value1 = getResponse.getValue(); System.out.println(value1); System.out.printf("get request, key=%s, response=%s\n", key, printer.print(getResponse)); } catch (Exception ex) { ex.printStackTrace(); } } rpcClient.stop(); }}
先启动服务端,然后启动客户端,就可以将实现客户端向服务端发送消息,并且服务端会向三台机器进行保存消息了。
五、Kafka常见问题
(一)消息队列模型知道吗?Kafka是怎么做到支持这两种模型的?
对于传统的消息队列系统支持两个模型:
点对点:也就是消息只能被一个消费者消费,消费完后消息删除。
发布订阅:相当于广播模式,消息可以被所有消费者消费。
kafka其实就是通过Consumer Group同时支持了这两个模型。如果说所有消费者都属于一个Group,消息只能被同一个Group内的一个消费者消费,那就是点对点模式。如果每个消费者都是一个单独的Group,那么就是发布订阅模式。
(二)说说Kafka通信过程原理吗?
首先kafka broker启动的时候,会去向Zookeeper注册自己的ID(创建临时节点),这个ID可以配置也可以自动生成,同时会去订阅Zookeeper的brokers/ids路径,当有新的broker加入或者退出时,可以得到当前所有broker信。
生产者启动的时候会指定bootstrap.servers,通过指定的broker地址,Kafka就会和这些broker创建TCP连接(通常我们不用配置所有的broker服务器地址,否则kafka会和配置的所有broker都建立TCP连接)
随便连接到任何一台broker之后,然后再发送请求获取元数据信息(包含有哪些主题、主题都有哪些分区、分区有哪些副本,分区的Leader副本等信息)
接着就会创建和所有broker的TCP连接。
之后就是发送消息的过程。
消费者和生产者一样,也会指定bootstrap.servers属性,然后选择一台broker创建TCP连接,发送请求找到协调者所在的broker。
然后再和协调者broker创建TCP连接,获取元数据。
根据分区Leader节点所在的broker节点,和这些broker分别创建连接。
最后开始消费消息。
(三)发送消息时如何选择分区的?
主要有两种方式:
轮询,按照顺序消息依次发送到不同的分区。
随机,随机发送到某个分区。
如果消息指定key,那么会根据消息的key进行hash,然后对partition分区数量取模,决定落在哪个分区上,所以,对于相同key的消息来说,总是会发送到同一个分区上,也是我们常说的消息分区有序性。
很常见的场景就是我们希望下单、支付消息有顺序,这样以订单ID作为key发送消息就达到了分区有序性的目的。
如果没有指定key,会执行默认的轮询负载均衡策略,比如第一条消息落在P0,第二条消息落在P1,然后第三条又在P1。
除此之外,对于一些特定的业务场景和需求,还可以通过实现Partitioner接口,重写configure和partition方法来达到自定义分区的效果。
(四)为什么需要分区?有什么好处?
这个问题很简单,如果说不分区的话,我们发消息写数据都只能保存到一个节点上,这样的话就算这个服务器节点性能再好最终也支撑不住。
实际上分布式系统都面临这个问题,要么收到消息之后进行数据切分,要么提前切分,kafka正是选择了前者,通过分区可以把数据均匀地分布到不同的节点。
分区带来了负载均衡和横向扩展的能力。
发送消息时可以根据分区的数量落在不同的Kafka服务器节点上,提升了并发写消息的性能,消费消息的时候又和消费者绑定了关系,可以从不同节点的不同分区消费消息,提高了读消息的能力。
另外一个就是分区又引入了副本,冗余的副本保证了Kafka的高可用和高持久性。
(五)详细说说消费者组和消费者重平衡?
Kafka中的消费者组订阅topic主题的消息,一般来说消费者的数量最好要和所有主题分区的数量保持一致最好(举例子用一个主题,实际上当然是可以订阅多个主题)。
当消费者数量小于分区数量的时候,那么必然会有一个消费者消费多个分区的消息。
而消费者数量超过分区的数量的时候,那么必然会有消费者没有分区可以消费。
所以,消费者组的好处一方面在上面说到过,可以支持多种消息模型,另外的话根据消费者和分区的消费关系,支撑横向扩容伸缩。
当我们知道消费者如何消费分区的时候,就显然会有一个问题出现了,消费者消费的分区是怎么分配的,有先加入的消费者时候怎么办?
旧版本的重平衡过程主要通过ZK监听器的方式来触发,每个消费者客户端自己去执行分区分配算法。
新版本则是通过协调者来完成,每一次新的消费者加入都会发送请求给协调者去获取分区的分配,这个分区分配的算法逻辑由协调者来完成。
而重平衡Rebalance就是指的有新消费者加入的情况,比如刚开始我们只有消费者A在消费消息,过了一段时间消费者B和C加入了,这时候分区就需要重新分配,这就是重平衡,也可以叫做再平衡,但是重平衡的过程和我们的GC时候STW很像,会导致整个消费群组停止工作,重平衡期间都无法消息消息。
另外,发生重平衡并不是只有这一种情况,因为消费者和分区总数是存在绑定关系的,上面也说了,消费者数量最好和所有主题的分区总数一样。
那只要消费者数量、主题数量(比如用的正则订阅的主题)、分区数量任何一个发生改变,都会触发重平衡。
下面说说重平衡的过程。
重平衡的机制依赖消费者和协调者之间的心跳来维持,消费者会有一个独立的线程去定时发送心跳给协调者,这个可以通过参数heartbeat.interval.ms来控制发送心跳的间隔时间。
每个消费者第一次加入组的时候都会向协调者发送JoinGroup请求,第一个发送这个请求的消费者会成为“群主”,协调者会返回组成员列表给群主。
群主执行分区分配策略,然后把分配结果通过SyncGroup请求发送给协调者,协调者收到分区分配结果。
其他组内成员也向协调者发送SyncGroup,协调者把每个消费者的分区分配分别响应给他们。
(六)具体讲讲分区分配策略?
主要有3种分配策略
Range
对分区进行排序,排序越靠前的分区能够分配到更多的分区。
比如有3个分区,消费者A排序更靠前,所以能够分配到P0\P1两个分区,消费者B就只能分配到一个P2。
如果是4个分区的话,那么他们会刚好都是分配到2个。
但是这个分配策略会有点小问题,他是根据主题进行分配,所以如果消费者组订阅了多个主题,那就有可能导致分区分配不均衡。
比如下图中两个主题的P0\P1都被分配给了A,这样A有4个分区,而B只有2个,如果这样的主题数量越多,那么不均衡就越严重。
RoundRobin
也就是我们常说的轮询了,这个就比较简单了,不画图你也能很容易理解。
这个会根据所有的主题进行轮询分配,不会出现Range那种主题越多可能导致分区分配不均衡的问题。
P0->A,P1->B,P1->A。。。以此类推
Sticky
这个从字面看来意思就是粘性策略,大概是这个意思。主要考虑的是在分配均衡的前提下,让分区的分配更小的改动。
比如之前P0\P1分配给消费者A,那么下一次尽量还是分配给A。
这样的好处就是连接可以复用,要消费消息总是要和broker去连接的,如果能够保持上一次分配的分区的话,那么就不用频繁的销毁创建连接了。
(七)如何保证消息可靠性?
- 生产者发送消息丢失
kafka支持3种方式发送消息,这也是常规的3种方式,发送后不管结果、同步发送、异步发送,基本上所有的消息队列都是这样玩的。
发送并忘记,直接调用发送send方法,不管结果,虽然可以开启自动重试,但是肯定会有消息丢失的可能。
同步发送,同步发送返回Future对象,我们可以知道发送结果,然后进行处理。
异步发送,发送消息,同时指定一个回调函数,根据结果进行相应的处理。
为了保险起见,一般我们都会使用异步发送带有回调的方式进行发送消息,再设置参数为发送消息失败不停地重试。
acks=all,这个参数有可以配置0|1|all。
0表示生产者写入消息不管服务器的响应,可能消息还在网络缓冲区,服务器根本没有收到消息,当然会丢失消息。
1表示至少有一个副本收到消息才认为成功,一个副本那肯定就是集群的Leader副本了,但是如果刚好Leader副本所在的节点挂了,Follower没有同步这条消息,消息仍然丢失了。
配置all的话表示所有ISR都写入成功才算成功,那除非所有ISR里的副本全挂了,消息才会丢失。
retries=N,设置一个非常大的值,可以让生产者发送消息失败后不停重试
Kafka 自身消息丢失。
kafka因为消息写入是通过PageCache异步写入磁盘的,因此仍然存在丢失消息的可能。
因此针对kafka自身丢失的可能设置参数:
replication.factor=N,设置一个比较大的值,保证至少有2个或者以上的副本。
min.insync.replicas=N,代表消息如何才能被认为是写入成功,设置大于1的数,保证至少写入1个或者以上的副本才算写入消息成功。
unclean.leader.election.enable=false,这个设置意味着没有完全同步的分区副本不能成为Leader副本,如果是true的话,那些没有完全同步Leader的副本成为Leader之后,就会有消息丢失的风险。
消费者消息丢失
消费者丢失的可能就比较简单,关闭自动提交位移即可,改为业务处理成功手动提交。
因为重平衡发生的时候,消费者会去读取上一次提交的偏移量,自动提交默认是每5秒一次,这会导致重复消费或者丢失消息。
enable.auto.commit=false,设置为手动提交。
还有一个参数我们可能也需要考虑进去的:
auto.offset.reset=earliest,这个参数代表没有偏移量可以提交或者broker上不存在偏移量的时候,消费者如何处理。earliest代表从分区的开始位置读取,可能会重复读取消息,但是不会丢失,消费方一般我们肯定要自己保证幂等,另外一种latest表示从分区末尾读取,那就会有概率丢失消息。
综合这几个参数设置,我们就能保证消息不会丢失,保证了可靠性。
(八)聊聊副本和它的同步原理吧?
Kafka副本的之前提到过,分为Leader副本和Follower副本,也就是主副本和从副本,和其他的比如Mysql不一样的是,Kafka中只有Leader副本会对外提供服务,Follower副本只是单纯地和Leader保持数据同步,作为数据冗余容灾的作用。
在Kafka中我们把所有副本的集合统称为AR(Assigned Replicas),和Leader副本保持同步的副本集合称为ISR(InSyncReplicas)。
ISR是一个动态的集合,维持这个集合会通过replica.lag.time.max.ms参数来控制,这个代表落后Leader副本的最长时间,默认值10秒,所以只要Follower副本没有落后Leader副本超过10秒以上,就可以认为是和Leader同步的(简单可以认为就是同步时间差)。
另外还有两个关键的概念用于副本之间的同步:
HW(High Watermark):高水位,也叫做复制点,表示副本间同步的位置。如下图所示,04绿色表示已经提交的消息,这些消息已经在副本之间进行同步,消费者可以看见这些消息并且进行消费,46黄色的则是表示未提交的消息,可能还没有在副本间同步,这些消息对于消费者是不可见的。
LEO(Log End Offset):下一条待写入消息的位移
副本间同步的过程依赖的就是HW和LEO的更新,以他们的值变化来演示副本同步消息的过程,绿色表示Leader副本,黄色表示Follower副本。
首先,生产者不停地向Leader写入数据,这时候Leader的LEO可能已经达到了10,但是HW依然是0,两个Follower向Leader请求同步数据,他们的值都是0。
此时,Follower再次向Leader拉取数据,这时候Leader会更新自己的HW值,取Follower中的最小的LEO值来更新。
之后,Leader响应自己的HW给Follower,Follower更新自己的HW值,因为又拉取到了消息,所以再次更新LEO,流程以此类推。
(九)Kafka为什么快?
主要是3个方面:
顺序IO
kafka写消息到分区采用追加的方式,也就是顺序写入磁盘,不是随机写入,这个速度比普通的随机IO快非常多,几乎可以和网络IO的速度相媲美。
Page Cache和零拷贝
kafka在写入消息数据的时候通过mmap内存映射的方式,不是真正立刻写入磁盘,而是利用操作系统的文件缓存PageCache异步写入,提高了写入消息的性能,另外在消费消息的时候又通过sendfile实现了零拷贝。
批量处理和压缩
Kafka在发送消息的时候不是一条条的发送的,而是会把多条消息合并成一个批次进行处理发送,消费消息也是一个道理,一次拉取一批次的消息进行消费。
并且Producer、Broker、Consumer都使用了优化后的压缩算法,发送和消息消息使用压缩节省了网络传输的开销,Broker存储使用压缩则降低了磁盘存储的空间。
参考资料:
1.《深入理解Kafka:核心设计实践原理》
2.状态机程序设计套路
3.raft算法源码
4.https://www.bbsmax.com/A/QW5Y3kaBzm/
总结
认为就是同步时间差)。
另外还有两个关键的概念用于副本之间的同步:
HW(High Watermark):高水位,也叫做复制点,表示副本间同步的位置。如下图所示,04绿色表示已经提交的消息,这些消息已经在副本之间进行同步,消费者可以看见这些消息并且进行消费,46黄色的则是表示未提交的消息,可能还没有在副本间同步,这些消息对于消费者是不可见的。
LEO(Log End Offset):下一条待写入消息的位移
[外链图片转存中…(img-xSblaHHK-1650934432034)]
副本间同步的过程依赖的就是HW和LEO的更新,以他们的值变化来演示副本同步消息的过程,绿色表示Leader副本,黄色表示Follower副本。
首先,生产者不停地向Leader写入数据,这时候Leader的LEO可能已经达到了10,但是HW依然是0,两个Follower向Leader请求同步数据,他们的值都是0。
[外链图片转存中…(img-pLogNcDZ-1650934432035)]
此时,Follower再次向Leader拉取数据,这时候Leader会更新自己的HW值,取Follower中的最小的LEO值来更新。
[外链图片转存中…(img-hV0shtYL-1650934432035)]
之后,Leader响应自己的HW给Follower,Follower更新自己的HW值,因为又拉取到了消息,所以再次更新LEO,流程以此类推。
[外链图片转存中…(img-xdjmej3f-1650934432036)]
(九)Kafka为什么快?
主要是3个方面:
顺序IO
kafka写消息到分区采用追加的方式,也就是顺序写入磁盘,不是随机写入,这个速度比普通的随机IO快非常多,几乎可以和网络IO的速度相媲美。
Page Cache和零拷贝
kafka在写入消息数据的时候通过mmap内存映射的方式,不是真正立刻写入磁盘,而是利用操作系统的文件缓存PageCache异步写入,提高了写入消息的性能,另外在消费消息的时候又通过sendfile实现了零拷贝。
批量处理和压缩
Kafka在发送消息的时候不是一条条的发送的,而是会把多条消息合并成一个批次进行处理发送,消费消息也是一个道理,一次拉取一批次的消息进行消费。
并且Producer、Broker、Consumer都使用了优化后的压缩算法,发送和消息消息使用压缩节省了网络传输的开销,Broker存储使用压缩则降低了磁盘存储的空间。
参考资料:
1.《深入理解Kafka:核心设计实践原理》
2.状态机程序设计套路
3.raft算法源码
4.https://www.bbsmax.com/A/QW5Y3kaBzm/
总结
如果这篇文章对您有所帮助,或者有所启发的话,求一键三连:点赞、转发、在看,您的支持是我坚持写作最大的动力。