解决Flink流式任务的性能瓶颈

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 解决Flink流式任务的性能瓶颈

都说“过早进行性能优化是万恶之源”,我宁肯相信这是为了“矫枉过正”而出此惊人之语,更何况,现在的IT时代已与Donald Knuth的时代已有很大差异了。重点还是在于“过早”这个词,之所以Knuth告诫我们不要过早进行性能优化,原因在于:

  • 判断性能是否存在问题,不能太早
  • 太早做性能优化,有可能并没有弄清楚性能瓶颈在哪里


image.png

⚜ 2016年8月,我有机会在斯坦福大学小住,与朋友在计算机学院William Gates大楼讨论问题,忽然发现了Donald Knuth的办公室,于是拍下了这张照片


最近,我的团队成员正在着力于提高实时流处理任务的性能。由于客户为我们的测试环境仅提供了极度可怜的集群资源,我们需要在“螺蛳壳里做道场”,死扣性能,尽可能在方案与实现上将性能提升到极致。(顺带说,在测试时,不要奢侈地提供大量资源,反倒有可能尽早发现性能问题,从而让团队想办法解决之。)


一开始,我们想到的方案是增加Flink Streaming Job每个算子或算子链的并行度。Flink支持多个级别设置并行度,包括:

  • 环境级别:对Execution Environment进行parallelism的设置
  • 客户端级别:客户端提交Job时通过命令参数-p进行设置
  • 算子级别:调用每个算子的setParallelism()方法设置算子的并行度,在为算子设置并行度时,需要考虑它对算子链的影响。如果相邻算子的并行度不一样,两个算子就不能成为算子链。算子链可以减少不必要的线程切换,减少不必要的序列化和反序列化操作,减少延迟提高吞吐能力,因此,如果两个算子相邻,且中间没有数据的shuffle操作,应保证它们的并行度是相同的。


如果没有显式设置并行度,Flink的系统默认并行度为1。不同级别优先级不同,优先级按照高低,顺序依次为:

算子级别 -> 客户端级别 -> 环境级别 -> 系统默认级别


Flink的并行度设置并不是说越大,数据处理的效率就越高,而是需要设置合理的并行度。并行度的设置数量取决于Task Manager的数量以及slot数量。通常可以认为Task Manager部署的节点有多少核CPU,就有多少个slot。


设置了合理的并行度,就能有效地利用Worker节点的资源。但为何在实现之初,没有考虑并行度呢?原因在于引入并行度后,从上游传入的数据就会被Task Manager分配到不同的slot做并行处理,由于不同任务执行时间不同,slot的执行效率也可能不同,就可能无法保证同类数据多条数据的时序性。


为了保证同类数据的执行时序性,我们引入了Flink的keyBy算子。它能够将相同key的元素散列到一个子任务中,且没有改变原来的元素数据结构。keyBy使用的key应使用数据的主键,即ID,如此就能保证拥有相同ID值的同类数据一定执行在同一个子任务中,进行同步处理,这就保证了数据处理的时序性。时序性与并行度带来的高性能,就能鱼与熊掌兼得了。


即便如此,我们提升的性能依旧有限,毕竟受到资源的限制,我们不能盲目增大并行度。由于单条消息数据的处理逻辑非常复杂,它的处理能力已经达到我们能够优化的极限。最后,评估任务的处理能力,仅能做到每秒处理6条左右的数据,这一结果自然不能接受。一种立竿见影的手段是增加更多的资源,但我们还是想在没有更多资源支持下,看看能否竭尽所能提升性能。——这时,我们才想到去探索性能瓶颈到底在哪里?


我们开始监控实时流任务的执行,通过日志记录执行时间,在单条数据处理能力已经无法优化的情况下,发现真正的性能瓶颈不在于Flink自身,而是任务末端将处理后的数据写入到ElasticSearch这一阶段。


在执行流式处理过程中,上游一旦采集到数据,就会及时逐条处理,这也是流式处理的实时特征。根据我们的业务特征,平台在接收到上游采集的流式数据后,经过验证、清洗、转换与业务处理,会按照主题治理的要求,将处理后的数据写入到ElasticSearch。然而,这并非流任务处理的终点。数据在写入到ElasticSearch后,平台需要触发一个事件,应下游系统的要求,将上游传递的消息转换为出口消息。由于上游传递的消息不一定包含了出口消息的所有数据,在转换消息时,平台还需要查询ElasticSearch,获得包括最近更新的数据,作为组成出口消息的数据内容。


这里仍然存在时序性问题!在组成出口消息时需要查询ElasticSearch,这就要求最新的数据已经写入成功并能被检索到。由于ElasticSearch要支持全文本检索,写入数据时需要为其建立索引,也就是Lucene中的Segments,使得每次写操作的延迟相对于读操作而言要高一些。为了提升写入性能,ElasticSearch引入了in-memory buffer(内存缓冲区),提供了refresh(刷新)的三种方式:

  • 即刻刷新
  • 指定周期刷新,默认周期为1s,它也是ElasticSearch的默认值
  • 当内存缓冲区满时刷新


只有即刻刷新,才能在一条数据写入到 Elasticsearch 后,能被马上搜索到。当上游采集的数据量非常多,且采用流式方式传入时,下游ElasticSearch的逐条写入与即刻刷新机制就成为了性能瓶颈。如果采用后两种刷新机制,又会导致索引未建立,无法即时搜索到最新数据,就会导致数据不一致。换言之,在我们的场景中,选择“即刻刷新”是必然的!要解决写入瓶颈的问题,最佳做法是放弃逐条写入,改为ElasticSearch支持的批量写入,如此即可减少不必要的连接,也能减少IO的次数。


虽说上游传递的流式数据需要实时进行处理,却并未要求它必须实时写入ElasticSearch,也未要求它必须实时推送给下游系统。当然,也不能延迟太长的时间。


为了权衡写入性能和数据正确性以及一致性,可以将实时写入改造为微批量的写入,如此,既能通过批量写入提升ElasticSearch的写入性能,又能保证数据必须成功写入到ElasticSearch后再推送消息,确保数据正确性与一致性。


团队成员想到了引入Flink的窗口,具体说来,是使用Flink时间窗口中的会话窗口与滚动窗口。


会话窗口的作用是在指定窗口周期内将相同key值的数据汇聚起来,我们为不同的key分配对应的会话窗口,而窗口好似一个桶,每个桶各自装各自key值的数据:



.keyBy(new KeyById()).window(ProcessingTimeSessionWindows.withGap(Time.seconds(1)))

如此这般,就能将1秒内相同key值的数据放到相同的会话窗口中,然后,通过reduce()算子对同一会话窗口中的数据进行合并,形成状态:



.reduce(new MergeWithSameId())

这一方式虽然实现了相同key数据的合并,但由于窗口的数量太过分散,导致数据汇聚的作用并不明显,没有达到批量写入提升性能的目的。


既然已经合并了相同key的数据,我们就可以减少窗口的数量,从而让不同key值的数据也能够汇聚到同一个窗口,形成数据的集合,交由下游进行批量写入。此时,选择的窗口为滚动窗口。


虽说窗口数量需要减少,但为了更好地利用资源,最好保证窗口的数量等于并行度。通过env.getParallelism()方法可以获得当前环境的并行度,在对数据的ID(它是数据的key)进行哈希值计算后,将并行度作为因子进行取模,就能将窗口数量压缩,天然实现数据的汇聚:

// 再执行了reduce后
.keyBy(new KeyById(env.getParallelism()))
.window(TumblingProcessingTimeWindows.of(Time.second(1)))
.reduce(new CellectEntities())
...// 汇聚后写入到ElasticSearch

对比改进前后的流式任务,下图是执行未加窗口的流式任务结果:


image.png


下图是执行加窗口后的流式任务结果:


image.png


相同环境下,前者处理流式数据的频率大概为6条/秒左右,后者则达到了20条/秒左右,整体性能提升了3倍多,实现了不通过横向添加资源就完成了流式任务的性能优化,归根结底,在于我们发现了性能瓶颈,然后再对症下药,方可取得疗效。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
558 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
3月前
|
Java Shell Maven
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
180 4
|
3月前
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
146 0
|
5月前
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
6月前
|
SQL 关系型数据库 MySQL
如何在Dataphin中构建Flink+Paimon流式湖仓方案
当前大数据处理工业界非常重要的一个大趋势是一体化,尤其是湖仓一体架构。与过去分散的数据仓库和数据湖不同,湖仓一体架构通过将数据存储和处理融为一体,不仅提升了数据访问速度和处理效率,还简化了数据管理流程,降低了资源成本。企业可以更轻松地实现数据治理和分析,从而快速决策。paimon是国内开源的,也是最年轻的成员。 本文主要演示如何在 Dataphin 产品中构建 Flink+Paimon 的流式湖仓方案。
7935 10
如何在Dataphin中构建Flink+Paimon流式湖仓方案
|
5月前
|
监控 Cloud Native 流计算
实时计算 Flink版产品使用问题之如何查看和管理任务
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
消息中间件 资源调度 Kafka
实时计算 Flink版操作报错合集之提交任务后,如何解决报错:UnavailableDispatcherOperationException
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
资源调度 Java Scala
实时计算 Flink版产品使用问题之如何实现ZooKeeper抖动导致任务失败时,能从最近的检查点重新启动任务
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
SQL 监控 大数据
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
Serverless 应用的监控与调试问题之Flink流式数仓对于工商银行的数据链路要如何简化
|
5月前
|
Kubernetes Java 数据库连接
实时计算 Flink版产品使用问题之部署到 Kubernetes 集群时,任务过一会儿自动被取消,该如何排查
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章