SparkSQL Catalyst解析

简介: Catalyst Optimizer是SparkSQL的核心组件(查询优化器),它负责将SQL语句转换成物理执行计划,Catalyst的优劣决定了SQL执行的性能。

Catalyst Optimizer是SparkSQL的核心组件(查询优化器),它负责将SQL语句转换成物理执行计划,Catalyst的优劣决定了SQL执行的性能。

查询优化器是一个SQL引擎的核心,开源常用的有Apache Calcite(很多开源组件都通过引入Calcite来实现查询优化,如Hive/Phoenix/Drill等),另外一个是orca(HAWQ/GreenPlum中使用)。

关系代数是查询优化器的理论基础。常见的查询优化技术:查询重用(ReuseSubquery/ReuseExchange等)/RBO/CBO等。

SparkSQL执行流程

sql1

SparkSQL中对一条SQL语句的处理过程如上图所示:
1) SqlParser将SQL语句解析成一个逻辑执行计划(未解析)
2) Analyzer利用HiveMeta中表/列等信息,对逻辑执行计划进行解析(如表/列是否存在等)
3) SparkOptimizer利用Rule Based(基于经验规则RBO)/Cost Based(基于代价CBO)的优化方法,对逻辑执行计划进行优化(如谓词下推/JoinReorder)
4) SparkPlanner将逻辑执行计划转换成物理执行计划(如Filter -> FilterExec),
同时从某些逻辑算子的多种物理算子实现中根据RBO/CBO选择其中一个合适的物理算子(如Join的多个实现BroadcastJoin/SortMergeJoin/HashJoin中选择一个实现)
5) PrepareForExecution是执行物理执行计划之前做的一些事情,比如ReuseExchange/WholeStageCodegen的处理等等
6) 最终在SparkCore中执行该物理执行计划。

接下来介绍Catalyst中的核心模块SparkOptimizer/SparkPlanner.

SparkOptimizer

使用已有的规则对逻辑执行计划进行优化,该过程是基于经验/启发式的优化方法,得到优化过的逻辑执行计划。

444

如上图所示,Optimizer中有很多Batch,每个Batch中包含1个或多个Rule,Batch的另外一个属性是迭代次数(Once/FixPoint默认100次),每个Batch内部Rule有前后执行顺序,Batch之间也是按照顺序来执行的。目前Optimizer中有60多个Rule。
备注: 从Rule看JoinReorder在这个过程就已经处理了。

SparkPlanner

参考: https://issues.apache.org/jira/browse/SPARK-1251
SparkPlanner将逻辑执行计划转换成物理执行计划,即将逻辑执行计划树中的逻辑节点转换成物理节点,如Join转换成HashJoinExec/SortMergeJoinExec...,Filter转成FilterExec等

666

Spark的Stragety有8个:

  • DataSourceV2Strategy
  • FileSourceStrategy
  • DataSourceStrategy
  • SpecialLimits
  • Aggregation
  • JoinSelection
  • InMemoryScans
  • BasicOperators

上述很多Stragety都是基于规则的策略。
JoinSelection用到了相关的统计信息来选择将Join转换为BroadcastHashJoinExec还是ShuffledHashJoinExec还是SortMergeJoinExec,属于CBO基于代价的策略。

PrepareForExecution

在执行之前,对物理执行计划做一些处理,这些处理都是基于规则的,包括

  • PlanSubqueries
  • EnsureRequirements
  • CollapseCodegenStages
  • ReuseExchange
  • ReuseSubquery

经过上述步骤之后生成的最终物理执行计划提交到Spark执行。

CBO(基于代价)实现

CBO的实现有三个步骤如下,可以大致了解一下:

1. 统计信息采集

Optimizer/Planner中CBO(基于代价)的优化需要采集统计信息,包括表维度和列维度。

//包含表/列
case class Statistics(
    sizeInBytes: BigInt,
    rowCount: Option[BigInt] = None,
    attributeStats: AttributeMap[ColumnStat] = AttributeMap(Nil),
    hints: HintInfo = HintInfo())

//列
case class ColumnStat(
    distinctCount: BigInt,
    min: Option[Any],
    max: Option[Any],
    nullCount: BigInt,
    avgLen: Long,
    maxLen: Long,
    histogram: Option[Histogram] = None)

上面结构体用来存储统计信息,可以看出:
表维度: 大小/条数
列维度: NDV/min/max/Null/平均长度/最大长度/直方图

上述信息需要提前使用Analyze命令进行采集

// 采集表维度的统计信息,NOSCAN表示不扫描表(即只有表大小信息,不采集表条数信息)
ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)]
COMPUTE STATISTICS [NOSCAN];

// 采集列信息
// 若spark.sql.statistics.histogram.enabled设置为true,则会采集直方图信息
// 采集直方图信息需要额外一次的表扫描
// 使用的是等高直方图
// 只支持IntegralType/DoubleType/DecimalType/FloatType/DateType/TimestampType的列采集直方图
ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS column1, column2;

2.估算算子统计信息

逻辑执行计划树中只有叶子节点(表)有实际的统计信息(通过Analyze获取), 逻辑执行计划树中非叶子节点会根据子节点信息以及估算方法获取本节点的统计信息。

/**
 * Returns the estimated statistics for the current logical plan node. Under the hood, this
 * method caches the return value, which is computed based on the configuration passed in the
 * first time. If the configuration changes, the cache can be invalidated by calling
 * [[invalidateStatsCache()]].
 */
def stats: Statistics = statsCache.getOrElse {
  if (conf.cboEnabled) {
    statsCache = Option(BasicStatsPlanVisitor.visit(self))
  } else {
    statsCache = Option(SizeInBytesOnlyStatsPlanVisitor.visit(self))
  }
  statsCache.get
}

def visit(p: LogicalPlan): T = p match {
    case p: Aggregate => visitAggregate(p)
    case p: Distinct => visitDistinct(p)
    case p: Except => visitExcept(p)
    case p: Expand => visitExpand(p)
    case p: Filter => visitFilter(p)
    case p: Generate => visitGenerate(p)
    case p: GlobalLimit => visitGlobalLimit(p)
    case p: Intersect => visitIntersect(p)
    case p: Join => visitJoin(p)
    case p: LocalLimit => visitLocalLimit(p)
    case p: Pivot => visitPivot(p)
    case p: Project => visitProject(p)
    case p: Repartition => visitRepartition(p)
    case p: RepartitionByExpression => visitRepartitionByExpr(p)
    case p: ResolvedHint => visitHint(p)
    case p: Sample => visitSample(p)
    case p: ScriptTransformation => visitScriptTransform(p)
    case p: Union => visitUnion(p)
    case p: Window => visitWindow(p)
    case p: LogicalPlan => default(p)
  }

每个算子都有自己的预估方法
CBO打开/关闭,有些算子的预估方法不一样,如AggregateEstimation/FilterEstimation/JoinEstimation/ProjectEstimation,其它算子CBO打开/关闭使用一套预估方法。

3.基于统计信息的优化

统计信息越准确,基于统计信息的优化更准确,从目前代码看只有下面三种场景使用到了统计信息。

JoinReorder

动态规划

//代价函数
//weight可以通过参数控制spark.sql.cbo.joinReorder.card.weight,默认0.7
//根据行数/大小来计算代价
cost = rows * weight + size * (1 - weight)

// 比较两种Join的代价大小
def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
      if (other.planCost.card == 0 || other.planCost.size == 0) {
        false
      } else {
        val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
        val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
        relativeRows * conf.joinReorderCardWeight +
          relativeSize * (1 - conf.joinReorderCardWeight) < 1
      }
}
JoinSelection

根据Join两个子节点的统计信息,判断使用BroadcastHashJoinExec还是ShuffledHashJoinExec还是SortMergeJoinExec,比如其中一个表(size)很小则可以使用BroadcastHashJoinExec。

StarSchemaDetection

探测星型模型,判断一个列是否是表的主键(因为SparkSQL不支持主键设置)

/**
 * Determines if a column referenced by a base table access is a primary key.
 * A column is a PK if it is not nullable and has unique values.
 * To determine if a column has unique values in the absence of informational
 * RI constraints, the number of distinct values is compared to the total
 * number of rows in the table. If their relative difference
 * is within the expected limits (i.e. 2 * spark.sql.statistics.ndv.maxError based
 * on TPC-DS data results), the column is assumed to have unique values.
 */
  private def isUnique(
      column: Attribute,
      plan: LogicalPlan): Boolean = plan match {
    case PhysicalOperation(_, _, t: LeafNode) =>
      val leafCol = findLeafNodeCol(column, plan)
      leafCol match {
        case Some(col) if t.outputSet.contains(col) =>
          val stats = t.stats
          stats.rowCount match {
            case Some(rowCount) if rowCount >= 0 =>
              if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) {
                val colStats = stats.attributeStats.get(col)
                if (colStats.get.nullCount > 0) {
                  false
                } else {
                  val distinctCount = colStats.get.distinctCount
                  val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d)
                  // ndvMaxErr adjusted based on TPCDS 1TB data results
                  relDiff <= conf.ndvMaxError * 2
                }
              } else {
                false
              }
            case None => false
          }
        case None => false
      }
    case _ => false
  }

image

目录
相关文章
|
SQL 分布式计算 大数据
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(一)
294 0
|
SQL 分布式计算 算法
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
大数据-97 Spark 集群 SparkSQL 原理详细解析 Broadcast Shuffle SQL解析过程(二)
242 0
|
SQL 分布式计算 数据库
SparkSQL的解析详解
  SparkSQL继承自Hive的接口,由于hive是基于MapReduce进行计算的,在计算过程中大量的中间数据要落地于磁盘,从而消耗了大量的I/O,降低了运行的效率,从而基于内存运算的SparkSQL应运而生。
1165 0
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
365 2
|
9月前
|
算法 测试技术 C语言
深入理解HTTP/2:nghttp2库源码解析及客户端实现示例
通过解析nghttp2库的源码和实现一个简单的HTTP/2客户端示例,本文详细介绍了HTTP/2的关键特性和nghttp2的核心实现。了解这些内容可以帮助开发者更好地理解HTTP/2协议,提高Web应用的性能和用户体验。对于实际开发中的应用,可以根据需要进一步优化和扩展代码,以满足具体需求。
904 29
|
9月前
|
前端开发 数据安全/隐私保护 CDN
二次元聚合短视频解析去水印系统源码
二次元聚合短视频解析去水印系统源码
381 4
|
9月前
|
JavaScript 算法 前端开发
JS数组操作方法全景图,全网最全构建完整知识网络!js数组操作方法全集(实现筛选转换、随机排序洗牌算法、复杂数据处理统计等情景详解,附大量源码和易错点解析)
这些方法提供了对数组的全面操作,包括搜索、遍历、转换和聚合等。通过分为原地操作方法、非原地操作方法和其他方法便于您理解和记忆,并熟悉他们各自的使用方法与使用范围。详细的案例与进阶使用,方便您理解数组操作的底层原理。链式调用的几个案例,让您玩转数组操作。 只有锻炼思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
|
9月前
|
移动开发 前端开发 JavaScript
从入门到精通:H5游戏源码开发技术全解析与未来趋势洞察
H5游戏凭借其跨平台、易传播和开发成本低的优势,近年来发展迅猛。接下来,让我们深入了解 H5 游戏源码开发的技术教程以及未来的发展趋势。
|
9月前
|
存储 前端开发 JavaScript
在线教育网课系统源码开发指南:功能设计与技术实现深度解析
在线教育网课系统是近年来发展迅猛的教育形式的核心载体,具备用户管理、课程管理、教学互动、学习评估等功能。本文从功能和技术两方面解析其源码开发,涵盖前端(HTML5、CSS3、JavaScript等)、后端(Java、Python等)、流媒体及云计算技术,并强调安全性、稳定性和用户体验的重要性。
|
10月前
|
机器学习/深度学习 自然语言处理 算法
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
生成式 AI 大语言模型(LLMs)核心算法及源码解析:预训练篇
2639 1

推荐镜像

更多
  • DNS