Task
在 Flink 中,Task 是一个阶段多个功能相同 subTask 的集合,Flink 会尽可能地将 operator 的 subtask 链接(chain)在一起形成 task。每个 task 在一个线程中执行。将 operators 链接成 task 是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。
要是之前学过Spark,这里可以用Spark的思想来看,Flink的Task就好比Spark中的Stage,而我们知道Spark的Stage是根据宽依赖来拆分的。所以我们也可以认为Flink的Task也是根据宽依赖拆分的(尽管Flink中并没有宽依赖的概念),这样会更好理解,如下图:
Operator Chain(算子链)
在Flink中,为了分布式执行,Flink会将算子子任务链接在一起形成任务。每个任务由一个线程执行。将算子链接在一起形成任务是一种有用的优化:它减少了线程间切换和缓冲的开销,并增加了整体吞吐量,同时降低了延迟。
举个例子,假设我们有一个简单的Flink流处理程序,它从一个源读取数据,然后应用map
和filter
操作,最后将结果写入到一个接收器。这个程序可能看起来像这样:
DataStream<String> data = env.addSource(new CustomSource()); data.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return value.toUpperCase(); } }) .filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return value.startsWith("A"); } }) .addSink(new CustomSink());
在这个例子中,map
和filter
操作可以被链接在一起形成一个任务,被优化为算子链,这意味着它们将在同一个线程中执行,而不是在不同的线程中执行并通过网络进行数据传输。
Task Slots
Task Slots即是任务槽,slot 在 Flink 里面可以认为是资源组,Flink 将每个任务分成子任务并且将这些子任务分配到 slot 来并行执行程序,我们可以通过集群的配置文件来设定 TaskManager 的 slot 数量:taskmanager.numberOfTaskSlots: 8。
例如,如果 Task Manager 有2个 slot,那么它将为每个 slot 分配 50% 的内存。 可以在一个 slot 中运行一个或多个线程。 同一 slot 中的线程共享相同的 JVM。
需要注意的是,slot 目前仅仅用来隔离内存,不会涉及 CPU 的隔离。在具体应用时,可以将 slot 数量配置为机器的 CPU 核心数,尽量避免不同任务之间对 CPU 的竞争。这也是开发环境默认并行度设为机器 CPU 数量的原因。
分发规则
- 不同的Task下的subtask要分发到同一个TaskSlot中,降低数据传输、提高执行效率。
- 相同的Task下的subtask要分发到不同的TaskSlot。
Slot共享组
如果希望某个算子对应的任务完全独占一个 slot,或者只有某一部分算子共享 slot,在Flink中,可以通过在代码中使用slotSharingGroup
方法来设置slot共享组。Flink会将具有相同slot共享组的操作放入同一个slot中,同时保持不具有slot共享组的操作在其他slot中。这可以用来隔离slot。
例如,你可以这样设置:
dataStream.map(...).slotSharingGroup("group1");
默认情况下,所有操作都被分配相同的SlotSharingGroup。
这样,只有属于同一个 slot 共享组的子任务,才会开启 slot 共享;不同组之间的任务是完全隔离的,必须分配到不同的 slot 上。
并行度和Slots的例子
听了上面并行度和Slots的理论,可能有点疑惑,通过一个例子简单说明下:
假设一共有3个TaskManager,每一个TaskManager中的slot数量设置为3个,那么一共有9个task slot,表示最多能并行执行9个任务。
假设我们写了一个WordCount程序,有四个转换算子:source —> flatMap —> reduce —> sink。
当所有算子并行度相同时,容易看出source和flatMap可以优化合并算子链,于是最终有三个任务节点:source & flatMap,reduce 和sink。 如果我们没有任何并行度设置,而配置文件中默认parallelism.default=1,那么程序运行的默认并行度为1,总共有3个任务。由于不同算子的任务可以共享任务槽,所以最终占用的slot只有1个。9个slot只用了1个,有8个空闲。如图所示:
我们可以直接把并行度设置为 9,这样所有 3*9=27 个任务就会完全占用 9 个 slot。这是当前集群资源下能执行的最大并行度,计算资源得到了充分的利用。
另外再考虑对于某个算子单独设置并行度的场景。例如,如果我们考虑到输出可能是写入文件,那会希望不要并行写入多个文件,就需要设置 sink 算子的并行度为 1。这时其他的算子并行度依然为 9,所以总共会有 19 个子任务。根据 slot 共享的原则,它们最终还是会占用全部的 9 个 slot,而 sink 任务只在其中一个 slot 上执行,通过这个例子也可以明确地看到,整个流处理程序的并行度,就应该是所有算子并行度中最大的那个,这代表了运行程序需要的 slot 数量。
DataSource数据源
Flink内嵌支持的数据源非常多,比如HDFS、Socket、Kafka、Collections。Flink也提供了addSource方式,可以自定义数据源,下面介绍一些常用的数据源。
File Source
- 通过读取本地、HDFS文件创建一个数据源。
如果读取的是HDFS上的文件,那么需要导入Hadoop依赖
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.5</version> </dependency>
代码示例:每隔10s去读取HDFS指定目录下的新增文件内容,并且进行WordCount。
import org.apache.flink.api.java.io.TextInputFormat import org.apache.flink.core.fs.Path import org.apache.flink.streaming.api.functions.source.FileProcessingMode import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment //在算子转换的时候,会将数据转换成Flink内置的数据类型,所以需要将隐式转换导入进来,才能自动进行类型转换 import org.apache.flink.streaming.api.scala._ object FileSource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment //读取hdfs文件 val filePath = "hdfs://node01:9000/flink/data/" val textInputFormat = new TextInputFormat(new Path(filePath)) //每隔10s中读取 hdfs上新增文件内容 val textStream = env.readFile(textInputFormat,filePath,FileProcessingMode.PROCESS_CONTINUOUSLY,10) textStream.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1).print() env.execute() } }
readTextFile底层调用的就是readFile方法,readFile是一个更加底层的方式,使用起来会更加的灵活
Collection Source
基于本地集合的数据源,一般用于测试场景,没有太大意义。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ object CollectionSource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.fromCollection(List("hello flink msb","hello msb msb")) stream.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1).print() env.execute() } }
Socket Source
接受Socket Server中的数据。
val initStream:DataStream[String] = env.socketTextStream("node01",8888)
Kafka Source
Flink接受Kafka中的数据,首先要配置flink与kafka的连接器依赖。
Maven依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.9.2</version> </dependency>
代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment val prop = new Properties() prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092") prop.setProperty("group.id","flink-kafka-id001") prop.setProperty("key.deserializer",classOf[StringDeserializer].getName) prop.setProperty("value.deserializer",classOf[StringDeserializer].getName) /** * earliest:从头开始消费,旧数据会频繁消费 * latest:从最近的数据开始消费,不再消费旧数据 */ prop.setProperty("auto.offset.reset","latest") val kafkaStream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] { override def isEndOfStream(t: (String, String)): Boolean = false override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(consumerRecord.key(), "UTF-8") val value = new String(consumerRecord.value(), "UTF-8") (key, value) } //指定返回数据类型 override def getProducedType: TypeInformation[(String, String)] = createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) }, prop)) kafkaStream.print() env.execute()
Transformations
Transformations算子可以将一个或者多个算子转换成一个新的数据流,使用Transformations算子组合可以进行复杂的业务处理。
Map
DataStream → DataStream
遍历数据流中的每一个元素,产生一个新的元素。
FlatMap
DataStream → DataStream
遍历数据流中的每一个元素,产生N个元素 N=0,1,2,......。
Filter
DataStream → DataStream
过滤算子,根据数据流的元素计算出一个boolean类型的值,true代表保留,false代表过滤掉。
KeyBy
DataStream → KeyedStream
根据数据流中指定的字段来分区,相同指定字段值的数据一定是在同一个分区中,内部分区使用的是HashPartitioner。
指定分区字段的方式有三种:
1、根据索引号指定 2、通过匿名函数来指定 3、通过实现KeySelector接口 指定分区字段
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1, 100) stream .map(x => (x % 3, 1)) //根据索引号来指定分区字段 // .keyBy(0) //通过传入匿名函数 指定分区字段 // .keyBy(x=>x._1) //通过实现KeySelector接口 指定分区字段 .keyBy(new KeySelector[(Long, Int), Long] { override def getKey(value: (Long, Int)): Long = value._1 }) .sum(1) .print() env.execute()
Reduce
KeyedStream:根据key分组 → DataStream
注意,reduce是基于分区后的流对象进行聚合,也就是说,DataStream类型的对象无法调用reduce方法。
.reduce((v1,v2) => (v1._1,v1._2 + v2._2))
代码例子:读取kafka数据,实时统计各个卡口下的车流量。
- 实现kafka生产者,读取卡口数据并且往kafka中生产数据:
val prop = new Properties() prop.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092") prop.setProperty("key.serializer", classOf[StringSerializer].getName) prop.setProperty("value.serializer", classOf[StringSerializer].getName) val producer = new KafkaProducer[String, String](prop) val iterator = Source.fromFile("data/carFlow_all_column_test.txt", "UTF-8").getLines() for (i <- 1 to 100) { for (line <- iterator) { //将需要的字段值 生产到kafka集群 car_id monitor_id event-time speed //车牌号 卡口号 车辆通过时间 通过速度 val splits = line.split(",") val monitorID = splits(0).replace("'","") val car_id = splits(2).replace("'","") val eventTime = splits(4).replace("'","") val speed = splits(6).replace("'","") if (!"00000000".equals(car_id)) { val event = new StringBuilder event.append(monitorID + "\t").append(car_id+"\t").append(eventTime + "\t").append(speed) producer.send(new ProducerRecord[String, String]("flink-kafka", event.toString())) } Thread.sleep(500) } }
- 实现kafka消费者:
val env = StreamExecutionEnvironment.getExecutionEnvironment val props = new Properties() props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092") props.setProperty("key.deserializer",classOf[StringDeserializer].getName) props.setProperty("value.deserializer",classOf[StringDeserializer].getName) props.setProperty("group.id","flink001") props.getProperty("auto.offset.reset","latest") val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(),props)) stream.map(data => { val splits = data.split("\t") val carFlow = CarFlow(splits(0),splits(1),splits(2),splits(3).toDouble) (carFlow,1) }).keyBy(_._1.monitorId) .sum(1) .print() env.execute()
Aggregations
KeyedStream → DataStream
Aggregations代表的是一类聚合算子,具体算子如下:
keyedStream.sum(0) keyedStream.sum("key") keyedStream.min(0) keyedStream.min("key") keyedStream.max(0) keyedStream.max("key") keyedStream.minBy(0) keyedStream.minBy("key") keyedStream.maxBy(0) keyedStream.maxBy("key")
代码例子:实时统计各个卡口最先通过的汽车的信息
val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(),props)) stream.map(data => { val splits = data.split("\t") val carFlow = CarFlow(splits(0),splits(1),splits(2),splits(3).toDouble) val eventTime = carFlow.eventTime val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val date = format.parse(eventTime) (carFlow,date.getTime) }).keyBy(_._1.monitorId) .min(1) .map(_._1) .print() env.execute()
Union 真合并
DataStream → DataStream
Union of two or more data streams creating a new stream containing all the elements from all the streams
合并两个或者更多的数据流产生一个新的数据流,这个新的数据流中包含了所合并的数据流的元素。
注意:需要保证数据流中元素类型一致
val env = StreamExecutionEnvironment.getExecutionEnvironment val ds1 = env.fromCollection(List(("a",1),("b",2),("c",3))) val ds2 = env.fromCollection(List(("d",4),("e",5),("f",6))) val ds3 = env.fromCollection(List(("g",7),("h",8))) val unionStream = ds1.union(ds2,ds3) unionStream.print() env.execute() 输出: ("a", 1) ("b", 2) ("c", 3) ("d", 4) ("e", 5) ("f", 6) ("g", 7) ("h", 8)