一、前言:SQL从提交到运行
SQL代码提交到ODPS上后,会经过一段时间的运行,得到最终的运行结果。SQL优化,就是对这个运行过程进行优化,主要表现在:缩短运行时间、减小运行消耗成本。在正式进行任务优化之前,可以先了解ODPS上SQL从提交到运行的整个链路。
先看看我们熟悉的ODPS,作为阿里巴巴自主研发的大数据处理平台,ODPS提供海量数据的计算处理和分析服务,使用户不用了解数据计算存储细节,而可以直接进行数据查询和处理。下图简单介绍了ODPS的组成及ODPS上提交作业和运行作业流程。
MaxCompute(ODPS)四大组成部分:
ODPS上提交作业和运行作业流程图:
二、优化第一步,先看logview
Logview查看:
进行SQL任务优化,读懂logview是关键,可以在D2开发平台下面的面板点击链接键查看logview。
Logview展示了Fuxi Job Dag图、Fuxi Task和Fuxi Instance等信息。他们的关系如下:一个SQL任务可以由多个Fuxi Job构成,一个Fuxi Job可以由多个Fuxi Task构成,一个Fuxi Task可以由多个Fuxi Instance构成。在logview面板的下方可以查看每个Fuxi Instance的明细情况。
ODPS中的Fuxi Task一般是有四种字母:M\R\J\C开头,分别代表四种任务类型。Fuxi Job Dag图中的Fuxi Task以这四种字母开头,后面连接的序号一般代表该Task和其依赖的直接上游Task编号。
在Fuxi Job Dag图中,可以直观看到各Fuxi Task的Instance个数,如下图所示左边代表运行中的(0),中间表示已经运行完成的(86),右边表示该Task总实例数(86)。R/W分别代表读取Read和写入Write。
Logview定位:
定位问题的一般步骤:
找到运行时长较长的Fuxi Instance后,可以通过:
1、查看Input output判断是否有数据膨胀。
2、通过Long-Tails、Data-Skews看是否有长尾、数据倾斜。
3、根据StdOut编码定位具体的sql片段:StdOut 可以看到实例中的各算子, DAG 图中双击运行时间最长的 Fuxi Task,展示出算子图,点击 StdOut 中对应的具体算子,即可定位具体的sql片段。
Logview效果查询:
左边面板Latency看运行时长,上方Summary面板中通过resource cost查耗费,换算公式:耗费= resource cost cpu / 1440 * 1.52元。
Logview 实用小技巧补充:
1、五彩的热力图
在Fuxi Job Dag图中,看板默认展示是Progress Chart,可以点击切换成Task Time Heat Chart,根据热力图找到运行时间最长的 Task(橙色),此时,Dag 图中线的粗细也能直观表示数据输入输出量大小。
2、神奇的Latency Chart
Latency Chart 直观展示了Task中各实例运行情况,可以这样理解:蓝线的宽度代表各实例的运行时长,越宽说明运行时间越长,图中的毛刺一般就是最长实例之一。高度越高代表该实例越晚开始运行,斜率越高表示资源压力越大。可以结合Job Dag中Task方框的开始结束时间和这里的Latency Chart图综合判断是否是资源获取耗时太久。截图中的Latency Chart图斜率较高,说明对应时间段资源获取压力较大。
3、具体的算子(Operator)
Fuxi Task 在逻辑上可以被拆分成多个算子。双击主面板 DAG 图中的某个 Task,可以看到算子详情,其中橙色代表耗时占比。(如果是 C 开头命名的 Task 就没有算子信息,正如上文提到的,该类 Task 表示动态调度控制,不涉及计算)
常见的算子有:
其中,TableScan 和StreamLineWrite 一般耗时会较长。
4、信息量大的 Summary
在主面板中切换到Summary,可以看到各 Task 的执行情况。
由于Task中的Instance是并发进行的,所以Task的执行时间并不等于各实例运行时间之和,一般会比最长实例运行时间长。Task的输入输出数据量一般会等于其各实例的输入输出数据量之和。
举个小栗子实战:
看完上述 Logview 小技巧后,我们举个栗子快速定位 SQL 运行慢的原因:
看 DAG 图知道是J4_1_2_3 和R5_4 运行时间长。分别双击点开看算子详情:
J4_1_2_3 部分,发现是在 Expand 和 HashAgg 之后的StreamLineWrite4 数据写入环节耗时较长(看橙色占比):
R5_4 部分,StreamLineRead4 耗时较长:
说明可能是代码中存在的 GROUPING SETS 中维度交叉组合过多,Expand 阶段数据规模膨胀,Hash聚合count(distinct)消耗一定时间,且输出较多数据量。导致J4_1_2_3 中的写入和R5_4 的读取都耗时较长,可以考虑减少一些没必要的维度交叉组合。
三、SQL运行慢的常见原因
1、资源紧张,任务优先级比较低
点击logview看Fuxi Job Tag图(如下),如果每个方框(也就是Fuxi Task)都是白色,说明实例还没运行,这个时候可以看面板左边的Queue Length,显示计算资源排队队列个数。同时点进去Waiting Queue,可以查看当前所有作业快照,通过不同维度(项目、配额等)聚合来看资源情况:一般绿色代表资源情况较好,黄色次之,红色更次。发现资源紧张后,也可以联系相应运维同学协助解决。
Priority代表优先级,和运维中心实例DAG图中的优先级数字是9-n的关系,截图这里数字越大优先级越低。可以通过参数设置(set odps.instance.priority=0;)来提升开发环境任务的优先级。
2、参数设置不合理
ODPS系统常用的参数设置:
通过map split块大小设置来控制Map Instance个数(set odps.sql.mapper.split.size=256;),一般会结合:
(set odps.sql.mapper.merge.limit.size=64;)一起使用,后者可以使每个Map Instance读入文件更均匀。
通过(set odps.sql.joiner.instances=-1; set odps.sql.reducer.instances=-1;)分别设置Join Task Instance和Reduce Task Instance数量,当每个task运行时间较长且没有发生长尾时可以考虑增加实例数。
通过:
(set odps.sql.groupby.skewindata=true/false;)开启Group by优化,在数据倾斜情况下可以使用。
参数设置不合理也会导致任务运行缓慢。比如当Instance并行度设置过大,会使得资源等待时间较长,并行度设置过小,任务执行时间又可能过长。实际操作中可以通过二分法在开发环境中测试Instance的合适大小。
3、SQL语句不当
SQL关键语句执行顺序:
其中from、select在解析过程进行,where分区字段在解析阶段直接裁剪,非分区字段在map阶段过滤。因此在写SQL语句时,避免写select *,充分利用where限制分区,添加条件限制数据量等。当表数据量很大时,可以考虑先将where条件写进子查询内,再和其他表进行join。
4、数据倾斜
数据倾斜是SQL优化中常见的情况,是指在MapReduce模型中大量value值集中在少部分reducer中处理的情况,由于少部分reducer处理的数据量过大,从而延长了整个任务执行的时间。出现数据倾斜的原因一般有以下几种:
1、join中关联出现热点key(相关reducer耗时较长);
2、join关联字段中空值过多(处理空值的reducer耗时超过平均值);
3、group by中出现热点key,某些键值数据量过多(比如在shop_id+order_id表中计算各商户的订单量,热门商户就可能是热点key);
4、join中count distinct中特殊值过多;
可以用odps中的mapjoin hint语法、手动切分热点、设置skewjoin参数(set odps.sql.skewjoin=true;)等来对数据倾斜进行处理。
5、数据计算量大
当数据量非常大时,ODPS需要更多的计算资源来并行处理数据。如果没有适当调整资源分配(如增加实例数、使用更高级别的实例类型),任务可能会因为资源不足而运行缓慢甚至失败。此时可以看下是否能一开始就对数据进行一些裁剪。比较经典的场景是根据业务理解添加适当的where条件、提前group by去重等来限制数据量,比如流量表限制log来源等,可以有效减少数据读取量。
ads应用表中,当cube的维度很多时,各个维度的枚举值组合起来,进行count(distinct)计算时耗费会比较高。此时可以先从业务上进行优化,比如裁剪一些用不到的维度组合。如果裁剪之后还是跑的很慢,且groupby与distinct字段值都均匀时,可以考虑用group by代替count distinct。
四、SQL任务优化实战
1、大表join小表:用Mapjoin Hint处理
在odps中,可以通过显式指定Mapjoin Hint提示,添加(/*+ MAPJOIN(小表1,小表2,小表3) */)语句来提升SQL运行效率。开启Mapjoin后,odps会在map阶段将小表的数据加载在内存中,在处理大表的每一行记录时,直接使用本地的小表数据进行join操作,大大提高了join的效率。
举个栗子:
通过如下SQL计算城代超会数:
select ds ,count(ditsinct user_id) as 城代超会数 from ( SELECT /*+ MAPJOIN(c) */ b.ds ,b.user_id FROM table_b b INNER JOIN table_c c ON b.district_id = c.district_id ) group by ds ;
其中b表是一张每天分区有几亿数据的大表,c表是维表每天数据几百万,在b表和c表join的时候,通过增加/*+ MAPJOIN(c) */语句,可以显著提升运行效率。
在用了mapjoin hint后,在join之前先对小表进行了reduce处理,随后的join节省了几个小时。然而,MapJoin也有其适用条件和限制,比如小表的大小需要适中,不能太大,否则会占用过多的内存资源,影响集群性能。可以通过参数设置(set odps.sql.mapjoin.memory.max=2048;)来增加小表储存内存。
MapJoin优化适用于左连接、右连接、内连接,不可用于全连接,应用在左连接时,大表必须为左表;应用在右连接时,大表必须为右表。
2、用双重group by代替count(distinct)
当数据量大且count distinct对象key的分布比较均匀时,可以用双重group by代替count distinct。
举个栗子:
select ds,count(distinct item_id) as item_cnt from item_table group by ds;
改写后:
select ds,count(item_id) as item_cnt from (select ds,item_id from item_table group by ds,item_id ) group by ds;
在ads应用层常见的cube表中,当数据量较大且去重计算对象键值分布较均匀时,可将要去重计算的对象如user_id等放入grouping_sets中,先去重再count,以提升运行效率。
代码举例:
3、进阶版本,处理多重count distinct:
当代码中count distnct对象有多个时,可以将聚合对象分开,再将最后数据union all或者join起来,但当聚合对象过多时,join过程可能会耗时较长。此时可以考虑用TRANS_COLS函数,一行变多行进行计算。
函数的语法使用如下:
select TRANS_COLS(参数1,列1,列2,...,聚合对象1,聚合对象2) AS (idx,列1,列2,...,新聚合对象)
其中参数1表示相关的维度数。
举个栗子,假设有3个维度:
city_id,client_code,is_new_user,计算对象分别是user_id和imeisi,此时语句可以写成:
select TRANS_COLS(3,city_id,client_code,is_new_user,user_id,imeisi) AS (idx,city_id,client_code,is_new_user,visitor_id);
在该SQL语句作用下,原来的3行数据就变成了6行,其中vistior_id代表新的聚合对象。
优化实战:
某SQL中核心代码:count distinct较多且条件复杂,且有两个聚合对象:user_id、imeisi:
使用TRANS_COLS函数后的代码:
写入TRANS_COLS函数后,聚合对象由多个变成1个,此时再用2中双重group by代替count distinct的方法,SQL跑起来耗时和耗费均减少。
在该例子中,写入TRANS_COLS函数后的运行成本减少将近一半。
4、临时表较大,进行拆分
当表里有复杂临时表耗时较长时,可以根据业务特性或者需要将临时表进行拆分,将大段资源消耗拆分成多段,避免因为资源等待而耗时。
5、合理采用UDF函数
用户自定义函数(UDF,User Defined Function)可以帮助我们实现特定的数据处理能力,有一些适当的UDF函数可以显著帮助我们节省运行时长。比如函数:rb_build_or_agg,针对用户去重计算相关指标优化效果显著。
但当UDF函数涉及大量循环或者没有充分利用其并行处理能力时,可能会使SQL任务整体运行时长增加或者资源消耗增大。SQL任务中有bi_udf:bi_get_date、bi_date_format等相关函数时,可以替换成内置函数。详情参考BI_UDF更换治理优化实战相关文章。
6、Hash分桶优化
在ODPS中,当涉及大数据集的join操作时,使用Hash分桶(Bucket)技术也可以优化数据处理性能。Hash分桶可以通过将数据预处理分配到不同的“桶”中,使得具有相同或相似连接键的数据尽可能地落在同一个桶里,从而减少后续连接操作时的数据扫描范围,提升处理效率。
具体语句是:
alter table table_xxx clustered by(column_name) sorted by(column_name) into 2108 buckets;
这里buckets的数量并不是越多越好,而是要根据查询性能和数据量大小权衡选择,可以在开发环境上观测运行效果来试验最合适的分桶数。
五、结语
在大数据处理和分析场景下,数据治理不可或缺。本文阐述了业务实战中常见慢SQL场景和优化方式,欢迎大家交流,最后感谢公司和团队里各位大佬的指导!
来源 | 阿里云开发者公众号
作者 | 钟悦