EMR Spark Relational Cache的执行计划重写

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 背景EMR Spark提供的Relational Cache功能,可以通过对数据模型进行预计算和高效地存储,加速Spark SQL,为客户实现利用Spark SQL对海量数据进行即时查询的目的。Relational Cache的工作原理类似物化视图,在用户提交SQL语句时对语句进行分析,并选出可用的预计算结果来加速查询。

背景

EMR Spark提供的Relational Cache功能,可以通过对数据模型进行预计算和高效地存储,加速Spark SQL,为客户实现利用Spark SQL对海量数据进行即时查询的目的。Relational Cache的工作原理类似物化视图,在用户提交SQL语句时对语句进行分析,并选出可用的预计算结果来加速查询。为了实现高效地预计算结果复用,我们构建的预计算缓存一般都较为通用,因此对于用户query,还需进行进一步的计算方能获得最终结果。因此,如何快速地找出匹配的缓存,并构建出准确的新执行计划,就显得尤为重要。

在Hive 3.x中支持的Materialized View,利用了Apache Calcite对执行计划进行重写。考虑到Spark SQL使用Catalyst进行执行计划优化,引入Calcite太重,因此EMR Spark中的Relational Cache实现了自己的Catalyst规则,用于重写执行计划。本文将介绍执行计划重写的相关内容。

执行计划重写

准备工作

Spark会把用户查询语句进行解析,依次转化为Unresolved Logical Plan(未绑定的逻辑计划)、Resolved Logical Plan(绑定的逻辑计划)、Optimized Logical Plan(优化的逻辑计划)、Physical Plan(物理计划)。其中,未优化的逻辑计划根据用户查询语句不同,会有较大区别,而Relational Cache作为优化的一部分,放在逻辑计划优化过程中也较为合适,因此我们拿到的用户查询计划会是优化中的逻辑计划。要与优化中的逻辑计划匹配,我们选择把这个重写过程放在Spark优化器比较靠后的步骤中,同时,预先将Relational Cache的逻辑计划进行解析,获得优化后的Cache计划,减小匹配时的复杂程度。这样,我们只需匹配做完了谓词下推、谓词合并等等优化之后的两个逻辑计划。

基本过程

在匹配时,我们希望能尽可能多得匹配计算和IO操作,因此,我们对目标计划进行前序遍历,依次进行匹配,尝试找到最多的匹配节点。而在判断两个节点是否匹配时,我们采用后序遍历的方式,希望尽快发现不匹配的情况,减少计划匹配的执行时间。然后我们会根据匹配结果,对计划进行重写,包括对于Cache数据进行进一步的Filter、Project、Sort甚至Aggregate等操作,使其与匹配节点完全等价,然后更新逻辑计划节点的引用绑定,无缝替换到逻辑计划中,这样就能轻松获得最终的重写后的计划。

Join匹配

Spark中的Join都是二元操作,而实际的Join顺序可能根据一些策略会有很大区别,因此对于Join节点,必须进行特殊处理。我们会首先将逻辑计划进行处理,根据缓存计划的Join顺序进行Join重排。这一步在树状匹配之前就进行了,避免不断重复Join重排带来的时间浪费。重排后的Join可以更大概率地被我们匹配到。

为了实现Cache的通用性,根据星型数据模型的特点,我们引入了Record Preserve的概念。这和传统数据库中的Primary Key/Foreign Key的关系较为类似,当有主键的表与非空外键指向的表在外键上进行Join时,记录的条数不会变化,不会膨胀某条记录,也不会丢失某条记录。PK/FK的语意在大数据处理框架中经常缺失,我们引入了新的DDL让用户自定义Record Preserve Join的关系。当用户定义A Inner Join B是对于A表Record Preserve时,我们也会把A Inner Join B和A的关系匹配起来。有了PK/FK的帮助,我们能匹配上的情况大大增加了,一个Relational Cache可以被更多看似区别巨大的查询共享,这可以很好的为用户节约额外的存储开销和预计算开销。

Aggregate匹配

一般的Aggregate匹配较为简单,而Spark支持的Grouping Set操作,会构建出Expand逻辑计划节点,相当于把一条记录转为多条,使用Grouping ID进行标记。由于Expand的子节点是所有Grouping的情况共用的,这里我们只对子节点进行一次匹配,再分别进行上面的Grouping属性和Aggregate属性的匹配。主要是验证目标聚合所需的属性或者聚合函数都能从某个Grouping ID对应的聚合结果中计算出来,比如粗粒度的Sum可以对细粒度的Sum进行二次Sum求和,而粗粒度的Count对细粒度的Count也应通过二次Sum求和,粗粒度的Average无法仅从细粒度的Average中还原出来等等。

计划重写

找出匹配的逻辑计划之后,就是重写逻辑计划的过程。对于无需二次聚合的逻辑计划,直接根据缓存数据的schema,从缓存数据的Relation中选择所需列,根据条件过滤后,进行后续操作。如果还需二次聚合,选择所需列时需保留外部要用的所有列,以及聚合时需要的列,还有聚合函数需要的数据。二次聚合的聚合函数需要根据实际情况进行重写,确保能使用Relational Cache中已经初步聚合的结果。这里面需要根据聚合的语意判断是否能够二次聚合。如果时Grouping Set的聚合,二次聚合之前还需选择正确的Grouping ID进行过滤。经过二次聚合后,步骤大体和普通的重写一致,只需替换到目标计划中即可。

结果

我们以一个例子来具体说明逻辑计划的重写结果。Star Schema Benchmark(论文链接)是星型模型数据分析的一个标准Benchmark,其结构定义如图所示:

ssb_schema

我们构建Relational Cache的SQL语句如下:

SELECT GROUPING_ID() AS grouping_id, lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum, SUM(lo_revenue) AS lo_revenue_SUM, SUM(lo_supplycost) AS lo_supplycost_SUM, SUM(V_REVENUE) AS V_REVENUE_SUM
FROM supplier, p_lineorder, dates, customer, part
WHERE lo_orderdate = d_datekey AND lo_custkey = c_custkey AND lo_suppkey = s_suppkey AND lo_partkey = p_partkey
GROUP BY lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum GROUPING SETS ((d_year, d_weeknuminyear, lo_discount, lo_quantity), (d_year, lo_discount, lo_quantity), (lo_discount, lo_quantity), (d_yearmonthnum, lo_discount, lo_quantity), (d_year, p_category, p_brand, s_region), (d_year, p_category, s_region), (d_year, s_region), (d_year, s_region, c_region, s_nation, c_nation), (d_year, s_city, c_city, s_nation, c_nation), (d_year, s_city, c_city), (d_year, d_yearmonth, s_city, c_city), (d_year, s_region, c_region, c_nation, p_mfgr), (d_year, s_region, s_nation, c_region, p_mfgr, p_category), (d_year, s_nation, s_city, c_region, p_brand, p_category, p_brand), (d_year, s_nation, s_city, c_region, p_brand, p_category), (d_year, s_nation, s_city, c_region, p_category, p_brand))

我们从中选出一条查询作为示例。具体查询语句:

select c_city, s_city, d_year, sum(lo_revenue) as revenue
    from customer, lineorder, supplier, dates
    where lo_custkey = c_custkey
        and lo_suppkey = s_suppkey
        and lo_orderdate = d_datekey
        and c_nation = 'UNITED KINGDOM'
        and (c_city='UNITED KI1' or c_city='UNITED KI5')
        and (s_city='UNITED KI1' or s_city='UNITED KI5')
        and s_nation = 'UNITED KINGDOM'
        and d_yearmonth = 'Dec1997'
    group by c_city, s_city, d_year
    order by d_year asc, revenue desc

原始逻辑计划如下所示:

Sort [d_year#39 ASC NULLS FIRST, revenue#0L DESC NULLS LAST], true
+- Aggregate [c_city#6, s_city#31, d_year#39], [c_city#6, s_city#31, d_year#39, sum(lo_revenue#23L) AS revenue#0L]
   +- Project [c_city#6, lo_revenue#23L, s_city#31, d_year#39]
      +- Join Inner, (lo_orderdate#16 = d_datekey#35)
         :- Project [c_city#6, lo_orderdate#16, lo_revenue#23L, s_city#31]
         :  +- Join Inner, (lo_suppkey#15 = s_suppkey#28)
         :     :- Project [c_city#6, lo_suppkey#15, lo_orderdate#16, lo_revenue#23L]
         :     :  +- Join Inner, (lo_custkey#13 = c_custkey#3)
         :     :     :- Project [c_custkey#3, c_city#6]
         :     :     :  +- Filter (((isnotnull(c_nation#7) && (c_nation#7 = UNITED KINGDOM)) && ((c_city#6 = UNITED KI1) || (c_city#6 = UNITED KI5))) && isnotnull(c_custkey#3))
         :     :     :     +- HiveTableRelation `ssb`.`customer`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c_custkey#3, c_name#4, c_address#5, c_city#6, c_nation#7, c_region#8, c_phone#9, c_mktsegment#10]
         :     :     +- Project [lo_custkey#13, lo_suppkey#15, lo_orderdate#16, lo_revenue#23L]
         :     :        +- Filter ((isnotnull(lo_custkey#13) && isnotnull(lo_suppkey#15)) && isnotnull(lo_orderdate#16))
         :     :           +- HiveTableRelation `ssb`.`lineorder`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [lo_orderkey#11L, lo_linenumber#12L, lo_custkey#13, lo_partkey#14, lo_suppkey#15, lo_orderdate#16, lo_orderpriotity#17, lo_shippriotity#18, lo_quantity#19L, lo_extendedprice#20L, lo_ordtotalprice#21L, lo_discount#22L, lo_revenue#23L, lo_supplycost#24L, lo_tax#25L, lo_commitdate#26, lo_shipmode#27]
         :     +- Project [s_suppkey#28, s_city#31]
         :        +- Filter (((isnotnull(s_nation#32) && ((s_city#31 = UNITED KI1) || (s_city#31 = UNITED KI5))) && (s_nation#32 = UNITED KINGDOM)) && isnotnull(s_suppkey#28))
         :           +- HiveTableRelation `ssb`.`supplier`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [s_suppkey#28, s_name#29, s_address#30, s_city#31, s_nation#32, s_region#33, s_phone#34]
         +- Project [d_datekey#35, d_year#39]
            +- Filter ((isnotnull(d_yearmonth#41) && (d_yearmonth#41 = Dec1997)) && isnotnull(d_datekey#35))
               +- HiveTableRelation `ssb`.`dates`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [d_datekey#35, d_date#36, d_dayofweek#37, d_month#38, d_year#39, d_yearmonthnum#40, d_yearmonth#41, d_daynuminweek#42, d_daynuminmonth#43, d_daynuminyear#44, d_monthnuminyear#45, d_weeknuminyear#46, d_sellingseason#47, d_lastdayinweekfl#48, d_lastdayinmonthfl#49, d_holidayfl#50, d_weekdayfl#51]

重写后的一个逻辑计划如下:

Sort [d_year#47 ASC NULLS FIRST, revenue#558L DESC NULLS LAST], true
+- Aggregate [c_city#22, s_city#39, d_year#47], [c_city#22, s_city#39, d_year#47, sum(cast(lo_revenue_SUM#773L as bigint)) AS revenue#558L]
   +- Filter ((((((((isnotnull(s_nation#40) && ((s_city#39 = UNITED KI1) || (s_city#39 = UNITED KI5))) && (s_nation#40 = UNITED KINGDOM)) && isnotnull(d_yearmonth#49)) && (d_yearmonth#49 = Dec1997)) && isnotnull(c_nation#23)) && (c_nation#23 = UNITED KINGDOM)) && ((c_city#22 = UNITED KI1) || (c_city#22 = UNITED KI5))) && (grouping_id#662 = 19322))
      +- Relation[grouping_id#662,lo_discount#759,s_city#39,c_city#22,p_category#762,lo_quantity#763,d_weeknuminyear#764,s_nation#40,s_region#766,p_mfgr#767,c_region#768,p_brand1#769,c_nation#23,d_yearmonthnum#771,d_yearmonth#49,lo_revenue_SUM#773L,lo_supplycost_SUM#774L,V_REVENUE_SUM#775L,d_year#47] parquet

由此可见,执行计划大大简化,我们可以做到亚秒级响应用户的命中查询。

进一步优化

在实际测试过程中,我们发现当多个Relational Cache存在时,匹配时间线性增长明显。由于我们在metastore中存储的是Cache的SQL语句,取SQL语句和再次解析的时间都不容小觑,这就使得匹配过程明显增长,背离了我们追求亚秒级响应的初衷。因此我们在Spark中构建了逻辑计划缓存,将解析过的Relational Cache的计划缓存在内存中,每个Relational Cache只缓存一份,计划本身占用空间有限,因此我们可以缓存住几乎所有的Relational Cache的优化后的逻辑计划,从而在第一次查询之后,所有查询都不再收到取SQL语句和再次解析的延迟困扰。经过这样的优化,匹配时间大幅减少到100ms的量级。

总结与思考

Relational Cache实现了一种基于Cache的优化方案,让Spark SQL能够用于即时查询的场景下,满足用户对海量数据秒级查询的需求。通过对用户查询的动态改写,可以大大提高缓存的利用率,扩展缓存的命中场景,有效提高查询性能。现有方案也有很多可优化的地方,比如重复的回溯遍历时间复杂度较高,不如在逻辑计划节点内部更新维护可匹配的信息。考虑到对Spark的侵入性,我们暂时选择了现有方案,后续根据实际的使用情况,还会进一步优化我们的逻辑计划重写过程。而重写的逻辑计划还涉及到基于不同的Relational Cache Plan会有不同的重写方式,在这些重写结果中如何根据执行代价选择最优的重写方案,将会在后续文章中进行揭秘,敬请期待!

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
2月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
153 2
|
3月前
|
SQL 分布式计算 Serverless
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
163 3
阿里云 EMR Serverless Spark 版正式开启商业化
|
3月前
|
存储 缓存 分布式计算
Spark cache()与unpersist()使用位置
Spark在执行过程中是懒加载模式,RDD转换仅仅是构建DAG描述而不执行,只有遇到action算子才会真正的运行
54 9
|
5月前
|
弹性计算 分布式计算 运维
迟来的EMR Serverless Spark评测报告
本文是一篇关于阿里云EMR Serverless Spark产品评测的文章,作者分享了使用体验和理解。EMR Serverless Spark是阿里云提供的全托管、一站式的Spark数据计算平台,简化了大数据处理流程,让用户专注于数据分析。文章提到了产品的主要优势,如快速启动、弹性伸缩、高资源利用率和低成本。
248 8
|
4月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
219 0
|
4月前
|
分布式计算 大数据 MaxCompute
EMR Remote Shuffle Service实践问题之阿里云RSS的开源计划内容如何解决
EMR Remote Shuffle Service实践问题之阿里云RSS的开源计划内容如何解决
|
4月前
|
分布式计算 测试技术 调度
EMR Remote Shuffle Service实践问题之集群中落地阿里云RSS如何解决
EMR Remote Shuffle Service实践问题之集群中落地阿里云RSS如何解决
|
2月前
|
SQL 存储 缓存
阿里云EMR StarRocks X Paimon创建 Streaming Lakehouse
本文介绍了阿里云EMR StarRocks在数据湖分析领域的应用,涵盖StarRocks的数据湖能力、如何构建基于Paimon的实时湖仓、StarRocks与Paimon的最新进展及未来规划。文章强调了StarRocks在极速统一、简单易用方面的优势,以及在数据湖分析加速、湖仓分层建模、冷热融合及全链路ETL等场景的应用。
303 8
阿里云EMR StarRocks X Paimon创建 Streaming Lakehouse
|
2月前
|
SQL 存储 缓存
降本60% ,阿里云 EMR StarRocks 全新发布存算分离版本
阿里云 EMR Serverless StarRocks 现已推出全新存算分离版本,该版本不仅基于开源 StarRocks 进行了全面优化,实现了存储与计算解耦架构,还在性能、弹性伸缩以及多计算组隔离能力方面取得了显著进展。
317 6
|
2月前
|
SQL 存储 缓存
阿里云EMR StarRocks X Paimon创建 Streaming Lakehouse
讲师焦明烨介绍了StarRocks的数据湖能力,如何使用阿里云EMR StarRocks构建基于Paimon的极速实时湖仓,StarRocks与Paimon的最新进展及未来规划。
136 3