一、Structured Streaming概述
Structured Streaming是一个基于sparksql引擎开发的可伸展和容错的流处理引擎。Structured Streaming传输中的关键思想是将实时数据流视为被连续添加的表。
这导致了一个新的流处理模型,该模型与批处理模型非常相似。您将像在静态表上一样将流计算表示为类似于批处理的标准查询,Spark在无界输入表上将其作为增量查询运行。
程式设计模型
将输入数据流视为“输入表”。流上到达的每个数据项都像是将新行附加到输入表中。
对输入的查询将生成“结果表”。在每个触发间隔(例如,每1秒钟),新行将附加到输入表中,并最终更新结果表。无论何时更新结果表,我们都希望将更改后的结果行写入外部接收器。
“输出”定义为写到外部存储器的内容。可以在不同的模式下定义输出:
Complete Mode:整个更新的结果表将被写入外部存储器。由存储连接器决定如何处理整个表的写入。
Append Mode:仅将自上次触发以来追加在结果表中的新行写入外部存储器。这仅适用于预期结果表中现有行不会更改的查询。
Update Mode:仅自上次触发以来在结果表中已更新的行将被写入外部存储(自Spark 2.1.1起可用)。请注意,这与完成模式的不同之处在于此模式仅输出自上次触发以来已更改的行。如果查询不包含聚合,则等同于追加模式。
二、Structured Streaming与Socket集成
以complete输出为例
[caizhengjie@node1 kafka]$ nc -lk 9999 java java java python java java java python java hive hove hbase hbase hbase hive python java
package com.spark.test import org.apache.spark.sql.SparkSession /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/9/29 * @time : 3:36 下午 */ object StructuredStreamingTest { def main(args: Array[String]): Unit = { // 创建Spark Session对象 val spark = SparkSession .builder .master("local[2]") .appName("HdfsTest") .getOrCreate() // 创建一个流数据框架 val lines = spark.readStream .format("socket") .option("host","node1") .option("port",9999) .load() // 返回的是dataframe格式 import spark.implicits._ // 先将dataframe准换成dataset,在对数据进行处理 val words = lines.as[String].flatMap(_.split(" ")) val wordCounts = words.groupBy("value").count() // 输出 val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination() } }
运行结果
三、Structured Streaming与Kafka集成
Kafka 0.10的结构化流集成,可从Kafka读取数据或向Kafka写入数据。
(1)通过IDEA工具
对于使用Maven项目定义的Scala 应用程序,需要加载pom.xml配置文件
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_${scala.binary.version}</artifactId> <version>${saprk.version}</version> </dependency>
启动kafka的生产者
bin/kafka-console-producer.sh --broker-list node192,node2:9092,node3:9092, --topic test
>java java python python >hive hive java java
运行程序
package com.spark.test import org.apache.spark.sql.SparkSession /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/9/29 * @time : 4:49 下午 */ object StructuredStreamingKafka { def main(args: Array[String]): Unit = { // 创建Spark Session对象 val spark = SparkSession .builder .master("local[2]") .appName("HdfsTest") .getOrCreate() // 读取kafka stream流 val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "node1:9092") .option("subscribe", "test") .load() //返回的是dataframe格式 import spark.implicits._ // 先将dataframe准换成dataset,在对数据进行处理 val lines = df.selectExpr("CAST(value AS STRING)").as[String] val words = lines.flatMap(_.split(" ")) val wordCounts = words.groupBy("value").count() // 输出 val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination() } }
运行结果:
(2)通过spark-shell
在通过spark-shell运行时,需要将下面的jar包拷贝到spark的jar目录下
kafka_2.11-2.1.1.jar和kafka-clients-2.1.1.jar在kafka的lib中能找到
spark-sql-kafka-0-10_2.11-2.4.6.jar和spark-streaming-kafka-0-10_2.11-2.4.6.jar需要到maven的仓库下找
启动kafka的生产者
bin/kafka-console-producer.sh --broker-list node192,node2:9092,node3:9092, --topic test
运行spark-shell
bin/spark-shell --master local[2]
scala> :paste // Entering paste mode (ctrl-D to finish) val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "node1:9092") .option("subscribe", "test") .load() import spark.implicits._ val lines = df.selectExpr("CAST(value AS STRING)").as[String] val words = lines.flatMap(_.split(" ")) val wordCounts = words.groupBy("value").count() // 输出 val query = wordCounts.writeStream .outputMode("update") .format("console") .start() query.awaitTermination() // Exiting paste mode, now interpreting.
查看运行结果:
四、Structured Streaming与MySQL集成
关于Structured Streaming与MySQL集成可以见文档:
通常,我们希望能够将流的输出写入外部数据库(例如MySQL)。在撰写本文时,结构化流API不支持将外部数据库作为接收器。但是,这样做的话,API选项将像一样简单.format(“jdbc”).start(“jdbc:mysql/…”)。同时,我们可以使用foreach接收器来完成此任务。让我们创建一个自定义JDBC Sink,它扩展了ForeachWriter并实现了其方法。
我们现在就可以使用我们的JDBCSink了:
As batches are complete, counts by zip could be INSERTed/UPSERTed into MySQL as needed.