Structured_Sink_Foreach | 学习笔记

简介: 快速学习 Structured_Sink_Foreach

开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_Sink_Foreach】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/692/detail/12157


Structured_Sink_Foreach

内容介绍

一.Foreach Writer

二.Foreach 模式

 

如何整合

官方提供的 think 只有 kafka 和 HDFS,如果将数据落地可以使用 Foreach,一个一个处理,非常安全

 

一.Foreach Writer

1.Foreach Writer 目标

掌握 Foreach 模式理解如何扩展 Structured Streaming 的 Sink,同时能够将数据落地到 MySQL

2. 步骤

需求

代码

3.需求

场景

大数据有一个常见的应用场景

(1)收集业务系统数据

(2)数据处理

(3)放入 OLTP 数据

//进行商业分析(增长率,人数等)

(4)外部通过 ECharts 获取并处理数据

//处理展示

这个场景下,StructuredStreaming 就需要处理数据水放入 MySQL 或者MongoDB,HBase 中以供 Web 程序可以

获取数据,图表的形式展示在前端

image.png


二.Foreach 模式

1.起因

·在 Structured Streaming 中,并未提供完整的 MySQL/JDBC 整合工具

·不止:MySQL 和 JDBC,可能会有其它的日标端需要写入

·很多时候 Structured Streaming 需要对按些第三方的系统,例阿里云的云存储,业马云的云存储第,但是 Spak 无法对所有第三方都提供支持,有时候需要自已编写。

解决方案

image.png

·既然无法河足所有的整合需求,StructuredStreaming 提供了 Foreach, 可以拿到每·个批次的数据

·通过 Foreach 拿到数据后,可以通过自定义写入方式,从而将数据落地到其它的系统

2.案例需求:

从 kafka 中读取数据处理后放入 MySQL

image.png

代码

1.创建 DataFrame 表示 Kafka 数据源

2.在源 DataFrame 中选择三列数据

3.创建 ForeachWriter 按收每一个批次的数据落地 MySQL

4.Foreach 落地数据

代码

Import org.apache.spark.sql.SparkSession

val spark SparkSession.builder()

master("local[6]")

appName("kafka integration")

getorCreate()

import spark.implicits._

val source spark

.readstream

format("kafka").option("kafka.bootstrap.servers","node01:9092,node02:9092,node03:9092")

option("subscribe","streaming-bank")

.option("startingoffsets","earliest")

.load()

.selectExpr( exprs = "CAST(value AS STRING) as value")

as[String]

//处理 csV, Dataset(String), Dataset(id, name, category)

val result = source. map(item => {

val arr = item.split( regex "::")

(arr(e). toInt, arr(1). toString, arr(2) tostring)

}).as[(Int, string, string)]. toDF( colNames "id", "name", "category")

class MySQLWriter extends ForeachWriter[Row]

private val driver ="com.mysql.jdbc.Driver"

private var connection:Connection

Private  val  url

="jdbc:mysql://node01:3306/streaming-movies-result"

private var statement:Statement =

Database

override def open(partitionId:Long,version:Long):Boolean =

class.forName(driver)

connection DriverManager.getconnection(url)

statement connection.createStatement()

true

override def process(value:Row):Unit =

statement.executeUpdate(sql =s"insert into movies values(${value.get(0)),$(value.get(1)),${value.get(2)})"

override def close(errororNull:Throwable):Unit =

connection.close()

result.writestream

foreach(new MySQLWriter)

.start()

awaitTermination()

第一步加载 driver

第二部创建 connection,url,创建 statement 执行 SQL 语句,将此类型放在 action 上。返回一个布尔值查看返回值是否正确。

写入前需要创建完成数据库和表

 

相关文章
Typora设置图片的相对路径(win系统)
Typora设置图片的相对路径(win系统)
1030 0
|
9月前
|
机器学习/深度学习 自然语言处理 算法
PyTorch PINN实战:用深度学习求解微分方程
物理信息神经网络(PINN)是一种将深度学习与物理定律结合的创新方法,特别适用于微分方程求解。传统神经网络依赖大规模标记数据,而PINN通过将微分方程约束嵌入损失函数,显著提高数据效率。它能在流体动力学、量子力学等领域实现高效建模,弥补了传统数值方法在高维复杂问题上的不足。尽管计算成本较高且对超参数敏感,PINN仍展现出强大的泛化能力和鲁棒性,为科学计算提供了新路径。文章详细介绍了PINN的工作原理、技术优势及局限性,并通过Python代码演示了其在微分方程求解中的应用,验证了其与解析解的高度一致性。
3007 5
PyTorch PINN实战:用深度学习求解微分方程
|
前端开发 JavaScript API
< 谈谈对 SPA(单页面应用)的理解 >
浅谈 SPA 相关知识,了解SPA相关优缺点 及 实现原理等。
338 1
< 谈谈对 SPA(单页面应用)的理解 >
|
网络协议 数据格式
【通信协议讲解】单片机基础重点通信协议解析与总结之ModBus(五)
【通信协议讲解】单片机基础重点通信协议解析与总结之ModBus(五)
325 1
|
Python
联合概率 边缘概率 条件概率 贝叶斯定理
联合概率 边缘概率 条件概率 贝叶斯定理
373 0
|
JavaScript
js如何判断数组内的值都为true
js如何判断数组内的值都为true
126 0
|
存储 监控 Java
详解ElasticAPM实现微服务的链路追踪(一)
Elastic APM实现链路追踪,首先要引用开源的APMAgent(APM代理),然后将监控的信息发送到APMServer,然后在转存入ElasticSearch,最后有Kibana展示;具体流程如下图所示:
详解ElasticAPM实现微服务的链路追踪(一)
|
Web App开发 弹性计算 Linux
CentOS 7迁移Anolis OS 8
龙蜥操作系统Anolis OS的体验。OpenAnolis社区提供的迁移工具leapp,充分考虑OS的差异兼容性问题,提供迁移评估,迁移实施,配置还原等步骤,用于实现CentOS7.x到Anolis OS 8的就地迁移。
|
Dubbo Java 关系型数据库
往期文章精选
往期文章精选
112 0