开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_Sink_HDFS】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/692/detail/12155
Structured_Sink_HDFS
HDFS Sink
使用 StructuredStreaming 可以从 kafka,HDFS 读取数据,也可以从其他读取数据。
Source 是可自定义的。
如何从 sink 落地数据。
如何将数据落地到 HFDS 和落地到 kafka,如何使用 foreach 落地到其他数据源,最终对整个内容进行简单说明,对 tigger 进行里了解,定义每一个批次的间隔,对内容进行原理性说明,最终对错误恢复进行说明,面试会被提问其相关问题以下都会解释说明。
1.目标
能够使用 Spark 将流式数据的处理结果放入 HDFS
2.步骤
场景和需求
代码实现
3.场景和需求
场景
·Kafka 往往作为数据系统和业务系统之问的桥梁
·数据系统一般由批量处理和流式处理两个部分组成
(Structured Streaming Structured Streaming)
·在 Kafka 作为整个数据平台入口的场景下,需要使用 Structured Streaming 按收 Kafka 的数据(以文件形式)并放置于 HDFS
上,后续才可以进行批量处理
放置 HBase 内也可进行查询
注意:
可以直接放入外表的目录中进行直接查询
案例需求
从 kafka 接收数据,从给定的数据集中,裁剪部分列落地于 HDFS
打开 intellij 创建文件
4. 代码
代码实现
(1)从 Kafka 读取数据,生成源数据集
对数据进行处理
连接 Kafka 生成 DataFrame
从 DataFrame 中取出表示 Kafka 消息内容的 value 列并转为 String 类型
def main(args:Array[String]):Unit =
//创建数据源
val spark SparkSession.builder()
master("local[6]")
appName("kafka integration")
getorCreate()
import spark.implicits._
val source spark
.readstream
format("kafka")
.option("kafka.bootstrap.servers","node01:9092,node02:9092,node03:9092")
option("subscribe","Streaming _test_2")
.option("startingoffsets","earliest")
.load()
//只要 value (别名无所谓)
.selectExpr(exprs =“CAST(value AS STRING)”)
.as[String]
打开 spark 点击 Files,Dataset 数据集进行拷贝,进入目录解压
将电影评分数据集放入 kafka 执行
(2)对源数据集选扦列
解析 CSV 格式的数据
生成正确类型的结果集
落地 HDFS
整体代码
import org.apache.spark.sql.SparkSession
val spark SparkSession.builder()
master("local[6]")
appName("kafka integration")、
def main(args:Array[String]):Unit =
//创建数据源
val spark SparkSession.builder()
master("local[6]")
appName("kafka integration")
getorCreate()
import spark.implicits._
val source spark
.readstream
format("kafka")
.option("kafka.bootstrap.servers","node01:9092,node02:9092,node03:9092")
option("subscribe","streaming-bank")
.option("startingoffsets","earliest")
.load()
.as[String]
/1:Toy Story (1995):Animation Children's/Comedy
CSV,Dataset(string),Dataset(id,name,category)
val result source.map(item =>
val arr item.split(regex"::"
(arr(0).toInt,arr(1).tostring,arr(2).tostring)
//转成元组类型,可以为每一列指定名称。
)).as[(Int,String,String)].tooF(coINames="id","name","category")
//4,落地到 HDFS 中(打印输出)
result.writeStream
format(source "parquet")
//parquet
格式
option("path","/dataset/streaming/moives/")
//指定目录 streaming/moives/
.start()}
//启动
awaitTerminatiom()
只有一列,String
Value 转成 string 命名 value
提供了一个新的数据集,后续都使用此数据集。打开 spark day 11,在 files 中点击 dataset,拷贝解压数据集.
使用文本工具打开,电影 id,名称 name,分类。拷贝到 kafka
进行读取,类似 csv
如何转化处理:
拿到 source,map 命名为一条数据(是一条字符串)
As 元组类型,todf 为每一列指定类型,拿到数据后,落地到 HDFS 中。
指定文件存在本地目录中
在运行代码前,需要创建 topic 和生产者,拷贝电影信息,分批拷贝数据
运行出现问题(没有引入环境变量)
将环境变量放入 HDFS
出错:
在落地必须设置 checkpoint
放入根目录下
oPtion checkpointlocation“,”checkpoint 运行
点击 dataset,查看数据成功落地。(写对应域名)