2024FFA-分论坛-核心技术专场1-Flink 2.0状态管理存算分离最新进展
内容介绍
1、存算分离架构介绍
2、状态存储内核ForSt
3、工作进展&未来展望
01、存算分离架构介绍
1.1下图是Flink2.0存算分离提出背景
Flink在1.x版本中采用了存算一体的高性能架构,成功解决了众多流处理中的低延迟问题。然而,当前许多用户反映Flink的State功能使用体验不佳,主要问题集中在以下几个方面。
首先,本地磁盘成为了计算节点的瓶颈。这既体现在存储空间上的限制,也体现在I/O性能上的局限。
其次,在检查点过程中,由于状态数据庞大,上传时会导致CPU或网络出现资源尖峰问题。为了应对这种尖峰问题,往往需要预留大量资源,否则会影响流处理的实际性能。
最后,也是最为关键的问题,作业恢复速度缓慢,这成为了阻碍Flink实现云原生部署的一大障碍。在修改配置或调整并发启动时,恢复过程尤为缓慢。
存算分离的架构可以有效解决上述问题。
第一,通过将远程存储作为主存,可以避免本地端存在的限制问题。远程存储可以灵活扩展I/O性能和内存容量,且成本相对较低。
第二,检查点操作可以变得更加轻量,因为状态数据已经存储在远程,无需再进行上传过程。第三,也是较为关键的一点,启动速度可以显著提升。因为可以直接从远程存储中的文件启动,无需进行繁琐的下载过程。
1.2 Flink2.0存算分离的几大工作
第一部分主要阐述了支持状态存储于远端的过程。
第二部分指出,当状态数据直接存储于远端时,面临的最大挑战是性能可能会显著下降,这种下降幅度可能达到十倍甚至上百倍。为了应对这一挑战,我们引入了异步化的处理方法,即利用异步线程来执行I/O操作,从而有效提升系统吞吐能力。第三点则是关于实现快速的检查点及恢复机制。
1.3 Flink2.0存算分离架构介绍
在社区中,提出了许多FLIP(Flink Improvement Proposal)以改进和优化系统。首先,有一个总纲性的FLIP,它针对当前的State API提出了改动方案,旨在将同步化的State API转变为异步化。紧接着,提出了异步化的执行模型,并对State访问过程进行了优化,例如通过批处理来优化现有的State请求。
为了支持远端存储的写入过程,引入了一个全新的内核机制。由于执行已经异步化,Checkpoint的过程可能会变得更为复杂,因为需要处理异步化的相关逻辑。因此,还有一个FLIP专门针对这一场景进行了优化。
最终,所有的操作算子(operator)都将使用新的异步化State API进行实现。这样一来,对于C端用户而言,无需改动他们的代码,就可以享受到存算分离以及异步化所带来的性能提升和优势。
1.4 Flink2.0存算分离--全新State API
在Flink 1.x版本中,State API是同步的。以左上角的WordCount示例为例,程序首先读取当前的count值,然后对其进行加一操作并更新回State,最后将结果发送到下游。在这个模型中,每个并发任务都是一个单线程执行的,因此在读取当前值时,线程会被阻塞,无法处理其他任务。
为了改进这一点,我们将State API改为异步的。具体来说,发送一个异步请求(asyncValue)来获取State的值,并希望用户提供一个回调函数(callback)。当获取到结果后,用户可以在这个回调函数中执行所需的操作,例如在示例中的thenCompose部分进行加一和更新操作。更新完成后,结果会被发送到下游。这就是异步API的工作方式。
同时,我们也保留了同步API的支持,以便用户可以根据需要选择使用。然而,需要注意的是,异步API和同步API混合使用时,性能可能不是最优的,因为同步API具有阻塞特性,会等待结果返回后再继续执行。
1.5 Flink2.0存算分离--异步执行模型
使用异步API进行State访问时,其执行流程如下所述。首先,Task线程作为单线程模型的核心,持续不断地执行用户的代码。对于输入数据,它仍然采用串联的方式进行处理。然而,在执行过程中,当用户需要进行State访问时,这一操作会被抽象为一个StateRequest描述符。这个描述符包含了要执行的Action,比如查询某个Key的值或更新某个Value。
此外,用户还需提供一个callback函数。Task线程会将所有的StateRequest收集起来,并一并发送给State Executor执行层。一旦发送,Task线程会立即返回继续处理其他任务。State Executor则负责异步执行这些StateRequest。执行完成后,Executor会将结果和对应的callback打包,并放入Task线程的输入队列中。Task线程随后会按顺序执行这些callback函数。
在这个过程中,Task线程会继续接收并处理新的输入数据。由于callback的执行可能会产生新的StateRequest,因此这形成了一个循环过程。这个过程会一直持续,直到不再产生新的StateRequest和callback为止。实际上,这是用一个线程来处理当前事件的结果并执行相应的回调。
然而,在实际应用中,我们会遇到许多挑战。首先,是Record保序的问题。对于具有相同Key的Record,由于它们访问的是同一个Key State的Value,因此在顺序执行时没有问题。但是,由于这不是一个原子操作,当不同Record处于不同StateRequest执行阶段时,如果出现乱序,就可能会引发问题。因此,我们需要确保相同Key的数据按顺序处理。
其次,以Watermark为例,它作为一种特殊的输入,会触发Watermark的上升并可能激活一些Timer。问题在于,如果上游的Record还没有处理完,就收到了Watermark上升的信号,那么此时的行为应该如何定义?这是一个需要解决的问题。
最后,还有Checkpoint的处理问题。当触发Checkpoint时,可能还有一些StateRequest或callback正在执行中。这些问题在FLIP-425中都有详细的讨论和解决方案。
1.6 Flink2.0存算分离--异步保序问题
当一个Task接收到输入,如Record1、2、3时,其内部执行模型会根据Record中的Key对它们进行分类处理。对于每个Key,模型确保在同一时间内只有一个Record处于执行状态。如果后续有相同Key的Record到来,它们会被放入一个阻塞队列中等待。
对于可以立即执行的Record,它们会被发送到State Executor中进行处理。处理完成后,会触发相应的callback函数。这个callback函数可能会继续产生新的StateRequest,从而形成一个循环处理过程。
为了确定何时可以处理完所有被阻塞的Record,模型采用了一个引用计数的方式来跟踪每个Key的活跃请求数量。当引用计数降为零,即表示没有任何与该Key相关的活跃请求时,被阻塞的Record就会被从阻塞队列中取出,放入活动队列中等待执行。
这样,通过引用计数和阻塞队列的结合使用,模型能够确保每个Record都按照正确的顺序和条件得到处理。
1.7 Flink2.0存算分离--攒批执行
在State Executor中,线程被明确区分为不同的用途。具体而言,仅有一个写线程负责写入操作,而多个读线程则负责读取操作。写线程在写入时采用批量处理的方式,即先将数据累积到一定量后再进行写入。读线程在读取数据时,也会根据情况选择批量读取(MultiGet)或逐个读取(单个Get)。这种选择是自适应的,主要基于预估的IO效率来决定。
此外,State Executor还支持其他操作,如Iterator等。将读写线程分离的好处在于,读取操作大概率涉及IO操作,因此采用并行且可能阻塞的方式进行读取可以带来较大的性能收益。而写入操作则主要是内存操作,因此无需使用多个线程,以避免不必要的开销。
值得注意的是,State Executor具有自动分割Get和MultiGet请求的能力,这也是自适应的,旨在进一步优化读取性能。
1.8 Flink2.0存算分离--检查点
当前的检查点机制首先涉及的是Aligned Checkpoint。对于Task的输入通道,它会从上游接收Barrier。当所有相关的Barrier都到达Task并完成对齐后,就会触发一次Checkpoint。在这个状态下,Task实际上并未处理数据,而是与StateBackend一起,将内部所需的状态进行快照保存,这就是Aligned Checkpoint。
然而,这里存在一个问题,即input channel的性能取决于Barrier到达的速度。如果Task本身的处理速度较慢,那么Barrier的对齐时间就会相应延长。为了解决这个问题,社区引入了Unaligned Checkpoint机制。这种机制不需要等待Barrier完全对齐,而是直接将当前的输入数据(即In-flight data)作为快照的一部分,与之前的Aligned Checkpoint类似,但更加灵活。
Unaligned Checkpoint的优势在于其速度非常快,因为它不受数据处理速度的限制。然而,在引入存算分离后,问题可能变得更加复杂。无论触发哪种类型的Checkpoint,只要触发Checkpoint,就可能存在尚未完成的StateRequest或等待执行的callback。在Task与StateBackend交互的过程中,是无法进行Checkpoint的。因此,通常的做法是等待所有交互完成后,再进行Aligned或Unaligned Checkpoint。
然而,在Unaligned Checkpoint的情况下,如果仍然需要等待输入数据处理完成,那么就会降低其速度优势。为了解决这个问题,提出了FLIP-455。FLIP-455主要针对的是那些尚未开始的StateRequest,这些请求通常位于Blocking buffer中(与Active buffer相比,Blocking buffer占主导地位,而Active buffer则相对较少)。FLIP-455建议将这些尚未开始的StateRequest也纳入Checkpoint中,并在恢复时重新运行它们进行判断。这样一来,就无需等待这些请求完成,从而提高了Unaligned Checkpoint的速度。
一方面,我们提供了一套用于定义用户逻辑的API,另一方面,我们也支持将StateRequest持久化到检查点(Checkpoint,简称CP)中。首先,让我们聚焦于StateRequest持久化到CP的问题,它涉及两个方面。
一方面是关于Action的持久化。Action需要指定其所属的类别和Key,由于我们有现成的用户序列化Key ladder,因此Action的序列化相对容易。
然而,另一方面,Callback的序列化则复杂得多。虽然Callback理论上也可以被序列化,但实际操作中会遇到诸多问题。用户通常会使用lambda表达式或匿名类来定义Callback,而匿名类在跨JVM执行时可能会引发潜在问题。此外,用户定义的lambda表达式中可能隐藏着一些bug,例如除零错误或空指针异常(NullPointerException,简称NP)。在恢复过程中,由于序列化的内容会原封不动地恢复,用户无法绕过Callback的执行,因此也无法修正这些bug。这是一个相当严重的问题。
为了解决这个问题,我们采取了一种替代方案:只记录Callback的名字,在恢复时根据这个名字重新定义一个与原来功能相同的Callback,并将其重新分配给对应的Request。这就是我们所谓的声明用户逻辑的API,它实际上是在声明Callback本身。
以name function为例,用户在定义Callback时可以提供一个名字,这个名字可以省略,也可以由系统自动生成。这是一个特殊的API,它允许用户在pipeline中声明Callback,并在后续过程中将其放入。通过这种方式,我们可以支持在Unaligned Checkpoint下快速进行Checkpoint操作,同时确保Callback的正确性和可执行性。
02、状态存储内核ForSt
接下来,我们将详细介绍一个因存算分离而引入的新内核——ForSt,详细介绍目前所做的内容。
2.1下图是存算分离&嵌入式存储:ForSt
ForSt,其命名灵感源自“For Streaming DB”,其核心改动在于引入了一层Unified FileSystem Layer(统一文件管理层)。这一层相当于一个屏蔽层,它掩盖了底层复杂的存储结构,无论是本地磁盘、分布式文件系统(DFS)还是缓存(Cache),都能被统一管理和访问。因此,ForSt DB内核能够轻松地访问远端存储,并在需要时利用本地磁盘作为缓存,以提升性能。
ForSt DB具备一系列特性,如远端读写能力、批量并发读写优化、本地磁盘缓存机制以及嵌入式数据库功能等,这些都使其在处理数据流方面表现出色。它依然依赖于tm进行存活管理,并且支持Flink的某些特性,如TTL(Time To Live,生存时间)和Snapshot(快照)等。
实际上,ForSt起源于FRocksDB,这是社区中ForSt内核的一个早期版本。现在,ForSt已经发展成为一个新的项目,并在原有基础上进行了一系列改动,这些改动使得它与之前的版本有了较大的区别。ForSt的内核将持续演进,以满足不断变化的需求,并且目前已经开源,供开发者使用和改进。
2.2 ForSt:远端读写
ForSt最为关键的功能在于它支持远端读写。在ForSt的内部设计中,首先定义了一个FileSystem接口,所有的存储操作都通过这个接口来执行。这个接口具备高度的灵活性,因为ForSt内部可以针对多种不同的存储系统实现相应的FileSystem,比如HDFS、OSS、S3等。
然而,为了快速构建并验证整个系统链路,目前ForSt依托于Flink来运行。在Flink的ForSt实现中,我们充分利用了Flink已经支持的多种FileSystem实现。这样做的好处是显而易见的:Flink支持哪些文件系统,ForSt就能支持哪些文件系统,无需额外开发。因此,无论是HDFS这样的远端存储系统,还是其他Flink支持的文件系统,ForSt都能轻松应对。
此外,由于实际的FileSystem实现位于Flink端,这为后续的一些功能开发带来了便利。例如,在实现Checkpoint(检查点)和DB文件之间的共享时,我们可以更加轻松地实现轻量级的Checkpoint功能。当然,这种设计也存在一些潜在的问题。由于中间需要经过一层GNI的调用,文件访问性能可能会受到一定影响。但相对而言,这种影响主要体现在CPU资源占用上,与IO性能相比,其影响相对较小。
2.3 ForSt:批量读写
ForSt支持批量读写功能,这一特性源自其前身FRocksDB。因此,ForSt继承了FRocksDB中的一些关键接口,如用于批量写的WriteBatch接口和用于批量读的MultiGet接口。通过实际测试,我们验证了这些接口带来的显著性能提升。这些好处包括但不限于提高数据处理的吞吐量、降低延迟以及优化资源利用率。
2.4 ForSt:快速检查点
接下来,我们探讨如何实现快速检查点(Checkpoint)。快速检查点有多种实现方式,这里我们主要讨论基于incremental(增量)的方法。
首先,我们考虑一个理想的场景:CP(检查点)文件和DB(数据库)所拥有的SST文件位于相同的存储介质上,并且存放在相同的文件夹内。这是最为便捷的情况,也是系统默认的配置。在这种场景下,DB文件可以直接被CP使用,无需额外的复制或移动操作。然而,这里存在一个文件所有权(owner)的问题:对于CP而言,文件的所有者是GM;而对于DB,文件的所有者可能是TM(任务管理器)或任务的一部分。实际上,系统只需执行一个简单的文件移交操作:在创建检查点时,将所需的SST文件全部标记为CP文件。对于DB来说,如果它不再需要某个文件,也不会立即删除,而是将删除的任务交给GM。这里有一个潜在的问题:如果DB仍然需要某个文件,而GM却删除了它,会怎么办?幸运的是,由于采用了incremental Checkpoint机制,GM只会在确认下一个Checkpoint不再需要该文件后才进行删除。因此,如果下一个Checkpoint不包含某个文件,那么DB实际上已经不再需要它了。所以,在这种情况下,文件的删除意味着它不再被DB所需,只需简单地进行标记即可。
接下来是第二种场景:CP和DB虽然使用相同的存储介质,但存放在不同的文件夹内。这也是一种可能的情况,主要取决于文件管理的策略。在这种场景下,我们需要将DB上的文件复制到CP文件夹内。这个过程类似于Flink社区中的PathsCopying功能(现已更名为PathsCopying ForSt系统接口)。
最后是第三种场景:CP和DB使用完全不同的存储介质,例如DB使用本地存储,而CP使用远端存储。在这种情况下,我们只能采取慢拷贝的方式,即将文件从本地存储传输到远端存储。
2.5 ForSt:快速恢复
目前,我们的讨论聚焦于统一存储环境下的相关问题。在最佳情况下,即发生Failover(故障转移)时,系统可以直接在原地重启,无需进行额外的操作。
接下来,我们谈谈手动Restore(恢复)的情况。手动Restore指的是在停止服务后,再重新启动服务,此过程中不改变并发度,但可以调整其他配置,不涉及Rescale(重新缩放或资源调整)。根据Claim Mode(声明模式)的不同,恢复行为也会有所差异。
如果采用的是Claim Mode,那么可以直接启动服务,因为文件实际上是由GM进行管理的。在GM的管理下,如果下一个Checkpoint不再需要某个文件,该文件才会被删除。因此,在Claim Mode下,恢复服务时无需担心文件保留问题。
然而,如果采用的是No Claim Mode,情况就会复杂一些。在No Claim Mode下,之前的Checkpoint文件并不能被视为可靠的,因为它们并不归GM管理。因此,在恢复服务之前,我们需要将这些Checkpoint文件拷贝出来。这个拷贝过程同样会面临快拷贝和慢拷贝的问题,具体取决于存储介质和文件大小等因素。
接下来,我们讨论Rescale(重新缩放或资源调整)的问题。在当前的2.0 ForSt DB系统中,Rescale过程主要通过ClipDB和IngestDB两个流程来实现。
ClipDB流程相当于按需对一个数据库(DB)所需的文件进行裁剪。它根据Key group的前缀来进行裁剪,以满足特定的需求。例如,如果只需要使用到10-19号Key group的文件,那么ClipDB就会生成一个新的、只包含这些Key group文件的数据库,我们称之为DB1。
而IngestDB流程则是将多个数据库合并成一个。这个过程中,不同Key group的数据库可以被合并成一个新的数据库。这种合并的好处在于能够最小化文件写入操作。以之前的例子为例,在Rescale过程中,只有紫色的文件会被重写,而其他文件则保持不变。因此,这种Rescale方式相对较快,且对系统的影响较小。
通过这两个流程,ForSt DB系统能够高效地实现数据库的重新缩放和资源调整,以满足不断变化的需求。
03、工作进展&未来展望
3.1下图是存算分离:Preview版本与2.0进展
在十月份发布的Flink 2.0 Preview版本中,ForSt内核全面支持了所有的State API执行模型等相关功能。同时,还引入了支持SQL的Regular Join算子,该算子在next mark阶段的一个端口上已经可以进行运行测试。在读写链路上,目前的状态是已经全面完成,纯远端访问的性能也符合预期。
关于即将发布的2.0版本,将包含以下功能特性:首先,我们实现了快速检查点恢复的功能,但需要注意的是,FLIP-455中提出的对异步优化的Checkpoint功能暂不支持。其次,2.0版本将支持本地磁盘Cache功能。尽管目前系统已经支持纯远端访问,但本地磁盘的存在也可以被有效利用作为Cache。这一功能现已实现,并且对于所有常见的Operator状态算子,如Agg、Join、Rank等(约占70%)都进行了重写和优化。此外,我们还提供了一个性能保障:在本地磁盘Cache占State 50%的情况下,系统仍能保证高性能运行,这部分State数据可以顺利存放在本地磁盘上。目前,ForSt的性能已经不逊色于FRocksDB的方案。
3.2 存算分离:性能测试
我们进行了一系列测试,首先是WordCount测试。该测试的特点在于State设置得相对较大,但Key的访问是随机的,且冲突较少。在采用FRocksDB方案和纯本地方案时,我们获得了约19.2 KB/s的性能表现。
在ForSt方案中,当访问远端且未采用异步优化时,其性能约为FRocksDB方案的十分之一。然而,当我们为ForSt加上异步访问框架后,其性能显著提升,达到了FRocksDB方案约85%的水平。更进一步,当我们利用本地磁盘作为Cache,并将50%的State数据存放在本地时,ForSt的性能甚至超越了DB处理方案,展现出了卓越的性能表现。
接下来,我们来看Nexmark Q20的测试,该测试主要考察的是Regular Join的性能,其特点是Key的冲突较多。在这种场景下,异步(Async)与同步(Sync)之间的性能差距可能并不那么显著。
对于FRocksDB方案,我们有一个基准的性能值。而现在,当我们在ForSt中采用Async加Remote的流程时,其性能已经超过了FRocksDB方案的50%。更进一步,如果我们再加上本地磁盘Cache的利用,那么ForSt的性能基本上已经与FRocksDB方案持平,成本也相当。
3.3 存算分离展望:2.0版本之后
在SQL Operator方面,我们将继续完善其支持,力求达到百分之百的兼容性。同时,我们会继续优化FLIP-455提出的更快速的检查点机制,并可能考虑支持更多特性。由于已经实现了存算分离,我们接下来可能会探索如Remote Compaction(远端压缩)等过程。
3.4 Forst展望:更多流特性支持
对于ForSt内核,我们也希望增强其对流特性的支持。例如,我们将实现原生的TTL(Time To Live)支持,以替代当前通过Flink的Java代码嵌入到ForStDB中的方式。此外,我们还将引入文件级别的直接裁剪功能,以解决用户反馈的某些文件因未进行Compaction而无法删除的问题。我们将通过文件级的协调机制,实现直接删除这些文件的功能。
在状态迁移方面,我们计划支持懒加载模式。目前,状态迁移是在Resort(重新排序)过程中进行的,这可能会导致一些状态迁移的变化。而在未来,我们将允许系统直接启动,并在后续的Compaction过程中进行状态迁移。同时,我们还将优化Compaction的频率调度,以提高效率。
在检查点方面,目前的做法是直接触发一个flash操作,这可能会进一步触发Compaction。然而,对于Memtable(内存表)本身,我们并不需要每次都进行flash操作来创建检查点。因此,我们计划实现一种更优化的检查点机制,该机制将考虑时间分片,并将批处理结果按partition(分区)和时间属性进行划分。对于流处理的结果,即中间状态,它们同样具有时间属性,这意味着我们可以根据时间属性来管理和优化这些状态。
例如,对于某些在特定时间后不再访问的State,我们可以直接将其删除。因此,我们将引入时间分片的管理机制,以更好地利用流数据的访问特点。虽然点查询可能更为常见,但范围查询也是不可或缺的。因此,我们还将针对点查询和范围查询的场景进行一些优化。ForSt DB无疑会有更加长远和广阔的发展前景。