摘要:本文整理自知乎大数据架构负责人贾承昆,在 Flink Forward Asia 2022 平台建设专场的分享。本篇内容主要分为四个部分:
- 背景介绍
- 实时计算平台 2.0
- 其他改进
- 未来规划
一、背景介绍
1.1 发展历程
知乎早在 2018 年就引入了 Flink 作为公司推荐的实时计算引擎,Flink 组件的版本也从早期的 1.6 一路升级到如今的 1.13。发展至今,目前 Flink 已经成为知乎内部最重要的组件之一,每天处理 PB 级的数据。
2018 年,上线了第一版 Flink 计算平台 Skytree,当时的主要目标是支持一些 Jar 包作业的运行。2019 年,在 Flink 批作业处理上进行了一些尝试,落地了 TiDB2Hive 等数据同步场景。2021 年,我们对整个 Flink 实时计算平台进行了重构,升级到了 2.0,主要目标是支持 Flink SQL、加速用户业务的开发效率。今年我们对平台进行了升级,支持了 UDF 统一 Jar 包管理等功能。
整个公司支撑了大概 700+的实时作业,集群规模大概有 1.3wCores+50TB 内存。业务情况比较丰富,覆盖了数据同步、实时数据仓库、算法特征、反作弊、风控、搜索等业务场景。
知乎在 2016 年就完成了所有业务的容器化改造,在 K8s 方面有比较多的积累。所以我们在第一版做 Skytree 的时候,就选择了基于 K8s 来构建整个 Flink 实时计算平台。但当时 Flink 1.6 还不支持原生的 Native K8s,于是我们在平台层自己完成了整个 Flink Deployment 的构建,再通过实时计算平台 Skytree1.0,我们支撑了业务对于 Jar 的运维部署的需求,并满足了一些数据同步、实时数仓等业务场景。
在 Skytree 1.0 时,我们也遇到了非常多的问题,主要表现在以下几个方面。
- Flink Jar 的模式开发成本较高,用户需要深入理解 Flink 的 API,且经常会遇到一些依赖冲突的问题。这样成本和门槛导致用户在使用新技术时,会有很多顾虑,效率也比较低下。
- 平台第一版基于 K8s 来构建,整个 Deployment 生命周期管理由平台层来完成。这部分的逻辑非常复杂,且我们经常发现,作业实际的状态和平台管理的状态不一致,这就会导致很多问题,维护成本也会越来越高。尤其在 Flink 1.7-1.8 之后,支持了原生的 K8s,这一部分就变成了技术债务。
- 开发流程不规范,导致平台的管理成本高。比如一些员工离职的时候,没有很好的进行代码交接,导致很多实时作业的代码无法重现。
基于以上问题,我们重新规划并设计了实时计算平台 2.0 Mipha。选择这个名字是因为当时团队的同事比较喜欢玩塞尔达,于是选择了塞尔达游戏中的一个人物来进行平台的命名。
二、实时计算平台 2.0
第一,在 Flink 1.11 以后,社区开源版本对 SQL 的支持已经非常好了。SQL 的语法能更好的支撑业务去快速满足一些需求。于是我们将 SQL 的支持作为 2.0 平台的核心目标,加速开发的效率。
第二,在 SQL 之上,我们需要构建一套完善的数据源管理系统,满足 Flink 去连接各种异构数据源的需求。
第三,需要解决 1.0 阶段,我们没有解决好的开发流程的管理问题。
Mipha 平台整体分为三层,从上到下分别是平台层、引擎层、计算资源层。
平台层是 Mipha 提供给用户的通用能力,包括作业管理、监控报警、包管理、adhoc 查询、日志采集等等。引擎层可以分成两个部分,SQL Gateway 和 Mipha MetaStore。是我们构建的一个统一元数据管理服务。计算资源层就是我们的实际资源,也就是 K8s。
SQL Gateway 由 Ververcia 公司开源,提供了一个简单 Flink SQL Session 管理和 SQL 提交的服务,已经被很多公司采纳。Flink SQL Gateway 提供了一个 REST 接口,让我们将 SQL 转化为 Flink 作业并运行在集群中。SQL Gateway 提供了 Sesison 管理机制,管理不同的 SQL 请求,当一个 Session 关闭时,会释放和该 Session 相关的所有资源以及关闭作业。
然后 Catalog 和其他 SQL 引擎一样,负责存储所有 Table 的元数据,提供数据的 Schema 和位置的描述。另外,集群 SQL Gateway 支持将作业提交到 K8s/Yarn,以满足我们多样化的调度需求。
目前,元数据管理的问题在 Flink SQL Gateway 中没有一个成熟的解决方案。当用户的 Session 关闭时,用户临时的表会随着 Session 一起回收,如果要重新提交作业,就必须重新提交建表语句。这个流程的使用体验对用户来说非常不友好,因此我们希望将 Flink 的元数据进行持久化。用户只需构建一次 Table,就可以永久保存下来,之后可以被反复使用以及和其他作业复用。
另外,需要解决非常多的元数据管理,除了常用的 Hive、Kafka 之外,还有公司内部的大量存储,比如 Redis、ClickHouse、Doris 等存储系统。Flink 官方目前推荐的做法是将数据源和映射表等信息存储到 Hive Metastore,但是这样会有以下问题:
Hive Metastore 的映射表会存储非常多的底层资源连接信息。Hive Metastore 本身在权限管理中做的并不完善,会造成敏感信息的泄露。
映射表的方式需要我们对每一种数据源都建立映射表。Hive Metastore 中会有大量描述本身已经具有 Schema 的存储,这样会造成元数据的重复和一致性的问题。
基于以上原因,我们决定自己开发一个统一的元数据管理 Mipha MetaStore,来解决异构元数据的管理问题。
下面简单介绍一下 Mipha MetaStore 的设计思路。首先我们将存储系统分成两种类型,本身已经具备 Schema 的和本身不具备 Schema 的。前者比如 Doris、ClickHouse、MySQL,后者比如 Kafka、Redis。
对于本身已经具备 Schema 的存储系统,我们倾向于不去维护 Schema,而是利用存储本身的 Schema 进行动态映射,这样可以最大程度的解决 Schema 不一致的问题,减少因 DDL 产生的维护成本。
对于本身不具备 Schema 的存储系统,我们会解析用户的建表语句,将解析后的 Schema 结构化的存储在 Mipha MetaStore 中,并对外提供统一的 API 供 Flink SQL Gateway 调用。当用户的一个 SQL 提交进来时,首先会经过逻辑执行计划和物理执行计划。在过程中我们会加载 Mipha MetaStore 中的元数据,对 SQL 进行统一的解释。
通过这个设计,首先,用户可以在统一的入口进行建表、删表的 DDL 等管理操作。其次,每一个新建的表都可以被多个作业重复使用,保证了持久性。通过隔离的 MetaStore 和我们自研的权限设计,可以避免连接信息被没有意义的泄露。
除了元数据管理,平台还提供了 UDF 的功能,来满足用户自定义的需求,扩展 SQL 的能力边界。
首先我们在平台上提供了 UDF 的注册入口,用户可以注册声明自己的函数定义。除此之外,UDF 还复用了 Jar 包任务的管理模式,和我们的 Git 仓库和 CRM 进行了打通。
Flink 提供了三种部署模式,第一种是 PerJob 模式,适用于 on Yarn 模式时 Jar 包作业的执行。第二种是Session 模式,它已被大部分公司采用。第三种是 Application 模式,它可以简单的理解为 PerJob 模式 on K8s。它是目前社区主推的一种方部署方式。
Application 模式的好处是:
作业的生命周期会和集群的生命周期保持一致。当作业结束或者取消时,整个集群也会随之关闭。这样的设计做到了单作业、单集群,最大程度上保证作业的隔离。
简化了作业的生命周期管理,我们可以专注管理作业,不用关心集群的状态。
基于以上优点,我们选择 Application 模式作为平台的统一部署模式。但在当前情况下,Application 模式只能应用于Jar包作业,不能使用于 SQL 作业。于是我们做了一些改造。
当用户提交一个 SQL 任务时,我们会先对 SQL 任务进行编译,将编译后的 JobGraph 持久化在一个外部存储中。然后通过一个引导的 Jar 包作业对 JobGraph 进行动态加载,加载后再交由 GM 执行。
通过这个设计,我们将 SQL 和 Jar 任务的提交模式统一成了 Application 模式,基于这种模式,我们也在积极的测试社区目前主推的弹性伸缩能力,相信后续会取得不错的结果。
在之前第一版平台时,用户会手动将 Jar 包上传到平台,然后进行作业的发布和部署。但我们并不知道 Jar 包从何而来,没有和公司统一的代码管理进行打通,会产生以下问题:
难以追踪代码变更,出现问题不能快速判断,是由用户的代码变更引起的,还是平台本身引起的。
排查需要找用户沟通要代码,确认代码位置,效率低。
员工离职交接流程问题,导致任务代码无法复现,导致很多管理问题。
于是我们决定对整个开发流程进行梳理,按照一个更标准的方式进行 Flink 作业的部署。
改进后的开发流程:为了实现该目标,我们和公司的 CI 团队合作,扩展了 CI 系统的能力。并基于系统开发了一个新的包管理服务 Kosmos。Kosmos 服务负责存储。首先,将 CICD 的思路引入到了 Flink 平台的发布过程中,我们要求所有的作业源代码都必须托管在 Git 仓库。用户创建作业时会先绑定一个 Git 仓库,并接入公司统一的 CI 系统。通过 Merge Request 或Push操作触发 CI 构建,并生成可用于发布的 Flink Jar 包。然后,这个 Jar 包会被上传到我们构建的统一包管理服务 Kosmos。cosmo 服务负责存储用户发布的任意 Jar 包系统,并提供仓库路径和包语义。Mipha 平台通过 Kosmos 服务获取每一个仓库的所有历史版本,用户可以选择其中一个版本进行发布和回滚。
上图是平台界面。这种方式可以让我们追踪到用户的每一次 commit,以及 commit 对应的 MMR。除了入口,用户不允许直接上传 Jar 包发布和运行作业。
通过这种方式也避免了离职时的交接问题。同时当用户反馈问题时,我们可以迅速定位到用户的源代码,并协助用户排查定位问题,大幅提高开发和运维效率。
实时计算平台 2.0 总结:
- 第一,在架构上实现一个现代的实时计算平台,并且具备良好的扩展能力。
- 第二,通过对 SQL 的支持,用户在十分钟左右就可以上线开发一个简单的 SQL 作业,大幅提高了作业的开发效率。平台支撑的用户也从 1.0 的几十个变成了现在的上百个,覆盖的业务场景和业务方也变得更多了。
- 第三,平台的设计是面向社区的,我们和社区的所有能力都具备非常好的兼容性。另外,我们几乎可以支持社区所有的 Connector 和 Format,并能低成本的将新的 Connector 引入到平台中,和社区一同前进。
三、其他改进
关于实时计算平台做的其他改进,主要分为三部分。分别是 CDC 平台化、易用性提升、运维优化。
3.1 Flink CDC 平台化
知乎早期曾尝试用 cudo+Maxwell+ SparkStreaming 的方式来构建实时 ODS 数据同步方案,但遇到了很多问题。
比如全量同步和增量同步阶段会涉及到不同的技术栈,且它们需要恰好编排任务执行的顺序,每一个环节都有可能造成数据丢失,而且我们确实遇到了对应的问题。
Flink CDC 它能很好的将全量同步和增量订阅结合在一起,极大的简化了实时数据同步过程中的数据链路,尽可能降低数据出现不一致的概率。基于此原因,我们选择了 Flink CDC 作为实时数据同步的解决方案。
这个方案近两年在社区也非常受欢迎,具体的原理已经有很多人分享过了,这里不再赘述。接下来介绍一下我们基于 Flink CDC 做平台化的一些思考。
首先,我们将一个 Flink CDC 换成一个完整的数据库存库,它需要订阅和解析数据库的 Changelog,并产生对应的数据。我们希望当 CDC 的实例变多时,对 DB 没有影响。因此我们尽可能让一个实例只对应一个 CDC 实例,不进行重复订阅。
其次,CDC 订阅产生的数据会包含全量数据+增量数据,这部分数据可能会被用于多个消费场景,比如做同步和特征计算。我们希望尽可能去复用这部分数据,不要重复采集。
基于上述考虑,我们设计了 Flink CDC+Upsert Kafka 方案,来构建整个 CDC 的平台化。我们开发了一个 Flink CDC 的 Jar 包,每一个 Jar 包会对应唯一一个数据库实例。然后将实例中所有的表全量数据+增量数据分别采集,并按照表的名字分发到下游不同的 Kafka Topic 中。这里选择 Upsert Kafka,主要有以下几个考虑:
首先,它可以比较简单的完成 Changelog 事件的模拟,方便下游进行消费。其次,通过Upsert Kafka可以对每一行数据构建一个唯一的 Key,通过 Kafka 的 compaction 功能,可以在既保存全量数据的情况下,又可以保证整个 Kafka 的存储不会随着时间无限膨胀,定期 compaction 对 Key 的数据进行合并。下游的消费方只能感受到 Upsert Kafka 的 Topic,从 Topic 的头部开始消费,一直消费到最新的数据,就可以完成增量数据+全量数据的构建。
目前,我们已经将此方案应用于 Doris 和 Iceberg 的数据同步场景中,并取得了不错的效果。
3.2 易用性提升
另外,由于我们的算法和业务场景中大量使用了 Protobuf 格式,它的格式不同于 arl 或 Josn,它需要一些静态的类进行解析。Flink 在最新版本之前是不原生支持 Protobuf 格式的,因此我们需要进行一些开发,来让 Flink 支持 Protobuf 的解析,扩展SQL的使用场景。基于此原因,我们开发了自己的 Protobuf Format。在开发 Format 之前,一共有三种方案来实现该功能。
- 方案一,通过用户的 IDL 文件,构建 dynamic message 对数据进行序列化和反序列化。但经过各种测试,该方案的性能开销,对比原生的 Protobuf 解析有 60%以上的性能损失,因此最终我们放弃了该方案。
- 方案二,通过用户的 IDL 文件,在作业启动时,对该 IDL 进行动态编译,将编译后的 Jar 包注入到容器的运行环境中,来满足 Format 的解析。但它对于编译的环境和用户的编译脚本有一定要求,这要求平台需要做非常多的事情,将整个流程串起来,因此最终我们也放弃了该方案。
- 方案三,可以由业务预先编译好 Jar 包,并通过i语法将 Jar 包动态的引入到平台中,来完成静态 PB 的解析。这个方案比较友好,但需要用户在使用该 Format 前对自己的 IDL 进行一次编译。最终完成的 Protobuf 格式如图所示。用户只需要定义它的 messageClass 以及将它的 Jar 包引入到平台中,就可以完成 Format 的支持。
由于 Protobuf 经常用于嵌套结构的表示,我们在构建对应的 Flink Table 和 Hive Table 时,经常难以完成SQL 的构建。所以我们提供了一个辅助的建表语句,用户可以通过 IDL 文件,一键生成 Flink Table 和 Hive Table 的建表语句,大幅简化了用户在使用 Protobuf 过程中的一些困扰。
3.3 运维优化
第一部分是动态 Jar 包加载。由于我们采用了 on K8s 的方案,整个容器运行环境的镜像基本不可变。同时我们也希望尽可能维护稳定的镜像版本,不希望频繁修改。但业务需求灵活多变,我们总会需要把一些动态的依赖加载进去,因此我们设计的一个方案,能够在运行时动态载入一些 Jar 包。
整个方案的架构图如上图所示。我们采用了 JuiceFS S3 Gateway 对动态 Jar 包进行代理,然后在 docker 的 entrypoint 中注入一个 hook,该 hook 会读取环境变量,并下载环境变量中声明的 Jar 包,存放到指定位置。通过这个方案,我们将 Connector、Plugins 和平台 patch 的代码通过动态 Jar 包下载的方式,注入到业务的作业容器中,避免了对镜像的频繁修改,同时又保留了平台的灵活性。这一部分内容我们已经在JuiceFS 开发社区分享过了,感兴趣的朋友可以去搜索一下。
第二部分是资源超卖。在 K8s 部署时,我们发现整个集群的资源瓶颈不是内存而是 CPU。作业在大部分时候 CPU 都有大量的浪费,基于此原因,我们对 CPU 进行了超卖。也就是说用户在平台上申请一个核心时,并不会真实给到一个核心的资源,而是通过超卖系数控制,让它使用一个更小的 CPU 申请量。
同时,由于需要满足业务流量的波峰波谷需求,我们会给予该 CPU 一定的弹性能力。比如用户申请一个核心,实际对应物理上的 0.3 个 Core,以及我们会给它配置一个弹性策略,让它能真正使用到 1 个 Core 的资源。通过这个设计,提高了整个集群 CPU 的利用率,降低了资源浪费的情况。
第三部分是日志采集和持久化。最开始在构建平台时,并没有意识到需要对作业运行时的日志进行采集。随着线上问题逐渐暴露,我们发现定位一些复杂问题时,需要 TaskManager 运行时的全量日志,才能更快的定位到问题。基于此需求,我们设计了一个在 K8s 上的日志采集方案。主要分为以下几部分。
首先,我们会部署一个 Demo-Set 的 FileBeat,在每一个 Flink 的计算节点中。FileBeat 会订阅每一个 Pod 的日志,并将该日志写入到 Kafka 中。然后我们会再去消费 Kafka 的数据,将日志持久化在 Hive 表五秒中。
对于已经在运行的作业,平台会提供 JobManager 链接直接查看该日志。对于已经完成或者取消失败的日志,我们可以通过 Hive 表中的持久化日志数据进行查询。通过这样的方式,在业务遇到线上问题时,我们可以优先考虑恢复作业,而不用考虑问题的定位和排查。事后再通过日志做深入分析,还原当时出现问题时的场景。
为了方便用户使用,平台的监控和报警全部是自动化进行的。平台会预先设置一些报警规则,比如 CPU 利用率过高、作业存活报警、消费延迟报警。这些报警在用户作业创建时就会自动创建,不需要用户额外配置,并且和公司的报警平台做了集成。同时,为了满足业务的个性化需求,平台也支持业务通过平台配置自定义报警规则。
第四部分是逻辑资源隔离和计费。以上问题全部解决后,我们发现了一些问题,比如集群资源经常出现不足;新的业务场景需要大量的计算资源,但平台没有预留已有的任务;重启时资源已被其他任务占用,导致作业出现启动不了的情况。所以业务希望我们设计一个资源隔离的方案。
由于我们在 K8s 上不能按照队列进行拆分,于是我们设计了一个偏软性的资源隔离方案。我们会给每一个项目组分配一个软性的 quota,当用户实际的资源消耗超过 quota 时,我们会禁止该用户再去提交作业。所有项目组的 quota 加起来等于整个集群的总资源量。同时,我们会按照该 quota 对用户进行计费,用户必须为他预留的资源买单。
通过这种方式,我们既做到了资源隔离,也可以督促业务优化作业,降低资源消耗。
四、未来规划
未来的规划主要体现在以下几方面:
- 在稳定性上,我们将继续优化调试和开发的效率;我们将改进 SQL 状态恢复;我们正在开发任务的自动诊断功能,加速任务出现问题时的定位效率。
- 在业务上,我们将会支持弹性扩缩容,提供更好用的数据建模工具。在算法方面我们和算法团队合作,正在开发平台化的算法特征计算能力。
活动推荐
时间:7 月 29 日下午 13:00-18:30
地点:北京朝阳区望京凯悦酒店
线下报名:http://hdxu.cn/DO7OG
扫下方图片直达线上直播间:
更多内容
活动推荐
阿里云基于 Apache Flink 构建的企业级产品-实时计算 Flink 版现开启活动:
0 元试用 实时计算 Flink 版(5000CU*小时,3 个月内)
了解活动详情:https://free.aliyun.com/?pipCode=sc