EMR Spark Relational Cache的执行计划重写

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 作者:王道远,花名健身, 阿里巴巴计算平台EMR技术专家。 背景 EMR Spark提供的Relational Cache功能,可以通过对数据模型进行预计算和高效地存储,加速Spark SQL,为客户实现利用Spark SQL对海量数据进行即时查询的目的。

作者:王道远,花名健身, 阿里巴巴计算平台EMR技术专家。

背景

EMR Spark提供的Relational Cache功能,可以通过对数据模型进行预计算和高效地存储,加速Spark SQL,为客户实现利用Spark SQL对海量数据进行即时查询的目的。Relational Cache的工作原理类似物化视图,在用户提交SQL语句时对语句进行分析,并选出可用的预计算结果来加速查询。为了实现高效地预计算结果复用,我们构建的预计算缓存一般都较为通用,因此对于用户query,还需进行进一步的计算方能获得最终结果。因此,如何快速地找出匹配的缓存,并构建出准确的新执行计划,就显得尤为重要。

在Hive 3.x中支持的Materialized View,利用了Apache Calcite对执行计划进行重写。考虑到Spark SQL使用Catalyst进行执行计划优化,引入Calcite太重,因此EMR Spark中的Relational Cache实现了自己的Catalyst规则,用于重写执行计划。本文将介绍执行计划重写的相关内容。

执行计划重写

准备工作
Spark会把用户查询语句进行解析,依次转化为Unresolved Logical Plan(未绑定的逻辑计划)、Resolved Logical Plan(绑定的逻辑计划)、Optimized Logical Plan(优化的逻辑计划)、Physical Plan(物理计划)。其中,未优化的逻辑计划根据用户查询语句不同,会有较大区别,而Relational Cache作为优化的一部分,放在逻辑计划优化过程中也较为合适,因此我们拿到的用户查询计划会是优化中的逻辑计划。要与优化中的逻辑计划匹配,我们选择把这个重写过程放在Spark优化器比较靠后的步骤中,同时,预先将Relational Cache的逻辑计划进行解析,获得优化后的Cache计划,减小匹配时的复杂程度。这样,我们只需匹配做完了谓词下推、谓词合并等等优化之后的两个逻辑计划。

基本过程
在匹配时,我们希望能尽可能多得匹配计算和IO操作,因此,我们对目标计划进行前序遍历,依次进行匹配,尝试找到最多的匹配节点。而在判断两个节点是否匹配时,我们采用后序遍历的方式,希望尽快发现不匹配的情况,减少计划匹配的执行时间。然后我们会根据匹配结果,对计划进行重写,包括对于Cache数据进行进一步的Filter、Project、Sort甚至Aggregate等操作,使其与匹配节点完全等价,然后更新逻辑计划节点的引用绑定,无缝替换到逻辑计划中,这样就能轻松获得最终的重写后的计划。

Join匹配
Spark中的Join都是二元操作,而实际的Join顺序可能根据一些策略会有很大区别,因此对于Join节点,必须进行特殊处理。我们会首先将逻辑计划进行处理,根据缓存计划的Join顺序进行Join重排。这一步在树状匹配之前就进行了,避免不断重复Join重排带来的时间浪费。重排后的Join可以更大概率地被我们匹配到。

为了实现Cache的通用性,根据星型数据模型的特点,我们引入了Record Preserve的概念。这和传统数据库中的Primary Key/Foreign Key的关系较为类似,当有主键的表与非空外键指向的表在外键上进行Join时,记录的条数不会变化,不会膨胀某条记录,也不会丢失某条记录。PK/FK的语意在大数据处理框架中经常缺失,我们引入了新的DDL让用户自定义Record Preserve Join的关系。当用户定义A Inner Join B是对于A表Record Preserve时,我们也会把A Inner Join B和A的关系匹配起来。有了PK/FK的帮助,我们能匹配上的情况大大增加了,一个Relational Cache可以被更多看似区别巨大的查询共享,这可以很好的为用户节约额外的存储开销和预计算开销。

Aggregate匹配
一般的Aggregate匹配较为简单,而Spark支持的Grouping Set操作,会构建出Expand逻辑计划节点,相当于把一条记录转为多条,使用Grouping ID进行标记。由于Expand的子节点是所有Grouping的情况共用的,这里我们只对子节点进行一次匹配,再分别进行上面的Grouping属性和Aggregate属性的匹配。主要是验证目标聚合所需的属性或者聚合函数都能从某个Grouping ID对应的聚合结果中计算出来,比如粗粒度的Sum可以对细粒度的Sum进行二次Sum求和,而粗粒度的Count对细粒度的Count也应通过二次Sum求和,粗粒度的Average无法仅从细粒度的Average中还原出来等等。

计划重写
找出匹配的逻辑计划之后,就是重写逻辑计划的过程。对于无需二次聚合的逻辑计划,直接根据缓存数据的schema,从缓存数据的Relation中选择所需列,根据条件过滤后,进行后续操作。如果还需二次聚合,选择所需列时需保留外部要用的所有列,以及聚合时需要的列,还有聚合函数需要的数据。二次聚合的聚合函数需要根据实际情况进行重写,确保能使用Relational Cache中已经初步聚合的结果。这里面需要根据聚合的语意判断是否能够二次聚合。如果时Grouping Set的聚合,二次聚合之前还需选择正确的Grouping ID进行过滤。经过二次聚合后,步骤大体和普通的重写一致,只需替换到目标计划中即可。

结果
我们以一个例子来具体说明逻辑计划的重写结果。Star Schema Benchmark(论文链接)是星型模型数据分析的一个标准Benchmark,其结构定义如图所示:

image

我们构建Relational Cache的SQL语句如下:

SELECT GROUPING_ID() AS grouping_id, lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum, SUM(lo_revenue) AS lo_revenue_SUM, SUM(lo_supplycost) AS lo_supplycost_SUM, SUM(V_REVENUE) AS V_REVENUE_SUM
FROM supplier, p_lineorder, dates, customer, part
WHERE lo_orderdate = d_datekey AND lo_custkey = c_custkey AND lo_suppkey = s_suppkey AND lo_partkey = p_partkey
GROUP BY lo_discount, s_city, c_city, p_category, d_year, lo_quantity, d_weeknuminyear, s_nation, s_region, p_mfgr, c_region, d_yearmonth, p_brand, c_nation, d_yearmonthnum GROUPING SETS ((d_year, d_weeknuminyear, lo_discount, lo_quantity), (d_year, lo_discount, lo_quantity), (lo_discount, lo_quantity), (d_yearmonthnum, lo_discount, lo_quantity), (d_year, p_category, p_brand, s_region), (d_year, p_category, s_region), (d_year, s_region), (d_year, s_region, c_region, s_nation, c_nation), (d_year, s_city, c_city, s_nation, c_nation), (d_year, s_city, c_city), (d_year, d_yearmonth, s_city, c_city), (d_year, s_region, c_region, c_nation, p_mfgr), (d_year, s_region, s_nation, c_region, p_mfgr, p_category), (d_year, s_nation, s_city, c_region, p_brand, p_category, p_brand), (d_year, s_nation, s_city, c_region, p_brand, p_category), (d_year, s_nation, s_city, c_region, p_category, p_brand))

我们从中选出一条查询作为示例。具体查询语句:

select c_city, s_city, d_year, sum(lo_revenue) as revenue
    from customer, lineorder, supplier, dates
    where lo_custkey = c_custkey
        and lo_suppkey = s_suppkey
        and lo_orderdate = d_datekey
        and c_nation = 'UNITED KINGDOM'
        and (c_city='UNITED KI1' or c_city='UNITED KI5')
        and (s_city='UNITED KI1' or s_city='UNITED KI5')
        and s_nation = 'UNITED KINGDOM'
        and d_yearmonth = 'Dec1997'
    group by c_city, s_city, d_year
    order by d_year asc, revenue desc

原始逻辑计划如下所示:

Sort [d_year#39 ASC NULLS FIRST, revenue#0L DESC NULLS LAST], true
+- Aggregate [c_city#6, s_city#31, d_year#39], [c_city#6, s_city#31, d_year#39, sum(lo_revenue#23L) AS revenue#0L]
   +- Project [c_city#6, lo_revenue#23L, s_city#31, d_year#39]
      +- Join Inner, (lo_orderdate#16 = d_datekey#35)
         :- Project [c_city#6, lo_orderdate#16, lo_revenue#23L, s_city#31]
         :  +- Join Inner, (lo_suppkey#15 = s_suppkey#28)
         :     :- Project [c_city#6, lo_suppkey#15, lo_orderdate#16, lo_revenue#23L]
         :     :  +- Join Inner, (lo_custkey#13 = c_custkey#3)
         :     :     :- Project [c_custkey#3, c_city#6]
         :     :     :  +- Filter (((isnotnull(c_nation#7) && (c_nation#7 = UNITED KINGDOM)) && ((c_city#6 = UNITED KI1) || (c_city#6 = UNITED KI5))) && isnotnull(c_custkey#3))
         :     :     :     +- HiveTableRelation `ssb`.`customer`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c_custkey#3, c_name#4, c_address#5, c_city#6, c_nation#7, c_region#8, c_phone#9, c_mktsegment#10]
         :     :     +- Project [lo_custkey#13, lo_suppkey#15, lo_orderdate#16, lo_revenue#23L]
         :     :        +- Filter ((isnotnull(lo_custkey#13) && isnotnull(lo_suppkey#15)) && isnotnull(lo_orderdate#16))
         :     :           +- HiveTableRelation `ssb`.`lineorder`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [lo_orderkey#11L, lo_linenumber#12L, lo_custkey#13, lo_partkey#14, lo_suppkey#15, lo_orderdate#16, lo_orderpriotity#17, lo_shippriotity#18, lo_quantity#19L, lo_extendedprice#20L, lo_ordtotalprice#21L, lo_discount#22L, lo_revenue#23L, lo_supplycost#24L, lo_tax#25L, lo_commitdate#26, lo_shipmode#27]
         :     +- Project [s_suppkey#28, s_city#31]
         :        +- Filter (((isnotnull(s_nation#32) && ((s_city#31 = UNITED KI1) || (s_city#31 = UNITED KI5))) && (s_nation#32 = UNITED KINGDOM)) && isnotnull(s_suppkey#28))
         :           +- HiveTableRelation `ssb`.`supplier`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [s_suppkey#28, s_name#29, s_address#30, s_city#31, s_nation#32, s_region#33, s_phone#34]
         +- Project [d_datekey#35, d_year#39]
            +- Filter ((isnotnull(d_yearmonth#41) && (d_yearmonth#41 = Dec1997)) && isnotnull(d_datekey#35))
               +- HiveTableRelation `ssb`.`dates`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [d_datekey#35, d_date#36, d_dayofweek#37, d_month#38, d_year#39, d_yearmonthnum#40, d_yearmonth#41, d_daynuminweek#42, d_daynuminmonth#43, d_daynuminyear#44, d_monthnuminyear#45, d_weeknuminyear#46, d_sellingseason#47, d_lastdayinweekfl#48, d_lastdayinmonthfl#49, d_holidayfl#50, d_weekdayfl#51]

重写后的一个逻辑计划如下:

Sort [d_year#47 ASC NULLS FIRST, revenue#558L DESC NULLS LAST], true
+- Aggregate [c_city#22, s_city#39, d_year#47], [c_city#22, s_city#39, d_year#47, sum(cast(lo_revenue_SUM#773L as bigint)) AS revenue#558L]
   +- Filter ((((((((isnotnull(s_nation#40) && ((s_city#39 = UNITED KI1) || (s_city#39 = UNITED KI5))) && (s_nation#40 = UNITED KINGDOM)) && isnotnull(d_yearmonth#49)) && (d_yearmonth#49 = Dec1997)) && isnotnull(c_nation#23)) && (c_nation#23 = UNITED KINGDOM)) && ((c_city#22 = UNITED KI1) || (c_city#22 = UNITED KI5))) && (grouping_id#662 = 19322))
      +- Relation[grouping_id#662,lo_discount#759,s_city#39,c_city#22,p_category#762,lo_quantity#763,d_weeknuminyear#764,s_nation#40,s_region#766,p_mfgr#767,c_region#768,p_brand1#769,c_nation#23,d_yearmonthnum#771,d_yearmonth#49,lo_revenue_SUM#773L,lo_supplycost_SUM#774L,V_REVENUE_SUM#775L,d_year#47] parquet

由此可见,执行计划大大简化,我们可以做到亚秒级响应用户的命中查询。

进一步优化

在实际测试过程中,我们发现当多个Relational Cache存在时,匹配时间线性增长明显。由于我们在metastore中存储的是Cache的SQL语句,取SQL语句和再次解析的时间都不容小觑,这就使得匹配过程明显增长,背离了我们追求亚秒级响应的初衷。因此我们在Spark中构建了逻辑计划缓存,将解析过的Relational Cache的计划缓存在内存中,每个Relational Cache只缓存一份,计划本身占用空间有限,因此我们可以缓存住几乎所有的Relational Cache的优化后的逻辑计划,从而在第一次查询之后,所有查询都不再收到取SQL语句和再次解析的延迟困扰。经过这样的优化,匹配时间大幅减少到100ms的量级。

总结与思考

Relational Cache实现了一种基于Cache的优化方案,让Spark SQL能够用于即时查询的场景下,满足用户对海量数据秒级查询的需求。通过对用户查询的动态改写,可以大大提高缓存的利用率,扩展缓存的命中场景,有效提高查询性能。现有方案也有很多可优化的地方,比如重复的回溯遍历时间复杂度较高,不如在逻辑计划节点内部更新维护可匹配的信息。考虑到对Spark的侵入性,我们暂时选择了现有方案,后续根据实际的使用情况,还会进一步优化我们的逻辑计划重写过程。而重写的逻辑计划还涉及到基于不同的Relational Cache Plan会有不同的重写方式,在这些重写结果中如何根据执行代价选择最优的重写方案,将会在后续文章中进行揭秘,敬请期待!

image

相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
3月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
217 2
|
4月前
|
SQL 分布式计算 Serverless
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
192 3
阿里云 EMR Serverless Spark 版正式开启商业化
|
4月前
|
存储 缓存 分布式计算
Spark cache()与unpersist()使用位置
Spark在执行过程中是懒加载模式,RDD转换仅仅是构建DAG描述而不执行,只有遇到action算子才会真正的运行
64 9
|
6月前
|
弹性计算 分布式计算 运维
迟来的EMR Serverless Spark评测报告
本文是一篇关于阿里云EMR Serverless Spark产品评测的文章,作者分享了使用体验和理解。EMR Serverless Spark是阿里云提供的全托管、一站式的Spark数据计算平台,简化了大数据处理流程,让用户专注于数据分析。文章提到了产品的主要优势,如快速启动、弹性伸缩、高资源利用率和低成本。
282 8
|
5月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
251 0
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
202 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
85 0
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
59 0
|
3月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
117 0
|
2月前
|
SQL 机器学习/深度学习 分布式计算
Spark快速上手:揭秘大数据处理的高效秘密,让你轻松应对海量数据
【10月更文挑战第25天】本文全面介绍了大数据处理框架 Spark,涵盖其基本概念、安装配置、编程模型及实际应用。Spark 是一个高效的分布式计算平台,支持批处理、实时流处理、SQL 查询和机器学习等任务。通过详细的技术综述和示例代码,帮助读者快速掌握 Spark 的核心技能。
128 6