facebook Presto SQL分析引擎——本质上和spark无异,分解stage,task,MR计算

简介:

Presto 是由 Facebook 开源的大数据分布式 SQL 查询引擎,适用于交互式分析查询,可支持众多的数据源,包括 HDFS,RDBMS,KAFKA 等,而且提供了非常友好的接口开发数据源连接器。

介绍

Presto是一个运行在多台服务器上的分布式系统。 完整安装包括一个coordinator和多个worker。 由客户端提交查询,从Presto命令行CLI提交到coordinator。 coordinator进行解析,分析并执行查询计划,然后分发处理队列到worker。

Presto Installation Overview

完全基于内存的并行计算

查询的并行执行流程

Presto SQL的执行流程如下图所示

  1. Cli通过HTTP协议提交SQL查询之后,查询请求封装成一个SqlQueryExecution对象交给Coordinator的SqlQueryManager#queryExecutor线程池去执行
  2. 每个SqlQueryExecution线程(图中Q-X线程)启动后对查询请求的SQL进行语法解析和优化并最终生成多个Stage的SqlStageExecution任务,每个SqlStageExecution任务仍然交给同样的线程池去执行
  3. 每个SqlStageExecution线程(图中S-X线程)启动后每个Stage的任务按PlanDistribution属性构造一个或者多个RemoteTask通过HTTP协议分配给远端的Worker节点执行
  4. Worker节点接收到RemoteTask请求之后,启动一个SqlTaskExecution线程(图中T-X线程)将这个任务的每个Split包装成一个PrioritizedSplitRunner任务(图中SR-X)交给Worker节点的TaskExecutor#executor线程池去执行

 查询执行流程

上面的执行计划实际执行效果如下图所示。

  1. Coordinator通过HTTP协议调用Worker节点的 /v1/task 接口将执行计划分配给所有Worker节点(图中蓝色箭头)
  2. SubPlan1的每个节点读取一个Split的数据并过滤后将数据分发给每个SubPlan0节点进行Join操作和Partial Aggr操作
  3. SubPlan1的每个节点计算完成后按GroupBy Key的Hash值将数据分发到不同的SubPlan2节点
  4. 所有SubPlan2节点计算完成后将数据分发到SubPlan3节点
  5. SubPlan3节点计算完成后通知Coordinator结束查询,并将数据发送给Coordinator

 执行计划计算流程

源数据的并行读取

在上面的执行计划中SubPlan1和SubPlan0都是Source节点,其实它们读取HDFS文件数据的方式就是调用的HDFS InputSplit API,然后每个InputSplit分配一个Worker节点去执行,每个Worker节点分配的InputSplit数目上限是参数可配置的,Config中的query.max-pending-splits-per-node参数配置,默认是100。

分布式的Hash聚合

上面的执行计划在SubPlan0中会进行一次Partial的聚合计算,计算每个Worker节点读取的部分数据的部分聚合结果,然后SubPlan0的输出会按照group by字段的Hash值分配不同的计算节点,最后SubPlan3合并所有结果并输出

流水线

数据模型

Presto中处理的最小数据单元是一个Page对象,Page对象的数据结构如下图所示。一个Page对象包含多个Block对象,每个Block对象是一个字节数组,存储一个字段的若干行。多个Block横切的一行是真实的一行数据。一个Page最大1MB,最多16*1024行数据。

 数据模型

节点内部流水线计算

下图是一个Worker节点内部的计算流程图,左侧是任务的执行流程图。

Worker节点将最细粒度的任务封装成一个PrioritizedSplitRunner对象,放入pending split优先级队列中。每个

Worker节点启动一定数目的线程进行计算,线程数task.shard.max-threads=availableProcessors() * 4,在config中配置。

每个空闲的线程从队列中取出一个PrioritizedSplitRunner对象执行,如果执行完成一个周期,超过最大执行时间1秒钟,判断任务是否执行完成,如果完成,从allSplits队列中删除,如果没有,则放回pendingSplits队列中。

每个任务的执行流程如下图右侧,依次遍历所有Operator,尝试从上一个Operator取一个Page对象,如果取得的Page不为空,交给下一个Operator执行。

 节点内部流水线计算

节点间流水线计算

下图是ExchangeOperator的执行流程图,ExchangeOperator为每一个Split启动一个HttpPageBufferClient对象,主动向上一个Stage的Worker节点拉数据,数据的最小单位也是一个Page对象,取到数据后放入Pages队列中

 节点间流水线计算

本地化计算

Presto在选择Source任务计算节点的时候,对于每一个Split,按下面的策略选择一些minCandidates

  1. 优先选择与Split同一个Host的Worker节点
  2. 如果节点不够优先选择与Split同一个Rack的Worker节点
  3. 如果节点还不够随机选择其他Rack的节点

对于所有Candidate节点,选择assignedSplits最少的节点。

动态编译执行计划

Presto会将执行计划中的ScanFilterAndProjectOperator和FilterAndProjectOperator动态编译为Byte Code,并交给JIT去编译为native代码。Presto也使用了Google Guava提供的LoadingCache缓存生成的Byte Code。

 动态编译执行计划

 动态编译执行计划

上面的两段代码片段中,第一段为没有动态编译前的代码,第二段代码为动态编译生成的Byte Code反编译之后还原的优化代
码,我们看到这里采用了循环展开的优化方法。

循环展开最常用来降低循环开销,为具有多个功能单元的处理器提供指令级并行。也有利于指令流水线的调度。

小心使用内存和数据结构

使用Slice进行内存操作,Slice使用Unsafe#copyMemory实现了高效的内存拷贝,Slice仓库参考:https://github.com/airlift/slice

Facebook工程师在另一篇介绍ORCFile优化的文章中也提到使用Slice将ORCFile的写性能提高了20%~30%,参考:https://code.facebook.com/posts/229861827208629/scaling-the-facebook-data-warehouse-to-300-pb/

类BlinkDB的近似查询

为了加快avg、count distinct、percentile等聚合函数的查询速度,Presto团队与BlinkDB作者之一Sameer Agarwal合作引入了一些近似查询函数approx_avg、approx_distinct、approx_percentile。approx_distinct使用HyperLogLog Counting算法实现。

GC控制

Presto团队在使用hotspot java7时发现了一个JIT的BUG,当代码缓存快要达到上限时,JIT可能会停止工作,从而无法将使用频率高的代码动态编译为native代码。

Presto团队使用了一个比较Hack的方法去解决这个问题,增加一个线程在代码缓存达到70%以上时进行显式GC,使得已经加载的Class从perm中移除,避免JIT无法正常工作的BUG。

 










本文转自张昺华-sky博客园博客,原文链接:http://www.cnblogs.com/bonelee/p/6616504.html,如需转载请自行联系原作者


相关文章
|
3月前
|
SQL 存储 缓存
SQL计算班级语文平均分:详细步骤与技巧
在数据库管理和分析中,经常需要计算某个班级在特定科目上的平均分
|
4月前
|
SQL 存储 并行计算
Lindorm Ganos 一条 SQL 计算轨迹
Lindorm Ganos 针对轨迹距离计算场景提供了内置函数 ST_Length_Rows,结合原生时空二级索引和时空聚合计算下推技术,能够高效过滤数据并并行执行运算任务。该方案通过主键索引和时空索引快速过滤数据,并利用多Region并行计算轨迹点距离,适用于车联网等场景。具体步骤包括根据车辆识别代码和时间戳过滤数据、范围过滤轨迹点以及并行计算距离。使用限制包括只支持点类型列聚合运算及表中轨迹点需按顺序排列等。测试结果显示,Lindorm Ganos 在不同数据量下均能实现秒级响应。
38 3
|
5月前
|
SQL
慢sql治理问题之 Task 数量分布不均的问题你们是如何优化的
慢sql治理问题之 Task 数量分布不均的问题你们是如何优化的
慢sql治理问题之 Task 数量分布不均的问题你们是如何优化的
|
5月前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
72 1
|
5月前
|
SQL 流计算
慢sql治理问题之下游 Task 频繁请求 JobMaster 导致 RPC 超时的问题你们是如何解决的
慢sql治理问题之下游 Task 频繁请求 JobMaster 导致 RPC 超时的问题你们是如何解决的
|
5月前
|
SQL 数据挖掘 数据库
SQL计算班级语文平均分:详细步骤与技巧
在数据库管理中,经常需要统计和查询各种汇总信息,如班级某科目的平均分
|
7月前
|
SQL 分布式计算 大数据
MaxCompute产品使用问题之odps sql 底层计算框架是MR吗
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
6月前
|
分布式计算 Apache Spark
|
8月前
|
分布式计算 DataWorks 大数据
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
MaxCompute操作报错合集之大数据计算的MaxCompute Spark引擎无法读取到表,是什么原因
|
8月前
|
机器学习/深度学习 分布式计算 数据处理
Spark是一个基于内存的通用数据处理引擎,可以进行大规模数据处理和分析
【5月更文挑战第2天】Spark是一个基于内存的通用数据处理引擎,可以进行大规模数据处理和分析
158 3