Flink State - Backend Improvements and Evolution in 2021

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 李钰 (绝顶)、唐云 (茶干) 在 FFA 2021 核心技术专场的分享

摘要:本文整理自 ASF Member、Apache Flink & HBase PMC、阿里巴巴资深技术专家李钰 (绝顶),Apache Flink Committer、阿里巴巴技术专家唐云 (茶干) 在 Flink Forward Asia 2021 核心技术专场的演讲。主要内容包括:

  1. State Backend Improvement
  2. Snapshot Improvement
  3. Future Work

FFA 2021 直播回放 & 演讲 PDF 下载

一、State Backend improvement

在过去一年中,Flink 社区 state-backend 模块有了很大的发展。在 1.13 版本之前,用户对于状态相关算子的性能缺乏监控手段,也没有很好的办法可以获悉状态读写操作的延迟。

img

我们引入了状态访问的延时监控,原理就是在每次状态访问前后,使用 system.nowTime 去统计访问延时,然后将其存储在一个 histgram 类型的指标中,监控功能打开对其性能的影响,尤其是 state 访问的性能影响是比较小的。

img

关于状态访问延迟监控相关的配置,需要特别强调的是采样间隔和保留历史数据这两个配置,采样间隔越小,数据结果就越准确,但是对日常访问的性能影响也稍大一些;历史数据保留的个数越多,数据结果越精确,但是内存占用也会稍大一些。

img

在 Flink 1.14 版本中,我们终于将 RocksDB 从 5.17 升级到 6.20 版本,除了 RocksDB 自身若干 bug 的修复,新版 RocksDB 也增加了一些特性,可以用于 Flink 1.14 和 1.15 中。首先它支持了 ARM 平台,可以保证 Flink 作业能够在 arm 基础上运行,其次提供了更细粒度的 WriteBuffer 内存管控,提升内存管控的稳定性。此外,还提供了 deleteRange 接口,为之后在扩容场景下的性能提升带来非常大的帮助。

img

随着云原生的愈发流行,通过 K8s 调入在容器环境中运行 Flink 作业已经成为越来越多厂商的选择,这其中不可避免需要考虑受限资源如何稳定运行,尤其是内存使用方面的管控。而诞生在 2010 年度的 RocksDB 在这方面的能力先天有些不足,Flink 1.10 才首次引入了内存管控。在过去的一年中, RocksDB 在内存管控方面又有了一些进步和改善。

首先回顾一下 RocksDB 内存方面的问题,谈这个问题之前要了解 Flink 是如何使用 state 和 RocksDB 的。

  • Flink 每声明一个 state,都会对应 RocksDB 中的一个 column family,column family 是 RocksDB 中独立的内存分配,它们之间通过物理资源来隔离;
  • 其次,Flink 并不限制用户在一个 operator 内声明的 state 数目,所以它也没有限制 column family 数目;
  • 最后,Flink 在 slot-sharing 机制下一个 slot 内可以存在多个包含 keyed state 的 operator。

基于以上三个原因,即使不考虑 RocksDB 自身在内存管理上的限制,理论上来说 Flink 的使用方式就有可能导致不受限的内存使用。

img

上图定义了一个 SQL 类的多个 RocksDB 的实例,共享了一个 writeBuffer manager 及其对应的 block cache,其中管理多个 writeBuffer 的 manager 将它所申请的内存在 block cache 中进行记账,数据相关的 block 在 block cache 中进行缓存,缓存包括数据相关的 data block,索引相关的 index block 和过滤用的 filter block,可以简单地理解成写缓存和读缓存。

由此可见,writeBuffer manager 与 block cache 协同工作的方式就是 manager 在 block cache 中进行记账。

img

manager 在 buffer 申请流程后,会以 io blocks 为基本单位在 block cache 中进行内存升级。默认 io block 是单个 writeBuffer 的 1/8,writeBuffer 的配置是 64Mb,所以 io block 的 size 是 8Mb,而这 8Mb 内存申请会再次拆成若干 dummy entry,分配到 Block 若干的 shard 中。需要特别说明的一点是,Flink 升级 RocksDB 之后,dummy entry 的最小单元降到了 256KB,降低了内存申请超额的概率。

因为 RocksDB 本身的设计是为多线程考虑的,所以在一个 cache 中会存在多个 shard,所以它的内存申请就会比较复杂。

img

WriteBuffer manager 内部实现产生的内存中,可变的 WriteBuffer 何时转化为不可变的 WriteBuffer。Immutable table 刷到磁盘上的过程中,默认 mutable writebuffer 的使用量是有上限的,达到上限之后就会提前 flush 这些 WriteBuffer。这会导致一个问题,即使写入的数据量不大,一旦申请 arena block,尤其是 arena block 比较多的情况下,就会提前触发 member table flush 的问题。从用户角度来说,会发现本地存在大量很小的 SST 文件,整体的读写性能也很差,因此 Flink 社区专门对此做了 arena block size 的配置校验功能。

img

目前 RocksDB 自身存在内存管控的不足和限制,所以需要在特定场景下预留一部分对外内存给 RocksDB 超额使用。对照上图 Flink process 内存模型,可以看到需要在 jvm-overhead 上对内存进行适当的保留,防止 RocksDB 超用。左边的表格展示了 jvm-overhead 相关的默认配置值,如果想要将 jvm-overhead 配置成 512Mb,只要将 mini 和 max 都配置成 512Mb 即可。

img

在内存有限的场景下,data block,index Block 以及 fliter blocks 是存在竞争问题的。上图的 Block 实例是按照实际大小进行绘制的,以 256Mb 文件的 SST 举例,其中 index block 大约是 0.5Mb,fliter block 大概是 5Mb,data block 一般是 4KB-64KB,可以看到 block 的竞争会导致大量的换入换出,极大影响读性能。

img

为了上述问题,我们将 RocksDB 的 partition-index 和 partition-filter 功能进行封装,优化了内存受限情况下的性能。就是将索引 index 和过滤 filter 进行分层存储,从而可以在有限内存中尽可能存储数据 block,减少磁盘的读取概率,从而提升整体性能。

img

除了关于稳定性相关的改善,Flink 还着重重构了 state 相关的 API,对新手的理解会更友好。

以前的 API 是混合了状态读写的 statebackend 和负责容错备份的 checkpoint 两者的概念。以 MemoryStatebackend 和 FsStateBackend 为例,二者在状态读写、访问对象方面是完全相同的,区别仅在于容错备份,所以初学者很容易混淆其中的区别。

img

上图展示了更新之后的 Flink 状态读写和容错点查 API 与更新前的区别。

img

新版里我们将状态访问与容错备份进行了分别的设置,上图是新版 API 与旧版 API 的对照表格。可以看到,MemoryStatebackend 和 FsStateBackend 负责状态读写的都是 HashMaoStateBackend 的状态存储。

二者的最大区别就是在 checkpoint 容错方面,一个是对应全内存的 ManagercCheckpointStorage,而另一个对应的是基于文件的 FileSystemSCheckpointStorage。相信通过对 API 的重构,能够给开发者以更深刻的理解。

二、Snashot Improvement

img

SavePoint 本身是与 state-backend 结耦的,并不局限于是通过什么样的 state-backend 实现。然而以前的 Flink 版本中,不同的 state-backend 的 SavePoint 格式是不同的,但是在新版 Flink 中,社区统一了相关的 SavePoint 格式,对于同样的作业可以在不丢失状态的情况下无缝切换 state-backend。

img

此外,社区还进一步增强了 unaligned checkpoint 的稳定性。将 channel 中的 buffer 作为 in-flight 数据,看作 operator state 的一部分进行提前持久化,避免 barrier 对齐的时间。

img

此外,在新版 Flink 中,社区支持了传统的 aligned 与 unaligned 之间的自动切换,只要设置一个全局的超时时间,Flink 的 checkpoint 达到阈值之后,就会自动的进行切换,相信这个功能的引入也可以进一步帮助开发者获得更好的 checkpoint 性能。

三、Future Work

img

未来,我们会进一步提高 RocksDB backend 的生产易用性。一方面我们会将 RocksDB 内部一些关键的性能指标,例如 block cache 命中率等添加到标准监控指标中,从而可以更加方便地对 RocksDB 的性能进行调优。另一方面,我们计划将 RocksDB 的日志文件重定向到 TM 日志目录下或者 TM 日志中,能够更方便地查看 RocksDB 日志信息,来定位问题和调优。

其次我们会进一步梳理明确 Flink 的快照语义,目前在 Flink 中有三种形态的快照,分别是 checkpoint,savepoint 和 retained checkpoint 。

  • 其中 checkpoint 是系统快照,其数据生命周期完全由 Flink 框架控制,用于在异常发生时进行 fail over,一旦作业停止,将被自动删除;
  • savepoint 负责统一格式的数据备份,其生命周期与 Flink 作业解耦,完全由用户控制,可以用来实现 Flink 作业的版本升级、跨集群迁移、state-backend 的切换等需求;
  • 而 retained checkpoint 的语义和生命周期目前都比较模糊,它可以独立于 Flink 作业生命周期之外存在,但当基于它恢复并且打开增量快照的时候,新作业的 checkpoint 会依赖其中的数据,从而导致用户很难判断何时可以安全地将其删除。

img

为了解决这一问题,社区提出了 FLIP-193,要求使用者基于 retained checkpoint 启动作业的时候,声明是采用 claim 还是 no-claim 模式。

  • 如果采用 claim 模式,则该 retained checkpoint 的数据生命周期完全由新作业掌控,即随着数据 Compaction 的发生,当新快照不再依赖于 retained checkpoint 当中的数据时,新作业可以将其安全删除;
  • 而如果采用 no-claim 模式,则新作业不能修改 retained checkpoint 的数据,这意味着新作业在第一次快照时需要做物理拷贝,不能引用 retained checkpoint 当中的数据。这样,在需要的时候可以随时手动将 retained checkpoint 的删除,而不需要担心影响基于其恢复的作业。

此外,后续我们计划对用户控制的快照赋予更清晰的语义,引入 native format 的 savepoint 的概念来替代 retained checkpoint。

img

最后介绍一下正在进行中的 FLIP-158 的工作。它引入了 Changelog based state backend 来实现更快速平稳的增量快照,相当于引入了一种基于 log 打点方式的快照。相比于目前已有的 snapshot 稳健的增量快照机制,它有更短的快照间隔,但同时会牺牲一些状态数据处理延时。这其实就是在延迟和容错之间的取舍和均衡。


FFA 2021 直播回放 & 演讲 PDF 下载

更多 Flink 相关技术问题,可扫码加入社区钉钉交流群
第一时间获取最新技术文章和社区动态,请关注公众号~

image.png

活动推荐

阿里云基于 Apache Flink 构建的企业级产品-实时计算Flink版现开启活动:
99 元试用 实时计算Flink版(包年包月、10CU)即有机会获得 Flink 独家定制卫衣;另包 3 个月及以上还有 85 折优惠!
了解活动详情:https://www.aliyun.com/product/bigdata/sc

image.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
SQL Java API
flink问题之state过期设置如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
433 0
|
7月前
|
SQL 消息中间件 分布式数据库
Flink问题之State 0点清除如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
128 0
|
2月前
|
SQL 消息中间件 分布式计算
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析
72 5
|
2月前
|
存储 SQL 分布式计算
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
大数据-127 - Flink State 04篇 状态原理和原理剖析:状态存储 Part2
21 0
|
2月前
|
存储 消息中间件 大数据
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
大数据-126 - Flink State 03篇 状态原理和原理剖析:状态存储 Part1
67 0
|
2月前
|
存储 SQL 分布式计算
大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态
大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态
46 0
|
4月前
|
消息中间件 应用服务中间件 API
Flink四大基石——3.State
Flink四大基石——3.State
56 1
|
4月前
|
SQL 流计算
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
Flink SQL 在快手实践问题之由于meta信息变化导致的state向前兼容问题如何解决
51 1
|
7月前
|
SQL 分布式数据库 Apache
Flink问题之实现state定时输出如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
7月前
|
存储 消息中间件 资源调度
Flink state 详解
Flink state 详解
72 0

相关产品

  • 实时计算 Flink版