基于Spark的应用水印技术和流数据去重

简介: 基于Spark的应用水印技术和流数据去重

一、实验目的

掌握Spark结构化流中的水印技术。

  掌握Spark结构化流中数据去重操作。

二、实验内容

1、在Spark结构化流程序中处理延迟到达的数据。

  2、在Spark结构化流程序中处理重复到达的数据。

三、实验原理

在现实世界中,流数据往往会不按顺序到达,以及因为网络拥挤、网络中断或数据生成器(如移动设备等)不在线而延迟到达。在流处理引擎中,水印是一种常用的技术,用于处理延迟数据,以及限制维护它所需的状态数量。

   从结构化流的角度来看,水印是event time的移动阈值,它落后于目前所见的最新事件时间。当新数据以一个较新的事件时间到达时,最新的事件时间将被更新,这将导致水印的移动。下图说明了一个例子,水印被定义为10分钟。水印线由实线表示,它在最新事件时间线(由虚线表示)后面。每个矩形框代表一段数据,它的事件时间就在盒子下面。事件时间10:07的那片数据有点晚了,大约10:12;然而,这仍然落在10:03和10:13之间的阈值上。因此,它将像往常一样处理。事件时间10:15的那片数据属于同一类别。然而,事件时间10:04的那片数据到达非常晚,大约10:22,它低于水印线,因此它将被忽略,而不会被处理。

759540ea989a46ea837296be7c0dc207.png

在数据处理的世界中,去除重复数据是一种常见的需求,在批处理过程中这样做并不困难。然而,在流处理中,由于流数据的无界性,它更具挑战性。结构化流使得流应用程序能够轻松地处理重复数据,这些应用程序可以通过在到达时删除重复的数据来保证精确一次性(exactly once)处理。

四、实验环境

硬件:x86_64 ubuntu 16.04服务器

  软件:JDK 1.8,Spark-2.3.2,Hadoop-2.7.3

五、实验步骤

5.1 启动Spark集群

1、启动HDFS集群和Spark集群。在终端窗口下,输入如下命令:

1.  $ start-dfs.sh
2.  $ cd /opt/spark
3.  $ ./sbin/start-all.sh

2、在HDFS上创建本实验中流处理程序要监听的文件数据源目录。在终端窗口下,输入如下命令:

1.  $ hdfs dfs -mkdir -p /data/dataset/streaming/input
2.  $ hdfs dfs -mkdir -p /data/dataset/streaming/input2

5.2 在Spark结构化流程序中处理延迟到达的数据

1、在”/data/dataset/streaming/mobile/“目录下,有两个移动电话事件数据的文件:

  /data/dataset/streaming/mobile/file1.json:

1.  {"id":"phone1","action":"open","ts":"2018-03-02T10:15:33"}
2.  {"id":"phone2","action":"open","ts":"2018-03-02T10:22:35"}
3.  {"id":"phone3","action":"open","ts":"2018-03-02T10:33:50"}

/data/dataset/streaming/mobile/file2.json:

1.  {"id":"phone4","action":"open","ts":"2018-03-02T10:29:35"}
2.  {"id":"phone5","action":"open","ts":"2018-03-02T10:11:35"}

以上数据文件中,带有延迟到达的数据。

  2、启动spark-shell。在终端窗口下,执行以下命令:(注意,请将以下命令中的localhost替换为虚拟机实际的机器名)

1.  $ spark-shell --master spark://localhost:7077

3、编写处理延迟数据的流应用程序。在spark-shell窗口下,进入到paste模式,然后输入如下的代码:

1.  import org.apache.spark.sql.types._
2.  import org.apache.spark.sql.functions._
3.       
4.  val mobileDataSchema = new StructType().add("id", StringType, false)
5.                                         .add("action", StringType, false)
6.                                         .add("ts", TimestampType, false)
7.       
8.  val mobileSSDF = spark.readStream.schema(mobileDataSchema).json("hdfs://localhost:9000/data/dataset/streaming/input")
9.       
10. // 设置一个带有水印的streaming DataFrame,并按ts和action列分组
11. val windowCountDF = mobileSSDF.withWatermark("ts", "10 minutes")
12.                               .groupBy(window($"ts", "10 minutes"), $"action")
13.                               .count
14. 
15. // 将计算结果输出到控制台
16. val mobileMemorySQ = windowCountDF.writeStream.format("console").option("truncate", "false").outputMode("update").start()

4、复制file1.json文件到HDFS的input目录并检查输出。另打开一个终端,执行如下的复制命令:

1.  $ hdfs dfs -put /data/dataset/streaming/mobile/file1.json  /data/dataset/streaming/input/

流程序处理file1.json的输出。正如期望的,每一行都落在它自己的窗口内。切换到流应用程序执行所在窗口,看到输出如下所示:

1.  +-------------------------------------------+-------+------+
2.  | window| action| count|
3.  +-------------------------------------------+-------+------+
4.  | [2018-03-02 10:20:00, 2018-03-02 10:30:00]| open | 1 |
5.  | [2018-03-02 10:30:00, 2018-03-02 10:40:00]| open | 1 |
6.  | [2018-03-02 10:10:00, 2018-03-02 10:20:00]| open | 1 |
7.  +-------------------------------------------+-------+------+

5、复制file2.json文件到HDFS的input目录并检查输出。切换到另一个终端,执行如下的复制命令:

1.  $ hdfs dfs -put /data/dataset/streaming/mobile/file2.json  /data/dataset/streaming/input/

流程序处理file2.json的输出。切换到流应用程序执行所在窗口,注意到窗口10:20 to 10:30的count现在被更新为2,窗口10:10:00 and 10:20:00没有变化:

1.  +-------------------------------------------+-------+------+
2.  | window| action| count|
3.  +-------------------------------------------+-------+------+
4.  | [2018-03-02 10:20:00, 2018-03-02 10:30:00]| open | 2 |
5.  +-------------------------------------------+-------+------+

如前所述,因为file2.json文件中的最后一行的时间戳落在10分钟的水印阈值之外,因此它不会被处理。

  6、停止流程序的执行。在流程序执行所在窗口,执行以下代码:

1.  mobileMemorySQ.stop

5.3 在Spark结构化流程序中处理重复到达的数据

1、在/data/dataset/streaming/deduplication/目录下,有两个移动电话事件数据的文件:file1.json和file2.json。这些文件的内容显示如下。

 file1.json - 每一行都是唯一的id和ts列

1.  {"id":"phone1","action":"open","ts":"2018-03-02T10:15:33"}
2.  {"id":"phone2","action":"open","ts":"2018-03-02T10:22:35"}
3.  {"id":"phone3","action":"open","ts":"2018-03-02T10:23:50"}

file2.json - 前两行是file1.json中前两行的重复,第三行是唯一的,第四行也是唯一的,但延迟到达(不被处理)

1.  {"id":"phone1","action":"open","ts":"2018-03-02T10:15:33"}
2.  {"id":"phone2","action":"open","ts":"2018-03-02T10:22:35"}
3.  {"id":"phone4","action":"open","ts":"2018-03-02T10:29:35"}
4.  {"id":"phone5","action":"open","ts":"2018-03-02T10:01:35"}

2、启动spark-shell。在终端窗口下,执行以下命令:(注意,请将以下命令中的localhost替换为虚拟机实际的机器名)

1.  $ spark-shell --master spark://localhost:7077

3、编写处理重复数据的流应用程序。在spark-shell窗口下,进入到paste模式,然后输入如下的代码:

1.  // 使用dropDuplicates API对数据去重
2.  import org.apache.spark.sql.types._
3.  import org.apache.spark.sql.functions._
4.       
5.  val mobileDataSchema = new StructType().add("id", StringType, false)
6.                                         .add("action", StringType, false)
7.                                         .add("ts", TimestampType, false)
8.       
9.  // 读取数据源
10. val mobileDupSSDF = spark.readStream.schema(mobileDataSchema).json("hdfs://localhost:9000/data/dataset/streaming/input2")
11.      
12. // 数据处理和去重
13. val windowCountDupDF = mobileDupSSDF.withWatermark("ts", "10 minutes")
14.                                     .dropDuplicates("id", "ts")
15.                                     .groupBy("id").count
16.      
17. // 将计算结果输出到控制台
18. val mobileMemoryDupSQ = windowCountDupDF.writeStream.format("console")
19.                                         .option("truncate", "false")
20.                                         .outputMode("update")
21.                                         .start()

4、复制file1.json文件到input2目录并检查输出。另打开一个终端,执行如下的复制命令:

1.  $  hdfs dfs -put /data/dataset/streaming/deduplication/file1.json  /data/dataset/streaming/input2/

查看流程序处理file1.json的输出。切换到流应用程序执行所在窗口,看到输出如下所示:

1.  +-------+------+
2.  | id    | count|
3.  +-------+------+
4.  | phone3|     1|
5.  | phone1|     1|
6.  | phone2|     1|
7.  +-------+------+

5、复制file2.json文件到input2目录并检查输出。切换到另一个终端,执行如下的复制命令:

1.  $ hdfs dfs -put /data/dataset/streaming/deduplication/file2.json  /data/dataset/streaming/input2/

查看流程序处理file2.json的输出。切换到流应用程序执行所在窗口,看到输出如下所示:

1.  +-------+------+
2.  | id    | count|
3.  +-------+------+
4.  | phone4|     1|
5.  +-------+------+

如预期,在file2.json被复制到input2目录中之后,只有一行显示在控制台中。原因是前两行是file1.json中前两行的重复,因此它们被过滤掉了。最后一行的时间戳是10:10,这被认为是迟到数据,因为时间戳比10分钟的水印阈值更早。因此,最后一行没有被处理,并删除掉了。

 6、停止流程序的执行。在流程序执行所在窗口,执行以下代码:

1.  mobileMemoryDupSQ.stop

六、 实验知识测试

七、实验拓展

1、给定一个集合元素,请编写代码,将其转换为DataFrame:


相关文章
|
4月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
443 1
|
9月前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
775 1
|
3月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
169 2
|
3月前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
161 1
|
6月前
|
分布式计算 大数据 数据处理
Apache Spark的应用与优势:解锁大数据处理的无限潜能
【8月更文挑战第23天】Apache Spark以其卓越的性能、易用性、通用性、弹性与可扩展性以及丰富的生态系统,在大数据处理领域展现出了强大的竞争力和广泛的应用前景。随着大数据技术的不断发展和普及,Spark必将成为企业实现数字化转型和业务创新的重要工具。未来,我们有理由相信,Spark将继续引领大数据处理技术的发展潮流,为企业创造更大的价值。
|
5月前
|
分布式计算 Java Apache
Apache Spark Streaming技术深度解析
【9月更文挑战第4天】Apache Spark Streaming是Apache Spark生态系统中用于处理实时数据流的一个重要组件。它将输入数据分成小批次(micro-batch),然后利用Spark的批处理引擎进行处理,从而结合了批处理和流处理的优点。这种处理方式使得Spark Streaming既能够保持高吞吐量,又能够处理实时数据流。
92 0
|
6月前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
79 1
|
6月前
|
分布式计算 资源调度 测试技术
“Spark Streaming异常处理秘籍:揭秘如何驯服实时数据流的猛兽,守护你的应用稳如泰山,不容错过!”
【8月更文挑战第7天】Spark Streaming 是 Apache Spark 中的关键组件,用于实时数据流处理。部署时可能遭遇数据问题、资源限制或逻辑错误等异常。合理处理这些异常对于保持应用稳定性至关重要。基础在于理解其异常处理机制,通过 DSC 将数据流切分为 RDD。对于数据异常,可采用 try-catch 结构捕获并处理;资源层面异常需优化 Spark 配置,如调整内存分配;逻辑异常则需加强单元测试及集成测试。结合监控工具,可全面提升应用的健壮性和可靠性。
95 3
|
7月前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
184 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
6月前
|
分布式计算 Hadoop 大数据
大数据处理框架在零售业的应用:Apache Hadoop与Apache Spark
【8月更文挑战第20天】Apache Hadoop和Apache Spark为处理海量零售户数据提供了强大的支持
103 0

热门文章

最新文章