Structured_Sink_HDFS | 学习笔记

简介: 快速学习 Structured_Sink_HDFS

开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_Sink_HDFS】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/692/detail/12155


Structured_Sink_HDFS

HDFS Sink

使用 StructuredStreaming 可以从 kafka,HDFS 读取数据,也可以从其他读取数据。

Source 是可自定义的。

如何从 sink 落地数据。

如何将数据落地到 HFDS 和落地到 kafka,如何使用 foreach 落地到其他数据源,最终对整个内容进行简单说明,对 tigger 进行里了解,定义每一个批次的间隔,对内容进行原理性说明,最终对错误恢复进行说明,面试会被提问其相关问题以下都会解释说明。

1.目标

能够使用 Spark 将流式数据的处理结果放入 HDFS

2.步骤

场景和需求

代码实现

3.场景和需求

场景

·Kafka 往往作为数据系统和业务系统之问的桥梁

·数据系统一般由批量处理和流式处理两个部分组成

(Structured Streaming Structured Streaming)

·在 Kafka 作为整个数据平台入口的场景下,需要使用 Structured Streaming 按收 Kafka 的数据(以文件形式)并放置于 HDFS

上,后续才可以进行批量处理

放置 HBase 内也可进行查询

注意:

可以直接放入外表的目录中进行直接查询

image.png

案例需求

从 kafka 接收数据,从给定的数据集中,裁剪部分列落地于 HDFS

打开 intellij 创建文件

4. 代码

代码实现

(1)从 Kafka 读取数据,生成源数据集

对数据进行处理

连接 Kafka 生成 DataFrame

从 DataFrame 中取出表示 Kafka 消息内容的 value 列并转为 String 类型

def main(args:Array[String]):Unit =

//创建数据源

val spark SparkSession.builder()

master("local[6]")

appName("kafka integration")

getorCreate()

import spark.implicits._

val source spark

.readstream

format("kafka").option("kafka.bootstrap.servers","node01:9092,node02:9092,node03:9092")

option("subscribe","Streaming _test_2")

.option("startingoffsets","earliest")

.load()

//只要 value (别名无所谓)

.selectExpr(exprs =“CAST(value AS STRING)”)

.as[String]

打开 spark 点击 Files,Dataset 数据集进行拷贝,进入目录解压

将电影评分数据集放入 kafka 执行

(2)对源数据集选扦列

解析 CSV 格式的数据

生成正确类型的结果集

落地 HDFS

整体代码

import org.apache.spark.sql.SparkSession

val spark SparkSession.builder()

master("local[6]")

appName("kafka integration")、

def main(args:Array[String]):Unit =

//创建数据源

val spark SparkSession.builder()

master("local[6]")

appName("kafka integration")

getorCreate()

import spark.implicits._

val source spark

.readstream

format("kafka").option("kafka.bootstrap.servers","node01:9092,node02:9092,node03:9092")

option("subscribe","streaming-bank")

.option("startingoffsets","earliest")

.load()

.as[String]

/1:Toy Story (1995):Animation Children's/Comedy

CSV,Dataset(string),Dataset(id,name,category)

val result source.map(item =>

val arr item.split(regex"::"

(arr(0).toInt,arr(1).tostring,arr(2).tostring)

//转成元组类型,可以为每一列指定名称。

)).as[(Int,String,String)].tooF(coINames="id","name","category") 

//4,落地到 HDFS 中(打印输出)

result.writeStream

format(source "parquet")

//parquet格式

option("path","/dataset/streaming/moives/")

//指定目录 streaming/moives/

.start()}

//启动

awaitTerminatiom()

只有一列,String

Value 转成 string 命名 value

提供了一个新的数据集,后续都使用此数据集。打开 spark day 11,在 files 中点击 dataset,拷贝解压数据集.

使用文本工具打开,电影 id,名称 name,分类。拷贝到 kafka

进行读取,类似 csv

image.png

如何转化处理:

拿到 source,map  命名为一条数据(是一条字符串)

As 元组类型,todf 为每一列指定类型,拿到数据后,落地到 HDFS 中。

指定文件存在本地目录中

在运行代码前,需要创建 topic 和生产者,拷贝电影信息,分批拷贝数据

image.png

运行出现问题(没有引入环境变量)

将环境变量放入 HDFS

image.png

出错:

在落地必须设置 checkpoint

image.png

放入根目录下

oPtion checkpointlocation“,”checkpoint   运行

点击 dataset,查看数据成功落地。(写对应域名)

相关文章
|
存储 分布式计算 负载均衡
Hadoop学习笔记(二)之HDFS
Hadoop学习笔记(二)之HDFS
|
6月前
|
存储 机器学习/深度学习 分布式计算
Hadoop学习笔记(HDP)-Part.12 安装HDFS
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
174 0
Hadoop学习笔记(HDP)-Part.12 安装HDFS
|
6月前
|
存储 SQL 分布式计算
Hadoop(HDFS+MapReduce+Hive+数仓基础概念)学习笔记(自用)
Hadoop(HDFS+MapReduce+Hive+数仓基础概念)学习笔记(自用)
516 0
|
机器学习/深度学习 移动开发 分布式计算
配置 HDFS-配置 core-site-hdfs-size 以及 env.sh 等配置文件|学习笔记
快速学习配置 HDFS-配置 core-site-hdfs-size 以及 env.sh 等配置文件
409 0
|
存储 缓存 分布式计算
HDFS(二)|学习笔记
快速学习 HDFS(二)
155 0
HDFS(二)|学习笔记
|
SQL JSON 负载均衡
离线同步 mysql 数据到 HDFS2 | 学习笔记
快速学习离线同步 mysql 数据到 HDFS2
187 0
离线同步 mysql 数据到 HDFS2  |  学习笔记
|
SQL 消息中间件 JSON
离线同步 mysql 数据到 HDFS1 | 学习笔记
快速学习离线同步 mysql 数据到 HDFS1
157 0
离线同步 mysql 数据到 HDFS1  |  学习笔记
|
监控 开发工具 开发者
网站流量日志 Flume收集--hdfs--基于文件闲置策略滚动| 学习笔记
快速学习网站流量日志 Flume收集--hdfs--基于文件闲置策略滚动
网站流量日志 Flume收集--hdfs--基于文件闲置策略滚动| 学习笔记
|
存储 分布式计算 负载均衡
HDFS(一)|学习笔记
快速学习 HDFS(一)
109 0
HDFS(一)|学习笔记
|
分布式计算 资源调度 Hadoop
CDH 搭建_ Hadoop _ HDFS _主节点|学习笔记
快速学习 CDH 搭建_ Hadoop _ HDFS _主节点
369 0
CDH 搭建_ Hadoop _ HDFS _主节点|学习笔记