SparkStreaming 案例_运行 | 学习笔记

简介: 快速学习 SparkStreaming 案例_运行

开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:SparkStreaming 案例_运行】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/692/detail/12133


SparkStreaming 案例_运行

内容介绍:

一、初始化程序

二、出现问题

三、重新运行 main 方法

四、代码的改动

 

一、初始化程序

首先,把 class 改成 object,main 才能运行。

输入 [root@cdh01 ~]# nc -lk 0.0.0.0 9999,这个时候就会去监听所有网卡上的 9999 端口。

val sparkconf = new SparkConf().setAppName("streaming word count" ). setNaster("l1ocal[6]")

val ssc = new Streamingcontext(sparkconf,seconds(1))

val lines: ReceiverInputDstream[ string] = ssc.socketTextstream(

hostname = "192.168.169.101",

port = 9999,

storageLevel = StorageLevel.MEMORY_AND_DISK_SER

 

二、出现问题

编译的时候出现了问题

image.png

错误的出现是因为在进行导包的时候,只进行了对 spark 的导包,没有导 spark 依赖的 hadoop 的包,所以要对其进行导包。

<dependency>

<grouprd>org.apache.hadoopc /groupid>

<artifactid>hadoop-client</ artifactid>

<version>2.7.5< i version>

< / dependency>

 

三、重新运行 main 方法

出现如图所示代码代表运行成功。

image.png

 

四、代码的改动

数据是一个批次一个批次进行处理的,每一个批次都会打印很多的日志,开始去产生数据。日志代码过多会影响观影体验,于是可以去做两个小的改动。

1.去掉多余的日志,让每个批次的时间变得久一些

val ssc = new Streamingcontext(sparkconf,Seconds(1))改为 val ssc = new Streamingcontext(sparkconf,Seconds(5))

1 秒改为 5 秒以后可以清晰看到每个批次的数据,接下来输入ssc.sparkContext.setLogLevel(WARN""),然后去运行程序。

运行结果如下

Tine; 1561902150000 ms

Tine; 1561902150000 ms

Tine; 1561902150000 ms

Tine; 1561902150000 ms

Tine; 1561902150000 ms

可以看到批次变得不是很密集了,日至也变少了,再去产生相应的数据。

例如在 z.cdh01 中输入以下内容:

[ root@cdh01 ~]# nc -lk 0.0.0.0 9999

hello ^H^H^H

hello wrod^H^H

hello spark

切换至 streaming 可以得到如下对应数据

Time: 1561982225008m5

(wr,1)

( hello,1)

Time: 1561902230a0o ms

(he1lo,1)

( spark,1)

Time: 1561982235008 ms

Time: 15619022408o8 ms

继续在 z.cdh01 中输入以下内容:

h^H spark

Spark hadoop

Spark hello

Hello hadoop

切换至 streaming 可以得到如下对应数据

Time; 1561982255a08 ms

-------------------

( hadoop,1)

(,1)

hel1o,1)( sgark,3)

Time: 1561982260808 ms

---------------

( hadoop,1)

( hello,1)

过程中发现,输入了 hello spark,hello hadoop,但是程序中只有一个 hello,是因为刚才所输入的内容分散至两个批次之中,所以这个统计并不是对于全局来进行统计的,它的每一次统计都是对于每一个批次的内容进行统计,得到的统计结果并不是全局的,只是一个批次的内容。

相关文章
|
网络协议 算法 物联网
OSPF的历史与发展
OSPF的历史与发展
359 0
|
存储 SQL API
Flink教程(23)- Flink高级特性(Streaming File Sink)
Flink教程(23)- Flink高级特性(Streaming File Sink)
1080 0
|
Java 运维
开发与运维内存问题之文件句柄泄漏如何解决
开发与运维内存问题之文件句柄泄漏如何解决
216 3
|
Linux 虚拟化 开发者
一键将CentOs的yum源更换为国内阿里yum源
一键将CentOs的yum源更换为国内阿里yum源
19332 31
|
存储 监控 安全
数据访问权限
【6月更文挑战第24天】数据访问权限
507 5
|
算法 程序员
程序员必知:字符串压缩(三)之短字符串压缩
程序员必知:字符串压缩(三)之短字符串压缩
454 0
|
SQL 安全
CTF--Web安全--SQL注入之‘绕过方法’
CTF--Web安全--SQL注入之‘绕过方法’
1289 0
|
机器学习/深度学习 vr&ar 图形学
通义万相-虚拟模特是怎么实现的
通义万相-虚拟模特是怎么实现的
332 0
|
Java 开发工具 Android开发
java代码混淆工具Allatori
java代码混淆工具Allatori
1378 0
|
分布式计算 大数据 Spark