Flink checkpoint(一)| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 快速学习 Flink checkpoint。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 Flink checkpoint(一)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10044


Flink checkpoint(一)

 

内容简介:

一、Checkpoint 与 state 的关系

二、什么是 state

三、如何在 Flink 中使用 state

四、Checkpoint 的执行机制

五、回答部分问题

 

一、Checkpoint 与 state 的关系

Checkpoint 是一个动词,它最终如果顺利会产生一个 complete Checkpoint ,它是一个动词,也就是在下图中会看到红框里面一共有 Trigger 了569027,没有 Failed ,如下图:

image.png

State 其实就是 Checkpoint 所必备的主要的数据,下图中可以看到官方中 state 的 size 是9.17 KB ,是很小的,如下图:

image.png

 

二、什么是 state

(1)state 的含义

先看一段代码:

env.socketTextStream(localhost,9000)

//split up the lines in pairs (2-tuples)

.flatMap(new Tokenizer())

//group by the tuple field 0 and sum up tuple field 1

.keyBy(0).sum(1)

.print();

这是一段非常直白的 Wellcome 的代码,如果执行上述代码,它会去监控本地的9000端口的数据,然后本地启动 netcat ,如果输入 hello world ,执行程序会自动输出什么?

之前提到它是 Wellcome ,便会输出 hello 1 、 world 1 ,那么如果再次输入 hello world  执行程序会输出什么?

hello a 、 world a ,为什么第二次流式计算,

第一次 hello world 来的时候就已经把结果输入进去了,

为什么第二次 hello world 来的时候 Flink 知道它已经看到过一次 hello world ,这就是 state 的作用, 

因为它把这个存储在 state 里面,它知道 hello 和 world 分别出现过一次, State 我们可以认为是流式计算中持久化的状态,可以看到数据输入进来,最后在每个地方会存储到 state backend 里面,  state backend 就是存储

state 的,如下图:

image.png

(2)state的分类

①Keyed State

在 Flink 中如果对 state 进行分类,

有这样两类:一个是 Keyed State ,它只能应用于 KeyedStream 的函数与操作中,它是已经划分好的,换言之每个 Key 只能属于某一个 Keyed State ,刚才实例中提到的的 hello world 的中的 hello 在作业没有发生变化的情况下,只要它一直在正常的运行中,那么 hello 永远只会出现在某一个State 的并发上面,不会去其他地方,再来看这段

word count 的代码:

env.socketTextStream(localhost,9000)

//split up the lines in pairs (2-tuples)

.flatMap(new Tokenizer())

//group by the tuple field 0 and sum up tuple field 1

.keyBy(0).sum(1)

.print();

其中的 keyBy 便是之前提到的使用 KeyedStream ,要创建KeyedStream ,并要对 key 进行划分,不同的 task 上不会出现相同的 key ,其中 sum 是调用内置的 StreamGroupedReduce UD 进行一个累积的计算。

在进一步讲之前先看一下下图中 keyBy  左边有三个并发,右边也是三个并发,于是左边的词进来之后用 keyBy ,并会根据 key 进行自分发,如下图:

image.png

可以看到 hello world  中的 hello 永远只会去到右下方并发路task 上面去,这个就是 keyBy 语。

②Operator State

另外一个 state 是 Operator State ,这个不需要作用在 KeyStream ,换言之无需 keyBy 也能拿到它,它用每一个 operator state 都仅与一个 operator 的实例绑定, Operator State 中比较常见的是 source state ,例如记录当前 source 的 offset ,

再来看一段 word count 代码 :

env.fromElements(WordCountData.WORDS)

//split up the lines in pairs (2-tuples) containing: (word,1)

.flatMap(new Tokenizer())

//group by the tuple field 0 and sum up tuple field 1

.keyBy(0).sum(1)

.print();

可以看到 env 后面的 Function 换成了 fromElements ,这个源码里面是 Operator State ,

它会调用内置的 romElementsFunction ,以下这段代码就是源码里面的一部分,

如下:

public class FromElementsFunction implements SourceFunction, CheckpointedFunction { 

private transient ListState checkpointedState;

public FromElementsFunction(TypeSerializer serializer,T...elments) throws I0Exception {

this(serializer,Arrays.asList(elements));

}

可以看到它用到了 ListState 和 checkpointState ,这个就存储了相关的 Operator State 信息。

下面这个图是整个 State 负类子类的关系,这个比较黄的词是平常中经常用到的 State ,那么有个问题来了 这些 State 是类的定义,它们哪些属于 Key State ?

哪些属于 Operator State ?

比如是像 ValueState 代表说是 key 所对应的value 是单值 value , MapState 的 key 所对应的是 value 是map, ListState 的 key 所对应的 value 是 list ,

如下图:

image.png

看看其中哪些是 key 的 state ?即 ValueState 、 MapState 、ListState ,说明一下AggregatingState 、 ReducingState、 FoldingState 其实都是对 ValueState 的一个封装,上面加了一个function,那么哪些是 Operator State ?

即 ListState BroadcastState 。

③Managed 和 Raw

除了 key 和 Operator 一个维度来区分 State 还有一个维度,称之为 Managed 和 Raw  ,其中 Managed State 是由 Flink 管理的state ,刚才举例的所有 state 均是 managed ;

而 Raw State是Flink 仅提供 stream 可以进行存储数据,从它的角度来看这些可能只是一些 bytes , 比如下面的这段方法,就是相关的 Raw State的东西,

如下:

public interface StateSnapshotContext extends FunctionSnapshotContext {

/**

* Returns an output stream for keyed state

*/

KeyedStateCheckpointOutputStream getRawKeyedOperatorStateOutput() throws Exception;

/**

* Returns an output stream for operator state

*/

OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Exception;


三、如何在 Flink 中使用 State

先来看这段 word count 代码:

env.fromElements(WordCountData.WORDS)

//split up the lines in pairs (2-tuples) containing: (word,1)

.flatMap(new Tokenizer())

//group by the tuple field 0 and sum up tuple field1

.keyBy(0).sum(1)

.print();

之前sum 里面调用了内置的 StreamGroupedReduce

再来看看这段源码:

public class StreamGroupedReduce extends AbstractUdfStreamOperator>

implements OneInputStreamOperator {

private transient ValueState values;

@Override

public void open() throws Exception {

Super.open():

ValueStateDescriptor stated = new valueStateDescriptor<>(STATE_NAME, serializer);

values = getRuntimeContext().getState(stateId);

}

@Override

public void processElement(StreamRecord element) throws Exception {

IN value = element.getValue():

IN currentValue = values.value():

if (currentValue != null) {

IN reduce = userFunction.reduce(currentValue, value);

values.update(reduceed);

output.collect(element.replace(reduced));

} else {

value.update(value):

output.collect(element.replace(value));

}

}

可以看到里面有个 ValueState ,它就是 StreamGroupedReduce 使用 state 的地方 open 方法里面可以对它进行初始化

通过 RuntimeContext 去访问 state如果你想使用它就在processElement 中使用包括 valueupdate 则是对 state 进行更新

另外一方面如果在 state 中使用 Operator State同样使用内置的 FromElementsFunction我们以源码的方式去展示一下

如下:

public class FromElementsFunction implements SourceFunction, CheckpointedFunction {

private transient ListState checkpointedState

@Override

public void initializeState(FunctionInitializationContext context) throws Exception {

Precondition.checkState(this.checkpointedState == null,

The + getClass().gesSimpleName() + has already been initialized.);

this.checkpointedState = context.getOperatorStateStore().getLiState(

new ListStateDescriptor<>(from-elements-state,IntSerializer,INSTANCE)

);

if (context.isRestored()) {

List retrievedStates = new ArrayList<>():

for (Integer entry :this.checkpointedState.get()){

retrievedStates.add(entry);

//given that the parallelism of the function is1,we can only have 1 state

Preconditions.checkArgument(retrievedStates,sze() == 1,getClass().getSimpleName() +retrieved invalid state.);

this.numElementsToSkip = retrievedStates.get(0);

}

}

@Override

public void snapshotState(FunctionSnapshotContextcontext) throws Exception {

Preconditions.checkState(this.checkpointedState != null,

The + getClass().getSimpleName() + has not been properly initialized.);

this.checkpointedState.clear();

this.checkpointedState.add(this.numElementsEmitted);

}

要访问它对 Operator State 进行显示的初始化,即这个 list 里面需要去存储需要去初始化是需要显示声明的换言之需要继承接口来显示的对进行初始化和存储checkpointedState就是这里使用的 Operator State可以在另外的方法里面

比如在 process 里就可以对此进行读写更新的这个逻辑实际上就是每次在snapshot 时把内存里面的 numElementsEmitted 加载到 ListState里面相当于就是说下次如果发生 failed会直接从 numElementsEmitted 进行恢复

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
3月前
|
容灾 流计算
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
美团 Flink 大作业部署问题之 Checkpoint 跨机房副本的制作能力如何实现
|
3月前
|
容灾 流计算
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
美团 Flink 大作业部署问题之Checkpoint 的 metadata 文件包含什么信息
|
3月前
|
存储 调度 流计算
Flink 新一代流计算和容错问题之如何实现 Generalized Log-Based Incremental Checkpoint
Flink 新一代流计算和容错问题之如何实现 Generalized Log-Based Incremental Checkpoint
|
3月前
|
存储 缓存 数据处理
Flink 新一代流计算和容错问题之中间数据流动缓慢导致 Checkpoint 慢的问题要如何解决
Flink 新一代流计算和容错问题之中间数据流动缓慢导致 Checkpoint 慢的问题要如何解决
|
3月前
|
关系型数据库 MySQL 数据处理
实时计算 Flink版产品使用问题之mini-cluster模式下,怎么指定checkpoint的时间间隔
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
存储 监控 Serverless
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决
Serverless 应用的监控与调试问题之Flink对于Checkpoint Barrier流动缓慢的问题要如何解决
|
3月前
|
缓存 流计算
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
美团 Flink 大作业部署问题之根据已存在的 Checkpoint 副本进行增量的副本制作如何实现
|
3月前
|
分布式计算 流计算
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
美团 Flink 大作业部署问题之Checkpoint Replicate Service 跨 HDFS 集群的副本制作是如何实现的
|
3月前
|
流计算
美团 Flink 大作业部署问题之新启动作业的 Checkpoint 跨作业文件引用的问题要如何避免
美团 Flink 大作业部署问题之新启动作业的 Checkpoint 跨作业文件引用的问题要如何避免
|
3月前
|
流计算 索引
美团 Flink 大作业部署问题之RocksDBStateBackend 的增量 Checkpoint 要怎么制作
美团 Flink 大作业部署问题之RocksDBStateBackend 的增量 Checkpoint 要怎么制作
下一篇
无影云桌面