Spark Streaming之foreachRDD操作详解

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 笔记

DStream中的所有计算,都是由output操作触发的,比如print()。如果没有任何output操作, 那么,压根儿就不会执行定义的计算逻辑。


此外,即使你使用了foreachRDD output操作,也必须在里面对RDD执行action操作,才能触 发对每一个batch的计算逻辑。否则,光有foreachRDD output操作,在里面没有对RDD执行 action操作,也不会触发任何逻辑。1.png通常在foreachRDD中,都会创建一个Connection,比如JDBC Connection,然后通过Connection将数据写入外部存储。


误区一:在RDD的foreach操作外部,创建Connection

这种方式是错误的,因为它会导致Connection对象被序列化后传输到每个Task中。而这种Connection对象, 实际上一般是不支持序列化的,也就无法被传输。

dstream.foreachRDD( rdd => {
  val connection = createNewConnection() 
  rdd.foreach( record => {
    connection.send(record) 
  } 
})

误区二:在RDD的foreach操作内部,创建Connection

这种方式是可以的,但是效率低下。因为它会导致对于RDD中的每一条数据,都创建一个Connection对象。 而通常来说,Connection的创建,是很消耗性能的。

dstream.foreachRDD( rdd => {
  rdd.foreach( record => {
    val connection = createNewConnection() 
    connection.send(record) 
    connection.close() 
  })
})

合理方式一:使用RDD的foreachPartition操作,并且在该操作内部,创建Connection对象

这样就相当于为RDD的每个partition创建一个Connection对象,节省资源的多了。

dstream.foreachRDD( rdd => {
  rdd.foreachPartition( partitionOfRecords => {
    val connection = createNewConnection()
    partitionOfRecords.foreach( record => {
      connection.send(record)
      connection.close() 
    })
  })
})

合理方式二:自己手动封装一个静态连接池,使用RDD的foreachPartition操作,并且在该操作内部,从 静态连接池中,通过静态方法,获取到一个连接,使用之后再还回去。

这样的话,甚至在多个RDD的 partition之间,也可以复用连接了。而且可以让连接池采取懒创建的策略,并且空闲一段时间后,将其释放掉。

dstream.foreachRDD( rdd => {
  rdd.foreachPartition( partitionOfRecords => {
    val connection = ConnectionPool.getConnection() 
    partitionOfRecords.foreach( record => {
      connection.send(record)
    )}
    ConnectionPool.returnConnection(connection) 
  })
})

案例:改写UpdateStateByKeyWordCount,将每次统计出来的全局的单词计数,写入一份,到MySQL数 据库中。

首先封装一个静态连接池:

package com.kfk.spark.common;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.LinkedList;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/18
 * @time : 7:26 下午
 */
public class ConnectionPool {
    private static LinkedList<Connection> connectionQueue;
    /**
     * 加载驱动
     */
    static {
        try {
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
    /**
     * 获取连接,多线程访问并发控制
     * @return
     */
    public synchronized static Connection getConnection(){
        try {
            if (connectionQueue == null){
                connectionQueue = new LinkedList<Connection>();
                for (int i = 0;i < 10;i++){
                    Connection conn = DriverManager.getConnection(
                            "jdbc:mysql://bigdata-pro-m04:3306/spark?useSSL=false",
                            "root",
                            "199911"
                    );
                    connectionQueue.push(conn);
                }
            }
        } catch (Exception e){
            e.printStackTrace();
        }
        return connectionQueue.poll();
    }
    /**
     * 还回去一个连接
     * @param conn
     */
    public static void returnConnection(Connection conn){
        connectionQueue.push(conn);
    }
}

编写实现功能代码:

package com.kfk.spark.common;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/14
 * @time : 8:23 下午
 */
public class CommStreamingContext {
    public static JavaStreamingContext getJssc(){
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext");
        return new JavaStreamingContext(conf, Durations.seconds(2));
    }
}
package com.kfk.spark.foreachrdd_project;
import com.kfk.spark.common.CommStreamingContext;
import com.kfk.spark.common.ConnectionPool;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/18
 * @time : 7:49 下午
 */
public class ForeachPersistMySQL {
    public static void main(String[] args) throws InterruptedException {
        JavaStreamingContext jssc = CommStreamingContext.getJssc();
        // 要使用UpdateStateByKey算子就必须设置一个Checkpoint目录,开启Checkpoint机制
        // 以便于内存数据丢失时,可以从Checkpoint中恢复数据
        jssc.checkpoint("hdfs://bigdata-pro-m04:9000/user/caizhengjie/datas/sparkCheckpoint");
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("bigdata-pro-m04",9999);
        // flatmap
        JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
        // map
        JavaPairDStream<String,Integer> pair =  words.mapToPair(word -> new Tuple2<>(word,1));
        // 通过spark来维护一份每个单词的全局统计次数
        JavaPairDStream<String,Integer> wordcount = pair.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
            @Override
            public Optional<Integer> call(List<Integer> values, Optional<Integer> state) throws Exception {
                Integer newValues = 0;
                if (state.isPresent()){
                    newValues = state.get();
                }
                for (Integer value : values){
                    newValues += value;
                }
                return Optional.of(newValues);
            }
        });
        // foreachRDD
        wordcount.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
            @Override
            public void call(JavaPairRDD<String, Integer> stringIntegerJavaPairRdd) throws Exception {
                stringIntegerJavaPairRdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Integer>>>() {
                    @Override
                    public void call(Iterator<Tuple2<String, Integer>> tuple2Iterator) throws Exception {
                        Tuple2<String, Integer> wordcount = null;
                        Connection conn = ConnectionPool.getConnection();
                        while (tuple2Iterator.hasNext()){
                            wordcount = tuple2Iterator.next();
                            String sql = "insert into spark.wordcount(word,count) values('"+wordcount._1+"', '"+wordcount._2+"')";
                            Statement statement = conn.createStatement();
                            statement.executeUpdate(sql);
                        }
                        ConnectionPool.returnConnection(conn);
                    }
                });
            }
        });
        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }
}





相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
7天前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
24 0
|
1月前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
38 1
|
1月前
|
分布式计算 资源调度 测试技术
“Spark Streaming异常处理秘籍:揭秘如何驯服实时数据流的猛兽,守护你的应用稳如泰山,不容错过!”
【8月更文挑战第7天】Spark Streaming 是 Apache Spark 中的关键组件,用于实时数据流处理。部署时可能遭遇数据问题、资源限制或逻辑错误等异常。合理处理这些异常对于保持应用稳定性至关重要。基础在于理解其异常处理机制,通过 DSC 将数据流切分为 RDD。对于数据异常,可采用 try-catch 结构捕获并处理;资源层面异常需优化 Spark 配置,如调整内存分配;逻辑异常则需加强单元测试及集成测试。结合监控工具,可全面提升应用的健壮性和可靠性。
58 3
|
3月前
|
分布式计算 Java Scala
如何处理 Spark Streaming 的异常情况?
【6月更文挑战第16天】如何处理 Spark Streaming 的异常情况?
172 56
|
2月前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之 Spark Local模式启动报错,是什么原因
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
2月前
|
SQL 分布式计算 数据处理
MaxCompute操作报错合集之使用Spark查询时函数找不到的原因是什么
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
2月前
|
分布式计算 监控 数据处理
Spark Streaming:解锁实时数据处理的力量
【7月更文挑战第15天】Spark Streaming作为Spark框架的一个重要组成部分,为实时数据处理提供了高效、可扩展的解决方案。通过其微批处理的工作模式和强大的集成性、容错性特性,Spark Streaming能够轻松应对各种复杂的实时数据处理场景。然而,在实际应用中,我们还需要根据具体需求和资源情况进行合理的部署和优化,以确保系统的稳定性和高效性。
|
3月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
2月前
|
分布式计算 Apache Spark
|
3月前
|
分布式计算 大数据 数据处理
MaxCompute操作报错合集之spark客户端执行时,报错,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。