开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 :待待深度探索 Flink SQL(二)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/632/detail/10041
待待深度探索 Flink SQL(二)
三. Blink Planner
1.blink planner 过程
Table API&SQL 解析验证:
在 Flink 1.9 中,Table API 进行了大量的重构,引入了一套新的 Operation,这套 Operation 主要是用来描述任务的 Logic Tree。
当 SQL 传输进来后,首先会去做 SQL 解析,SQL 解析完成之后,会得到 SqlNode Tree(抽象语法树),然后会紧接着去做 Validate(验证),验证时会去访问 FunctionManger 和 CatalogManger。
FunctionManger 主要是查询用户定义的 UDF,以及检查 UDF 是否合法。
CatalogManger 主要是检查这个 Table 或者 Database 是否存在,如果验证都通过,就会生成一个 Operation DAG(有向无环图)。
从这一步可以看出,Table API 和 SQL 在 Flink 中最终都会转化为统一的结构,即 Operation DAG。
生成RelNode:
Operation DAG 会被转化为 RelNode(关系表达式) DAG。
优化:
优化器会对 RelNode 做各种优化,优化器的输入是各种优化的规则,以及各种统计信息。
当前,在 Blink Planner 里面,绝大部分的优化规则,Stream 和 Batch 是共享的。
差异在于,对 Batch 而言,它没有 state 的概念,而对于 Stream 而言,它是不支持 sort 的,所以目前 Blink Planner 中,还是运行了两套独立的规则集(Rule Set),然后定义了两套独立的 Physical Rel:BatchPhysical Rel 和 StreamPhysical Rel。
优化器优化的结果,就是具体的 Physical Rel DAG。
转化:
得到 Physical Rel Dag 后,继续会转化为 ExecNode,通过名字可以看出,ExecNode 已经属于执行层的概念了,但是这个执行层是 Blink 的执行层,在 ExecNode 中,会进行大量的 CodeGen 的操作,还有非 Code 的 Operator 操作。
最后,将 ExecNode 转化为 Transformation DAG。
生成可执行 Job Graph:
得到 Transformation DAG 后,最终会被转化成 Job Graph,完成 SQL 或者 Table API 的解析。
通过这个图,可以看出SQL/Table API如何在flink中流转,最终变为可执行的Graph。从上图可以很清楚的看到,解析的过程涉及到了三层:Table API/SQL,Blink Planner,Runtime。
2. Blink Planner 改进及优化
Blink Planner 功能方面改进主要包含如下几个方面:
l 更完整的 SQL 语法支持:例如,IN,EXISTS,NOT EXISTS,subquery,完整的 Over 语句,Group Sets 等。而且已经跑通了所有的 TPCH,TPCDS 这两个测试集,性能还非常不错。
l 提供了更丰富,高效的算子。
l 提供了非常完善的 cost 模型,同时能够对接 Catalog 中的统计信息,使 cost 根据统计信息得到更优的执行plan。
l 支持 join reorder。
l shuffle service:对 Batch 而言,Blink Planner 还支持 shuffle service,这对 Batch 作业的稳定性有非常大的帮助。
如果遇到 Batch 作业失败,通过 shuffle service 能够很快的进行恢复。
性能方面,主要包括以下部分:
l 分段优化。
l Sub-Plan Reuse。
l 更丰富的优化 Rule:共一百多个 Rule ,并且绝大多数 Rule 是 Stream 和 Batch 共享的。
l 更高效的数据结构 BinaryRow:能够节省序列化和反序列化的操作。
l mini-batch 支持(仅 Stream):节省 state 的访问的操作。
l 节省多余的 Shuffle 和 Sort(Batch 模式):两个算子之间,如果已经按 A 做 Shuffle,紧接着他下的下游也是需要按 A Shuffle 的数据,那中间的这一层 Shuffle,就可以省略,这样就可以省很多网络的开销,Sort 的情况也是类似。
Sort 和 Shuffle 如果在整个计算里面是占大头,对整个性能是有很大的提升的。
l 深入性能优化及实践
示例 5
create view MyView as select word, count(1) as freq from SourceTable group by word;
insert into SinkTable1 select * from MyView where freq >10;
insert into SinkTable2 select count(word) as freq2, freq from MyView group by freq;
首先创建一个 view,view 执行的是 word count 的例子,从 source 表读数据,然后 group by 每一个 word,看这个word的词频,把刚才的 view 做一个 filter,结果写到sink表1。
对每个词频做排序,相同词频下,word 出现的次数写到 sink 表2。
上面的这几个 SQL,转化为 RelNode DAG,大致图形如下:
解析:
最下层的 Scan 是扫描 source 表,然后 group by一个word 做 count1。
左边的一个分支对应的是 sink 1。先是做了一个 filter,然后结果写到 sink 1里面。
右面的分支是得出的结果按 freq 求 group by,然后求出 word count 的结果,写到sink 2。
但是在 flink planner 里面(上图),每一个 sink 做优化的时候都是单独进行的,即它是执行一个 query 的时候,flink 就要做一次优化。
在执行 sink 2 的时候,也做一个优化。
可以看到,old planner 只是简单的从 Sink 出发,反向的遍历到 Source,从而形成两个独立的执行链路,从上图也可以清楚的看到,Scan 和第一层 Aggregate 是有重复计算的。
在 Blink Planner 中,经过优化层之后,会生成如下执行层的 DAG:
Blink Planner 不是在每次调用 insert into 的时候就开始优化,而是先将所有的 insert into 操作缓存起来,等到执行前才进行优化,这样就可以看到完整的执行图,可以知道哪些部分是重复计算的。
Blink Planner 通过寻找可以优化的最大公共子图,找到这些重复计算的部分。
经过优化后,Blink Planner 会将最大公共子图的部分当做一个临时表,供其他部分直接使用。
这样,上面的图可以分为三部分:
① 最大公共子图部分(临时表)
② 临时表与 Filter 和 SinkTable1 优化
③ 临时表与第二个 Aggregate 和 SinkTable 2 优化
Blink Planner 其实是通过声明的 View 找到最大公共子图的,因此在开发过程中,如果需要复用某段逻辑,就将其定义为 View,这样就可以充分利用 Blink Planner 的分段优化功能,减少重复计算。
当然,当前的优化也不是最完美的,因为提前对图进行了切割,可能会导致一些优化丢失,今后会持续地对这部分算法进行改进。
总结一下,Blink Planner 的分段优化,其实解的是多 Sink 优化问题(DAG 优化),单 Sink 不是分段优化关心的问题,单 Sink 可以在所有节点上优化,不需要分段。
l
Sub-Plan Reuse
insert into SinkTabl
e
select freq from (select word, count(1) as freq from SourceTable group by word) t where word like 'T%'
union all
select count(word) as freq2 from (select word, count(1) as freq from SourceTable group by word) t group by freq;
这个示例的 SQL 和分段优化的 SQL 其实是类似的,不同的是,没有将结果 Sink 到两个 Table 里面,而是将结果 Union 起来,Sink 到一个结果表里面。
下面看一下转化为 RelNode 的 DAG 图:
从上图可以看出,Scan 和第一层的 Aggregate 也是有重复计算的,Blink Planner 其实也会将其找出来,变成下面的图:
Sub-Plan 优化的启用,有两个相关的配置:
·
table.optimizer.reuse-sub-plan-enabled
(默认开启)
·
table.optimizer.reuse-source-enabled
(默认开启)
这两个配置,默认都是开启的,用户可以根据自己的需求进行关闭。
这里主要说明一下 table.optimizer.reuse-source-enabled 这个参数。
在 Batch 模式下,join 操作可能会导致死锁,具体场景是在执行 hash-join 或者 nested-loop-join 时一定是先读 build 端,然后再读 probe 端,如果启用 reuse-source-enabled,当数据源是同一个 Source 的时候,Source 的数据会同时发送给 build 和 probe 端。
这时候,build 端的数据将不会被消费,导致 join 操作无法完成,整个 join 就被卡住了。
为了解决死锁问题,Blink Planner 会先将 probe 端的数据落盘,这样 build 端读数据的操作才会正常,等 build 端的数据全部读完之后,再从磁盘中拉取 probe 端的数据,从而解决死锁问题。
但是,落盘会有额外的开销,会多一次写的操作;有时候,读两次 Source 的开销,可能比一次写的操作更快,这时候,可以关闭 reuse-source,性能会更好。
当然,如果读两次 Source 的开销,远大于一次落盘的开销,可以保持 reuse-source 开启。
需要说明的是,Stream 模式是不存在死锁问题的,因为 Stream 模式 join 不会有选边的问题。
总结而言,sub-plan reuse 解的问题是优化结果的子图复用问题,它和分段优化类似,但他们是一个互补的过程。
■ Agg 分类优化
Blink 中的 Aggregate 有以下四类:
· group agg
例如:select count(a) from t group by b
为了同其他agg进行区分,query是最常见的group by语句。
· over agg
例如:select count(a) over (partition by b order by c) from t
有 over 的关键字。
· window agg
例如:select count(a) from t group by tumble(ts, interval '10' second), b
有常见的 window 属性。
· table agg
· 例如
tEnv.scan('t').groupBy('a').flatAggregate(flatAggFunc('b' as ('c', 'd')))
只能在 table API 里面引入,允许用户自定义 agg 行为。不像 group agg 多条进,一条出。它可以允许用户多条进多条出。
l
Local/Global Agg
主要是为了减少网络 Shuffle 数据。
对于 agg function 可以做 merge 这样的 agg 可以用 local globle 来解。
(
s
um(a)->local
sum
(a)+globle sum (local result)
Globle sum 是输入式 local sum 的结果。
Distinct Agg 进行优化,主要是对 SQL 语句进行改写,达到优化的目的。
但 Batch 模式和 Stream 模式解决的问题是不同的:
· Stream 模式下,主要是解决热点问题,因为 Stream 需要将所有的输入数据放在 State 里面,如果数据有热点,State 操作会很频繁,这将影响性能。
· Batch 模式下的 distinct Agg,需要先做 distinct,再做 agg,逻辑上需要两步才能实现,直接实现 Distinct Agg 开销太大。
流里面为什么不需要分开?因为流里面有 state 。在访问 state 的时候天然的可以通过 key 去做一层 distinct。在流里面到 distinct agg 里面来做,但是 batch 里面是不能做到的。
要运用 Local/Global 的优化,必要条件如下:
① Aggregate 的所有 Agg Function 都是 mergeable 的,每个 Aggregate 需要实现 merge 方法,例如 SUM,COUNT,AVG,这些都是可以分多阶段完成,最终将结果合并;但是求中位数,计算 95% 这种类似的问题,无法拆分为多阶段,因此,无法运用 Local/Global 的优化。
② table.optimizer.agg-phase-strategy 设置为 AUTO 或者 TWO_PHASE。AUTO表示不强制指定一阶段或二阶段,而是让框架来决定。
③ Stream 模式下,mini-batch 开启 ;Batch 模式下 AUTO 会根据 cost 模型加上统计数据,选择是否进行 Local/Global 优化。绝大多数情况下会选用local globle优化。
看下面一个范例
Query:
select count(*) from t group by color
没有优化的情况下,下面的这个 Aggregate 会产生 10 次的 Shuffle 操作。
使用 Local/Global 优化后,加了一层,这时候 local 算子会和前面的算子在本地先进行预聚合,然后再进行 Shuffle 操作。
整个 Shuffle 的数据剩下 6 条。在 Stream 模式下,Blink 其实会以 mini-batch 的维度对结果进行预聚合,然后将结果发送给 Global Agg 进行汇总。
为什么在流里面开启mini-batch,是因为local agg的聚合是以mini-batch为单位去做的,会去对mini-batch中的所有数据做一次提前的聚合,然后把聚合的结果发送给最终的globle agg。
l distinct agg
① Batch下,强制改写:
第一层求 distinct 值和非 distinct agg function 的值,第二层 层求 distinct agg function 的值。
select color, count(distinct id), count(*) from t group by color
手工改写成:
select color, count(id), min(cnt) from ( select color, id, count(*) filter (where $e=2) as cnt from (
select color, id, 1 as $e from t --for distinct id union all
select color, null as id, 2 as $e from t -- for count(*)
) group by color, id, $e ) group by color
基本思路是:count distinct 和普通的 count 是没法同时计算的。先做 distinct。
这里改写的思路是一份数据分成两部分。一部分是给 distinct 用,另外一部分给count* 用。
用红色标出来的:第一部分:Select color id 加 as $e。是为了给 count distinct 用的还是给普通的 count*用的,$e1 是给 distinct id 用的。
第二部分:$e2 表示给 count* 用的。因为在 count* 这边是 group by color 。所以说对第二位的 id 统一把它置为一个同样的值。这时候才能保证结果的正确性。
再来看 union 后的结果,把它改写成一个普通的 query,求 count*,但是 count*是有条件的,只允许统计下面这一条数据做 count*。下面一层对这个结果选择直接输出的 agg 例如 min 或者 max 都可以。
对于 count distinct,因为已经在上层做过了,所以在这一层只需要做 count id 即可。
红字部分用 expand 算子表示。
接下来看整条数据的图:
② Stream 下,必要条件:
必须是支持的agg
function:avg/count/min/max/su um/first_value/last_value/ concat_agg/single_value
table.optimizer.distinct-agg.split.enabled开启(默认关闭)
select
color,count(distinct
id),count(*)
from
t
group
by
color
手工改写为:
select color, sum(dcnt), sum(cnt) from ( select color, count(distinct id) as dcnt, count(*) as cnt from t group by color, mod(hash_code(id), 1024) ) group by color
基本思路是:将 count distinct 的 id 做一个 Shuffle,这样能够把原始的 Shuffle 的数据做一个打散。
改写前:
上层数据是输入的数据。白色部分比较多,白色部分为热点。所有白色的热点有可能落到同一个agg里面,会导致这一agg会出现热点。
改写后,逻辑图就会变为下面这样,热点数据被打散到多个中间节点上:
需要注意的是,示例 5 的 SQL 中 mod(hash_code(id),1024)中的这个 1024 为打散的维度,这个值建议保持原样或者设置大一些,设置太小产生的效果可能不好。
四.Q&A
配置文件哪里指定 blink planner 或 flink planner?
答:目前还没发通过配置文件指定。应该通过代码显示指定或者在environment city 里面不指定,而是提供的 flink jar 或者 blink 的 jar 指定。