Apache Hudi与Apache Flink更好地集成,最新方案了解下?

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Hudi与Apache Flink更好地集成,最新方案了解下?

1. 现有架构

现有Flink写Hudi架构如下

现有的架构存在如下瓶颈

InstantGeneratorOperator并发度为1,将限制高吞吐的消费,因为所有的split都将会打到一个线程内,网络IO会有很大压力;WriteProcessOperator算子根据分区处理输入数据,在单个分区处理,BUCKET逐一写入,磁盘IO也会有很大压力;通过checkpoint缓存数据,但checkpoint应该比较轻量级并且不应该有一些IO操作;FlinkHoodieIndex对per-job模式有效,不适用于其他Flink作业;

2. 改进方案

2.1 步骤1:移除并发度为1的算子

解决第一个瓶颈。

可以通过为写入算子实现一个算子协调器WriteOperatorCoordinator来避免使用并行度为1的算子InstantGeneratorOperator,协调器会基于checkpoint开始新的提交。

2.1.1 工作流

写方法首先会将数据缓存为一批HoodieRecord

当Flink checkpoint开始时,开始写一批数据,当一批数据写成功后,方法会通知StreamWriteOperaorCoordinator成功写入;

2.1.2 Exactly-once语义

通过缓存checkpoint之间的数据来实现exactly-once语义,算子协调器在触发checkpoint时会在Hoodie的timeline上创建一个新的instant,协调器总是会在其算子之前开始checkpoint,所以当方法开始checkpoint时,已经存在了REQUESTED HoodieInstant

方法处理线程开始阻塞数据缓存,然后checkpoint线程开始刷出之前缓存的数据,当刷出成功后,线程不再阻塞并且开始为新一轮的checkpoint缓存数据。

因为checkpoint失败会触发写回滚,实现了exactly-once语义。

2.1.3 容错

算子协调器在生成新的instant时会检查上一个instant的合法性,如果写入失败会进行回滚处理,算子协调器在提交写入状态时会进行多次重试以减少提交状态的失败概率。

注意:需要按照分区字段对输入数据进行分区以避免不同的线程写入相同的FileGroup,一般场景下时间字段为分区字段,所以sink task非常可能会有IO瓶颈,更灵活的方式是根据FileGroupId进行数据shuffle(步骤2解决)。

2.2 步骤2:更灵活的写入线程

解决第二个瓶颈。

对于每一个分区,WriteProcessOperator处理所有的逻辑,包括index/bucket/数据写入:

索引INSERT/UPDATE记录;使用PARTITIONER确定每条记录的Bucket(FileID)逐一写Bucket

第三步的单线程处理是瓶颈所在。为解决这个瓶颈,将WriteProcessOperator划分为FileIdAssignerBucketWriter

2.2.1 FileIdAssigner

FileIdAssigner对每条记录处理如下

BucketAssigner为每条记录创建一个分区写入Profile,其是分配BucketID(Partition Path+FileID)的关键;查找索引以确定记录是否为UPDATE,如果记录是UPDATE,那么关联已有的fileID,如果是INSERT,根据配置的文件大小确定fileID;向下游发送带有fileID的记录;

FileIdAssigner的输出记录可以通过fileID进一步shuffle到BucketWriter

2.2.2 BucketWriter

BucketWriter的输入为HoodieRecord,然后逐一写Bucket;

第二步需要重构已有的Flink客户端(HoodieFlinkWriteClient),当前代码中HoodieFlinkWriteClient将处理步骤二中的所有的任务,这种模式适用于Spark,但对Flink不太合适,对于Flink而言,需要做一些重构(移除index/bucket)以便让client更轻量级,专注于数据写入。

2.3 步骤3:Mini-batch模式写

解决第三个瓶颈。

BucketWriterCoordinator开始时会开始一个新的instant(不同于步骤1和步骤2中从新的checkpoint开始)新的checkpoint开始时,BucketWriter会阻塞并且刷出缓存数据,有异步线程消费缓存数据(在第一个版本中是在#processElement方法中刷出数据)并刷出。对于BucketWriteCoordinator,如果checkpoint的数据写入成功(获取一个checkpoint成功通知),检查并提交INFLIGHT状态的instant,同时还是新的instant。

2.3.1 Exactly-once语义

为提高吞吐,当checkpoint线程开始刷出缓存数据时,处理线程不再阻塞数据的缓存。当checkpoint失败触发回滚操作时,会有一些重复的数据,但是在UPSERT操作下语义依然正确。

当支持一条条记录写入而非一批记录时,可以支持Exactly-Once语义。

2.3.2 容错

在进行checkpoint时,不再阻塞数据缓存,因此很可能有一个mini-batch缓存刷出,当checkpoint失败时,会重新消费之前的缓存数据,会重复写入该缓存数据。

当checkpoint完成时,协调器检查并提交上一次instant,同时开始新的instant。当发生错误时,将会回滚写入的数据,这意味着一个Hoodie Instant可能会跨不同的checkpoint。如果一个checkpoint超时,那么下一次checkpoint将会刷出剩余的缓存数据。

2.4 步骤四:新的索引

解决第四个瓶颈。

新的索引基于BloomFilter索引,其步骤如下

从state中查找一条记录是否为UPDATE,如果为INSERT,不做任何处理;如果记录是UPDATE,使用BloomFilter索引查找候选文件,查找这些文件并且将所有的index信息放入状态;

当所有文件都被加载后,则可标识为纯状态模式,后面可以仅仅只查询状态即可。

新的索引可适用于不同的Flink作业写入;

3. 兼容性

算子协调器在Flink 1.11引入,为兼容低于1.11版本,需要添加一个不使用算子协调器的pipeline

input operator => the instant generator => fileID assigner => bucket writer => commit sink

其中使用了instant generator替换协调器。

注意该pipeline无法使用mini-batch模式,因为没有组件协调mini-batch,也无法控制算子checkpoint的通知顺序,所以无法在checkpoint完成后开始新的instant。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
344 33
The Past, Present and Future of Apache Flink
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
205 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
948 13
Apache Flink 2.0-preview released
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
144 3
|
2月前
|
消息中间件 Java Kafka
什么是Apache Kafka?如何将其与Spring Boot集成?
什么是Apache Kafka?如何将其与Spring Boot集成?
91 5
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
72 1
|
2月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
88 1
|
3月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
275 0
|
SQL 架构师 API
《Apache Flink 知其然,知其所以然》系列视频课程
# 课程简介 目前在我的公众号新推出了《Apache Flink 知其然,知其所以然》的系列视频课程。在内容上会先对Flink整体架构和所适用的场景做一个基础介绍,让你对Flink有一个整体的认识!然后对核心概念进行详细介绍,让你深入了解流计算中一些核心术语的含义,然后对Flink 各个层面的API,如 SQL/Table&DataStreamAPI/PythonAPI 进行详细的介绍,以及
1380 0
《Apache Flink 知其然,知其所以然》系列视频课程
|
5月前
|
存储 消息中间件 Java
Apache Flink 实践问题之原生TM UI日志问题如何解决
Apache Flink 实践问题之原生TM UI日志问题如何解决
55 1

热门文章

最新文章

推荐镜像

更多