Structured Streaming架构原理详解!

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 笔记

一、Structured Streaming概述


Structured Streaming是一个基于sparksql引擎开发的可伸展和容错的流处理引擎。Structured Streaming传输中的关键思想是将实时数据流视为被连续添加的表。

这导致了一个新的流处理模型,该模型与批处理模型非常相似。您将像在静态表上一样将流计算表示为类似于批处理的标准查询,Spark在无界输入表上将其作为增量查询运行。


程式设计模型

将输入数据流视为“输入表”。流上到达的每个数据项都像是将新行附加到输入表中。1.png

对输入的查询将生成“结果表”。在每个触发间隔(例如,每1秒钟),新行将附加到输入表中,并最终更新结果表。无论何时更新结果表,我们都希望将更改后的结果行写入外部接收器。

2.png

“输出”定义为写到外部存储器的内容。可以在不同的模式下定义输出:


Complete Mode:整个更新的结果表将被写入外部存储器。由存储连接器决定如何处理整个表的写入。

Append Mode:仅将自上次触发以来追加在结果表中的新行写入外部存储器。这仅适用于预期结果表中现有行不会更改的查询。

Update Mode:仅自上次触发以来在结果表中已更新的行将被写入外部存储(自Spark 2.1.1起可用)。请注意,这与完成模式的不同之处在于此模式仅输出自上次触发以来已更改的行。如果查询不包含聚合,则等同于追加模式。3.png


二、Structured Streaming与Socket集成


以complete输出为例

[caizhengjie@node1 kafka]$ nc -lk 9999
java java java 
python java 
java java python java hive hove
hbase
hbase hbase hive python java
package com.spark.test
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/9/29
 * @time : 3:36 下午
 */
object StructuredStreamingTest {
    def main(args: Array[String]): Unit = {
        // 创建Spark Session对象
        val spark = SparkSession
                .builder
                .master("local[2]")
                .appName("HdfsTest")
                .getOrCreate()
        // 创建一个流数据框架
        val lines = spark.readStream
                .format("socket")
                .option("host","node1")
                .option("port",9999)
                .load() // 返回的是dataframe格式
        import spark.implicits._
        // 先将dataframe准换成dataset,在对数据进行处理
        val words = lines.as[String].flatMap(_.split(" "))
        val wordCounts = words.groupBy("value").count()
        // 输出
        val query = wordCounts.writeStream
                .outputMode("complete")
                .format("console")
                .start()
        query.awaitTermination()
    }
}

运行结果

4.png


三、Structured Streaming与Kafka集成

Kafka 0.10的结构化流集成,可从Kafka读取数据或向Kafka写入数据。


(1)通过IDEA工具

对于使用Maven项目定义的Scala 应用程序,需要加载pom.xml配置文件

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId>
      <version>${saprk.version}</version>
</dependency>

启动kafka的生产者

bin/kafka-console-producer.sh --broker-list node192,node2:9092,node3:9092, --topic test
>java java python python
>hive hive java java 

运行程序

package com.spark.test
import org.apache.spark.sql.SparkSession
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/9/29
 * @time : 4:49 下午
 */
object StructuredStreamingKafka {
    def main(args: Array[String]): Unit = {
        // 创建Spark Session对象
        val spark = SparkSession
                .builder
                .master("local[2]")
                .appName("HdfsTest")
                .getOrCreate()
        // 读取kafka stream流
        val df = spark
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", "node1:9092")
                .option("subscribe", "test")
                .load() //返回的是dataframe格式
        import spark.implicits._
        // 先将dataframe准换成dataset,在对数据进行处理
        val lines = df.selectExpr("CAST(value AS STRING)").as[String]
        val words =  lines.flatMap(_.split(" "))
        val wordCounts = words.groupBy("value").count()
        // 输出
        val query = wordCounts.writeStream
                .outputMode("complete")
                .format("console")
                .start()
        query.awaitTermination()
    }
}

运行结果:

5.png

(2)通过spark-shell

在通过spark-shell运行时,需要将下面的jar包拷贝到spark的jar目录下

6.png

kafka_2.11-2.1.1.jar和kafka-clients-2.1.1.jar在kafka的lib中能找到

spark-sql-kafka-0-10_2.11-2.4.6.jar和spark-streaming-kafka-0-10_2.11-2.4.6.jar需要到maven的仓库下找

7.png

image.png

启动kafka的生产者

bin/kafka-console-producer.sh --broker-list node192,node2:9092,node3:9092, --topic test

运行spark-shell

bin/spark-shell --master local[2]
scala> :paste
// Entering paste mode (ctrl-D to finish)
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "node1:9092")
.option("subscribe", "test")
.load()
import spark.implicits._
val lines = df.selectExpr("CAST(value AS STRING)").as[String]
val words =  lines.flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
// 输出
val query = wordCounts.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
// Exiting paste mode, now interpreting.

查看运行结果:

9.png


四、Structured Streaming与MySQL集成


关于Structured Streaming与MySQL集成可以见文档:

https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html


通常,我们希望能够将流的输出写入外部数据库(例如MySQL)。在撰写本文时,结构化流API不支持将外部数据库作为接收器。但是,这样做的话,API选项将像一样简单.format(“jdbc”).start(“jdbc:mysql/…”)。同时,我们可以使用foreach接收器来完成此任务。让我们创建一个自定义JDBC Sink,它扩展了ForeachWriter并实现了其方法。10.png

我们现在就可以使用我们的JDBCSink了:

11.png

As batches are complete, counts by zip could be INSERTed/UPSERTed into MySQL as needed.

12.png


相关文章
|
2月前
|
存储 SQL 关系型数据库
MySQL进阶突击系列(03) MySQL架构原理solo九魂17环连问 | 给大厂面试官的一封信
本文介绍了MySQL架构原理、存储引擎和索引的相关知识点,涵盖查询和更新SQL的执行过程、MySQL各组件的作用、存储引擎的类型及特性、索引的建立和使用原则,以及二叉树、平衡二叉树和B树的区别。通过这些内容,帮助读者深入了解MySQL的工作机制,提高数据库管理和优化能力。
|
2月前
|
人工智能 前端开发 编译器
【AI系统】LLVM 架构设计和原理
本文介绍了LLVM的诞生背景及其与GCC的区别,重点阐述了LLVM的架构特点,包括其组件独立性、中间表示(IR)的优势及整体架构。通过Clang+LLVM的实际编译案例,展示了从C代码到可执行文件的全过程,突显了LLVM在编译器领域的创新与优势。
103 3
|
3月前
|
运维 持续交付 云计算
深入解析云计算中的微服务架构:原理、优势与实践
深入解析云计算中的微服务架构:原理、优势与实践
113 3
|
20天前
|
Java Linux C语言
《docker基础篇:2.Docker安装》包括前提说明、Docker的基本组成、Docker平台架构图解(架构版)、安装步骤、阿里云镜像加速、永远的HelloWorld、底层原理
《docker基础篇:2.Docker安装》包括前提说明、Docker的基本组成、Docker平台架构图解(架构版)、安装步骤、阿里云镜像加速、永远的HelloWorld、底层原理
268 89
|
12天前
|
存储 缓存 监控
ClickHouse 架构原理及核心特性详解
ClickHouse 是由 Yandex 开发的开源列式数据库,专为 OLAP 场景设计,支持高效的大数据分析。其核心特性包括列式存储、字段压缩、丰富的数据类型、向量化执行和分布式查询。ClickHouse 通过多种表引擎(如 MergeTree、ReplacingMergeTree、SummingMergeTree)优化了数据写入和查询性能,适用于电商数据分析、日志分析等场景。然而,它在事务处理、单条数据更新删除及内存占用方面存在不足。
132 21
|
12天前
|
存储 消息中间件 druid
Druid 架构原理及核心特性详解
Druid 是一个分布式、支持实时多维OLAP分析的列式存储数据处理系统,适用于高速实时数据读取和灵活的多维数据分析。它通过Segment、Datasource等元数据概念管理数据,并依赖Zookeeper、Hadoop和Kafka等组件实现高可用性和扩展性。Druid采用列式存储、并行计算和预计算等技术优化查询性能,支持离线和实时数据分析。尽管其存储成本较高且查询语言功能有限,但在大数据实时分析领域表现出色。
54 19
|
12天前
|
存储 SQL NoSQL
Doris 架构原理及核心特性详解
Doris 是百度内部孵化的OLAP项目,现已开源并广泛应用。它采用MPP架构、向量化执行引擎和列存储技术,提供高性能、易用性和实时数据处理能力。系统由FE(管理节点)和BE(计算与存储节点)组成,支持水平扩展和高可用性。Doris 适用于海量数据分析,尤其在电商、游戏等行业表现出色,但资源消耗较大,复杂查询优化有局限性,生态集成度有待提高。
46 15
|
9天前
|
Java 网络安全 开发工具
Git进阶笔记系列(01)Git核心架构原理 | 常用命令实战集合
通过本文,读者可以深入了解Git的核心概念和实际操作技巧,提升版本管理能力。
|
29天前
|
机器学习/深度学习 算法 PyTorch
深度强化学习中SAC算法:数学原理、网络架构及其PyTorch实现
软演员-评论家算法(Soft Actor-Critic, SAC)是深度强化学习领域的重要进展,基于最大熵框架优化策略,在探索与利用之间实现动态平衡。SAC通过双Q网络设计和自适应温度参数,提升了训练稳定性和样本效率。本文详细解析了SAC的数学原理、网络架构及PyTorch实现,涵盖演员网络的动作采样与对数概率计算、评论家网络的Q值估计及其损失函数,并介绍了完整的SAC智能体实现流程。SAC在连续动作空间中表现出色,具有高样本效率和稳定的训练过程,适合实际应用场景。
134 7
深度强化学习中SAC算法:数学原理、网络架构及其PyTorch实现
|
11天前
|
机器学习/深度学习 人工智能 自然语言处理
一文彻底讲透GPT架构及推理原理
本篇是作者从开发人员的视角,围绕着大模型正向推理过程,对大模型的原理的系统性总结,希望对初学者有所帮助。

热门文章

最新文章