Structured_案例_代码编写 | 学习笔记

简介: 快速学习 Structured_案例_代码编写

开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_案例_代码编写】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/692/detail/12144


Structured_案例_代码编写

内容介绍

一. 目标和过程

二. 代码编写

 

一. 目标和过程

目标:

实现 Structured Streaming 部分的代码编写

步骤:

1.创建文件

2.创建 SparkSession

3.读取 Socket 数据生成 DataFrame

4.将 DataFrame 转为 Dataset, 使用有类型的 API 处理词频统计

5.生成结果集并写入控制台

第二步创建 SparkSession 和第三步读取 Socket 数据生成 DataFrame 都是为了数据的读取。第四步为了进行数据的处理,第四步来进行数据的落地。

这三大步也是以往在编写 structured streaming 的三大步骤。

第一步读取读数据,然后第二步处理,第三步落地。

 

二.代码编写

先进入到的 idea 当中,去创建一个新的包,区别于 Spark streaming 的包。然后包名叫做 cn.itcast.structured 。

然后创建文件,文件就命名为叫做 socket word count 。

然后接下来类创建出来以后把 class 改为 object

改为 object ,大家一定能猜到其实是要写 main 方法的把 main 方法给它表示出来。

image.png

代码编写步骤:

第一步肯定还是要去创建 Spark session 创建 Spark position

第二步要进行数据的读取,数据集的生成,生成数据读取。

第三步要去进行数据的处理。

第四步生成结果集的生成和输出。这就是的四大步骤。

但是在这之前,工程当中其实还没有导入 structure streaming 相关的 API ,要进行一个简单的导入,就把 pom 打开。

structure streaming 不需要导太多的东西,只需要导一个依赖,就是 dependency

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.2.0</version> 

接下来就已经导入完成了,它会进行相应的加载。等它加载完以后,就回到 socket word count,

创建 Spark session,代码如下所示

def main(args: Array[string]): Unit = {

//1.创建 Sparksession

val spark = Sparksession.builder()

.master(master = "local[6]")

.appName(name = "socketstructured")

·getOrCreate()

设置一个 master 类似于的 Spark streaming 地方,不能给 1,至少要大于 1。

进行数据集的生成。之前编写 Spark SQL 程序的时候是如何生成第一个 data frame ,使用 Spark 来进行 read ,但是读取批量的数据集的时候使用 read ,但是读取流的时候使用 read stream 这点需要注意。

然后要去 format  format 就是要去告诉他现在要读取的是 socket ,那读取到 socket 以后,去 option 指定的 host 是对应的还是要去看一下的 IP 的。

那就打开的 shell 窗口,输入 if config

image.png

然后来去看一下的 IP 是 192168169101,

编写代码如下:

val source: DataFrame = spark.readStream

.format(source = "socket")

.option("host",“192.168.169.101")

.option("port",9999)

.load()

时候大家能注意到是不是读取的就是一个 source 那它是什么样的一个数据类型非常简单,它就是一个 data frame 大家注意到了吧,也就是说除了和批处理不一样的地方就是批处理的时候使用 read ,而流式处理的时候,使用 read stream 就这点区别。要进行词频统计还是使用有类型的 API 会比较舒服。所以应该把 source 转为一个 data set 。

编写如下代码:

val sourceDs = source.as[string](...)

为其赋予一个类型就可以生成一个 source DS 大家能注意到变成了一个 data set string 如下:

val sourceDS: Dataset[string] = source.as[String](...)

但是后面它要求传入一个 encoder

但没有,要导入一个依赖,import Spark 点隐式转换点下划线,把所有的隐式转换导进来,这时拥有了 Encoder

代码如下:

import spark.implicits._

在数据处理之前,就顺手再做一件事情,要再去设置一下的日志级别,否则每一个批次打出来一堆日志,也会影响查看,

代码如下:

spark.sparkContext.setLogLevel(“WARN")

数据处理

编写代码如下:

sourceDS.flatMap(_.split( regex = "“))

.map((_,1))

·groupByKey(_·_1)

.count()

这里注意:处理的方式和 RDD 会有很显著的区别。 RDD 当中是使用 reduce by key 而这使用 group by key 并且不需要去指定怎么计算,直接通过一个方法就可以确定计算方式。

然后会生成一个新的一个 data site 命名为 words,

代码如下:

val words = sourceDsflatMap(_split(regex=““))

结果输出:

首先平常在进行批数据处理的时候,是不是使用  write  输出结果,但是是流,就要使用 words.writeStream 。

words.writeStream

.outputMode(OutputMode.Complete())

.format(source = “console")

.start()

.awaitTermination()

主体代码如下:

//1.创建 SparkSession

def main(args: Array[string]): Unit = {

//1.创建 Sparksession

val spark = Sparksession.builder()

.master(master = "local[6]")

.appName(name = "socketstructured")

·getOrCreate()

//2. 数据集的生成,数据读取

val source: DataFrame = spark.readStream

.format(source = "socket")

.option("host",“192.168.169.101")

.option("port",9999)

.load()

val sourceDS: Dataset[string] = source.as[String](...)

// 3. 数据的处理

sourceDS.flatMap(_.split( regex = "“))

.map((_,1))

·groupByKey(_·_1)

.count()

//4、结果集的生成和输出

words.writeStream

.outputMode(OutputMode.Complete())

.format(source = “console")

.start()

.awaitTermination()

相关文章
|
数据库 索引
评论功能里数据库的设计
【4月更文挑战第2天】本文探讨了评论系统的树形结构设计,提出了四种方法:邻接表、分段式path、Nested Set和Closure Table。针对评论业务功能,如加载评论页和查看回复,优先考虑邻接表和分段式path。采用邻接表思路,设计了评论表结构,包括Uid、Biz、BizID、RootID、PID、Content、索引和级联删除规则。同时提到了索引设计,如Uid、Biz+BizID、PID和Ctime/Utime,以优化查询性能。
380 3
|
8月前
|
SQL 分布式计算 大数据
《深度剖析Spark SQL:与传统SQL的异同》
Spark SQL是Apache Spark生态系统中用于处理结构化数据的组件,作为大数据时代的SQL利器,它在继承传统SQL语法和逻辑思维的基础上,重新定义了数据处理的效率与灵活性。相比传统SQL,Spark SQL支持分布式计算、内存处理及多种数据源,可高效应对PB级数据挑战。其核心概念DataFrame提供优化查询能力,使数据分析更便捷。两者虽有联系,但在处理规模、计算模式和优化策略上差异显著,共同满足不同场景下的数据需求。
494 35
|
9月前
|
Java Linux 应用服务中间件
在Rocky Linux 9上安装JDK并配置环境变量!
本教程介绍在Rocky Linux 9上安装JDK并配置环境变量的完整步骤。首先更新系统,清理旧版本JDK相关包及残留文件,确保环境干净。接着搜索并安装所需版本的JDK(如OpenJDK 17),验证安装是否成功。然后查找JDK安装路径,配置全局环境变量`JAVA_HOME`和`PATH`,最后验证环境变量设置。按照此流程操作,可顺利完成Java开发环境搭建,支持多版本切换(如JDK 8/11/17)。生产环境请谨慎操作,避免影响现有服务。
1499 21
|
编解码 Oracle Java
java9到java17的新特性学习--github新项目
本文宣布了一个名为"JavaLearnNote"的新GitHub项目,该项目旨在帮助Java开发者深入理解和掌握从Java 9到Java 17的每个版本的关键新特性,并通过实战演示、社区支持和持续更新来促进学习。
467 3
|
Java Linux
解决OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0...
解决OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0...
2524 0
|
存储 Kubernetes 监控
深度解析Kubernetes在微服务架构中的应用与优化
【10月更文挑战第18天】深度解析Kubernetes在微服务架构中的应用与优化
538 0
|
弹性计算 数据库 虚拟化
阿里云产品之云服务器ECS类各云产品简介及适用场景介绍
阿里云的云服务器ECS类产品并不是只有云服务器和轻量应用服务器两种产品,还包括弹性裸金属服务器、GPU云服务器、专有宿主机、FPGA云服务器、VMware服务等产品和服务均属于云服务器ECS类云产品,本文为大家介绍一下哪些云产品属于云服务器ECS类产品,他们各自的适用场景有哪些。
|
存储 机器学习/深度学习 编解码
计算机视觉的基础概念与入门
之前学习了一下 Python 环境下计算机视觉方面的一些应用(主要是 OpenCV),但是对于计算机视觉方面的种种概念都是一笔带过,计算机视觉是一个很大的领域,在深入它之前 ,有必要对其中的一些基础概念有一个宏观的理解。
|
Cloud Native jenkins Java
使用Jenkins实现持续集成与持续部署
【6月更文挑战第7天】本文介绍了如何使用Jenkins实现持续集成与持续部署,提高软件开发效率和质量。首先,解释了CI/CD的概念,持续集成通过自动化构建和测试减少错误,持续部署则自动将软件部署至生产环境。接着,详细阐述了Jenkins的安装配置、构建项目设置,以及如何通过代码提交触发构建、自动化测试和构建报告。此外,还讨论了Jenkins的持续部署功能,包括配置部署环境、自动化部署和回滚策略。最后,指出Jenkins在DevOps和云原生趋势中的重要角色。