Flink / Scala - DataSet Transformations 常用转换函数详解

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: ​上一篇文章讲到了 Flink 如何获取数据生成 DataSet,这篇文章主要讨论 DataSet 后续支持的 Transform 转换函数。相较于 Spark,Flink 提供了更多的 API 和更灵活的写法与实现。

一.引言

上一篇文章讲到了 Flink 如何获取数据生成 DataSet,这篇文章主要讨论 DataSet 后续支持的 Transform 转换函数。相较于 Spark,Flink 提供了更多的 API 和更灵活的写法与实现。

image.gif编辑

Tips :

下述示例均以该 env 为基础实现

import org.apache.flink.api.scala.ExecutionEnvironment
  def main(args: Array[String]): Unit = {
    //获取flink执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //导入隐式转换
    import org.apache.flink.api.scala._
  }

image.gif

二.常见 Transformation

1.Map

Map 转换在数据集的每个元素上应用用户定义的 Map 函数。它实现了一对一的映射,即函数必须恰好返回一个元素。下述方法将原始字符都增加一个 '_test' 后缀并输出 。

val data: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
    data.map(x => x + "_test").print()

image.gif

2.FlatMap

FlatMap 转换在数据集的每个元素上应用用户定义的映射函数。这个map函数的变体可以为每个输入元素返回任意多个结果元素(包括不返回)。下述方法会获得 1,2,2,3 四个元素组成的 DataSet。

val textLines: DataSet[String] = env.fromElements("1-2", "2-3")
    textLines.flatMap(x => x.split("-")).print()

image.gif

3.Filter

Filter转换对数据集的每个元素应用一个用户定义的过滤器函数,并且只保留那些函数返回true的元素。

val textLines: DataSet[String] = env.fromElements("1-2", "2-3")
    textLines.filter(_.startsWith("1")).print()

image.gif

4.MapPartition

MapPartition 在一个函数调用中转换一个并行分区。map-partition 函数以 Iterable 形式获取分区,并可以生成任意数量的结果值。每个分区中的元素数量取决于并行度和之前的操作。和 Spark 类似,也可以为每个 partition 启动一个 Client 连接从而达到减少连接数增加可复用性。

val data: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
    data.mapPartition(partition => {
      // Do SomeThing
      partition.map(x => {
        x
      })
    }).print()

image.gif

三.Transformations on Grouped DataSet

reduce操作可以对分组的数据集进行操作。指定用于分组的键可以通过多种方式实现:

key expressions - 关键表达式

a key-selector function - 一个密钥选择器函数

one or more field position keys (Tuple DataSet only) - 一个或多个字段位置键(仅限元组数据集)

Case Class fields (Case Classes only) - 案例类字段(仅案例类)

1.Reduce

关键表达式指定数据集的每个元素的一个或多个字段。每个键表达式要么是公共字段的名称,要么是getter方法的名称。'.' 可以用于向下获取对象。关键字表达式“*”选择所有字段。下面的代码展示了使用 reduce 实现基本的 wordCount。

val wordCountWords = env.fromElements(("B", 1), ("A", 1), ("C", 1), ("A", 1), ("C", 2))
    wordCountWords.groupBy(_._1).reduce { (w1, w2) => {
      (w1._1, w1._2 + w2._2)
    }
    }.print()

image.gif

这里 groupBy 可以使用 _._1 代表使用第一列进行分组,也可以通过数字,例如使用 groupBy(0) 代表使用第一列分组,如果内部采用 case Class 例如 Word(name: String, count: Int),也可以使用 groupBy("name") 方法进行 groupBy,这里使用方式十分灵活。除此之外,groupBy 支持同时根据多个字段进行分组,下述方法同时使用 name + age 两个字段进行分组。

case class Student(name: String, age: Int, var score: Double)
    val wordCountWordsOnClass = env.fromElements(Student("A", 10, 50), Student("B", 11, 50), Student("C", 12, 50), Student("A", 10, 60))
    wordCountWordsOnClass.groupBy("name", "age").reduce { (st1, st2) => {
      st1.score = st1.score + st2.score
      st1
    }
    }.print()

image.gif

2.ReduceGroup

应用于分组数据集的groureduce转换为每个组调用用户定义的group-reduce函数。这个函数与Reduce的区别在于,用户定义的函数可以一次获得整个组。该函数在组的所有元素上使用Iterable调用,可以返回任意数量的结果元素。

val wordCountWords = env.fromElements(("B", 1), ("A", 1), ("C", 1), ("A", 1), ("C", 2))
    wordCountWords.groupBy(0).reduceGroup {
      in => {
        in.toSet
      }
    }.print()

image.gif

reduceGroup 和 reduce 都将相同 key 的元素进行聚合,差别在于 reduce 是一个一个元素聚合,即上面的 reduce{ (st1, st2) },经过多轮 1对1 的聚合得到最终一个元素,reduceGroup 一次直接获得该 key 下的全部元素组成的 iterator,操作更加灵活,但是存在内存的隐患,例如该 key 下的大量数据,这里直接调用 iterator.toArray 就会内存溢出。

3.SortGroup

group-reduce函数使用Iterable访问组中的元素。可选地,Iterable可以按照指定的顺序分发组中的元素。在很多情况下,这有助于降低用户自定义group-reduce函数的复杂度,提高其效率。例如选取 TopK,或者取最后的 TokK 元素,对 group 函数进行预排序就很方便,下面例子为每一个分数找到名字字母排序最靠前的同学。

val wordCountWords = env.fromElements(("B", 1), ("A", 1), ("C", 1), ("A", 1), ("C", 2))
    wordCountWords.groupBy(1).sortGroup(0, Order.ASCENDING).reduceGroup {
      in =>
        var prev: (String, Int) = null
        for (t <- in) {
          if (prev == null || prev != t)
            prev = t
        }
        prev
    }.print()

image.gif

4.CombineGroup

与reduce函数相反,group-reduce函数不是隐式组合的。为了使group-reduce函数可组合,它必须实现GroupCombineFunction接口。下述方法继承 GroupCombineFunction,复写 combine 方法实现了和上面类似的 reduce 操作,将相同 key 的元素最终聚合为1个。

val wordCountWords = env.fromElements(("B", 1), ("A", 1), ("C", 1), ("A", 1), ("C", 2))
    class MyCombinableGroup
      extends GroupCombineFunction[(String, Int), (String, Int)] {
      override def combine(
                            in: java.lang.Iterable[(String, Int)],
                            out: Collector[(String, Int)]): Unit = {
        val r: (String, Int) =
          in.iterator.asScala.reduce((a, b) => (a._1, a._2 + b._2))
        // emit tuple with key and sum
        out.collect(r)
      }
    }
    wordCountWords.groupBy(0).combineGroup(new MyCombinableGroup).print()

image.gif

GroupCombine 变换是组合函数 GroupReduceFunction中组合步骤的广义形式。上述示例同样可以继承 GroupReduceFunction 类实现。下面方法同时继承 GroupReduceFunction 和 GroupCombineFunction 并实现 reduce 和 combine 方法,二者的结果是一致的。

val wordCountWords = env.fromElements(("B", 1), ("A", 1), ("C", 1), ("A", 1), ("C", 2))
    class MyCombinableGroupReducer
      extends GroupReduceFunction[(String, Int), String]
        with GroupCombineFunction[(String, Int), (String, Int)] {
      override def reduce(
                           in: java.lang.Iterable[(String, Int)],
                           out: Collector[String]): Unit = {
        val r: (String, Int) =
          in.iterator.asScala.reduce((a, b) => (a._1, a._2 + b._2))
        // concat key and sum and emit
        out.collect(r._1 + "-" + r._2)
      }
      override def combine(
                            in: java.lang.Iterable[(String, Int)],
                            out: Collector[(String, Int)]): Unit = {
        val r: (String, Int) =
          in.iterator.asScala.reduce((a, b) => (a._1, a._2 + b._2))
        // emit tuple with key and sum
        out.collect(r)
      }
    }
    wordCountWords.groupBy(1).reduceGroup(new MyCombinableGroupReducer).print()
    wordCountWords.groupBy(1).combineGroup(new MyCombinableGroupReducer).print()

image.gif

5.Group Summary

除去 sortGroup 外,上面介绍了 Reduce,GroupReduce,CombineReduce ,三者都实现了聚合处理,看上去功能是一致的,但其实三者都有不同的侧重点。

Reduce : 一个一个组合数据,灵活性强但整体性差,资源要求低

CombineGroup :先将一部分数据聚合,再将各部分数据统一聚合得到最终结果,类似分治

ReduceGroup:获取全部数据,整体性强但是可能有资源问题

CombineGroup 其实就是 Reduce 和 ReduceGroup 的折中,因此使用 CombineGroup 对数据进行聚合统计时,其结果可能并非是最终的真实结果,需要再加一个 ReduceGroup 才能得到最终结果。

val input: DataSet[String] = [..] // The words received as input
val combinedWords: DataSet[(String, Int)] = input
  .groupBy(0)
  .combineGroup {
    (words, out: Collector[(String, Int)]) =>
        var key: String = null
        var count = 0
        for (word <- words) {
            key = word
            count += 1
        }
        out.collect((key, count))
}
val output: DataSet[(String, Int)] = combinedWords
  .groupBy(0)
  .reduceGroup {
    (words, out: Collector[(String, Int)]) =>
        var key: String = null
        var sum = 0
        for ((word, sum) <- words) {
            key = word
            sum += count
        }
        out.collect((key, sum))
}

image.gif

四.Aggregate

1.Sum / Min / Max

上述三个函数为内置的聚合函数,注意 Aggregate 转换只能应用于元组数据集,并且仅支持用于分组的字段位置键,内置函数需要 import 引入。

import org.apache.flink.api.java.aggregation.Aggregations.{MIN, SUM}
    // 按字段1聚合,对聚合内容的字段0求和并且对字段二求最小值
    val input: DataSet[(Int, String, Double)] = env.fromElements((1, "A", 2D), (2, "B", 1D), (0, "C", 5D), (0, "C", 3D))
    input.groupBy(1).aggregate(SUM, 0).and(MIN, 2).print()

image.gif

2.MinBy / MaxBy

MinBy (MaxBy)转换为每组元组选择一个元组。选中的元组是一个或多个指定字段值为最小(最大值)的元组。用于比较的字段必须是有效的关键字段,即可比性。如果多个元组具有最小(最大)字段值,则返回这些元组中的任意一个元组。下述示例根据字段 1 分组,并根据字段 0,2 排序,取最小的元祖。

val input: DataSet[(Int, String, Double)] = env.fromElements((1, "A", 2D), (2, "B", 1D), (0, "C", 5D), (0, "C", 3D))
    val output: DataSet[(Int, String, Double)] = input
      .groupBy(1)  // group DataSet on second field
      .minBy(0, 2) // select tuple with minimum values for first and third field.
    output.print()

image.gif

3.Distinct

Distinct转换计算源数据集的不同元素的数据集。下面的代码从数据集中删除所有重复的元素。

A.使用 Tuple 去重

val input: DataSet[(Int, String, Double)] = env.fromElements((1, "A", 2D), (2, "B", 1D), (0, "C", 5D), (0, "C", 3D))
    // distinct 先遇到的保留,后遇到的舍弃
    input.distinct().print()
    // 多字段去重
    input.distinct(0,1).print()
    // 指定条件去重
    input.distinct {x => Math.abs(x._1) > 0}.print()

image.gif

B.使用 CaseClass 去重

case class Student(name: String, age: Int, var score: Double)
    val wordCountWordsOnClass = env.fromElements(Student("A", 10, 50), Student("B", 11, 50), Student("C", 12, 50), Student("A", 10, 60))
    // 基础去重
    wordCountWordsOnClass.distinct().print()
    // 多字段去重
    wordCountWordsOnClass.distinct("name", "age").print()
    // 全字段去重
    wordCountWordsOnClass.distinct("_").print()

image.gif

五.Join

Join转换将两个数据集连接成一个数据集。两个数据集的元素被连接在一个或多个键上,这些键可以被指定 :

a key expression - 一个关键表达式

a key-selector function - 一个密钥选择器函数

one or more field position keys (Tuple DataSet only) - 一个或多个字段位置键(仅限元组数据集)。

Case Class Fields - Case类字段

1.Default Join

默认的Join转换生成一个新的具有两个字段的元组数据集。每个元组在第一个元组字段中保存第一个输入数据集的一个联接元素,在第二个字段中保存第二个输入数据集的一个匹配元素。

val joinData1: DataSet[(Int, String)] = env.fromElements((1, "A"), (2, "B"), (0, "C"), (0, "D"))
    val joinData2: DataSet[(Int, String)] = env.fromElements((1, "A"), (2, "B"), (0, "C"), (0, "E"))
    joinData1.join(joinData2).where(_._1).equalTo(_._1).print()

image.gif

image.gif编辑

2.Join With Function

Join转换还可以调用用户定义的连接函数来处理连接元组。连接函数接收第一个输入数据集的一个元素和第二个输入数据集的一个元素,并恰好返回一个元素。

case class Student(name: String, age: Int, var score: Double)
    val wordCountWordsOnClass = env.fromElements(Student("A", 10, 50), Student("B", 11, 50), Student("C", 12, 50), Student("A", 10, 60))    
    val weights: DataSet[(String, Double)] = env.fromElements(("A", 20), ("B", 30))
    wordCountWordsOnClass.join(weights).where("name").equalTo(0) {
      (words, weight) => (words.name, words.score * weight._2)
    }.print()

image.gif

3.Join with Flat-Join Function

与Map和FlatMap类似,FlatJoin的行为方式与Join相同,但它可以返回(收集)、零个、一个或多个元素,而不是返回一个元素。通过 Collector 和 if 控制输出的条件与个数。

case class Student(name: String, age: Int, var score: Double)
    val wordCountWordsOnClass = env.fromElements(Student("A", 10, 50), Student("B", 11, 50), Student("C", 12, 50), Student("A", 10, 60))    
    val weights: DataSet[(String, Double)] = env.fromElements(("A", 20), ("B", 30))
    wordCountWordsOnClass.join(weights).where("name").equalTo(0) {
      (words, weight, out: Collector[String]) =>
        if (words.score > 50) out.collect(words.name + "\t" + words.score * weight._2)
    }.print()

image.gif

4.Join with DataSet Size Hint

为了指导 Flink Join 优化器选择正确的执行策略,你可以提示要连接的数据集的大小。

case class Student(name: String, age: Int, var score: Double)
    val wordCountWordsOnClass = env.fromElements(Student("A", 10, 50), Student("B", 11, 50), Student("C", 12, 50), Student("A", 10, 60))     
    // hint that the second DataSet is very small or large 提示右表大小 为了指导优化器选择正确的执行策略,你可以提示要连接的数据集的大小
    wordCountWordsOnClass.joinWithTiny(weights).where("name").equalTo(0) {
      (words, weight) => (words.name, words.score * weight._2)
    }.print()
    wordCountWordsOnClass.joinWithHuge(weights).where("name").equalTo(0) {
      (words, weight) => (words.name, words.score * weight._2)
    }.print()

image.gif

5.Join Algorithm Hints

Flink运行时可以以各种方式执行连接。在不同的情况下,每一种可能的方法都优于其他方法。系统尝试自动选择一种合理的方式,但如果您想强制执行连接的特定方式,则允许您手动选择策略。

case class Student(name: String, age: Int, var score: Double)
    val wordCountWordsOnClass = env.fromElements(Student("A", 10, 50), Student("B", 11, 50), Student("C", 12, 50), Student("A", 10, 60))     
    wordCountWordsOnClass.join(weights, JoinHint.BROADCAST_HASH_FIRST).where("name").equalTo(0) {
      (words, weight) => (words.name, words.score * weight._2)
    }.print()

image.gif

这里引入了 JoinHint 选择 join 的连接方式,这里和 Hive 的 mapJoin 思想很类似,Hive 中使用 hive.auto.convert.join.nonconditionaltask.size 参数控制小表的大小,默认25M,日常使用中最好不要超过1G。Flink 共包含以下几种模式:

OPTIMIZER_CHOOSES 不做提示,系统自动选择
BROADCAST_HASH_FIRST 广播 input1,适用于 input1 很小
BROADCAST_HASH_SECOND 广播 input2,适用于 input2 很小
REPARTITION_HASH_FIRST shuffle 每个输入,用 input1 构建 hash 映射,适用于 input1 很小
REPARTITION_HASH_SECOND shuffle 每个输入,用 input2 构建 hash 映射,适用于 input2 很小
REPARTITION_SORT_MERGE shuffle 每个输入并排序。适用于 input1 或 input2 已排序

六.Outer Join

OuterJoin转换在两个数据集上执行左、右或完全外部连接。外部连接类似于常规(内部)连接,它创建的所有元素对的键值相等。此外,如果在另一边没有找到匹配的键,则“外部”一边的记录(如果是完整的,则为左、右或两者)将被保留。将匹配的一对元素(或一个元素和一个空值作为另一个输入)赋给一个JoinFunction,将这对元素转换为单个元素,或者赋给一个FlatJoinFunction,将这对元素转换为任意多个(包括没有)元素。两个数据集的元素被连接在一个或多个键上,这些键可以被指定:

a key expression - 一个关键表达式

a key-selector function - 一个密钥选择器函数

one or more field position keys (Tuple DataSet only) - 一个或多个字段位置键(仅限元组数据集)。

Case Class Fields - Case类字段

1.OuterJoin with Function

OuterJoin转换调用用户定义的连接函数来处理连接元组。连接函数接收第一个输入数据集的一个元素和第二个输入数据集的一个元素,并恰好返回一个元素。根据外部连接的类型(左、右、全),连接函数的两个输入元素中的一个可以为空。

val joinData1: DataSet[(Int, String)] = env.fromElements((1, "A"), (2, "B"), (0, "C"), (0, "D"))
    val joinData2: DataSet[(Int, String)] = env.fromElements((1, "A"), (2, "B"), (0, "C"), (0, "E"))
     joinData1.leftOuterJoin(joinData2).where(0).equalTo(0) {
      (o1, o2) => (o1._1, if (o1._2 == "C") -1 else o1._1)
    }.print()

image.gif

2.OuterJoin with Alogrithm Hints

和 common join 类似,Flink运行时可以以各种方式执行外部连接。在不同的情况下,每一种可能的方法都优于其他方法。系统会尝试自动选择一种合理的方式,但如果您想强制执行外部连接的特定方式,则允许您手动选择策略。

val joinData1: DataSet[(Int, String)] = env.fromElements((1, "A"), (2, "B"), (0, "C"), (0, "D"))
    val joinData2: DataSet[(Int, String)] = env.fromElements((1, "A"), (2, "B"), (0, "C"), (0, "E"))
    joinData1.leftOuterJoin(joinData2, JoinHint.BROADCAST_HASH_SECOND).where(0).equalTo(0) {
      (o1, o2) => (o1._1, if (o1._2 == "C") -1 else o1._1)
    }.print()

image.gif

OuterJoin 有以下限制:

LeftouterJoin 支持

    • OPTIMIZER_CHOOSES
    • BROADCAST_HASH_SECOND
    • REPARTITION_HASH_SECOND
    • REPARTITION_SORT_MERGE

    RightOuterJoin 支持

      • OPTIMIZER_CHOOSES
      • BROADCAST_HASH_FIRST
      • REPARTITION_HASH_FIRST
      • REPARTITION_SORT_MERGE

      FullOuteJoin 支持

        • OPTIMIZER_CHOOSES
        • REPARTITION_SORT_MERGE

        七.Cross

        Cross转换将两个数据集合并成一个数据集。它构建两个输入数据集元素的所有成对组合,也就是说,它构建一个笛卡尔积。Cross转换要么对每一对元素调用用户定义的Cross函数,要么输出一个Tuple2。

        1.Cross with Self-Definded Function

        Cross转换可以调用用户定义的Cross函数。cross函数接收第一个输入的一个元素和第二个输入的一个元素,并恰好返回一个结果元素。下述示例计算两个元素的距离,并返回 id 和 dist。

        val coords1: DataSet[Coord] = env.fromElements(Coord(1, 1, 2), Coord(2, 1, 2), Coord(3, 4, 5), Coord(4, 1, 2))
            val coords2: DataSet[Coord] = env.fromElements(Coord(11, 1, 3), Coord(12, 12, 2), Coord(13, 5, 7), Coord(14, 4, 3))
            val distances = coords1.cross(coords2) {
              (c1, c2) =>
                val dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2))
                (c1.id, c2.id, dist)
            }
            distances.print()

        image.gif

        2.Cross with DataSet Size Hint

        为了指导优化器选择正确的执行策略,你可以提示数据集的大小交叉如下所示,input2 比较小则选择 crossWithTine,反之使用 crossWithHuge。

        val coords1: DataSet[Coord] = env.fromElements(Coord(1, 1, 2), Coord(2, 1, 2), Coord(3, 4, 5), Coord(4, 1, 2))
            val coords2: DataSet[Coord] = env.fromElements(Coord(11, 1, 3), Coord(12, 12, 2), Coord(13, 5, 7), Coord(14, 4, 3))
            // crossWithTiny or crossWithHuge
            val distances = coords1.crossWithTiny(coords2) {
              (c1, c2) =>
                val dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2))
                (c1.id, c2.id, dist)
            }
            distances.print()

        image.gif

        八.CoGroup

        CoGroup 转换联合处理两个数据集的分组。两个数据集按照一个定义的键分组,共享相同键的两个数据集的分组一起交给一个用户定义的 co-group 函数。如果对于特定键,只有一个数据集有组,则使用该组和空组调用 co-group 函数。co-group 函数可以分别遍历两个组的元素,并返回任意数量的结果元素。与Reduce、groureduce和Join类似,可以使用不同的键选择方法定义键。

        v1 中包含 key 在 data1 中的全部元素构成的迭代器,v2 中包含 key 在 data2 中的全部元素构成的迭代器,这里可以在内部实现笛卡尔积或者其他操作。

        val data1: DataSet[(String, Int)] = env.fromElements(("A", 1), ("B", 2), ("C", 3))
            val data2: DataSet[(String, Double)] = env.fromElements(("A", 2D), ("B", 3D), ("B", 3D))
            // Value is not a object
            val outputV2 = data1.coGroup(data2).where(0).equalTo(0) {
              (v1: Iterator[(String, Int)], v2: Iterator[(String, Double)], out: Collector[Double]) => (
                v1.toArray.foreach(x => {
                  v2.toArray.foreach(y => {
                    out.collect(x._2 * y._2)
                  })
                })
              )
            }
            outputV2.print()

        image.gif

        Tips:

        一定要在 Self-Function 中声明 v1 和 v2 的类型,按照官方 API 直接写 (v1, v2, out: Collector[Double]),会显示如下报错 value xxx is not a member of Object :

        image.gif编辑

        九.DataSet Change

        1.Union

        生成两个数据集的并集,这两个数据集必须是相同的类型。多个数据集的联合可以通过多个联合调用来实现,如下所示:

        val data1: DataSet[(String, Int)] = env.fromElements(("A", 1), ("B", 2), ("C", 3))
            val data3: DataSet[(String, Int)] = env.fromElements(("A", 1), ("B", 2), ("C", 3))
           // 生成两个数据集的并集,这两个数据集必须是相同的类型。多个数据集的联合可以通过多个联合调用来实现
            val dataUnion = data1.union(data3)
            dataUnion.print()

        image.gif

        除了 DataSet,DataStreaming 也可以使用 union 合并,从而达到多流合并的目的。

        2.Rebalance

        均匀地重新平衡数据集的并行分区,以消除数据倾斜。和 Spark 的 repartition 很像。

        val dataUnion = data1.union(data3)
            // 均匀地重新平衡数据集的并行分区,以消除数据倾斜。
            dataUnion.rebalance().print()

        image.gif

        3.Hash-Partition

        对给定键上的数据集进行哈希分区。在 Spark 中可以继承 HashPartition 实现自定义 Hash 分区。

        val dataUnion = data1.union(data3)
            // 相同的 Hash 存在一个分区
            dataUnion.partitionByHash(0).print()

        image.gif

        4.Range-Partition

        根据给定的键对数据集进行范围分区。这里需要与上述区分开,Hash-Partition 只要 Hash 值一致就会分在同一个分区,而 Range-Partition 则需要相同 key。

        val dataUnion = data1.union(data3)
            // 相同的 key 存在一个分区
            dataUnion.partitionByRange(0).print()

        image.gif

        5.Sort-Partition

        在指定字段上按指定顺序对数据集的所有分区进行本地排序。 字段1升序后再按字段0降序排序,这样得到的 partition 数据是排好序的。

        val dataUnion = data1.union(data3)
            dataUnion.sortPartition(1, Order.ASCENDING)
              .sortPartition(0, Order.DESCENDING)
              .print()

        image.gif

        十.First-n

        返回数据集的前n个(任意)元素。First-n可以应用于常规数据集、分组数据集或分组排序数据集。这个函数类似于 spark 的 take。

        val data1: DataSet[(String, Int)] = env.fromElements(("A", 1), ("B", 2), ("C", 3))
            val data3: DataSet[(String, Int)] = env.fromElements(("A", 1), ("B", 2), ("C", 3))
            val dataUnion = data1.union(data3)
            // 取 Top3
            dataUnion.first(3).print()
            // 数据按字段0分组后每个组取 Top 1
            dataUnion.groupBy(0).first(1).print()
            // 数据按字段0分组后按字段1升序排序并取 TOP 2
            dataUnion.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print()

        image.gif

        Final Summary

        上述解释大多翻译自  Flink 1.14.1 官方 API,有很多好用的方法,配合 scala 的语法糖写起来也很简洁,Transformation 大致就这些,后续继续介绍 Sink 相关以及 DataStream 相关。

        相关实践学习
        基于Hologres轻松玩转一站式实时仓库
        本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
        Linux入门到精通
        本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
        目录
        相关文章
        |
        3月前
        |
        SQL 大数据 API
        大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
        大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
        88 0
        |
        1月前
        |
        数据处理 数据安全/隐私保护 流计算
        Flink 三种时间窗口、窗口处理函数使用及案例
        Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
        177 27
        |
        2月前
        |
        Java Scala
        Scala 方法与函数
        Scala 方法与函数
        28 1
        |
        3月前
        |
        Java Shell 流计算
        Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
        Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
        35 1
        Flink-02 Flink Java 3分钟上手 Stream SingleOutputStreamOpe ExecutionEnvironment DataSet FlatMapFunction
        |
        3月前
        |
        Kubernetes Cloud Native 流计算
        Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
        Flink-12 Flink Java 3分钟上手 Kubernetes云原生下的Flink集群 Rancher Stateful Set yaml详细 扩容缩容部署 Docker容器编排
        112 3
        |
        3月前
        |
        存储 Java 数据处理
        Flink-01 介绍Flink Java 3分钟上手 HelloWorld 和 Stream ExecutionEnvironment DataSet FlatMapFunction
        Flink-01 介绍Flink Java 3分钟上手 HelloWorld 和 Stream ExecutionEnvironment DataSet FlatMapFunction
        57 1
        |
        3月前
        |
        SQL 消息中间件 分布式计算
        大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
        大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
        50 0
        |
        5月前
        |
        Java 关系型数据库 MySQL
        实时计算 Flink版操作报错合集之在使用批处理模式中使用flat_aggregate函数时报错,该如何解决
        在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
        |
        6月前
        |
        SQL Java 数据处理
        实时计算 Flink版产品使用问题之使用MavenShadePlugin进行relocation并遇到只包含了Java代码而未包含Scala代码,该怎么办
        实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
        |
        6月前
        |
        SQL Java 数据处理
        实时计算 Flink版产品使用问题之开窗函数(WindowFunction)如何做开窗
        实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。