Spark Streaming之UpdateStateByKey算子详解

简介: 笔记

流处理中,有个状态(state)的概念:


无状态的:当前批次处理完之后,数据只与当前批次有关

有状态的:前后批次的数据处理完之后,之间是有关系的

updateStateByKey解读


updateStateByKey:返回的是一个新的并且带有状态的DStream,会根据每一个key进行更新,更新的规则是根据自己定义的function来确定的。


updateStateByKey操作允许您在使用新信息不断更新时保持任意状态。要使用它,您必须执行两个步骤:


定义状态:状态可以是任意数据类型。

定义状态更新功能:使用函数指定如何使用先前状态和输入流中的新值更新状态。

对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在 batch中是否有新的数据。

如果state更新函数返回none,那么key对应的state就会被删除。 当然,对于每个新出现的key,也会执行state更新函数。


注意,updateStateByKey操作,要求必须开启Checkpoint机制。


案例:基于缓存的实时wordcount程序(在实际业务场景中,这个是非常有用的)


定义state;以wordcount为例,value是作为state来处理的,是根据key来更新我们的value的

定义一个state update function:将旧state的值与新state的值根据所定义的规则给相互作用在一起

Java语言实现:

package com.kfk.spark.common;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/14
 * @time : 8:23 下午
 */
public class CommStreamingContext {
    public static JavaStreamingContext getJssc(){
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext");
        return new JavaStreamingContext(conf, Durations.seconds(2));
    }
}
package com.kfk.spark.stream;
import com.kfk.spark.common.CommStreamingContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/15
 * @time : 10:08 下午
 */
public class StreamingUpdateStateByKeyJava {
    public static void main(String[] args) throws InterruptedException {
        JavaStreamingContext jssc = CommStreamingContext.getJssc();
        // 要使用UpdateStateByKey算子就必须设置一个Checkpoint目录,开启Checkpoint机制
        // 以便于内存数据丢失时,可以从Checkpoint中恢复数据
        jssc.checkpoint("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/sparkCheckpoint");
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("bigdata-pro-m04",9999);
        // flatmap
        JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        // map
        JavaPairDStream<String,Integer> pair =  words.mapToPair(word -> new Tuple2<>(word,1));
        // 通过spark来维护一份每个单词的全局统计次数
        JavaPairDStream<String,Integer> wordcount = pair.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
            @Override
            public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {
                Integer newValues = 0;
                if (state.isPresent()){
                    newValues = state.get();
                }
                for (Integer value : values){
                    newValues += value;
                }
                return Optional.of(newValues);
            }
        });
        // lambda表达式写法
//        JavaPairDStream<String,Integer> wordcount = pair.updateStateByKey((values, state) -> {
//            Integer newValues = 0;
//            if (state.isPresent()){
//                newValues = state.get();
//            }
//
//            for (Integer value : values){
//                newValues += value;
//            }
//
//            return Optional.of(newValues);
//
//        });
        wordcount.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

Scala语言实现:

package com.kfk.spark.stream
import com.kfk.spark.common.CommStreamingContextScala
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/16
 * @time : 8:04 下午
 */
object StreamingUpdateStateByKeyScala {
    def main(args: Array[String]): Unit = {
        val jssc = CommStreamingContextScala.getJssc;
        val lines = jssc.socketTextStream("bigdata-pro-m04", 9999)
        // 要使用UpdateStateByKey算子就必须设置一个Checkpoint目录,开启Checkpoint机制
        // 以便于内存数据丢失时,可以从Checkpoint中恢复数据
        jssc.checkpoint("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/sparkCheckpoint")
        // flatmap
        val words = lines.flatMap(word => word.split(" "))
        // map
        val pair = words.map(x => (x,1))
        // updateStateByKey
        val wordcount = pair.updateStateByKey((values : Seq[Int], state : Option[Int]) => {
            var newValue = state.getOrElse(0)
            for (value <- values){
                newValue += value
            }
            Option(newValue)
        })
        wordcount.print()
        jssc.start()
        jssc.awaitTermination()
    }
}

运行结果:

-------------------------------------------
Time: 1608175348000 ms
-------------------------------------------
(hive,2)
(php,1)
(python,1)
(java,6)
-------------------------------------------
Time: 1608175354000 ms
-------------------------------------------
(hive,2)
(php,1)
(python,2)
(java,7)
(hadoop,1)vb


相关文章
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
59 0
|
3月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
117 0
|
2月前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
76 0
|
3月前
|
SQL 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(一)
64 0
|
3月前
|
存储 分布式计算 大数据
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
大数据-101 Spark Streaming DStream转换 窗口操作状态 跟踪操作 附带多个案例(二)
65 0
|
3月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(一)
49 0
|
3月前
|
SQL 分布式计算 大数据
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
大数据-100 Spark 集群 Spark Streaming DStream转换 黑名单过滤的三种实现方式(二)
42 0
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
大数据-99 Spark 集群 Spark Streaming DStream 文件数据流、Socket、RDD队列流
45 0
|
3月前
|
消息中间件 分布式计算 Kafka
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
大数据-98 Spark 集群 Spark Streaming 基础概述 架构概念 执行流程 优缺点
58 0