开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:SparkStreaming 案例_运行】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/692/detail/12133
SparkStreaming 案例_运行
内容介绍:
一、初始化程序
二、出现问题
三、重新运行 main 方法
四、代码的改动
一、初始化程序
首先,把 class 改成 object,main 才能运行。
输入 [root@cdh01 ~]# nc -lk 0.0.0.0 9999,这个时候就会去监听所有网卡上的 9999 端口。
val sparkconf = new SparkConf().setAppName("streaming word count" ). setNaster("l1ocal[6]")
val ssc = new Streamingcontext(sparkconf,seconds(1))
val lines: ReceiverInputDstream[ string] = ssc.socketTextstream(
hostname = "192.168.169.101",
port = 9999,
storageLevel = StorageLevel.MEMORY_AND_DISK_SER
)
二、出现问题
编译的时候出现了问题
错误的出现是因为在进行导包的时候,只进行了对 spark 的导包,没有导 spark 依赖的 hadoop 的包,所以要对其进行导包。
<dependency>
<grouprd>org.apache.hadoopc /groupid>
<artifactid>hadoop-client</ artifactid>
<version>2.7.5< i version>
< / dependency>
三、重新运行 main 方法
出现如图所示代码代表运行成功。
四、代码的改动
数据是一个批次一个批次进行处理的,每一个批次都会打印很多的日志,开始去产生数据。日志代码过多会影响观影体验,于是可以去做两个小的改动。
1.去掉多余的日志,让每个批次的时间变得久一些
把 val ssc = new Streamingcontext(sparkconf,Seconds(1))改为 val ssc = new Streamingcontext(sparkconf,Seconds(5))
1 秒改为 5 秒以后可以清晰看到每个批次的数据,接下来输入ssc.sparkContext.setLogLevel(WARN""),然后去运行程序。
运行结果如下
Tine; 1561902150000 ms
Tine; 1561902150000 ms
Tine; 1561902150000 ms
Tine; 1561902150000 ms
Tine; 1561902150000 ms
可以看到批次变得不是很密集了,日至也变少了,再去产生相应的数据。
例如在 z.cdh01 中输入以下内容:
[ root@cdh01 ~]# nc -lk 0.0.0.0 9999
hello ^H^H^H
hello wrod^H^H
hello spark
切换至 streaming 可以得到如下对应数据
Time: 1561982225008m5
(wr,1)
( hello,1)
Time: 1561902230a0o ms
(he1lo,1)
( spark,1)
Time: 1561982235008 ms
Time: 15619022408o8 ms
继续在 z.cdh01 中输入以下内容:
h^H spark
Spark hadoop
Spark hello
Hello hadoop
切换至 streaming 可以得到如下对应数据
Time; 1561982255a08 ms
-------------------
( hadoop,1)
(,1)
hel1o,1)( sgark,3)
Time: 1561982260808 ms
---------------
( hadoop,1)
( hello,1)
过程中发现,输入了 hello spark,hello hadoop,但是程序中只有一个 hello,是因为刚才所输入的内容分散至两个批次之中,所以这个统计并不是对于全局来进行统计的,它的每一次统计都是对于每一个批次的内容进行统计,得到的统计结果并不是全局的,只是一个批次的内容。

