全网最详细4W字Flink入门笔记(上) 3

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: 全网最详细4W字Flink入门笔记(上)

Connect 假合并

DataStream,DataStream → ConnectedStreams

合并两个数据流并且保留两个数据流的数据类型,能够共享两个流的状态

val ds1 = env.socketTextStream("node01", 8888)
val ds2 = env.socketTextStream("node01", 9999)
val wcStream1 = ds1.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1)
val wcStream2 = ds2.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1)
val restStream: ConnectedStreams[(String, Int), (String, Int)] = wcStream2.connect(wcStream1)

CoMap, CoFlatMap

ConnectedStreams → DataStream

CoMap, CoFlatMap并不是具体算子名字,而是一类操作名称

凡是基于ConnectedStreams数据流做map遍历,这类操作叫做CoMap

凡是基于ConnectedStreams数据流做flatMap遍历,这类操作叫做CoFlatMap

CoMap第一种实现方式:

restStream.map(new CoMapFunction[(String,Int),(String,Int),(String,Int)] {
      //对第一个数据流做计算
      override def map1(value: (String, Int)): (String, Int) = {
        (value._1+":first",value._2+100)
      }
      //对第二个数据流做计算
      override def map2(value: (String, Int)): (String, Int) = {
        (value._1+":second",value._2*100)
      }
    }).print()

CoMap第二种实现方式:

restStream.map(
      //对第一个数据流做计算
      x=>{(x._1+":first",x._2+100)}
      //对第二个数据流做计算
      ,y=>{(y._1+":second",y._2*100)}
    ).print()

代码例子:现有一个配置文件存储车牌号与车主的真实姓名,通过数据流中的车牌号实时匹配出对应的车主姓名(注意:配置文件可能实时改变)

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val filePath = "data/carId2Name"
val carId2NameStream = env.readFile(new TextInputFormat(new Path(filePath)),filePath,FileProcessingMode.PROCESS_CONTINUOUSLY,10)
val dataStream = env.socketTextStream("node01",8888)
dataStream.connect(carId2NameStream).map(new CoMapFunction[String,String,String] {
    private val hashMap = new mutable.HashMap[String,String]()
    override def map1(value: String): String = {
        hashMap.getOrElse(value,"not found name")
    }
    override def map2(value: String): String = {
        val splits = value.split(" ")
        hashMap.put(splits(0),splits(1))
        value + "加载完毕..."
    }
}).print()
env.execute()

CoFlatMap第一种实现方式:

ds1.connect(ds2).flatMap((x,c:Collector[String])=>{
      //对第一个数据流做计算
      x.split(" ").foreach(w=>{
        c.collect(w)
      })
    }
      //对第二个数据流做计算
      ,(y,c:Collector[String])=>{
      y.split(" ").foreach(d=>{
        c.collect(d)
      })
    }).print

CoFlatMap第二种实现方式:

ds1.connect(ds2).flatMap(
      //对第一个数据流做计算
      x=>{
      x.split(" ")
    }
      //对第二个数据流做计算
      ,y=>{
        y.split(" ")
      }).print()

CoFlatMap第三种实现方式:

ds1.connect(ds2).flatMap(new CoFlatMapFunction[String,String,(String,Int)] {
    //对第一个数据流做计算 
    override def flatMap1(value: String, out: Collector[(String, Int)]): Unit = {
        val words = value.split(" ")
        words.foreach(x=>{
          out.collect((x,1))
        })
      }
    //对第二个数据流做计算
    override def flatMap2(value: String, out: Collector[(String, Int)]): Unit = {
        val words = value.split(" ")
        words.foreach(x=>{
          out.collect((x,1))
        })
      }
    }).print()

Split

DataStream → SplitStream

根据条件将一个流分成两个或者更多的流

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,100)
val splitStream = stream.split(
    d => {
        d % 2 match {
            case 0 => List("even")
            case 1 => List("odd")
        }
    }
)
splitStream.select("even").print()
env.execute()

Select

SplitStream → DataStream

从SplitStream中选择一个或者多个数据流

splitStream.select("even").print()

Iterate

DataStream → IterativeStream → DataStream

Iterate算子提供了对数据流迭代的支持

迭代由两部分组成:迭代体、终止迭代条件,不满足终止迭代条件的数据流会返回到stream流中,进行下一次迭代,满足终止迭代条件的数据流继续往下游发送:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val initStream = env.socketTextStream("node01",8888)
val stream = initStream.map(_.toLong)
stream.iterate {
    iteration => {
        //定义迭代逻辑
        val iterationBody = iteration.map ( x => {
            println(x)
            if(x > 0) x - 1
            else x
        } )
        //> 0  大于0的值继续返回到stream流中,当 <= 0 继续往下游发送
        (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
    }
}.print()
env.execute()

函数类和富函数类

在使用Flink算子的时候,可以通过传入匿名函数和函数类对象。

函数类分为:普通函数类、富函数类。

富函数类相比于普通的函数,可以获取运行环境的上下文(Context),拥有一些生命周期方法,管理状态,可以实现更加复杂的功能

普通函数类 富函数类
MapFunction RichMapFunction
FlatMapFunction RichFlatMapFunction
FilterFunction RichFilterFunction
...... ......
  • 使用普通函数类过滤掉车速高于100的车辆信息
val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.readTextFile("./data/carFlow_all_column_test.txt")
    stream.filter(new FilterFunction[String] {
      override def filter(value: String): Boolean = {
        if (value != null && !"".equals(value)) {
          val speed = value.split(",")(6).replace("'", "").toLong
          if (speed > 100)
            false
          else
            true
        }else
          false
      }
    }).print()
    env.execute()
  • 使用富函数类,将车牌号转化成车主真实姓名,映射表存储在Redis中

添加redis依赖,数据写入到redis。

<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
  <version>${redis.version}</version>
</dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("node01", 8888)
    stream.map(new RichMapFunction[String, String] {
      private var jedis: Jedis = _
      //初始化函数  在每一个thread启动的时候(处理元素的时候,会调用一次)
      //在open中可以创建连接redis的连接
      override def open(parameters: Configuration): Unit = {
        //getRuntimeContext可以获取flink运行的上下文环境  AbstractRichFunction抽象类提供的
        val taskName = getRuntimeContext.getTaskName
        val subtasks = getRuntimeContext.getTaskNameWithSubtasks
        println("=========open======"+"taskName:" + taskName + "\tsubtasks:"+subtasks)
        jedis = new Jedis("node01", 6379)
        jedis.select(3)
      }
      //每处理一个元素,就会调用一次
      override def map(value: String): String = {
        val name = jedis.get(value)
        if(name == null){
          "not found name"
        }else
          name
      }
      //元素处理完毕后,会调用close方法
      //关闭redis连接
      override def close(): Unit = {
        jedis.close()
      }
    }).setParallelism(2).print()
    env.execute()

ProcessFunction(处理函数)

ProcessFunction属于低层次的API,我们前面讲的map、filter、flatMap等算子都是基于这层高层封装出来的。

越低层次的API,功能越强大,用户能够获取的信息越多,比如可以拿到元素状态信息、事件时间、设置定时器等

  • 代码例子:监控每辆汽车,车速超过100迈,2s钟后发出超速的警告通知:
object MonitorOverSpeed02 {
  case class CarInfo(carId:String,speed:Long)
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("node01",8888)
    stream.map(data => {
      val splits = data.split(" ")
      val carId = splits(0)
      val speed = splits(1).toLong
      CarInfo(carId,speed)
    }).keyBy(_.carId)
      //KeyedStream调用process需要传入KeyedProcessFunction
      //DataStream调用process需要传入ProcessFunction
      .process(new KeyedProcessFunction[String,CarInfo,String] {
      override def processElement(value: CarInfo, ctx: KeyedProcessFunction[String, CarInfo, String]#Context, out: Collector[String]): Unit = {
        val currentTime = ctx.timerService().currentProcessingTime()
        if(value.speed > 100 ){
          val timerTime = currentTime + 2 * 1000
          ctx.timerService().registerProcessingTimeTimer(timerTime)
        }
      }
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, CarInfo, String]#OnTimerContext, out: Collector[String]): Unit = {
        var warnMsg = "warn... time:" + timestamp + "  carID:" + ctx.getCurrentKey
        out.collect(warnMsg)
      }
    }).print()
    env.execute()
  }
}

总结

使用Map Filter....算子的适合,可以直接传入一个匿名函数、普通函数类对象(MapFuncation FilterFunction),富函数类对象(RichMapFunction、RichFilterFunction),传入的富函数类对象:可以拿到任务执行的上下文,生命周期方法、管理状态.....。

如果业务比较复杂,通过Flink提供这些算子无法满足我们的需求,通过process算子直接使用比较底层API(获取上下文、生命周期方法、测输出流、时间服务等)。

KeyedDataStream调用process,KeyedProcessFunction 。

DataStream调用process,ProcessFunction 。

Sink

Flink内置了大量sink,可以将Flink处理后的数据输出到HDFS、kafka、Redis、ES、MySQL等。

工程场景中,会经常消费kafka中数据,处理结果存储到Redis或者MySQL中

Redis Sink

Flink处理的数据可以存储到Redis中,以便实时查询

Flink内嵌连接Redis的连接器,只需要导入连接Redis的依赖就可以

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
</dependency>

WordCount写入到Redis中,选择的是HSET数据类型,代码如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("node01",8888)
    val result = stream.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(0)
      .sum(1)
    //若redis是单机
    val config = new FlinkJedisPoolConfig.Builder().setDatabase(3).setHost("node01").setPort(6379).build()
    //如果是 redis集群
    /*val addresses = new util.HashSet[InetSocketAddress]()
    addresses.add(new InetSocketAddress("node01",6379))
    addresses.add(new InetSocketAddress("node01",6379))
   val clusterConfig = new FlinkJedisClusterConfig.Builder().setNodes(addresses).build()*/
    result.addSink(new RedisSink[(String,Int)](config,new RedisMapper[(String,Int)] {
      override def getCommandDescription: RedisCommandDescription = {
        new RedisCommandDescription(RedisCommand.HSET,"wc")
      }
      override def getKeyFromData(t: (String, Int))  = {
        t._1
      }
      override def getValueFromData(t: (String, Int))  = {
        t._2 + ""
      }
    }))
    env.execute()

Kafka Sink

处理结果写入到kafka topic中,Flink也是默认支持,需要添加连接器依赖,跟读取kafka数据用的连接器依赖相同,之前添加过就不需要再次添加了

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>
import java.lang
import java.util.Properties
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
object KafkaSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("node01",8888)
    val result = stream.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(0)
      .sum(1)
    val props = new Properties()
    props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
//    props.setProperty("key.serializer",classOf[StringSerializer].getName)
//    props.setProperty("value.serializer",classOf[StringSerializer].getName)
    /**
    public FlinkKafkaProducer(
     FlinkKafkaProducer(defaultTopic: String, serializationSchema: KafkaSerializationSchema[IN], producerConfig: Properties, semantic: FlinkKafkaProducer.Semantic)
      */
    result.addSink(new FlinkKafkaProducer[(String,Int)]("wc",new KafkaSerializationSchema[(String, Int)] {
      override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
        new ProducerRecord("wc",element._1.getBytes(),(element._2+"").getBytes())
      }
    },props,FlinkKafkaProducer.Semantic.EXACTLY_ONCE))
    env.execute()
  }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
8月前
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
276 3
|
Java Linux API
flink入门-流处理
flink入门-流处理
167 0
|
存储 Java Linux
10分钟入门Flink--安装
本文介绍Flink的安装步骤,主要是Flink的独立部署模式,它不依赖其他平台。文中内容分为4块:前置准备、Flink本地模式搭建、Flink Standalone搭建、Flink Standalong HA搭建。
10分钟入门Flink--安装
|
分布式计算 Java API
Flink教程(04)- Flink入门案例
Flink教程(04)- Flink入门案例
183 0
|
8月前
|
存储 缓存 算法
[尚硅谷flink] 检查点笔记
[尚硅谷flink] 检查点笔记
210 3
|
8月前
|
分布式计算 监控 API
flink 入门编程day02
flink 入门编程day02
|
8月前
|
存储 传感器 消息中间件
[尚硅谷 flink] 状态管理 笔记
[尚硅谷 flink] 状态管理 笔记
|
8月前
|
SQL 关系型数据库 Apache
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门
1439 3
|
存储 缓存 分布式计算
Flink教程(02)- Flink入门(下)
Flink教程(02)- Flink入门(下)
126 0
|
SQL 消息中间件 API
Flink教程(02)- Flink入门(上)
Flink教程(02)- Flink入门(上)
223 0