Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)

简介: Spark学习---6、SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)

这是本人的学习过程,看到的同道中人祝福你们心若有所向往,何惧道阻且长;

但愿每一个人都像星星一样安详而从容的,不断沿着既定的目标走完自己的路程;

最后想说一句君子不隐其短,不知则问,不能则学。

如果大家觉得我写的还不错的话希望可以收获关注、点赞、收藏(谢谢大家)

一、SparkStreaming概述

1.1 SparkStreaming是什么

SparkStreaming用于流式数据的处理。

(1)SparkS支持的数据输入源很多,例如kafka、Flume、HDFS等。

(2)数据输入可以用Spark的高度抽象原语如:map、Reduce、join、Window等进行运算

(3)而且结果也能保存在很多地方,例如HDFS、数据库等。

1.2 SparkStreaming架构原理

1.2.1 什么是DStream

SparkCore==>RDD

SparkSQL==>DataFrame、DataSet

SparkStreaming使用离散化流作为抽象表示,叫作DStream

DStream是随时间推移而受到的数据序列

在DStream是随时间推移而收到的数据都作为RDD存在,而DStream是由这些RDD所组成的序列(因此被称为离散化)。

所以,简单来说,DStream就是对RDD的实时数据处理场景的一种封装。

1.2.2 架构图

1、整体架构图

2、SparkStreaming架构图

1.2.3 背压机制

1、Spark1.5 以前的版本:

用户可以通过设置静态配置参数“spark.streaming.receiver.maxRate”的值来限制Receiver的数据接收速率。

优点:此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。

缺点:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。

2、1.5版本及以后版本

Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。

背压机制(SparkStreaming Backpressure):

更具JobScheduler反馈作业的执行信息来动态调整Receiver数据接受率。

通过属性“spark.streaming.backpressure.enabled”来控制是否启用背压机制,默认值是false,即不启用。

1.3 SparkStreaming特点

1、易用

2、容错

3、易整合到Spark体系

二、DStream入门

2.1 WordCount案例实操

1、需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数。

2、添加依赖

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.3.0</version>
    </dependency>
</dependencies>

3、编写代码

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
 * @ClassName Test01_wordCount
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/3 20:49
 * @Version 1.0
 */
public class Test01_wordCount {
    public static void main(String[] args) throws InterruptedException {
        //TODO 第一步 创建SparkConf对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test01_wordCount");
        //TODO 第二步 创建JavaStreamingContext对象,并设置批次时间
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(3L));
        //TODO 第三步 地读取数据开始业务逻辑计算
        //1、对接数据源获取数据
        JavaReceiverInputDStream<String> lineDStream = ssc.socketTextStream("hadoop102", 44444);
        //2、切分
        JavaDStream<String> flatMapDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });
        //3、转换word->(word,1)
        JavaPairDStream<String, Integer> javaPairDStream = flatMapDStream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });
        //4、统计单词个数
        JavaPairDStream<String, Integer> reduceByKeyDStream = javaPairDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //5、输出结果
        reduceByKeyDStream.print();
        //TODO 第四步 启动阻塞进程
        ssc.start();
        ssc.awaitTermination();
    }
}

4、更改日志打印级别

如果不希望运行时打印大量日志,可以在resources文件夹中添加log4j2.properties文件,并添加日志配置信息。

# Set everything to be logged to the console 
rootLogger.level = ERROR 
rootLogger.appenderRef.stdout.ref = console  
# In the pattern layout configuration below, we specify an explicit `%ex` conversion 
# pattern for logging Throwables. If this was omitted, then (by default) Log4J would 
# implicitly add an `%xEx` conversion pattern which logs stacktraces with additional 
# class packaging information. That extra information can sometimes add a substantial 
# performance overhead, so we disable it in our default logging config. 
# For more information, see SPARK-39361. 
appender.console.type = Console 
appender.console.name = console 
appender.console.target = SYSTEM_ERR 
appender.console.layout.type = PatternLayout 
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex  
# Set the default spark-shell/spark-sql log level to WARN. When running the 
# spark-shell/spark-sql, the log level for these classes is used to overwrite 
# the root logger's log level, so that the user can have different defaults 
# for the shell and regular Spark apps. 
logger.repl.name = org.apache.spark.repl.Main 
logger.repl.level = warn  
logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver 
logger.thriftserver.level = warn  
# Settings to quiet third party logs that are too verbose 
logger.jetty1.name = org.sparkproject.jetty 
logger.jetty1.level = warn 
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle 
logger.jetty2.level = error 
logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper 
logger.replexprTyper.level = info 
logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter 
logger.replSparkILoopInterpreter.level = info 
logger.parquet1.name = org.apache.parquet 
logger.parquet1.level = error 
logger.parquet2.name = parquet 
logger.parquet2.level = error  
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support 
logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler 
logger.RetryingHMSHandler.level = fatal 
logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry 
logger.FunctionRegistry.level = error  
# For deploying Spark ThriftServer 
# SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805 
appender.console.filter.1.type = RegexFilter 
appender.console.filter.1.regex = .*Thrift error occurred during processing of message.* 
appender.console.filter.1.onMatch = deny appender.console.filter.1.onMismatch = neutral

5、启动程序并通过netcat发送数据(nc 再启动IEDA程序):

2.2 WordCount解析

在SparkStreaming中,DataStream是基础抽象,代表这数据流和经过算子计算的结果流。SparkStreaming仍然是基于批处理的思想来处理流式数据的,在内部实现上,将每一批次的数据疯转为一个RDD,DStream就是一系列RDD的封装,接下来就是Spark引擎来对这些RDD进行转换。DStream中批次与批次之间计算相互独立。

3、kafka数据源

3.1 版本选型

1、ReceiverAPI:需要一个专门的Executor去接受数据,然后发生给其他的Executor做计算。

存在的问题:接受数据的Executor和计算的Executor速度会有所不同,特别在接受数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。

2、DirectAPI:是由计算的Executor来主动消费kafka的数据,速度由自身控制。

3.2 对接Kafka数据源

1、需求:通过SparkStreaming读取kafka某个主题的数据并输出打印到控制台。

2、添加依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.3.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    <version>3.3.0</version>
    </dependency>
</dependencies>

3、编写代码

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
/**
 * @ClassName Test02_kafkaDirectAuto
 * @Description TODO
 * @Author Zouhuiming
 * @Date 2023/7/3 21:05
 * @Version 1.0
 */
public class Test02_kafkaDirectAuto {
    public static void main(String[] args) throws InterruptedException {
        //1、创建配置对象
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Test02_kafkaDirectAuto");
        //2、创建JavaStreamingContext对象
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(3L));
        //3、业务逻辑
        //4、定义要消费的kafka主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        //5、定义kafka消费者配置以及创建消费者策略
        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG,"ssID");
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        ConsumerStrategy<Object, Object> consumerStrategy = ConsumerStrategies.Subscribe(topics, kafkaParams);
        //6、对接kafka
        JavaInputDStream<ConsumerRecord<Object, Object>> kafkaDStream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), consumerStrategy);
        //7、转换数据结构 consumerRecord==》consumerRecord.value
        JavaDStream<String> lineDStream =kafkaDStream.map(new Function<ConsumerRecord<Object, Object>, String>() {
            @Override
            public String call(ConsumerRecord<Object, Object> v1) throws Exception {
                return v1.value().toString();
            }
        });
        //8、切分数据
        JavaDStream<String> flatMapDStream = lineDStream.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.stream(s.split(" ")).iterator();
            }
        });
        //9、转换数据结构 word->(word,1)
        JavaPairDStream<String, Integer> mapToPairDStream = flatMapDStream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });
        //10、按照key进行聚合value
        JavaPairDStream<String, Integer> reduceByKeyDStream = mapToPairDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //11、打印结果输出流
        reduceByKeyDStream.print();
        //x. 启动并阻止线程
        ssc.start();
        ssc.awaitTermination();
    }
}

运行结果


相关文章
|
3月前
|
分布式计算 大数据 Java
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
85 5
|
3月前
|
分布式计算 关系型数据库 MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL
64 3
|
3月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
84 0
|
3月前
|
分布式计算 算法 Spark
spark学习之 GraphX—预测社交圈子
spark学习之 GraphX—预测社交圈子
106 0
|
3月前
|
分布式计算 Scala Spark
educoder的spark算子学习
educoder的spark算子学习
29 0
|
4月前
|
分布式计算 Shell Scala
学习使用Spark
学习使用Spark
141 3
|
5月前
|
分布式计算 Shell Scala
如何开始学习使用Spark?
【8月更文挑战第31天】如何开始学习使用Spark?
147 2
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
144 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
68 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
400 9