Structured_Sink_Kafka | 学习笔记

简介: 快速学习 Structured_Sink_Kafka

开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_Sink_Kafka】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/692/detail/12156


Structured_Sink_Kafka

1.场景

有很多时候 ETL 过后的数据,需要再次放入 Kafka 在 Kafka 后,可能会有流式程序统-将数据落地到 HDFS 或者 HBase.

image.png

将 kafka 整合落地非常有必要

从 Kafka 中获取数据,简单处理,再次放入 Kafka

2.步骤

(1)从卡夫卡读取数据,生成原数据集

连接卡夫卡生成 dateframe

从 deadframe 中取出表示卡消息内容的 value 列,并转为 string

(2)对源数据集选择列

解析 CSV 格式的数据

生成正确类型的结果集

再次落地 kafka

(3)代码

object KafkaSink {

def main(args: Array[String]): Unit = {

System . setProperty("hadoop . home . dir", "C:lwinutil")

//创建 SparkSession

val spark = SparkSession. builder(). appName( name - "hdfs_ sink")master( master =" local[6]"), getorCreate( )

import spark . implicits._

//获取 Kafka 数据

val source: Dataset[String]e spark . readStream

. format( source = "kafka")

.option("kafka.bootstrap.servers","node01:9092,node02:9092, node03:9092"). option(" subscribe", "streaming test 2")

. option("startingOffsets", "earliest")

. load()

.selectExpr( exprs = "CAST(value AS STRING) as value")

as[String]

//处理 csV, Dataset(String), Dataset(id, name, category)

val result = source. map(item => {

val arr = item.split( regex "::")

(arr(e). toInt, arr(1). toString, arr(2) tostring)

}).as[(Int, string, string)]. toDF( colNames "id", "name", "category")

//落地到 kafka

result. writeStream

.format( source = "kafka")

. outputMode(outputMode .Append())

. option(" checkpointLocation", "checkpoint")

.option("kafka.bootstrap.servers","node01:9092,

node02:9992, .node03:992")

.option("topic", "streamingtest 3")

//指定落地地点

. start( )

. awaitTermination( )

修改成 kafka,指定的写入即可运行成功

相关文章
|
消息中间件 数据采集 域名解析
数据采集-Lua集成kafka流程跑通|学习笔记
快速学习数据采集-Lua集成kafka流程跑通
数据采集-Lua集成kafka流程跑通|学习笔记
|
6月前
|
消息中间件 存储 分布式计算
Hadoop学习笔记(HDP)-Part.19 安装Kafka
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
166 0
Hadoop学习笔记(HDP)-Part.19 安装Kafka
|
消息中间件 Kafka 调度
Kafka 消费者案例 | 学习笔记
快速学习 Kafka 消费者案例
172 0
Kafka 消费者案例  |  学习笔记
|
消息中间件 负载均衡 Kafka
Kafka 生产者案例 | 学习笔记
快速学习 Kafka 生产者案例
135 0
Kafka 生产者案例  |  学习笔记
|
消息中间件 存储 弹性计算
消息队列 kafka 销售指南| 学习笔记
快速学习消息队列 kafka 销售指南
消息队列 kafka 销售指南| 学习笔记
|
消息中间件 弹性计算 分布式计算
Kafka 数据如何同步到 MaxCompute | 学习笔记
快速学习 Kafka 数据如何同步到 MaxCompute,介绍了 Kafka 数据如何同步到 MaxCompute系统机制, 以及在实际应用过程中如何使用。
170 0
Kafka 数据如何同步到 MaxCompute | 学习笔记
|
消息中间件 分布式计算 大数据
Structured_Source_Kafka_整合 | 学习笔记
快速学习 Structured_Source_Kafka_整合
Structured_Source_Kafka_整合 | 学习笔记
|
消息中间件 JSON 大数据
Structured_Source_Kafka_回顾 | 学习笔记
快速学习 Structured_Source_Kafka_回顾
Structured_Source_Kafka_回顾 | 学习笔记
|
消息中间件 JSON 分布式计算
Structred_Source_Kafka_需求 | 学习笔记
快速学习 Structred_Source_Kafka_需求
Structred_Source_Kafka_需求 | 学习笔记
|
消息中间件 JSON 分布式计算
Structred_Source_Kafka_连接 | 学习笔记
快速学习 Structred_Source_Kafka_连接
Structred_Source_Kafka_连接 | 学习笔记