一.引言
上一篇文章讲到了 Flink 如何获取数据生成 DataSet,这篇文章主要讨论 DataSet 后续支持的 Transform 转换函数。相较于 Spark,Flink 提供了更多的 API 和更灵活的写法与实现。
编辑
Tips :
下述示例均以该 env 为基础实现
import org.apache.flink.api.scala.ExecutionEnvironment def main(args: Array[String]): Unit = { //获取flink执行环境 val env = ExecutionEnvironment.getExecutionEnvironment //导入隐式转换 import org.apache.flink.api.scala._ }
二.常见 Transformation
1.Map
Map 转换在数据集的每个元素上应用用户定义的 Map 函数。它实现了一对一的映射,即函数必须恰好返回一个元素。下述方法将原始字符都增加一个 '_test' 后缀并输出 。
val data: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink")) data.map(x => x + "_test").print()
2.FlatMap
FlatMap 转换在数据集的每个元素上应用用户定义的映射函数。这个map函数的变体可以为每个输入元素返回任意多个结果元素(包括不返回)。下述方法会获得 1,2,2,3 四个元素组成的 DataSet。
val textLines: DataSet[String] = env.fromElements("1-2", "2-3") textLines.flatMap(x => x.split("-")).print()
3.Filter
Filter转换对数据集的每个元素应用一个用户定义的过滤器函数,并且只保留那些函数返回true的元素。
val textLines: DataSet[String] = env.fromElements("1-2", "2-3") textLines.filter(_.startsWith("1")).print()
4.MapPartition
MapPartition 在一个函数调用中转换一个并行分区。map-partition 函数以 Iterable 形式获取分区,并可以生成任意数量的结果值。每个分区中的元素数量取决于并行度和之前的操作。和 Spark 类似,也可以为每个 partition 启动一个 Client 连接从而达到减少连接数增加可复用性。
val data: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink")) data.mapPartition(partition => { // Do SomeThing partition.map(x => { x }) }).print()
三.Transformations on Grouped DataSet
reduce操作可以对分组的数据集进行操作。指定用于分组的键可以通过多种方式实现:
key expressions - 关键表达式
a key-selector function - 一个密钥选择器函数
one or more field position keys (Tuple DataSet only) - 一个或多个字段位置键(仅限元组数据集)
Case Class fields (Case Classes only) - 案例类字段(仅案例类)
1.Reduce
关键表达式指定数据集的每个元素的一个或多个字段。每个键表达式要么是公共字段的名称,要么是getter方法的名称。'.' 可以用于向下获取对象。关键字表达式“*”选择所有字段。下面的代码展示了使用 reduce 实现基本的 wordCount。
val wordCountWords = env.fromElements(("B", 1), ("A", 1), ("C", 1), ("A", 1), ("C", 2)) wordCountWords.groupBy(_._1).reduce { (w1, w2) => { (w1._1, w1._2 + w2._2) } }.print()
这里 groupBy 可以使用 _._1 代表使用第一列进行分组,也可以通过数字,例如使用 groupBy(0) 代表使用第一列分组,如果内部采用 case Class 例如 Word(name: String, count: Int),也可以使用 groupBy("name") 方法进行 groupBy,这里使用方式十分灵活。除此之外,groupBy 支持同时根据多个字段进行分组,下述方法同时使用 name + age 两个字段进行分组。
case class Student(name: String, age: Int, var score: Double) val wordCountWordsOnClass = env.fromElements(Student("A", 10, 50), Student("B", 11, 50), Student("C", 12, 50), Student("A", 10, 60)) wordCountWordsOnClass.groupBy("name", "age").reduce { (st1, st2) => { st1.score = st1.score + st2.score st1 } }.print()
2.ReduceGroup
应用于分组数据集的groureduce转换为每个组调用用户定义的group-reduce函数。这个函数与Reduce的区别在于,用户定义的函数可以一次获得整个组。该函数在组的所有元素上使用Iterable调用,可以返回任意数量的结果元素。
val wordCountWords = env.fromElements(("B", 1), ("A", 1), ("C", 1), ("A", 1), ("C", 2)) wordCountWords.groupBy(0).reduceGroup { in => { in.toSet } }.print()
reduceGroup 和 reduce 都将相同 key 的元素进行聚合,差别在于 reduce 是一个一个元素聚合,即上面的 reduce{ (st1, st2) },经过多轮 1对1 的聚合得到最终一个元素,reduceGroup 一次直接获得该 key 下的全部元素组成的 iterator,操作更加灵活,但是存在内存的隐患,例如该 key 下的大量数据,这里直接调用 iterator.toArray 就会内存溢出。
3.SortGroup
group-reduce函数使用Iterable访问组中的元素。可选地,Iterable可以按照指定的顺序分发组中的元素。在很多情况下,这有助于降低用户自定义group-reduce函数的复杂度,提高其效率。例如选取 TopK,或者取最后的 TokK 元素,对 group 函数进行预排序就很方便,下面例子为每一个分数找到名字字母排序最靠前的同学。
val wordCountWords = env.fromElements(("B", 1), ("A", 1), ("C", 1), ("A", 1), ("C", 2)) wordCountWords.groupBy(1).sortGroup(0, Order.ASCENDING).reduceGroup { in => var prev: (String, Int) = null for (t <- in) { if (prev == null || prev != t) prev = t } prev }.print()
4.CombineGroup
与reduce函数相反,group-reduce函数不是隐式组合的。为了使group-reduce函数可组合,它必须实现GroupCombineFunction接口。下述方法继承 GroupCombineFunction,复写 combine 方法实现了和上面类似的 reduce 操作,将相同 key 的元素最终聚合为1个。
val wordCountWords = env.fromElements(("B", 1), ("A", 1), ("C", 1), ("A", 1), ("C", 2)) class MyCombinableGroup extends GroupCombineFunction[(String, Int), (String, Int)] { override def combine( in: java.lang.Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = { val r: (String, Int) = in.iterator.asScala.reduce((a, b) => (a._1, a._2 + b._2)) // emit tuple with key and sum out.collect(r) } } wordCountWords.groupBy(0).combineGroup(new MyCombinableGroup).print()
GroupCombine 变换是组合函数 GroupReduceFunction中组合步骤的广义形式。上述示例同样可以继承 GroupReduceFunction 类实现。下面方法同时继承 GroupReduceFunction 和 GroupCombineFunction 并实现 reduce 和 combine 方法,二者的结果是一致的。
val wordCountWords = env.fromElements(("B", 1), ("A", 1), ("C", 1), ("A", 1), ("C", 2)) class MyCombinableGroupReducer extends GroupReduceFunction[(String, Int), String] with GroupCombineFunction[(String, Int), (String, Int)] { override def reduce( in: java.lang.Iterable[(String, Int)], out: Collector[String]): Unit = { val r: (String, Int) = in.iterator.asScala.reduce((a, b) => (a._1, a._2 + b._2)) // concat key and sum and emit out.collect(r._1 + "-" + r._2) } override def combine( in: java.lang.Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = { val r: (String, Int) = in.iterator.asScala.reduce((a, b) => (a._1, a._2 + b._2)) // emit tuple with key and sum out.collect(r) } } wordCountWords.groupBy(1).reduceGroup(new MyCombinableGroupReducer).print() wordCountWords.groupBy(1).combineGroup(new MyCombinableGroupReducer).print()
5.Group Summary
除去 sortGroup 外,上面介绍了 Reduce,GroupReduce,CombineReduce ,三者都实现了聚合处理,看上去功能是一致的,但其实三者都有不同的侧重点。
Reduce : 一个一个组合数据,灵活性强但整体性差,资源要求低
CombineGroup :先将一部分数据聚合,再将各部分数据统一聚合得到最终结果,类似分治
ReduceGroup:获取全部数据,整体性强但是可能有资源问题
CombineGroup 其实就是 Reduce 和 ReduceGroup 的折中,因此使用 CombineGroup 对数据进行聚合统计时,其结果可能并非是最终的真实结果,需要再加一个 ReduceGroup 才能得到最终结果。
val input: DataSet[String] = [..] // The words received as input val combinedWords: DataSet[(String, Int)] = input .groupBy(0) .combineGroup { (words, out: Collector[(String, Int)]) => var key: String = null var count = 0 for (word <- words) { key = word count += 1 } out.collect((key, count)) } val output: DataSet[(String, Int)] = combinedWords .groupBy(0) .reduceGroup { (words, out: Collector[(String, Int)]) => var key: String = null var sum = 0 for ((word, sum) <- words) { key = word sum += count } out.collect((key, sum)) }
四.Aggregate
1.Sum / Min / Max
上述三个函数为内置的聚合函数,注意 Aggregate 转换只能应用于元组数据集,并且仅支持用于分组的字段位置键,内置函数需要 import 引入。
import org.apache.flink.api.java.aggregation.Aggregations.{MIN, SUM} // 按字段1聚合,对聚合内容的字段0求和并且对字段二求最小值 val input: DataSet[(Int, String, Double)] = env.fromElements((1, "A", 2D), (2, "B", 1D), (0, "C", 5D), (0, "C", 3D)) input.groupBy(1).aggregate(SUM, 0).and(MIN, 2).print()
2.MinBy / MaxBy
MinBy (MaxBy)转换为每组元组选择一个元组。选中的元组是一个或多个指定字段值为最小(最大值)的元组。用于比较的字段必须是有效的关键字段,即可比性。如果多个元组具有最小(最大)字段值,则返回这些元组中的任意一个元组。下述示例根据字段 1 分组,并根据字段 0,2 排序,取最小的元祖。
val input: DataSet[(Int, String, Double)] = env.fromElements((1, "A", 2D), (2, "B", 1D), (0, "C", 5D), (0, "C", 3D)) val output: DataSet[(Int, String, Double)] = input .groupBy(1) // group DataSet on second field .minBy(0, 2) // select tuple with minimum values for first and third field. output.print()
3.Distinct
Distinct转换计算源数据集的不同元素的数据集。下面的代码从数据集中删除所有重复的元素。
A.使用 Tuple 去重
val input: DataSet[(Int, String, Double)] = env.fromElements((1, "A", 2D), (2, "B", 1D), (0, "C", 5D), (0, "C", 3D)) // distinct 先遇到的保留,后遇到的舍弃 input.distinct().print() // 多字段去重 input.distinct(0,1).print() // 指定条件去重 input.distinct {x => Math.abs(x._1) > 0}.print()
B.使用 CaseClass 去重
case class Student(name: String, age: Int, var score: Double) val wordCountWordsOnClass = env.fromElements(Student("A", 10, 50), Student("B", 11, 50), Student("C", 12, 50), Student("A", 10, 60)) // 基础去重 wordCountWordsOnClass.distinct().print() // 多字段去重 wordCountWordsOnClass.distinct("name", "age").print() // 全字段去重 wordCountWordsOnClass.distinct("_").print()
五.Join
Join转换将两个数据集连接成一个数据集。两个数据集的元素被连接在一个或多个键上,这些键可以被指定 :
a key expression - 一个关键表达式
a key-selector function - 一个密钥选择器函数
one or more field position keys (Tuple DataSet only) - 一个或多个字段位置键(仅限元组数据集)。
Case Class Fields - Case类字段
1.Default Join
默认的Join转换生成一个新的具有两个字段的元组数据集。每个元组在第一个元组字段中保存第一个输入数据集的一个联接元素,在第二个字段中保存第二个输入数据集的一个匹配元素。
val joinData1: DataSet[(Int, String)] = env.fromElements((1, "A"), (2, "B"), (0, "C"), (0, "D")) val joinData2: DataSet[(Int, String)] = env.fromElements((1, "A"), (2, "B"), (0, "C"), (0, "E")) joinData1.join(joinData2).where(_._1).equalTo(_._1).print()
编辑
2.Join With Function
Join转换还可以调用用户定义的连接函数来处理连接元组。连接函数接收第一个输入数据集的一个元素和第二个输入数据集的一个元素,并恰好返回一个元素。
case class Student(name: String, age: Int, var score: Double) val wordCountWordsOnClass = env.fromElements(Student("A", 10, 50), Student("B", 11, 50), Student("C", 12, 50), Student("A", 10, 60)) val weights: DataSet[(String, Double)] = env.fromElements(("A", 20), ("B", 30)) wordCountWordsOnClass.join(weights).where("name").equalTo(0) { (words, weight) => (words.name, words.score * weight._2) }.print()
3.Join with Flat-Join Function
与Map和FlatMap类似,FlatJoin的行为方式与Join相同,但它可以返回(收集)、零个、一个或多个元素,而不是返回一个元素。通过 Collector 和 if 控制输出的条件与个数。
case class Student(name: String, age: Int, var score: Double) val wordCountWordsOnClass = env.fromElements(Student("A", 10, 50), Student("B", 11, 50), Student("C", 12, 50), Student("A", 10, 60)) val weights: DataSet[(String, Double)] = env.fromElements(("A", 20), ("B", 30)) wordCountWordsOnClass.join(weights).where("name").equalTo(0) { (words, weight, out: Collector[String]) => if (words.score > 50) out.collect(words.name + "\t" + words.score * weight._2) }.print()
4.Join with DataSet Size Hint
为了指导 Flink Join 优化器选择正确的执行策略,你可以提示要连接的数据集的大小。
case class Student(name: String, age: Int, var score: Double) val wordCountWordsOnClass = env.fromElements(Student("A", 10, 50), Student("B", 11, 50), Student("C", 12, 50), Student("A", 10, 60)) // hint that the second DataSet is very small or large 提示右表大小 为了指导优化器选择正确的执行策略,你可以提示要连接的数据集的大小 wordCountWordsOnClass.joinWithTiny(weights).where("name").equalTo(0) { (words, weight) => (words.name, words.score * weight._2) }.print() wordCountWordsOnClass.joinWithHuge(weights).where("name").equalTo(0) { (words, weight) => (words.name, words.score * weight._2) }.print()
5.Join Algorithm Hints
Flink运行时可以以各种方式执行连接。在不同的情况下,每一种可能的方法都优于其他方法。系统尝试自动选择一种合理的方式,但如果您想强制执行连接的特定方式,则允许您手动选择策略。
case class Student(name: String, age: Int, var score: Double) val wordCountWordsOnClass = env.fromElements(Student("A", 10, 50), Student("B", 11, 50), Student("C", 12, 50), Student("A", 10, 60)) wordCountWordsOnClass.join(weights, JoinHint.BROADCAST_HASH_FIRST).where("name").equalTo(0) { (words, weight) => (words.name, words.score * weight._2) }.print()
这里引入了 JoinHint 选择 join 的连接方式,这里和 Hive 的 mapJoin 思想很类似,Hive 中使用 hive.auto.convert.join.nonconditionaltask.size 参数控制小表的大小,默认25M,日常使用中最好不要超过1G。Flink 共包含以下几种模式:
OPTIMIZER_CHOOSES | 不做提示,系统自动选择 |
BROADCAST_HASH_FIRST | 广播 input1,适用于 input1 很小 |
BROADCAST_HASH_SECOND | 广播 input2,适用于 input2 很小 |
REPARTITION_HASH_FIRST | shuffle 每个输入,用 input1 构建 hash 映射,适用于 input1 很小 |
REPARTITION_HASH_SECOND | shuffle 每个输入,用 input2 构建 hash 映射,适用于 input2 很小 |
REPARTITION_SORT_MERGE | shuffle 每个输入并排序。适用于 input1 或 input2 已排序 |
六.Outer Join
OuterJoin转换在两个数据集上执行左、右或完全外部连接。外部连接类似于常规(内部)连接,它创建的所有元素对的键值相等。此外,如果在另一边没有找到匹配的键,则“外部”一边的记录(如果是完整的,则为左、右或两者)将被保留。将匹配的一对元素(或一个元素和一个空值作为另一个输入)赋给一个JoinFunction,将这对元素转换为单个元素,或者赋给一个FlatJoinFunction,将这对元素转换为任意多个(包括没有)元素。两个数据集的元素被连接在一个或多个键上,这些键可以被指定:
a key expression - 一个关键表达式
a key-selector function - 一个密钥选择器函数
one or more field position keys (Tuple DataSet only) - 一个或多个字段位置键(仅限元组数据集)。
Case Class Fields - Case类字段
1.OuterJoin with Function
OuterJoin转换调用用户定义的连接函数来处理连接元组。连接函数接收第一个输入数据集的一个元素和第二个输入数据集的一个元素,并恰好返回一个元素。根据外部连接的类型(左、右、全),连接函数的两个输入元素中的一个可以为空。
val joinData1: DataSet[(Int, String)] = env.fromElements((1, "A"), (2, "B"), (0, "C"), (0, "D")) val joinData2: DataSet[(Int, String)] = env.fromElements((1, "A"), (2, "B"), (0, "C"), (0, "E")) joinData1.leftOuterJoin(joinData2).where(0).equalTo(0) { (o1, o2) => (o1._1, if (o1._2 == "C") -1 else o1._1) }.print()
2.OuterJoin with Alogrithm Hints
和 common join 类似,Flink运行时可以以各种方式执行外部连接。在不同的情况下,每一种可能的方法都优于其他方法。系统会尝试自动选择一种合理的方式,但如果您想强制执行外部连接的特定方式,则允许您手动选择策略。
val joinData1: DataSet[(Int, String)] = env.fromElements((1, "A"), (2, "B"), (0, "C"), (0, "D")) val joinData2: DataSet[(Int, String)] = env.fromElements((1, "A"), (2, "B"), (0, "C"), (0, "E")) joinData1.leftOuterJoin(joinData2, JoinHint.BROADCAST_HASH_SECOND).where(0).equalTo(0) { (o1, o2) => (o1._1, if (o1._2 == "C") -1 else o1._1) }.print()
OuterJoin 有以下限制:
LeftouterJoin 支持
OPTIMIZER_CHOOSES
BROADCAST_HASH_SECOND
REPARTITION_HASH_SECOND
REPARTITION_SORT_MERGE
RightOuterJoin 支持
OPTIMIZER_CHOOSES
BROADCAST_HASH_FIRST
REPARTITION_HASH_FIRST
REPARTITION_SORT_MERGE
FullOuteJoin 支持
OPTIMIZER_CHOOSES
REPARTITION_SORT_MERGE
七.Cross
Cross转换将两个数据集合并成一个数据集。它构建两个输入数据集元素的所有成对组合,也就是说,它构建一个笛卡尔积。Cross转换要么对每一对元素调用用户定义的Cross函数,要么输出一个Tuple2。
1.Cross with Self-Definded Function
Cross转换可以调用用户定义的Cross函数。cross函数接收第一个输入的一个元素和第二个输入的一个元素,并恰好返回一个结果元素。下述示例计算两个元素的距离,并返回 id 和 dist。
val coords1: DataSet[Coord] = env.fromElements(Coord(1, 1, 2), Coord(2, 1, 2), Coord(3, 4, 5), Coord(4, 1, 2)) val coords2: DataSet[Coord] = env.fromElements(Coord(11, 1, 3), Coord(12, 12, 2), Coord(13, 5, 7), Coord(14, 4, 3)) val distances = coords1.cross(coords2) { (c1, c2) => val dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2)) (c1.id, c2.id, dist) } distances.print()
2.Cross with DataSet Size Hint
为了指导优化器选择正确的执行策略,你可以提示数据集的大小交叉如下所示,input2 比较小则选择 crossWithTine,反之使用 crossWithHuge。
val coords1: DataSet[Coord] = env.fromElements(Coord(1, 1, 2), Coord(2, 1, 2), Coord(3, 4, 5), Coord(4, 1, 2)) val coords2: DataSet[Coord] = env.fromElements(Coord(11, 1, 3), Coord(12, 12, 2), Coord(13, 5, 7), Coord(14, 4, 3)) // crossWithTiny or crossWithHuge val distances = coords1.crossWithTiny(coords2) { (c1, c2) => val dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2)) (c1.id, c2.id, dist) } distances.print()
八.CoGroup
CoGroup 转换联合处理两个数据集的分组。两个数据集按照一个定义的键分组,共享相同键的两个数据集的分组一起交给一个用户定义的 co-group 函数。如果对于特定键,只有一个数据集有组,则使用该组和空组调用 co-group 函数。co-group 函数可以分别遍历两个组的元素,并返回任意数量的结果元素。与Reduce、groureduce和Join类似,可以使用不同的键选择方法定义键。
v1 中包含 key 在 data1 中的全部元素构成的迭代器,v2 中包含 key 在 data2 中的全部元素构成的迭代器,这里可以在内部实现笛卡尔积或者其他操作。
val data1: DataSet[(String, Int)] = env.fromElements(("A", 1), ("B", 2), ("C", 3)) val data2: DataSet[(String, Double)] = env.fromElements(("A", 2D), ("B", 3D), ("B", 3D)) // Value is not a object val outputV2 = data1.coGroup(data2).where(0).equalTo(0) { (v1: Iterator[(String, Int)], v2: Iterator[(String, Double)], out: Collector[Double]) => ( v1.toArray.foreach(x => { v2.toArray.foreach(y => { out.collect(x._2 * y._2) }) }) ) } outputV2.print()
Tips:
一定要在 Self-Function 中声明 v1 和 v2 的类型,按照官方 API 直接写 (v1, v2, out: Collector[Double]),会显示如下报错 value xxx is not a member of Object :
编辑
九.DataSet Change
1.Union
生成两个数据集的并集,这两个数据集必须是相同的类型。多个数据集的联合可以通过多个联合调用来实现,如下所示:
val data1: DataSet[(String, Int)] = env.fromElements(("A", 1), ("B", 2), ("C", 3)) val data3: DataSet[(String, Int)] = env.fromElements(("A", 1), ("B", 2), ("C", 3)) // 生成两个数据集的并集,这两个数据集必须是相同的类型。多个数据集的联合可以通过多个联合调用来实现 val dataUnion = data1.union(data3) dataUnion.print()
除了 DataSet,DataStreaming 也可以使用 union 合并,从而达到多流合并的目的。
2.Rebalance
均匀地重新平衡数据集的并行分区,以消除数据倾斜。和 Spark 的 repartition 很像。
val dataUnion = data1.union(data3) // 均匀地重新平衡数据集的并行分区,以消除数据倾斜。 dataUnion.rebalance().print()
3.Hash-Partition
对给定键上的数据集进行哈希分区。在 Spark 中可以继承 HashPartition 实现自定义 Hash 分区。
val dataUnion = data1.union(data3) // 相同的 Hash 存在一个分区 dataUnion.partitionByHash(0).print()
4.Range-Partition
根据给定的键对数据集进行范围分区。这里需要与上述区分开,Hash-Partition 只要 Hash 值一致就会分在同一个分区,而 Range-Partition 则需要相同 key。
val dataUnion = data1.union(data3) // 相同的 key 存在一个分区 dataUnion.partitionByRange(0).print()
5.Sort-Partition
在指定字段上按指定顺序对数据集的所有分区进行本地排序。 字段1升序后再按字段0降序排序,这样得到的 partition 数据是排好序的。
val dataUnion = data1.union(data3) dataUnion.sortPartition(1, Order.ASCENDING) .sortPartition(0, Order.DESCENDING) .print()
十.First-n
返回数据集的前n个(任意)元素。First-n可以应用于常规数据集、分组数据集或分组排序数据集。这个函数类似于 spark 的 take。
val data1: DataSet[(String, Int)] = env.fromElements(("A", 1), ("B", 2), ("C", 3)) val data3: DataSet[(String, Int)] = env.fromElements(("A", 1), ("B", 2), ("C", 3)) val dataUnion = data1.union(data3) // 取 Top3 dataUnion.first(3).print() // 数据按字段0分组后每个组取 Top 1 dataUnion.groupBy(0).first(1).print() // 数据按字段0分组后按字段1升序排序并取 TOP 2 dataUnion.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print()
Final Summary
上述解释大多翻译自 Flink 1.14.1 官方 API,有很多好用的方法,配合 scala 的语法糖写起来也很简洁,Transformation 大致就这些,后续继续介绍 Sink 相关以及 DataStream 相关。