Structured_Sink_Trigger | 学习笔记

简介: 快速学习 Structured_Sink_Trigger

开发者学堂课程【大数据Spark2020版(知识精讲与实战演练)第五阶段:Structured_Sink_Trigger】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/692/detail/12158


Structured_Sink_Trigger

内容介绍

一.Tigger

二.   步骤

三.连续流处理

 

一.Tigger

指定批次,Structured Sink 批次设置较重要.

1.Tigger 目标

掌握如何控制 StructuredStreaming 的处理时间

 

二.  步骤

微批次处理

连续流处理

1.微批次处理

image.png

数据在时间线不断产生,流式数据。

每个结果是中间批次产生的结果,速度较快。

并不是真正的流,而是缓存一个批次网期的数据,后处理这一批次的数据

2.通用流程

步骤

1.根据 Spark 提供的调试用的数据源 Rate 创建流式 DataFrame

·Rate 数据源会定期提供一个由两列 timestamp,value 组成的数据,value 是一个随机数

2.处理利和聚合数据,计算每个个位数和十位数各有多少条数烟

·对 value 求 log10 即可得出其位数

·后按照位数进行分组,最终就可以看到每个位数的数据行多少个

代码

val spark =SparkSession.builder()

.master(="local[6]")

appName("socket_processor")

getorcreate()

import org.apache.spark.sql.functions._

import spark.implicits._

spark.sparkContext.setLogLevel("ERROR")

val source spark.readstream

format("rate")

.load()

val result source.select(1og10('value)cast IntegerType as 'key.'value)

agg(count('key)as count)

select('key,'count)

where('key.isNotNull)

sort('key.asc)

3. 代码

import org.apache.spark.sq1.SparkSession

import org.apache.spark.sql.streaming.OutputMode

object Triggers

def main(args:Array[String]):Unit =

//创建数据源

val spark SparkSession.builder()

appName(name="triggers")

master(master ="local[6]")getorcreate()

// timestamp,value

val source spark.readstream

.format(source="rate")

//一般用于测试演示,模拟一个假的数据源

.load()

//简单处理

val result source

//落地

source.writestream

format(source "console")

outputMode(OutputMode.Complete())

start()

.awaitTermination()

若 Complete 中间必须有操作,其展示的是全集操作。

若不存在聚合操作,则不可用。

image.png

限制输出  spark sparkContext.setloglevel(“wang”)

image.png

下一个批次立即执行,

用 trigger(trigger.Processingtime())可以指定时间间隔,

按照固定的时间间隔划分批次

如果指定间隔为零,相当于默认批次划分

如果前一批次的时间提前完成,等待此间隔到达之后,才会进入下一个批次

如果前一个批次延迟完成,么下一个皮子会在前一个批次完成后立即执行

image.png

image.png

trigger(trigger.once()):只执行一次,运行后不再处理。

遗留信息处理完整会用到。

image.png

微批次无需考虑数据一致性,数据更容易处理,但时间颇慢三

 

二.连续流处理

1.介绍

微批次会将收到的数据按照批次划分为不同的 DataFrame, 后执行 DataFrame, 所以其数据的处理适迟取决于每个

DataFrame 的处理速度,最快也只能在一~个 DataFrame 结束后立刻执行下一个,位快可以达到 100ms 左右的端到端

延迟

而连续流处理可以做到大约 1ms 的端到端数据处理延迟

连续流处理可以达到 at-least-once 的容错语义

从 Spark2.3 版本开始支持连续流处理,我们所采用的 2.2 版木还没有这个特性并且这个静性度止到 2.4 发然是

实验性质,不建议在生产环境中使用

·操作

的流程

2.步骤

使用特殊的 Trig8er 完成功能

3.代码

result.writestream

outputMode(OutputMode.Complete())

format("console")

trigger(Trigger.Continuous("1 second"))

start()

awaitTermination()

4.限制(限制较多)

只支持 Map 类的有类型探作

只支持普通的的 Q@ 类操作,不支持聚给

Source 只支持 Kafka

Sink 只支特 Kafka,Console,Memory

相关文章
|
Java 开发工具
开发工具系列 之 同一个电脑上安装多个版本的JDK
这篇文章介绍了如何在一台电脑上安装和配置多个版本的JDK,包括从官网下载所需JDK、安装过程、配置环境变量以及如何查看和切换当前使用的JDK版本,并提到了如果IDEA和JDK版本不兼容时的解决方法。
开发工具系列 之 同一个电脑上安装多个版本的JDK
|
消息中间件 存储 分布式计算
Spark从入门到入土(四):SparkStreaming集成kafka
Spark从入门到入土(四):SparkStreaming集成kafka
Spark从入门到入土(四):SparkStreaming集成kafka
|
存储 安全 API
Android Google Pay接入
本文介绍了如何将 Google Play 结算库集成到您的应用中以开始销售商品。包含一些代码示例,它们基于 GitHub 上的官方示例应用。
2655 0
Android Google Pay接入
|
分布式计算 Hadoop 数据挖掘
|
消息中间件 分布式计算 Kafka
Spark-Spark Streaming例子整理(二)
Spark Streaming从Flume Poll数据 一、Spark Streaming on Polling from Flume实战 二、Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据。
3569 0
|
2天前
|
数据采集 人工智能 安全
|
12天前
|
云安全 监控 安全
|
3天前
|
自然语言处理 API
万相 Wan2.6 全新升级发布!人人都能当导演的时代来了
通义万相2.6全新升级,支持文生图、图生视频、文生视频,打造电影级创作体验。智能分镜、角色扮演、音画同步,让创意一键成片,大众也能轻松制作高质量短视频。
1034 151
|
3天前
|
编解码 人工智能 机器人
通义万相2.6,模型使用指南
智能分镜 | 多镜头叙事 | 支持15秒视频生成 | 高品质声音生成 | 多人稳定对话