浅谈Flink批处理优化器之Join优化

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 跟传统的关系型数据库类似,Flink提供了优化器“hint”(提示)以告诉优化器选择一些执行策略。目前优化提示主要针对批处理中的连接(join)。在批处理中共有三个跟连接有关的转换函数: join:默认为等值连接(Equi-join),维基百科将其归类为内连接(inner join)的一种 https://en.

跟传统的关系型数据库类似,Flink提供了优化器“hint”(提示)以告诉优化器选择一些执行策略。目前优化提示主要针对批处理中的连接(join)。在批处理中共有三个跟连接有关的转换函数:

  • join:默认为等值连接(Equi-join),维基百科将其归类为内连接(inner join)的一种 https://en.wikipedia.org/wiki/Join_(SQL)
  • outerJoin:外连接,具体细分为left-outer join、right-outer join、full-outer join;
  • cross:交叉连接,求两个数据集的笛卡尔积;

完全展开之后共有五种,这也符合ANSI-standard SQL对连接种类的划分。

下文当我们提及“join”时,主要指equi-join,而当我们想表达outer-join时,我们会直接使用“外连接”,当我们想泛指时,我们将使用“连接”这个词。

常用来实现连接的算法有:hash join、sort-merge join以及nested loop join,下面我们对这三种算法进行简单介绍。首先,当基于hash算法实现连接时,通常划分为两个阶段:

  1. build:为参与连接的两个数据集中较小的数据集准备好哈希表,哈希表中的记录包含着连接的属性以及它对应的行。因为哈希表是通过对连接属性应用一个哈希函数来访问的,因此通过它将比扫描初始的数据集更快地发现给定的连接属性对应的行;
  2. probe:一旦哈希表构建完成,会扫描更大的数据集并通过从更小的数据集匹配哈希表以发现相关的行;

而使用sort-merge算法实现连接时,通常也划分为两个阶段:

  1. sort:对两个数据集在它们的连接键属性上进行排序;
  2. merge:合并排过序的数据集;

nested loop实现连接相对更容易理解,它使用两层嵌套循环分别作用于两个参与连接的数据集。

在Flink的DataSet API中,hash和sort-merge算法都可被选择用于实现join和outerJoin,而nested loop只用于实现cross join。

通过上面的介绍,我们得知当选择hash算法来实现连接时,需要确定以哪个输入端作为build端,哪个输入端作为probe端,这是影响其执行效率的因素之一(因为通常选择数据量较小的数据集作为build端)。因此,以hash算法来实现连接时,而不同的选择显然对应着不同的运算符描述器,列举如下:

  • HashJoinBuildFirstProperties
  • HashJoinBuildSecondProperties
  • HashLeftOuterJoinBuildFirstDescriptor
  • HashLeftOuterJoinBuildSecondDescriptor
  • HashRightOuterJoinBuildFirstDescriptor
  • HashRightOuterJoinBuildSecondDescriptor
  • HashFullOuterJoinBuildFirstDescriptor
  • HashFullOuterJoinBuildSecondDescriptor

而当以sort-merge算法来实现连接时,不会区分输入端的特殊职责,也就不存在build阶段和probe阶段,因此运算符描述器只有如下四种:

  • SortMergeInnerJoinDescriptor:
  • SortMergeLeftOuterJoinDescriptor:
  • SortMergeRightOuterJoinDescriptor:
  • SortMergeFullOuterJoinDescriptor:

以上这么多运算符描述器,主要是为它们设置不同的执行策略(DriverStrategy),不同的执行策略直接导致了不同的执行成本。

为了理清算法跟参与连接的输入端的关系,Flink将它们区分成两种不同策略的:本地策略以及传输(ship)策略。其中传输策略表示如何移动两个输入端中的数据使得它们具备连接的条件;本地策略则指两个已在本地的输入端数据集所执行的连接算法。

我们来解释一下这两种策略,假设有两个待连接的数据集(R和S)。传输策略有如下两种:

  • Broadcast-Forward strategy (BF):该策略会将一个完整的数据集,比如R,广播到数据集S的每一个分区上,而数据集S的所有数据则一直处于本地,无需网络传输;
  • Repartition-Repartition strategy (RR):以相同的分区函数以及用于连接的键属性分区两个数据集R、S;

正如上面已经提及的,本地策略也即连接的实现算法也有两种:

  • Sort-Merge-Join strategy (SM):首先对两个输入端的数据集在它们的连接键属性上进行排序(排序阶段),然后合并排过序的数据集(合并阶段);
  • Hybrid-Hash-Join strategy (HH):分为构建阶段和探索阶段;

在不指定“Hint”的情况下,Flink在进行批处理优化时会根据成本自动选择传输策略以及本地策略。优化器的一个关键特征是它会根据已经存在的数据属性来进行推理。就连接运算而言,如果某一个输入端的数据量远小于另一输入端,Flink会倾向于选择BF传输策略,将较小的输入端广播给较大的输入端的每一个分区,并在本地策略中选择HH且以较小的输入端作为HH的构建端;如果优化器得知某个(或两个)输入端已排好序,那么生成的候选计划将不再重分区该输入端,此时它更倾向于选择RR传输策略以及SM本地策略。

除了优化器的自动选择,当用户对数据集非常了解的情况下,Flink定义了JoinHint允许用户为join(inner join)指定连接策略给予优化器提示。JoinHint提供了人为选择连接策略的灵活性,其使用方式有两种,一种是直接指定两个输入端的大小:

DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
    result1 = input1.joinWithTiny(input2)    //提示优化器第二个数据集比第一个数据集小得多
        .where(0)
        .equalTo(0);

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
    result2 = input1.joinWithHuge(input2)    //提示优化器第二个数据集比第一个数据集大得多
        .where(0)
        .equalTo(0);

另一种是直接指定连接策略:

DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]

DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
            .where("id").equalTo("key");

当前有如下的这些策略可供选择:

  • OPTIMIZER_CHOOSES:将选择权交予Flink优化器,相当于没有给提示;
  • BROADCAST_HASH_FIRST:广播第一个输入端,同时基于它构建一个哈希表,而第二个输入端作为探索端,选择这种策略的场景是第一个输入端规模很小;
  • BROADCAST_HASH_SECOND:广播第二个输入端并基于它构建哈希表,第一个输入端作为探索端,选择这种策略的场景是第二个输入端的规模很小;
  • REPARTITION_HASH_FIRST:该策略会导致两个输入端都会被重分区,但会基于第一个输入端构建哈希表。该策略适用于第一个输入端数据量小于第二个输入端的数据量,但这两个输入端的规模仍然很大,优化器也是当没有办法估算大小,没有已存在的分区以及排序顺序可被使用时系统默认采用的策略;
  • REPARTITION_HASH_SECOND:该策略会导致两个输入端都会被重分区,但会基于第二个输入端构建哈希表。该策略适用于两个输入端的规模都很大,但第二个输入端的数据量小于第一个输入端的情况;
  • REPARTITION_SORT_MERGE:输入端被以流的形式进行连接并合并成排过序的输入。该策略适用于一个或两个输入端都已排过序的情况;

对应到优化器中,JoinHint被用来指定创建何种运算符描述器,由于JoinHint只适应于join,所以它只对应如下这些运算符描述器:

  • HashJoinBuildFirstProperties
  • HashJoinBuildSecondProperties
  • SortMergeInnerJoinDescriptor

因此,如果用户给出了JoinHint,则数据属性(其实这里主要是DriverStrategy)会通过以上三种运算符描述器来提供:

joinHint = joinHint == null ? JoinHint.OPTIMIZER_CHOOSES : joinHint;

switch (joinHint) {
    case BROADCAST_HASH_FIRST:
        list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
        break;
    case BROADCAST_HASH_SECOND:
        list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
        break;
    case REPARTITION_HASH_FIRST:
        list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
        break;
    case REPARTITION_HASH_SECOND:
        list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
        break;
    case REPARTITION_SORT_MERGE:
        list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2, false, false, true));
        break;
    case OPTIMIZER_CHOOSES:
        list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2));
        list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
        list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
        break;
    default:
        throw new CompilerException("Unrecognized join hint: " + joinHint);
}

由代码段可见,当将选择权交给优化器时,它会将三种运算符描述器都作为数据属性,供后续生成候选计划时再剔除。

除了针对join的提示外,Flink还提供了针对求交叉连接的提示CrossHint,该提示主要是针对输入端的数据量大小。使用示例如下:

DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]

DataSet<Tuple4<Integer, String, Integer, String>>
    udfResult = input1.crossWithTiny(input2)        //提示第二个数据集非常小
    // apply any Cross function (or projection)
    .with(new MyCrosser());

DataSet<Tuple3<Integer, Integer, String>>
    projectResult = input1.crossWithHuge(input2)    //提示第二个数据集非常大
    // apply a projection (or any Cross function)
    .projectFirst(0,1).projectSecond(1);

不同于Join提示,Cross提示被表述为不同的API。从代码层面上来看,CrossHint有三个枚举值:

  • OPTIMIZER_CHOOSES:将选择权交给Flink优化器;
  • FIRST_IS_SMALL:第一个输入端小于第二个输入端;
  • SECOND_IS_SMALL:第二个输入端数据量小于第一个输入端;

在创建相关运算符描述器CrossHint被用来指定特定的构造参数,比如是允许第一个输入端广播还是第二个输入端广播。交叉连接的实现算法为nested-loop,关于运算符描述器,考虑到以哪个数据集作为内、外层循环以及以阻塞模型还是流模型来处理这两个因素,有四种实现:

  • CrossBlockOuterFirstDescriptor:以第二个输入端作为内循环,第一个输入端作为外循环且以阻塞形式处理;
  • CrossBlockOuterSecondDescriptor:以第一个输入端作为内循环,第二个输入端作为外循环且以阻塞形式处理;
  • CrossStreamOuterFirstDescriptor:以第二个输入端作为内循环,第一个输入端作为外循环且以流模型处理;
  • CrossStreamOuterSecondDescriptor:以第一个输入端作为内循环,第二个输入端作为外循环且以流模型处理;

且需要注意的是,不同的处理模型,哪个输入端作为内外循环是相反的:

else if (hint == CrossHint.SECOND_IS_SMALL) {
    ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
    list.add(new CrossBlockOuterSecondDescriptor(false, true));
    list.add(new CrossStreamOuterFirstDescriptor(false, true));
    this.dataProperties = list;
}
else if (hint == CrossHint.FIRST_IS_SMALL) {
    ArrayList<OperatorDescriptorDual> list = new ArrayList<OperatorDescriptorDual>();
    list.add(new CrossBlockOuterFirstDescriptor(true, false));
    list.add(new CrossStreamOuterSecondDescriptor(true, false));
    this.dataProperties = list;
}

但广播哪个输入端是一致的。


原文发布时间为:2017-04-24

本文作者:vinoYang

本文来自云栖社区合作伙伴CSDN博客,了解相关信息可以关注CSDN博客。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
26天前
|
Java 流计算
利用java8 的 CompletableFuture 优化 Flink 程序
本文探讨了Flink使用avatorscript脚本语言时遇到的性能瓶颈,并通过CompletableFuture优化代码,显著提升了Flink的QPS。文中详细介绍了avatorscript的使用方法,包括自定义函数、从Map中取值、使用Java工具类及AviatorScript函数等,帮助读者更好地理解和应用avatorscript。
利用java8 的 CompletableFuture 优化 Flink 程序
|
21天前
|
存储 缓存 监控
Flink如何优化?需要注意哪些方面?
【10月更文挑战第10天】Flink如何优化?需要注意哪些方面?
44 6
|
3月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之使用StarRocks作为Lookup Join的表是否合适
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
3月前
|
存储 监控 Oracle
实时计算 Flink版产品使用问题之如何解决双流Join导致的状态膨胀和资源压力问题
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
Java 关系型数据库 MySQL
实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
3月前
|
SQL 资源调度 流计算
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
慢sql治理问题之在 Flink 中, userjar 分发问题如何优化
|
3月前
|
监控 搜索推荐 数据挖掘
Flink流处理与批处理大揭秘:实时与离线,一文让你彻底解锁!
【8月更文挑战第24天】Apache Flink 是一款开源框架,擅长流处理与批处理。流处理专攻实时数据流,支持无限数据流及事件驱动应用,实现数据的连续输入与实时处理。批处理则聚焦于静态数据集,进行一次性处理。两者差异体现在处理方式与应用场景:流处理适合实时性要求高的场景(例如实时监控),而批处理更适用于离线数据分析任务(如数据挖掘)。通过提供的示例代码,读者可以直观理解两种模式的不同之处及其实际应用。
163 0
|
3月前
|
SQL Oracle 数据处理
实时计算 Flink版产品使用问题之如何优化数据读取速度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
4月前
|
SQL 算法 数据库
SQL优化器原理 - Join重排
保证等价性:不同的Join顺序可能产生相同的结果集,但执行成本可能不同。因此,在重排Join顺序时,必须确保结果集的等价性。