全网最详细4W字Flink入门笔记(上) 5

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 全网最详细4W字Flink入门笔记(上)

分区策略

在 Apache Flink 中,分区(Partitioning)是将数据流按照一定的规则划分成多个子数据流或分片,以便在不同的并行任务或算子中并行处理数据。分区是实现并行计算和数据流处理的基础机制。Flink 的分区决定了数据在作业中的流动方式,以及在并行任务之间如何分配和处理数据。

在 Flink 中,数据流可以看作是一个有向图,图中的节点代表算子(Operators),边代表数据流(Data Streams)。数据从源算子流向下游算子,这些算子可能并行地处理输入数据,而分区就是决定数据如何从一个算子传递到另一个算子的机制。

shuffle

场景:增大分区、提高并行度,解决数据倾斜

DataStream → DataStream

分区元素随机均匀分发到下游分区,网络开销比较大

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(1)
println(stream.getParallelism)
stream.shuffle.print()
env.execute()

console result:上游数据比较随意的分发到下游

2> 1
1> 4
7> 10
4> 6
6> 3
5> 7
8> 2
1> 5
1> 8
1> 9

rebalance

场景:增大分区、提高并行度,解决数据倾斜

DataStream → DataStream

轮询分区元素,均匀的将元素分发到下游分区,下游每个分区的数据比较均匀,在发生数据倾斜时非常有用,网络开销比较大

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
val stream = env.generateSequence(1,100)
val shuffleStream = stream.rebalance
shuffleStream.print()
env.execute()

console result:上游数据比较均匀的分发到下游

8> 6
3> 1
5> 3
7> 5
1> 7
2> 8
6> 4
4> 2
3> 9
4> 10

rescale

场景:减少分区  防止发生大量的网络传输   不会发生全量的重分区

DataStream → DataStream

通过轮询分区元素,将一个元素集合从上游分区发送给下游分区,发送单位是集合,而不是一个个元素

注意:rescale发生的是本地数据传输,而不需要通过网络传输数据,比如taskmanager的槽数。简单来说,上游的数据只会发送给本TaskManager中的下游。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.rescale.writeAsText("./data/stream2").setParallelism(4)
env.execute()

console result:stream1:1内容分发给stream2:1和stream2:2

stream1:1

1
3
5
7
9

stream1:2

2
4
6
8
10

stream2:1

1
5
9

stream2:2

3
7

stream2:3

2
6
10

stream2:4

4
8

broadcast

场景:需要使用映射表、并且映射表会经常发生变动的场景

DataStream → DataStream

上游中每一个元素内容广播到下游每一个分区中

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.broadcast.writeAsText("./data/stream2").setParallelism(4)
env.execute()

console result:stream1:1、2内容广播到了下游每个分区中

stream1:1

1
3
5
7
9

stream1:2

2
4
6
8
10

stream2:1

1
3
5
7
9
2
4
6
8
10

global

场景:并行度降为1

DataStream → DataStream

上游分区的数据只分发给下游的第一个分区

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.global.writeAsText("./data/stream2").setParallelism(4)
env.execute()

console result:stream1:1、2内容只分发给了stream2:1

stream1:1

1
3
5
7
9

stream1:2

2
4
6
8
10

stream2:1

1
3
5
7
9
2
4
6
8
10

forward

场景:一对一的数据分发,map、flatMap、filter 等都是这种分区策略

DataStream → DataStream

上游分区数据分发到下游对应分区中

partition1->partition1

partition2->partition2

注意:必须保证上下游分区数(并行度)一致,不然会有如下异常:

Forward partitioning does not allow change of parallelism
* Upstream operation: Source: Sequence Source-1 parallelism: 2,
* downstream operation: Sink: Unnamed-4 parallelism: 4
* stream.forward.writeAsText("./data/stream2").setParallelism(4)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.forward.writeAsText("./data/stream2").setParallelism(2)
env.execute()

console result:stream1:1->stream2:1、stream1:2->stream2:2

stream1:1

1
3
5
7
9

stream1:2

2
4
6
8
10

stream2:1

1
3
5
7
9

stream2:2

2
4
6
8
10

keyBy

场景:与业务场景匹配

DataStream → DataStream

根据上游分区元素的Hash值与下游分区数取模计算出,将当前元素分发到下游哪一个分区

MathUtils.murmurHash(keyHash)(每个元素的Hash值) % maxParallelism(下游分区数)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.keyBy(0).writeAsText("./data/stream2").setParallelism(2)
env.execute()

console result:根据元素Hash值分发到下游分区中

PartitionCustom

DataStream → DataStream

通过自定义的分区器,来决定元素是如何从上游分区分发到下游分区

object ShuffleOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    val stream = env.generateSequence(1,10).map((_,1))
    stream.writeAsText("./data/stream1")
    stream.partitionCustom(new customPartitioner(),0)
      .writeAsText("./data/stream2").setParallelism(4)
    env.execute()
  }
  class customPartitioner extends Partitioner[Long]{
    override def partition(key: Long, numPartitions: Int): Int = {
      key.toInt % numPartitions
    }
  }
}




本篇文章就到这里,感谢阅读,如果本篇博客有任何错误和建议,欢迎给我留言指正。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
8月前
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
276 3
|
Java Linux API
flink入门-流处理
flink入门-流处理
167 0
|
存储 Java Linux
10分钟入门Flink--安装
本文介绍Flink的安装步骤,主要是Flink的独立部署模式,它不依赖其他平台。文中内容分为4块:前置准备、Flink本地模式搭建、Flink Standalone搭建、Flink Standalong HA搭建。
10分钟入门Flink--安装
|
分布式计算 Java API
Flink教程(04)- Flink入门案例
Flink教程(04)- Flink入门案例
183 0
|
8月前
|
存储 缓存 算法
[尚硅谷flink] 检查点笔记
[尚硅谷flink] 检查点笔记
210 3
|
8月前
|
分布式计算 监控 API
flink 入门编程day02
flink 入门编程day02
|
8月前
|
存储 传感器 消息中间件
[尚硅谷 flink] 状态管理 笔记
[尚硅谷 flink] 状态管理 笔记
|
8月前
|
SQL 关系型数据库 Apache
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
1439 3
|
存储 缓存 分布式计算
Flink教程(02)- Flink入门(下)
Flink教程(02)- Flink入门(下)
126 0
|
SQL 消息中间件 API
Flink教程(02)- Flink入门(上)
Flink教程(02)- Flink入门(上)
223 0