一、实验目的
掌握Spark结构化流中的水印技术。
掌握Spark结构化流中数据去重操作。
二、实验内容
1、在Spark结构化流程序中处理延迟到达的数据。
2、在Spark结构化流程序中处理重复到达的数据。
三、实验原理
在现实世界中,流数据往往会不按顺序到达,以及因为网络拥挤、网络中断或数据生成器(如移动设备等)不在线而延迟到达。在流处理引擎中,水印是一种常用的技术,用于处理延迟数据,以及限制维护它所需的状态数量。
从结构化流的角度来看,水印是event time的移动阈值,它落后于目前所见的最新事件时间。当新数据以一个较新的事件时间到达时,最新的事件时间将被更新,这将导致水印的移动。下图说明了一个例子,水印被定义为10分钟。水印线由实线表示,它在最新事件时间线(由虚线表示)后面。每个矩形框代表一段数据,它的事件时间就在盒子下面。事件时间10:07的那片数据有点晚了,大约10:12;然而,这仍然落在10:03和10:13之间的阈值上。因此,它将像往常一样处理。事件时间10:15的那片数据属于同一类别。然而,事件时间10:04的那片数据到达非常晚,大约10:22,它低于水印线,因此它将被忽略,而不会被处理。
在数据处理的世界中,去除重复数据是一种常见的需求,在批处理过程中这样做并不困难。然而,在流处理中,由于流数据的无界性,它更具挑战性。结构化流使得流应用程序能够轻松地处理重复数据,这些应用程序可以通过在到达时删除重复的数据来保证精确一次性(exactly once)处理。
四、实验环境
硬件:x86_64 ubuntu 16.04服务器
软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3
五、实验步骤
5.1 启动Spark集群
1、启动HDFS集群和Spark集群。在终端窗口下,输入如下命令:
1. $ start-dfs.sh 2. $ cd /opt/spark 3. $ ./sbin/start-all.sh
2、在HDFS上创建本实验中流处理程序要监听的文件数据源目录。在终端窗口下,输入如下命令:
1. $ hdfs dfs -mkdir -p /data/dataset/streaming/input 2. $ hdfs dfs -mkdir -p /data/dataset/streaming/input2
5.2 在Spark结构化流程序中处理延迟到达的数据
1、在”/data/dataset/streaming/mobile/“目录下,有两个移动电话事件数据的文件:
/data/dataset/streaming/mobile/file1.json:
1. {"id":"phone1","action":"open","ts":"2018-03-02T10:15:33"} 2. {"id":"phone2","action":"open","ts":"2018-03-02T10:22:35"} 3. {"id":"phone3","action":"open","ts":"2018-03-02T10:33:50"}
/data/dataset/streaming/mobile/file2.json:
1. {"id":"phone4","action":"open","ts":"2018-03-02T10:29:35"} 2. {"id":"phone5","action":"open","ts":"2018-03-02T10:11:35"}
以上数据文件中,带有延迟到达的数据。
2、启动spark-shell。在终端窗口下,执行以下命令:(注意,请将以下命令中的localhost替换为虚拟机实际的机器名)
1. $ spark-shell --master spark://localhost:7077
3、编写处理延迟数据的流应用程序。在spark-shell窗口下,进入到paste模式,然后输入如下的代码:
1. import org.apache.spark.sql.types._ 2. import org.apache.spark.sql.functions._ 3. 4. val mobileDataSchema = new StructType().add("id", StringType, false) 5. .add("action", StringType, false) 6. .add("ts", TimestampType, false) 7. 8. val mobileSSDF = spark.readStream.schema(mobileDataSchema).json("hdfs://localhost:9000/data/dataset/streaming/input") 9. 10. // 设置一个带有水印的streaming DataFrame,并按ts和action列分组 11. val windowCountDF = mobileSSDF.withWatermark("ts", "10 minutes") 12. .groupBy(window($"ts", "10 minutes"), $"action") 13. .count 14. 15. // 将计算结果输出到控制台 16. val mobileMemorySQ = windowCountDF.writeStream.format("console").option("truncate", "false").outputMode("update").start()
4、复制file1.json文件到HDFS的input目录并检查输出。另打开一个终端,执行如下的复制命令:
1. $ hdfs dfs -put /data/dataset/streaming/mobile/file1.json /data/dataset/streaming/input/
流程序处理file1.json的输出。正如期望的,每一行都落在它自己的窗口内。切换到流应用程序执行所在窗口,看到输出如下所示:
1. +-------------------------------------------+-------+------+ 2. | window| action| count| 3. +-------------------------------------------+-------+------+ 4. | [2018-03-02 10:20:00, 2018-03-02 10:30:00]| open | 1 | 5. | [2018-03-02 10:30:00, 2018-03-02 10:40:00]| open | 1 | 6. | [2018-03-02 10:10:00, 2018-03-02 10:20:00]| open | 1 | 7. +-------------------------------------------+-------+------+
5、复制file2.json文件到HDFS的input目录并检查输出。切换到另一个终端,执行如下的复制命令:
1. $ hdfs dfs -put /data/dataset/streaming/mobile/file2.json /data/dataset/streaming/input/
流程序处理file2.json的输出。切换到流应用程序执行所在窗口,注意到窗口10:20 to 10:30的count现在被更新为2,窗口10:10:00 and 10:20:00没有变化:
1. +-------------------------------------------+-------+------+ 2. | window| action| count| 3. +-------------------------------------------+-------+------+ 4. | [2018-03-02 10:20:00, 2018-03-02 10:30:00]| open | 2 | 5. +-------------------------------------------+-------+------+
如前所述,因为file2.json文件中的最后一行的时间戳落在10分钟的水印阈值之外,因此它不会被处理。
6、停止流程序的执行。在流程序执行所在窗口,执行以下代码:
1. mobileMemorySQ.stop
5.3 在Spark结构化流程序中处理重复到达的数据
1、在/data/dataset/streaming/deduplication/目录下,有两个移动电话事件数据的文件:file1.json和file2.json。这些文件的内容显示如下。
file1.json - 每一行都是唯一的id和ts列
1. {"id":"phone1","action":"open","ts":"2018-03-02T10:15:33"} 2. {"id":"phone2","action":"open","ts":"2018-03-02T10:22:35"} 3. {"id":"phone3","action":"open","ts":"2018-03-02T10:23:50"}
file2.json - 前两行是file1.json中前两行的重复,第三行是唯一的,第四行也是唯一的,但延迟到达(不被处理)
1. {"id":"phone1","action":"open","ts":"2018-03-02T10:15:33"} 2. {"id":"phone2","action":"open","ts":"2018-03-02T10:22:35"} 3. {"id":"phone4","action":"open","ts":"2018-03-02T10:29:35"} 4. {"id":"phone5","action":"open","ts":"2018-03-02T10:01:35"}
2、启动spark-shell。在终端窗口下,执行以下命令:(注意,请将以下命令中的localhost替换为虚拟机实际的机器名)
1. $ spark-shell --master spark://localhost:7077
3、编写处理重复数据的流应用程序。在spark-shell窗口下,进入到paste模式,然后输入如下的代码:
1. // 使用dropDuplicates API对数据去重 2. import org.apache.spark.sql.types._ 3. import org.apache.spark.sql.functions._ 4. 5. val mobileDataSchema = new StructType().add("id", StringType, false) 6. .add("action", StringType, false) 7. .add("ts", TimestampType, false) 8. 9. // 读取数据源 10. val mobileDupSSDF = spark.readStream.schema(mobileDataSchema).json("hdfs://localhost:9000/data/dataset/streaming/input2") 11. 12. // 数据处理和去重 13. val windowCountDupDF = mobileDupSSDF.withWatermark("ts", "10 minutes") 14. .dropDuplicates("id", "ts") 15. .groupBy("id").count 16. 17. // 将计算结果输出到控制台 18. val mobileMemoryDupSQ = windowCountDupDF.writeStream.format("console") 19. .option("truncate", "false") 20. .outputMode("update") 21. .start()
4、复制file1.json文件到input2目录并检查输出。另打开一个终端,执行如下的复制命令:
1. $ hdfs dfs -put /data/dataset/streaming/deduplication/file1.json /data/dataset/streaming/input2/
查看流程序处理file1.json的输出。切换到流应用程序执行所在窗口,看到输出如下所示:
1. +-------+------+ 2. | id | count| 3. +-------+------+ 4. | phone3| 1| 5. | phone1| 1| 6. | phone2| 1| 7. +-------+------+
5、复制file2.json文件到input2目录并检查输出。切换到另一个终端,执行如下的复制命令:
1. $ hdfs dfs -put /data/dataset/streaming/deduplication/file2.json /data/dataset/streaming/input2/
查看流程序处理file2.json的输出。切换到流应用程序执行所在窗口,看到输出如下所示:
1. +-------+------+ 2. | id | count| 3. +-------+------+ 4. | phone4| 1| 5. +-------+------+
如预期,在file2.json被复制到input2目录中之后,只有一行显示在控制台中。原因是前两行是file1.json中前两行的重复,因此它们被过滤掉了。最后一行的时间戳是10:10,这被认为是迟到数据,因为时间戳比10分钟的水印阈值更早。因此,最后一行没有被处理,并删除掉了。
6、停止流程序的执行。在流程序执行所在窗口,执行以下代码:
1. mobileMemoryDupSQ.stop
六、 实验知识测试
无
七、实验拓展
1、给定一个集合元素,请编写代码,将其转换为DataFrame:
无