SOFAJRaft-RheaKV MULTI-RAFT-GROUP 实现分析 | SOFAJRaft 实现原理

简介: SOFAStack (Scalable Open Financial Architecture Stack)是蚂蚁金服自主研发的金融级分布式架构,包含了构建金融级云原生架构所需的各个组件,是在金融场景里锤炼出来的最佳实践。

SOFAStack (Scalable Open Financial Architecture Stack)是>蚂蚁金服自主研发的金融级分布式架构,包含了构建金融级云原生架构所需的各个组件,是在金融场景里锤炼出来的最佳实践。

image.png

SOFAJRaft 是一个基于 Raft 一致性算法的生产级高性能 Java 实现,支持 MULTI-RAFT-GROUP,适用于高负载低延迟的场景。

本文为《剖析 | SOFAJRaft 实现原理》第五篇,本篇作者袖扣,来自蚂蚁金服。

《剖析 | SOFAJRaft 实现原理》系列由 SOFA 团队和源码爱好者们出品,项目代号:,文章尾部有参与方式,欢迎同样对源码热情的你加入。

SOFAJRaft:

https://github.com/alipay/sofa-jraft

前言

RheaKV 是首个以 JRaft 为基础实现的一个原生支持分布式的嵌入式键值(key、value)数据库,现在本文将从 RheaKV 是如何利用 MULTI-RAFT-GROUP 的方式实现 RheaKV 的高性能及容量的可扩展性的,从而进行全面的源码、实例剖析。

MULTI-RAFT-GROUP

通过对 Raft 协议的描述我们知道:用户在对一组 Raft 系统进行更新操作时必须先经过 Leader,再由 Leader 同步给大多数 Follower。而在实际运用中,一组 Raft 的 Leader 往往存在单点的流量瓶颈,流量高便无法承载,同时每个节点都是全量数据,所以会受到节点的存储限制而导致容量瓶颈,无法扩展。

MULTI-RAFT-GROUP 正是通过把整个数据从横向做切分,分为多个 Region 来解决磁盘瓶颈,然后每个 Region 都对应有独立的 Leader 和一个或多个 Follower 的 Raft 组进行横向扩展,此时系统便有多个写入的节点,从而分担写入压力,图如下:

image.png

此时磁盘及 I/O 瓶颈解决了,那多个 Raft Group 是如何协作的呢,我们接着往下看。

选举与复制

RheaKV 主要由 3 个角色组成:PlacementDriver(以下成为 PD) 、Store、Region。由于 RheaKV 支持多组 Raft,所以比单组场景多出一个 PD 角色,用来调度以及收集每个 Store 及 Region 的基础信息。

image.png

PlacementDriver

PD 负责整个集群的管理调度、Region ID 生成等。此组件非必须的,如果不使用 PD,设置 PlacementDriverOptions 的 fake 属性为 true 即可。PD 一般通过 Region 的心跳返回信息进行对 Region 调度,Region 处理完后,PD 则会在下一个心跳返回中收到 Region 的变更信息来更新路由及状态表。

Store

通常一个 Node 负责一个 Store,Store 可以被看作是 Region 的容器,里面存储着多个分片数据。Store 会向 PD 主动上报 StoreHeartbeatRequest 心跳,心跳交由 PD 的 handleStoreHeartbeat 处理,里面包含该 Store 的基本信息,比如,包含多少 Region,有哪些 Region 的 Leader 在该 Store 等。

Region

Region 是数据存储、搬迁的最小单元,对应的是 Store 里某个实际的数据区间。每个 Region 会有多个副本,每个副本存储在不同的 Store,一起组成一个Raft Group。Region 中的 Leader 会向 PD 主动上报 RegionHeartbeatRequest 心跳,交由 PD 的 handleRegionHeartbeat 处理,而 PD 是通过 Region的 Epoch 感知 Region 是否有变化。

RegionRouteTable 路由表组件

Muti-Raft-Group 的多 Region 是通过 RegionRouteTable 路由表组件进行管理的,可通过 addOrUpdateRegion、removeRegion 进行添加、更新、移除 Region,也包括 Region 的拆分。目前暂时还未实现 Region 的聚合,后面会考虑实现。

分区逻辑与算法 Shard

image.png

“让每组 Raft 负责一部分数据。”

数据分区或者分片算法通常就是 Range 和 Hash,RheaKV 是通过 Range 进行数据分片的,分成一个个 Raft Group,也称为 Region。这里为何要设计成 Range 呢?原因是 Range 切分是按照对 Key 进行字节排序后再做每段每段切分,像类似 scan 等操作对相近 key 的查询会尽可能集中在某个 Region,这个是 Hash 无法支持的,就算遇到单个 Region 的拆分也会更好处理一些,只用修改部分元数据,不会涉及到大范围的数据挪动。

当然 Range 也会有一个问题那就是,可能会存在某个 Region 被频繁操作成为热点 Region。不过也有一些优化方案,比如 PD 调度热点 Region 到更空闲的机器上,或者提供 Follower 分担读的压力等。

Region 和 RegionEpoch 结构如下:

image.png

Region.id:为 Region 的唯一标识,通过 PD 全局唯一分配。

Region.startKey、Region.endKey:这个表示的是 Region 的 key 的区间范围 [startKey, endKey),特别值得注意的是针对最开始 Region 的 startKey,和最后 Region 的 endKey 都为空。

Region.regionEpoch:当 Region 添加和删除 Peer,或者 split 等,此时 regionEpoch 就会发生变化,其中 confVer 会在配置修改后递增,version 则是每次有 split 、merge(还未实现)等操作时递增。

Region.peers:peers 则指的是当前 Region 所包含的节点信息,Peer.id 也是由 PD 全局分配的,Peer.storeId 代表的是 Peer 当前所处的 Store。

读与写 Read / Write

由于数据被拆分到不同 Region 上,所以在进行多 key 的读、写、更新操作时需要操作多个 Region,这时操作前我们需要得到具体的 Region,然后再单独对不同 Region 进行操作。我们以在多 Region上 scan 操作为例, 目标是返回某个 key 区间的所有数据:

  1. 我们首先看 scan 方法的核心调用方法 internalScan 的异步实现:

例如:

com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore#scan(byte[], byte[], boolean, boolean)

image.png

很容易看到,在调用 scan 首先让 PD Client 通过 RegionRouteTable.findRegionsByKeyRange 检索 startKey、endKey 所覆盖的 Region,最后返回的可能为多个 Region,具体 Region 覆盖检索方法如下:

image.png

检索相关变量定义如下:

image.png

我们可以看到整个 RheaKV 的 range 路由表是通过 TreeMap 的进行存储的,正呼应我们前面讲过所有的 key 是通过对应字节进行排序存储。对应的 Value 为该 Region 的 RegionId,随后我们通过 Region 路由 regionTable 查出即可。

现在我们得到 scan 覆盖到的所有 Region:List 在循环查询中我们看到有一个“retryCause -> {}”的 Lambda 表达式很容易看出这里是加持异常重试处理,后面我们会讲到,接下来会通过 internalRegionScan 查询每个 Region 的结果。具体源码如下:

image.png

这里也同样有一个重试处理,可以看到代码中根据当前是否为 Region 节点来决定是本机查询还是通过 RPC 进行查询,如果是本机则调用 rawKVStore.scan() 进行本地直接查询,反之通过 rheaKVRpcService 进行 RPC 远程节点查询。最后每个 Region 查询都返回为一个 future,通过 FutureHelper.joinList 工具类 CompletableFuture.allOf 异步并发返回结果 List。

  1. 我们再看看写入具体流程。相比 scan 读,put 写相对比较简单,只需要针对 key 计算出对应 Region 再进行存储即可,我们可以看一个异步 put 的例子。

例如:

com.alipay.sofa.jraft.rhea.client.DefaultRheaKVStore#put(java.lang.String, byte[])

image.png

我们可以发现 put 基础方法是支持 batch 的,即可成批提交。如未使用 batch 即直接提交,具体逻辑如下:

image.png

通过 pdClient 查询对应存储的 Region,并且通过 regionId 拿到 RegionEngine,再通过对应存储引擎 KVStore 进行 put,整个过程同样支持重试机制。我们再回过去看看 batch 的实现,很容易发现利用到了 Disruptor 的 RingBuffer 环形缓冲区,无锁队列为性能提供了保障,代码现场如下:

image.png

Split / Merge

  1. 什么时候 Region 会拆分?

前面我们有讲过,PD 会在 Region 的 heartBeat 里面对 Region 进行调度,当某个 Region 里的 keys 数量超过预设阀值,我们即可对该 Region 进行拆分,Store 的状态机 KVStoreStateMachine 即收到拆分消息进行拆分处理。具体拆分源码如下:

KVStoreStateMachine.doSplit 源码如下:
image.png

StoreEngine.doSplit 源码如下:

image.png

我们可以轻易的看到从原始 parentRegion 切分成 region 和 pRegion,并重设了 startKey、endKey 和版本号,并添加到 RegionEngineTable 注册到 RegionKVService,同时调用 pdClient.getRegionRouteTable().splitRegion() 方法进行更新存储在 PD 的 Region 路由表。

  1. 什么时候需要对 Region 进行合并?

既然数据过多需要进行拆分,那 Region 进行合并那就肯定是 2 个或者多个连续的 Region 数据量明显小于绝大多数 Region 容量则我们可以对其进行合并。这一块后面会考虑实现。

RegionKVService 结构及实现分析

StoreEngine

通过上面我们知道,一个 Store 即为一个节点,里面包含着一个或者多个 RegionEngine,一个 StoreEngine 通常通过 PlacementDriverClient 对 PD 进行调用,同时拥有 StoreEngineOptions 配置项,里面配置着存储引擎和节点相关配置。

我们以默认的 DefaultRheaKVStore 加载 StoreEngine 为例,DefaultRheaKVStore 实现了 RheaKVStore 接口的基础功能,从最开始 init 方法,根据 RheaKVStoreOptions 加载了 pdClient 实例,随后加载 storeEngine。

在 StoreEngine 启动的时候,首先会去加载对应的 StoreEngineOptions 配置,构建对应的 Store 配置,并且生成一致性读的线程池 readIndexExecutor、快照线程池 snapshotExecutor、RPC 的线程池 cliRpcExecutor、Raft 的 RPC 线程池 raftRpcExecutor,以及存储 RPC 线程池 kvRpcExecutor、心跳发送器 HearteatSender 等,如果打开代码,我们还能看到 metricsReportPeriod,打开配置可以进行性能指标监控。

在 DefaultRheaKVStore 加载完所有工序之后,便可使用 get、set、scan 等操作,还包含对应同步、异步操作。

在这个过程中里面的 StoreEngine 会记录着 regionKVServiceTable、regionEngineTable,它们分别掌握着具体每个不同的 Region 存储的操作功能,对应的 key 即为 RegionId。

RegionEngine

每个在 Store 里的 Region 副本中,RegionEngine 则是一个执行单元。它里面记录着关联着的 StoreEngine 信息以及对应的 Region 信息。由于它也是一个选举节点,所以也包含着对应状态机 KVStoreStateMachine,以及对应的 RaftGroupService,并启动里面的 RpcServer 进行选举同步。

这个里面有个 transferLeadershipTo 方法,这个可被调用用于平衡当前节点分区的 Leader,避免压力重叠。

DefaultRegionKVService 是 RegionKVService 的默认实现类,主要处理对 Region 的具体操作。

RheaKV FailoverClosure 解读

需要特别讲到的是,在具体的 RheaKV 操作时,FailoverClosure 担任着比较重要的角色,也给整个系统增加了一定的容错性。假如在一次 scan 操作中,如果跨 Store 需要多节点 scan 数据的时候,任何网络抖动都会造成数据不完整或者失败情况,所以允许一定次数的重试有利于提高系统的可用性,但是重试次数不宜过高,如果出现网络堵塞,多次 timeout 级别失败会给系统带来额外的压力。这里只需要在 DefaultRheaKVStore 中,进行配置 failoverRetries 设置次数即可。

RheaKV PD 之 PlacementDriverClient

PlacementDriverClient 接口主要由 AbstractPlacementDriverClient 实现,然后 FakePlacementDriverClient、RemotePlacementDriverClient 为主要功能。FakePlacementDriverClient 是当系统不需要 PD 的时候进行 PD 对象的模拟,这里主要讲到 RemotePlacementDriverClient。

  • RemotePlacementDriverClient 通过PlacementDriverOptions 进行加载,并根据基础配置刷新路由表;
  • RemotePlacementDriverClient 承担着对路由表RegionRouteTable 的管控,例如获取Store、路由、Leader节点信息等;
  • RemotePlacementDriverClient 还包含着 CliService,通过 CliService 外部可对复制节点进行操作运维,如 addReplica、removeReplica、transferLeader。

总结

由于很多传统存储中间件并不原生支持分布式,所以一直少有体感,Raft 协议是一套比较比较好理解的共识协议,SOFAJRaft 通俗易懂是一个非常好的代码和工程范例,同时 RheaKV 也是一套非常轻量化支持多存储结构可分片的嵌入式数据库。写一篇代码分析文章也是一个学习和进步的过程,由此我们也可以窥探到了一些数据库的基础实现,祝愿社区能在 SOFAJRaft / RheaKV 基础上构建更加灵活和自治理的系统和应用。

目录
相关文章
|
运维 监控 Java
探索Elasticsearch在Java环境下的全文检索应用实践
【6月更文挑战第30天】在大数据背景下,Elasticsearch作为分布式搜索分析引擎,因其扩展性和易用性备受青睐。本文指导在Java环境中集成Elasticsearch,涉及安装配置、使用RestHighLevelClient连接、索引与文档操作,如创建索引、插入文档及全文检索查询。此外,还讨论了高级查询、性能优化和故障排查,帮助开发者高效处理非结构化数据全文检索。
408 0
|
JSON JavaScript 数据可视化
D3 不到20行代码就能实现世界地图的绘制
每到农历年末,相信很多小伙伴和本作者一样,都忍不住会去看江苏卫视的一档脑力比拼节目《最强大脑》,尽管上一季最强大脑喷点确实很多,但依旧没有减弱"追剧"的热情。今年最强大脑(第5季)的赛制有很大的变化,挑战的人数从百人大战,到最强30脑,再到现从第三场的一对一PK,确实与以往有了很大的不同。此外,今年更加强调了选手在生活中的光环,例如本文要引用的一场比赛就是最近一期来自清华的孙勇与北京的陈泽坤的一场以地图投影为背景的比赛,孙勇就是顶着2016安徽省高考理科状元的光环来的。今年没了叨叨魏,节目的流程显得自然了很多。好了,不扯了,来、来、来来来!我们开始说本文要讲的主题--地图。
1916 0
D3 不到20行代码就能实现世界地图的绘制
|
5月前
|
XML 人工智能 定位技术
如何让AI更懂你?掌握提示词与上下文工程的核心思维
本文深入解析与大型语言模型交互的核心技巧,涵盖提示词(Prompt)、提示词工程与上下文工程三大关键概念,助你从AI用户进阶为高效引导者,全面提升AI应用能力。
|
11月前
|
人工智能 自然语言处理 搜索推荐
阿里云携手叫叫,共创儿童学习AI新体验
阿里云携手叫叫,共创儿童学习AI新体验
|
存储 关系型数据库 MySQL
MySQL数据库碎片化:隐患与解决策略
UUID作为主键可能导致MySQL存储碎片,影响性能。频繁的DML操作、字段长度变化和非顺序插入(如UUID)都会造成碎片。碎片增加磁盘I/O,降低查询效率,浪费空间,影响备份速度。建议使用自增ID,固定长度字段,并适时运行OPTIMIZE TABLE来减少碎片。
|
存储 缓存 人工智能
深度解析CPFS 在 LLM 场景下的高性能存储技术
本文深入探讨了CPFS在大语言模型(LLM)训练中的端到端性能优化策略,涵盖计算端缓存加速、智能网卡加速、数据并行访问及数据流优化等方面。重点分析了大模型对存储系统的挑战,包括计算规模扩大、算力多样性及数据集增长带来的压力。通过分布式P2P读缓存、IO加速、高性能存算通路技术以及智能数据管理等手段,显著提升了存储系统的吞吐量和响应速度,有效提高了GPU利用率,降低了延迟,从而加速了大模型的训练进程。总结了CPFS在AI训练场景中的创新与优化实践,为未来大模型发展提供了有力支持。
|
机器学习/深度学习 数据采集 人工智能
探索AI驱动的自动化测试新纪元###
本文旨在探讨人工智能如何革新软件测试领域,通过AI技术提升测试效率、精准度和覆盖范围。在智能算法的支持下,自动化测试不再局限于简单的脚本回放,而是能够模拟复杂场景、预测潜在缺陷,并实现自我学习与优化。我们正步入一个测试更加主动、灵活且高效的新时代,本文将深入剖析这一变革的核心驱动力及其对未来软件开发的影响。 ###
|
Java 编译器 C语言
【C/C++】 switch-case 详解/全面总结
关于 C语言/C++ 中,switch-case 的尽量详细和全面的解释与总结
4886 0
|
SQL 存储 缓存
大厂 5 年实时数据开发经验总结,Flink SQL 看这篇就够了!
大厂 5 年实时数据开发经验总结,Flink SQL 看这篇就够了!
501 58
|
数据安全/隐私保护 开发者 Docker
国内docker公开镜像站的关闭!别急,docker_image_pusher 使用Github Action将国外的Docker镜像转存到阿里云私有仓库
通过使用 docker_image_pusher 这样的开源项目,我们能够轻松地解决国内访问 Docker 镜像拉取速度慢及拉去失败的问题,同时保证了镜像的稳定性和安全性。利用 Github Action 的自动化功能,使得这一过程更加简单和高效。
3146 2