开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_Sink_Kafka】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/692/detail/12156
Structured_Sink_Kafka
1.场景
有很多时候 ETL 过后的数据,需要再次放入 Kafka 在 Kafka 后,可能会有流式程序统-将数据落地到 HDFS 或者 HBase.
将 kafka 整合落地非常有必要
从 Kafka 中获取数据,简单处理,再次放入 Kafka
2.步骤
(1)从卡夫卡读取数据,生成原数据集
连接卡夫卡生成 dateframe
从 deadframe 中取出表示卡消息内容的 value 列,并转为 string
(2)对源数据集选择列
解析 CSV 格式的数据
生成正确类型的结果集
再次落地 kafka
(3)代码
object KafkaSink {
def main(args: Array[String]): Unit = {
System . setProperty("hadoop . home . dir", "C:lwinutil")
//创建 SparkSession
val spark = SparkSession. builder(). appName( name - "hdfs_ sink")master( master =" local[6]"), getorCreate( )
import spark . implicits._
//获取 Kafka 数据
val source: Dataset[String]e spark . readStream
. format( source = "kafka")
.option("kafka.bootstrap.servers","node01:9092,node02:9092, node03:9092"). option(" subscribe", "streaming test 2")
. option("startingOffsets", "earliest")
. load()
.selectExpr( exprs = "CAST(value AS STRING) as value")
as[String]
//处理 csV, Dataset(String), Dataset(id, name, category)
val result = source. map(item => {
val arr = item.split( regex "::")
(arr(e). toInt, arr(1). toString, arr(2) tostring)
}).as[(Int, string, string)]. toDF( colNames "id", "name", "category")
//落地到 kafka
result. writeStream
.format( source = "kafka")
. outputMode(outputMode .Append())
. option(" checkpointLocation", "checkpoint")
.option("kafka.bootstrap.servers","node01:9092,
node02:9992, .node03:992")
.option("topic", "streamingtest 3")
//指定落地地点
. start( )
. awaitTermination( )
修改成 kafka,指定的写入即可运行成功