EMR Spark Relational Cache 利用数据预组织加速查询

本文涉及的产品
EMR Serverless StarRocks,5000CU*H 48000GB*H
简介: 在利用Relational Cache进行查询优化时,我们需要通过预计算,存储大量数据。而在查询时,我们真正需要读取的数据量也许并不大。为了能让查询实现秒级响应,这就涉及到优化从大量数据中快速定位所需数据的场景。

Relational Cache相关文章链接:

使用Relational Cache加速EMR Spark数据分析
使用EMR Spark Relational Cache跨集群同步数据
EMR Spark Relational Cache的执行计划重写
EMR Spark Relational Cache如何支持雪花模型中的关联匹配

背景

在利用Relational Cache进行查询优化时,我们需要通过预计算,存储大量数据。而在查询时,我们真正需要读取的数据量也许并不大。为了能让查询实现秒级响应,这就涉及到优化从大量数据中快速定位所需数据的场景。本文介绍在EMR Spark Relational Cache中,我们如何针对这种场景进行了优化。

存储格式

在数据存储格式上,我们默认选择Spark社区支持最好的Parquet格式。Parquet是一种列式存储格式,我们可以很方便地利用列式存储格式进行字段裁剪。另外,Parquet的每个数据文件由多个Row Group组成,同时在每个数据文件的footer中记录了各个Row Group的统计信息,如最大值、最小值等。这些统计信息可以在读取数据时减少实际的IO开销。事实上,在现在的Spark版本中,我们可以看到Catalyst优化器已经把可以下推的一些过滤条件下推到了Parquet reader,利用Parquet文件的统计信息过滤真正需要读取的Row Group,从而实现减少IO量,加速查询时间的效果。这也是列存格式基本都支持的功能。

对于Relational Cache而言,有很多过滤条件时确定已知的。我们直接利用这一特性,将确定的查询条件下推到Parquet reader里,由Parquet reader完成对Row Group的选择。由于实际要读取的数据量占总数据量的比重往往很小,这种过滤的实际效果还是比较好的。

image

数据分区

对于Spark Relational Cache来说,由于构建Cube时会使用到Expand算子,我们需要引入Grouping ID来区分不同的grouping set。在大部分后续的查询中,我们往往只需要其中一个Grouping ID所对应的数据。因此,Grouping ID成了一个天然的数据分区选择。在Hive/Spark等大数据分析引擎中,数据分区是对于结构化数据,将其中一个或多个字段的具体值作为目录,分目录存放文件的一种常见做法。当我们确定要选择某Grouping ID对应的数据时,我们只需读取对应目录中的数据即可。这种做法可以直接忽略Grouping ID不匹配的文件,从而大大减少启动的总task数量,减少Spark的任务调度开销。

文件索引

当总数据量较大时,存储的文件数也会比较多。此时即使我们通过Parquet的footer可以获得较好的过滤效果,我们还是要启动一些task去读取这些footer。在Spark实际的实现中,往往需要与文件数量的量级相当的task去进行footer读取。在集群计算资源有限时,调度这些任务就显得比较浪费时间。为了能进一步减少Spark作业的调度开销,进一步提高执行效率,我们实现了文件索引来优化这种场景。

文件索引就类似于独立的footer。我们提前收集每个文件中各字段数据的最大最小值,并存储在一个隐藏的数据表中。这样,我们只需要读取一个单独的表就可以从文件层面对需要处理的文件做一个初步的过滤。这是一个单独的stage,由于一个文件只对应这个隐藏表中的一条记录,因此读取隐藏表所需的task数量要远远小于读取所有数据文件footer的开销。后续stage的任务数量也因此可以大大减少。在Relational Cache的访问场景下,整体加速效果非常明显。

数据排序

为了能实现高效的数据准备过程,不论是在Parquet文件的footer还是在我们实现的文件索引中,都是主要依靠最大值和最小值的信息来过滤数据。那么在极端场景下,光靠这些统计信息可能会完全没有过滤的效果。举个例子,如果某个key的所有数据文件、所有Row Group的最大值和最小值都等于全局最大值和最小值时,对这个key的过滤就完全无效了。这样,我们会自然而然的想到对数据进行排序。

但是,传统的数据排序还有一个问题。在数据库中,当我们对多个字段进行排序时,往往字段之间具有主次关系,这就导致排序字段序列中,排在最前面的字段有很好的过滤效果,而排得靠后的字段因为数据分散,往往过滤效果越来越差。这就需要我们找到更好的排序方法,能够兼顾到多个字段的数据过滤效果。

这里涉及到一个空间填充曲线的概念。我们可以把数据想像成一个有限空间,如何将数据进行排序和分块,能够使得每一块的最值都只是在一个不大的范围内,从而让文件索引获得较好的过滤效果呢?我们选择了Z-Order曲线对多维数据空间进行排序,这样可以保证每列都有较为均衡的过滤效果。下图是二维空间中Z-Order曲线的示意图。

image

不过我们也要注意到,随着排序列的增加,单列的过滤效果将会越来越差。因此在实际运用中,我们也要对排序列进行取舍,才能获得最佳的整体效果。

小结

本文介绍了EMR Spark的Relational Cache如何从数据量较大的Cube中快速提取出所需数据加速查询的原理。通过列式存储、文件索引、Z-Order等技术,我们可以快速过滤数据,大大减少实际发生的IO数据量,避免IO瓶颈的出现,从而优化整体查询性能。

参考

  1. Apache Parquet
  2. Processing Petabytes of Data in Seconds with Databricks Delta
  3. Z-Order Curve
相关实践学习
基于EMR Serverless StarRocks一键玩转世界杯
基于StarRocks构建极速统一OLAP平台
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
2月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
52 3
|
2月前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
153 2
|
3月前
|
SQL 分布式计算 Serverless
阿里云 EMR Serverless Spark 版正式开启商业化
阿里云 EMR Serverless Spark 版正式开启商业化,内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验!
163 3
阿里云 EMR Serverless Spark 版正式开启商业化
|
4月前
|
存储 安全 API
阿里云EMR数据湖文件系统问题之JindoFS元数据查询和修改请求的问题如何解决
阿里云EMR数据湖文件系统问题之JindoFS元数据查询和修改请求的问题如何解决
|
4月前
|
分布式计算 Serverless 数据处理
EMR Serverless Spark 实践教程 | 通过 Apache Airflow 使用 Livy Operator 提交任务
Apache Airflow 是一个强大的工作流程自动化和调度工具,它允许开发者编排、计划和监控数据管道的执行。EMR Serverless Spark 为处理大规模数据处理任务提供了一个无服务器计算环境。本文为您介绍如何通过 Apache Airflow 的 Livy Operator 实现自动化地向 EMR Serverless Spark 提交任务,以实现任务调度和执行的自动化,帮助您更有效地管理数据处理任务。
219 0
|
SQL 分布式计算 Spark
钉钉群直播【Spark Relational Cache 原理和实践】
主要介绍Relational Cache/物化视图的历史和背景,以及EMR Spark基于Relational Cache加速Spark查询的技术方案,及如何通过基于Relational Cache的数据预计算和预组织,使用Spark支持亚秒级响应的交互式分析使用场景。
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
108 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
68 0
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
44 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
98 0