Connect 假合并
DataStream,DataStream → ConnectedStreams
合并两个数据流并且保留两个数据流的数据类型,能够共享两个流的状态
val ds1 = env.socketTextStream("node01", 8888) val ds2 = env.socketTextStream("node01", 9999) val wcStream1 = ds1.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1) val wcStream2 = ds2.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1) val restStream: ConnectedStreams[(String, Int), (String, Int)] = wcStream2.connect(wcStream1)
CoMap, CoFlatMap
ConnectedStreams → DataStream
CoMap, CoFlatMap并不是具体算子名字,而是一类操作名称
凡是基于ConnectedStreams数据流做map遍历,这类操作叫做CoMap
凡是基于ConnectedStreams数据流做flatMap遍历,这类操作叫做CoFlatMap
CoMap第一种实现方式:
restStream.map(new CoMapFunction[(String,Int),(String,Int),(String,Int)] { //对第一个数据流做计算 override def map1(value: (String, Int)): (String, Int) = { (value._1+":first",value._2+100) } //对第二个数据流做计算 override def map2(value: (String, Int)): (String, Int) = { (value._1+":second",value._2*100) } }).print()
CoMap第二种实现方式:
restStream.map( //对第一个数据流做计算 x=>{(x._1+":first",x._2+100)} //对第二个数据流做计算 ,y=>{(y._1+":second",y._2*100)} ).print()
代码例子:现有一个配置文件存储车牌号与车主的真实姓名,通过数据流中的车牌号实时匹配出对应的车主姓名(注意:配置文件可能实时改变)
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val filePath = "data/carId2Name" val carId2NameStream = env.readFile(new TextInputFormat(new Path(filePath)),filePath,FileProcessingMode.PROCESS_CONTINUOUSLY,10) val dataStream = env.socketTextStream("node01",8888) dataStream.connect(carId2NameStream).map(new CoMapFunction[String,String,String] { private val hashMap = new mutable.HashMap[String,String]() override def map1(value: String): String = { hashMap.getOrElse(value,"not found name") } override def map2(value: String): String = { val splits = value.split(" ") hashMap.put(splits(0),splits(1)) value + "加载完毕..." } }).print() env.execute()
CoFlatMap第一种实现方式:
ds1.connect(ds2).flatMap((x,c:Collector[String])=>{ //对第一个数据流做计算 x.split(" ").foreach(w=>{ c.collect(w) }) } //对第二个数据流做计算 ,(y,c:Collector[String])=>{ y.split(" ").foreach(d=>{ c.collect(d) }) }).print
CoFlatMap第二种实现方式:
ds1.connect(ds2).flatMap( //对第一个数据流做计算 x=>{ x.split(" ") } //对第二个数据流做计算 ,y=>{ y.split(" ") }).print()
CoFlatMap第三种实现方式:
ds1.connect(ds2).flatMap(new CoFlatMapFunction[String,String,(String,Int)] { //对第一个数据流做计算 override def flatMap1(value: String, out: Collector[(String, Int)]): Unit = { val words = value.split(" ") words.foreach(x=>{ out.collect((x,1)) }) } //对第二个数据流做计算 override def flatMap2(value: String, out: Collector[(String, Int)]): Unit = { val words = value.split(" ") words.foreach(x=>{ out.collect((x,1)) }) } }).print()
Split
DataStream → SplitStream
根据条件将一个流分成两个或者更多的流
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.generateSequence(1,100) val splitStream = stream.split( d => { d % 2 match { case 0 => List("even") case 1 => List("odd") } } ) splitStream.select("even").print() env.execute()
Select
SplitStream → DataStream
从SplitStream中选择一个或者多个数据流
splitStream.select("even").print()
Iterate
DataStream → IterativeStream → DataStream
Iterate算子提供了对数据流迭代的支持
迭代由两部分组成:迭代体、终止迭代条件,不满足终止迭代条件的数据流会返回到stream流中,进行下一次迭代,满足终止迭代条件的数据流继续往下游发送:
val env = StreamExecutionEnvironment.getExecutionEnvironment val initStream = env.socketTextStream("node01",8888) val stream = initStream.map(_.toLong) stream.iterate { iteration => { //定义迭代逻辑 val iterationBody = iteration.map ( x => { println(x) if(x > 0) x - 1 else x } ) //> 0 大于0的值继续返回到stream流中,当 <= 0 继续往下游发送 (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0)) } }.print() env.execute()
函数类和富函数类
在使用Flink算子的时候,可以通过传入匿名函数和函数类对象。
函数类分为:普通函数类、富函数类。
富函数类相比于普通的函数,可以获取运行环境的上下文(Context),拥有一些生命周期方法,管理状态,可以实现更加复杂的功能
普通函数类 | 富函数类 |
MapFunction | RichMapFunction |
FlatMapFunction | RichFlatMapFunction |
FilterFunction | RichFilterFunction |
...... | ...... |
- 使用普通函数类过滤掉车速高于100的车辆信息
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.readTextFile("./data/carFlow_all_column_test.txt") stream.filter(new FilterFunction[String] { override def filter(value: String): Boolean = { if (value != null && !"".equals(value)) { val speed = value.split(",")(6).replace("'", "").toLong if (speed > 100) false else true }else false } }).print() env.execute()
- 使用富函数类,将车牌号转化成车主真实姓名,映射表存储在Redis中
添加redis依赖,数据写入到redis。
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>${redis.version}</version> </dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("node01", 8888) stream.map(new RichMapFunction[String, String] { private var jedis: Jedis = _ //初始化函数 在每一个thread启动的时候(处理元素的时候,会调用一次) //在open中可以创建连接redis的连接 override def open(parameters: Configuration): Unit = { //getRuntimeContext可以获取flink运行的上下文环境 AbstractRichFunction抽象类提供的 val taskName = getRuntimeContext.getTaskName val subtasks = getRuntimeContext.getTaskNameWithSubtasks println("=========open======"+"taskName:" + taskName + "\tsubtasks:"+subtasks) jedis = new Jedis("node01", 6379) jedis.select(3) } //每处理一个元素,就会调用一次 override def map(value: String): String = { val name = jedis.get(value) if(name == null){ "not found name" }else name } //元素处理完毕后,会调用close方法 //关闭redis连接 override def close(): Unit = { jedis.close() } }).setParallelism(2).print() env.execute()
ProcessFunction(处理函数)
ProcessFunction属于低层次的API,我们前面讲的map、filter、flatMap等算子都是基于这层高层封装出来的。
越低层次的API,功能越强大,用户能够获取的信息越多,比如可以拿到元素状态信息、事件时间、设置定时器等
- 代码例子:监控每辆汽车,车速超过100迈,2s钟后发出超速的警告通知:
object MonitorOverSpeed02 { case class CarInfo(carId:String,speed:Long) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("node01",8888) stream.map(data => { val splits = data.split(" ") val carId = splits(0) val speed = splits(1).toLong CarInfo(carId,speed) }).keyBy(_.carId) //KeyedStream调用process需要传入KeyedProcessFunction //DataStream调用process需要传入ProcessFunction .process(new KeyedProcessFunction[String,CarInfo,String] { override def processElement(value: CarInfo, ctx: KeyedProcessFunction[String, CarInfo, String]#Context, out: Collector[String]): Unit = { val currentTime = ctx.timerService().currentProcessingTime() if(value.speed > 100 ){ val timerTime = currentTime + 2 * 1000 ctx.timerService().registerProcessingTimeTimer(timerTime) } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, CarInfo, String]#OnTimerContext, out: Collector[String]): Unit = { var warnMsg = "warn... time:" + timestamp + " carID:" + ctx.getCurrentKey out.collect(warnMsg) } }).print() env.execute() } }
总结
使用Map Filter....算子的适合,可以直接传入一个匿名函数、普通函数类对象(MapFuncation FilterFunction),富函数类对象(RichMapFunction、RichFilterFunction),传入的富函数类对象:可以拿到任务执行的上下文,生命周期方法、管理状态.....。
如果业务比较复杂,通过Flink提供这些算子无法满足我们的需求,通过process算子直接使用比较底层API(获取上下文、生命周期方法、测输出流、时间服务等)。
KeyedDataStream调用process,KeyedProcessFunction 。
DataStream调用process,ProcessFunction 。
Sink
Flink内置了大量sink,可以将Flink处理后的数据输出到HDFS、kafka、Redis、ES、MySQL等。
工程场景中,会经常消费kafka中数据,处理结果存储到Redis或者MySQL中
Redis Sink
Flink处理的数据可以存储到Redis中,以便实时查询
Flink内嵌连接Redis的连接器,只需要导入连接Redis的依赖就可以
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> </dependency>
WordCount写入到Redis中,选择的是HSET数据类型,代码如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("node01",8888) val result = stream.flatMap(_.split(" ")) .map((_, 1)) .keyBy(0) .sum(1) //若redis是单机 val config = new FlinkJedisPoolConfig.Builder().setDatabase(3).setHost("node01").setPort(6379).build() //如果是 redis集群 /*val addresses = new util.HashSet[InetSocketAddress]() addresses.add(new InetSocketAddress("node01",6379)) addresses.add(new InetSocketAddress("node01",6379)) val clusterConfig = new FlinkJedisClusterConfig.Builder().setNodes(addresses).build()*/ result.addSink(new RedisSink[(String,Int)](config,new RedisMapper[(String,Int)] { override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET,"wc") } override def getKeyFromData(t: (String, Int)) = { t._1 } override def getValueFromData(t: (String, Int)) = { t._2 + "" } })) env.execute()
Kafka Sink
处理结果写入到kafka topic中,Flink也是默认支持,需要添加连接器依赖,跟读取kafka数据用的连接器依赖相同,之前添加过就不需要再次添加了
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink-version}</version> </dependency>
import java.lang import java.util.Properties import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema} import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.StringSerializer object KafkaSink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("node01",8888) val result = stream.flatMap(_.split(" ")) .map((_, 1)) .keyBy(0) .sum(1) val props = new Properties() props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092") // props.setProperty("key.serializer",classOf[StringSerializer].getName) // props.setProperty("value.serializer",classOf[StringSerializer].getName) /** public FlinkKafkaProducer( FlinkKafkaProducer(defaultTopic: String, serializationSchema: KafkaSerializationSchema[IN], producerConfig: Properties, semantic: FlinkKafkaProducer.Semantic) */ result.addSink(new FlinkKafkaProducer[(String,Int)]("wc",new KafkaSerializationSchema[(String, Int)] { override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = { new ProducerRecord("wc",element._1.getBytes(),(element._2+"").getBytes()) } },props,FlinkKafkaProducer.Semantic.EXACTLY_ONCE)) env.execute() } }