全网最详细4W字Flink入门笔记(上) 2

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 全网最详细4W字Flink入门笔记(上)

Task

在 Flink 中,Task 是一个阶段多个功能相同 subTask 的集合,Flink 会尽可能地将 operator 的 subtask 链接(chain)在一起形成 task。每个 task 在一个线程中执行。将 operators 链接成 task 是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。

要是之前学过Spark,这里可以用Spark的思想来看,Flink的Task就好比Spark中的Stage,而我们知道Spark的Stage是根据宽依赖来拆分的。所以我们也可以认为Flink的Task也是根据宽依赖拆分的(尽管Flink中并没有宽依赖的概念),这样会更好理解,如下图:

Operator Chain(算子链)

在Flink中,为了分布式执行,Flink会将算子子任务链接在一起形成任务。每个任务由一个线程执行。将算子链接在一起形成任务是一种有用的优化:它减少了线程间切换和缓冲的开销,并增加了整体吞吐量,同时降低了延迟

举个例子,假设我们有一个简单的Flink流处理程序,它从一个源读取数据,然后应用mapfilter操作,最后将结果写入到一个接收器。这个程序可能看起来像这样:

DataStream<String> data = env.addSource(new CustomSource());
data.map(new MapFunction<String, String>() {
    @Override
    public String map(String value) throws Exception {
        return value.toUpperCase();
    }
})
.filter(new FilterFunction<String>() {
    @Override
    public boolean filter(String value) throws Exception {
        return value.startsWith("A");
    }
})
.addSink(new CustomSink());

在这个例子中,mapfilter操作可以被链接在一起形成一个任务,被优化为算子链,这意味着它们将在同一个线程中执行,而不是在不同的线程中执行并通过网络进行数据传输

Task Slots

Task Slots即是任务槽,slot 在 Flink 里面可以认为是资源组,Flink 将每个任务分成子任务并且将这些子任务分配到 slot 来并行执行程序,我们可以通过集群的配置文件来设定 TaskManager 的 slot 数量:taskmanager.numberOfTaskSlots: 8。

例如,如果 Task Manager 有2个 slot,那么它将为每个 slot 分配 50% 的内存。 可以在一个 slot 中运行一个或多个线程。 同一 slot 中的线程共享相同的 JVM。

需要注意的是,slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。在具体应用时,可以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争。这也是开发环境默认并行度设为机器 CPU 数量的原因

分发规则

  • 不同的Task下的subtask要分发到同一个TaskSlot中,降低数据传输、提高执行效率
  • 相同的Task下的subtask要分发到不同的TaskSlot

Slot共享组

如果希望某个算子对应的任务完全独占一个 slot,或者只有某一部分算子共享 slot,在Flink中,可以通过在代码中使用slotSharingGroup方法来设置slot共享组。Flink会将具有相同slot共享组的操作放入同一个slot中,同时保持不具有slot共享组的操作在其他slot中。这可以用来隔离slot。

例如,你可以这样设置:

dataStream.map(...).slotSharingGroup("group1");

默认情况下,所有操作都被分配相同的SlotSharingGroup。

这样,只有属于同一个 slot 共享组的子任务,才会开启 slot 共享;不同组之间的任务是完全隔离的,必须分配到不同的 slot 上。

并行度和Slots的例子

听了上面并行度和Slots的理论,可能有点疑惑,通过一个例子简单说明下:

假设一共有3个TaskManager,每一个TaskManager中的slot数量设置为3个,那么一共有9个task slot,表示最多能并行执行9个任务。

假设我们写了一个WordCount程序,有四个转换算子:source —> flatMap —> reduce —> sink

当所有算子并行度相同时,容易看出source和flatMap可以优化合并算子链,于是最终有三个任务节点:source & flatMap,reduce 和sink。 如果我们没有任何并行度设置,而配置文件中默认parallelism.default=1,那么程序运行的默认并行度为1,总共有3个任务。由于不同算子的任务可以共享任务槽,所以最终占用的slot只有1个。9个slot只用了1个,有8个空闲。如图所示:

我们可以直接把并行度设置为 9,这样所有 3*9=27 个任务就会完全占用 9 个 slot。这是当前集群资源下能执行的最大并行度,计算资源得到了充分的利用。

另外再考虑对于某个算子单独设置并行度的场景。例如,如果我们考虑到输出可能是写入文件,那会希望不要并行写入多个文件,就需要设置 sink 算子的并行度为 1。这时其他的算子并行度依然为 9,所以总共会有 19 个子任务。根据 slot 共享的原则,它们最终还是会占用全部的 9 个 slot,而 sink 任务只在其中一个 slot 上执行,通过这个例子也可以明确地看到,整个流处理程序的并行度,就应该是所有算子并行度中最大的那个,这代表了运行程序需要的 slot 数量

DataSource数据源

Flink内嵌支持的数据源非常多,比如HDFS、Socket、Kafka、Collections。Flink也提供了addSource方式,可以自定义数据源,下面介绍一些常用的数据源。

File Source

  • 通过读取本地、HDFS文件创建一个数据源。

如果读取的是HDFS上的文件,那么需要导入Hadoop依赖

<dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-client</artifactId>
 <version>2.6.5</version>
</dependency>

代码示例:每隔10s去读取HDFS指定目录下的新增文件内容,并且进行WordCount。

import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
//在算子转换的时候,会将数据转换成Flink内置的数据类型,所以需要将隐式转换导入进来,才能自动进行类型转换
import org.apache.flink.streaming.api.scala._
object FileSource {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //读取hdfs文件
    val filePath = "hdfs://node01:9000/flink/data/"
    val textInputFormat = new TextInputFormat(new Path(filePath))
    //每隔10s中读取 hdfs上新增文件内容
    val textStream = env.readFile(textInputFormat,filePath,FileProcessingMode.PROCESS_CONTINUOUSLY,10)
    textStream.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1).print()
    env.execute()
  }
}

readTextFile底层调用的就是readFile方法,readFile是一个更加底层的方式,使用起来会更加的灵活


Collection Source

基于本地集合的数据源,一般用于测试场景,没有太大意义。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object CollectionSource {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.fromCollection(List("hello flink msb","hello msb msb"))
    stream.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1).print()
    env.execute()
  }
}

Socket Source

接受Socket Server中的数据。

val initStream:DataStream[String] = env.socketTextStream("node01",8888)

Kafka Source

Flink接受Kafka中的数据,首先要配置flink与kafka的连接器依赖。

Maven依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.9.2</version>
</dependency>

代码:

val env = StreamExecutionEnvironment.getExecutionEnvironment
    val prop = new Properties()
    prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
    prop.setProperty("group.id","flink-kafka-id001")
    prop.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    prop.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    /**
      * earliest:从头开始消费,旧数据会频繁消费
      * latest:从最近的数据开始消费,不再消费旧数据
      */
    prop.setProperty("auto.offset.reset","latest")
 val kafkaStream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
      override def isEndOfStream(t: (String, String)): Boolean = false
      override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) =    {
        val key = new String(consumerRecord.key(), "UTF-8")
        val value = new String(consumerRecord.value(), "UTF-8")
        (key, value)
      }
      //指定返回数据类型
      override def getProducedType: TypeInformation[(String, String)] =
        createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
    }, prop))
    kafkaStream.print()
    env.execute()

Transformations

Transformations算子可以将一个或者多个算子转换成一个新的数据流,使用Transformations算子组合可以进行复杂的业务处理。

Map

DataStream → DataStream

遍历数据流中的每一个元素,产生一个新的元素。

FlatMap

DataStream → DataStream

遍历数据流中的每一个元素,产生N个元素 N=0,1,2,......。

Filter

DataStream → DataStream

过滤算子,根据数据流的元素计算出一个boolean类型的值,true代表保留,false代表过滤掉。

KeyBy

DataStream → KeyedStream

根据数据流中指定的字段来分区,相同指定字段值的数据一定是在同一个分区中,内部分区使用的是HashPartitioner。

指定分区字段的方式有三种:

1、根据索引号指定 2、通过匿名函数来指定 3、通过实现KeySelector接口  指定分区字段

val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1, 100)
    stream
      .map(x => (x % 3, 1))
      //根据索引号来指定分区字段
      //      .keyBy(0)
      //通过传入匿名函数 指定分区字段
      //      .keyBy(x=>x._1)
      //通过实现KeySelector接口  指定分区字段
      .keyBy(new KeySelector[(Long, Int), Long] {
      override def getKey(value: (Long, Int)): Long = value._1
    })
      .sum(1)
      .print()
    env.execute()

Reduce

KeyedStream:根据key分组 → DataStream

注意,reduce是基于分区后的流对象进行聚合,也就是说,DataStream类型的对象无法调用reduce方法

.reduce((v1,v2) => (v1._1,v1._2 + v2._2))

代码例子:读取kafka数据,实时统计各个卡口下的车流量。

  • 实现kafka生产者,读取卡口数据并且往kafka中生产数据:
val prop = new Properties()
    prop.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
    prop.setProperty("key.serializer", classOf[StringSerializer].getName)
    prop.setProperty("value.serializer", classOf[StringSerializer].getName)
    val producer = new KafkaProducer[String, String](prop)
    val iterator = Source.fromFile("data/carFlow_all_column_test.txt", "UTF-8").getLines()
    for (i <- 1 to 100) {
      for (line <- iterator) {
        //将需要的字段值 生产到kafka集群  car_id monitor_id event-time speed
        //车牌号 卡口号 车辆通过时间 通过速度
        val splits = line.split(",")
        val monitorID = splits(0).replace("'","")
        val car_id = splits(2).replace("'","")
        val eventTime = splits(4).replace("'","")
        val speed = splits(6).replace("'","")
        if (!"00000000".equals(car_id)) {
          val event = new StringBuilder
          event.append(monitorID + "\t").append(car_id+"\t").append(eventTime + "\t").append(speed)
          producer.send(new ProducerRecord[String, String]("flink-kafka", event.toString()))
        }
        Thread.sleep(500)
      }
    }
  • 实现kafka消费者:
val env = StreamExecutionEnvironment.getExecutionEnvironment
    val props = new Properties()
    props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("group.id","flink001")
    props.getProperty("auto.offset.reset","latest")
    val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new   SimpleStringSchema(),props))
    stream.map(data => {
      val splits = data.split("\t")
      val carFlow = CarFlow(splits(0),splits(1),splits(2),splits(3).toDouble)
      (carFlow,1)
    }).keyBy(_._1.monitorId)
        .sum(1)
        .print()
    env.execute()

Aggregations

KeyedStream → DataStream

Aggregations代表的是一类聚合算子,具体算子如下:

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")

代码例子:实时统计各个卡口最先通过的汽车的信息

val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(),props))
    stream.map(data => {
      val splits = data.split("\t")
      val carFlow = CarFlow(splits(0),splits(1),splits(2),splits(3).toDouble)
      val eventTime = carFlow.eventTime
      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      val date = format.parse(eventTime)
      (carFlow,date.getTime)
    }).keyBy(_._1.monitorId)
        .min(1)
        .map(_._1)
        .print()
    env.execute()

Union 真合并

DataStream → DataStream

Union of two or more data streams creating a new stream containing all the elements from all the streams

合并两个或者更多的数据流产生一个新的数据流,这个新的数据流中包含了所合并的数据流的元素

注意:需要保证数据流中元素类型一致

val env = StreamExecutionEnvironment.getExecutionEnvironment
    val ds1 = env.fromCollection(List(("a",1),("b",2),("c",3)))
    val ds2 = env.fromCollection(List(("d",4),("e",5),("f",6)))
    val ds3 = env.fromCollection(List(("g",7),("h",8)))
    val unionStream = ds1.union(ds2,ds3)
    unionStream.print()
    env.execute()
输出:
("a", 1)
("b", 2)
("c", 3)
("d", 4)
("e", 5)
("f", 6)
("g", 7)
("h", 8)


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
8月前
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
276 3
|
Java Linux API
flink入门-流处理
flink入门-流处理
167 0
|
存储 Java Linux
10分钟入门Flink--安装
本文介绍Flink的安装步骤,主要是Flink的独立部署模式,它不依赖其他平台。文中内容分为4块:前置准备、Flink本地模式搭建、Flink Standalone搭建、Flink Standalong HA搭建。
10分钟入门Flink--安装
|
分布式计算 Java API
Flink教程(04)- Flink入门案例
Flink教程(04)- Flink入门案例
183 0
|
8月前
|
存储 缓存 算法
[尚硅谷flink] 检查点笔记
[尚硅谷flink] 检查点笔记
210 3
|
8月前
|
分布式计算 监控 API
flink 入门编程day02
flink 入门编程day02
|
8月前
|
存储 传感器 消息中间件
[尚硅谷 flink] 状态管理 笔记
[尚硅谷 flink] 状态管理 笔记
|
8月前
|
SQL 关系型数据库 Apache
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
1439 3
|
存储 缓存 分布式计算
Flink教程(02)- Flink入门(下)
Flink教程(02)- Flink入门(下)
126 0
|
SQL 消息中间件 API
Flink教程(02)- Flink入门(上)
Flink教程(02)- Flink入门(上)
223 0