Flink 通过 State Processor API 实现状态的读取和写入

简介: 在 1.9 版本之前,Flink 运行时的状态对于用户来说是一个黑盒,我们是无法访问状态数据的,从 Flink-1.9 版本开始,官方提供了 State Processor API 这让用户读取和更新状态成为了可能,我们可以通过 State Processor API 很方便的查看任务的状态,还可以在任务第一次启动的时候基于历史数据做状态冷启动。从此状态对于用户来说是透明的。下面就来看一下 State Processor API 的使用。

大家好,我是 JasonLee。


在 1.9 版本之前,Flink 运行时的状态对于用户来说是一个黑盒,我们是无法访问状态数据的,从 Flink-1.9 版本开始,官方提供了 State Processor API 这让用户读取和更新状态成为了可能,我们可以通过 State Processor API 很方便的查看任务的状态,还可以在任务第一次启动的时候基于历史数据做状态冷启动。从此状态对于用户来说是透明的。下面就来看一下 State Processor API 的使用。


添加依赖


<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-state-processor-api_2.11</artifactId>
    <version>1.14.4</version>
</dependency>


Mapping Application State to DataSets


State Processor API 将流应用程序的状态映射到一个或多个可以单独处理的数据集。为了能够使用 API,我们先来理解一下任务的状态和 DataSets 之间是如何映射的。


让我们先看看有状态的 Flink 作业是什么样子的。Flink 作业由多个算子组成,通常有一个或多个 source 数据源,一些用于实际处理数据的算子,以及一个或多个 sink 算子。每个算子并行的在一个或多个 task 上运行,并且可以处理不同类型的状态。一个算子可以有 0、1 个或多个 operator states,这些状态被组织成 list,作用于所有的 tasks 上。如果 operator 应用于 keyed states,它还可以有 0 个、1 个或多个 keyed state,这些状态的作用域为从每个 record 中提取的 key。


下图显示了应用程序 MyApp,它由  Src、Proc 和 Snk 三个算子组成。Src 有一个 operator state 状态(os1), Proc 有一个 operator 状态(os2) 和两个 keyed state 状态(ks1, ks2),而 Snk 是无状态的。



MyApp 的 SavePoint 或 CheckPoint 由所有的状态数据组成,以便可以恢复每个 task 的状态。在使用 batch 作业处理保存点(或检查点)的数据时,我们需要将各个任务状态的数据映射到数据集或表中的心智模型。实际上,我们可以将保存点视为数据库。每个 operator(由其UID标识)代表 namespace。每一个算子的 operator state 在 namespace 里都映射到一个固定的表里,其中有一列包含所有 task 的状态数据。一个算子的所有 keyed state 都映射到由 key 的列组成的单个表,以及另外一列对应每一个 keyed state。下图显示了MyApp 的保存点如何映射到数据库。



该图显示了 Src 的 operator state 的值是如何映射到一个表的,该表有一列和五行,每一行代表 Src 的所有并行任务中的每个列表条目。算子 Proc 的 operator state(os2) 类似地映射到单个表。keyed state ks1 和 ks2 合并到一个包含三列的表中,一列表示 key,一列用于 ks1,一列用于 ks2。这个 keyed table 为两个 keyed state 的每个不同 key 保存一行。因为算子 Snk 没有任何状态,所以它的 namespace 是空的。


Reading State


读取状态首先需要指定一个有效的 savepoint 或 checkpoint 的路径,以及应该用于恢复数据的 StateBackend。恢复状态的兼容性保证与恢复 DataStream 应用程序时相同。


ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new HashMapStateBackend());


读取状态时支持三种不同类型的状态:


Operator State

Keyed State

Window State

Writing New Savepoints


也可以编写 Savepoints,它允许这样的用例,如基于历史数据的启动状态。每个 Savepoints 由一个或多个 BootstrapTransformation(下面会解释)组成,每个 BootstrapTransformation 都定义了单个算子的状态。


注意:state processor api 当前未提供 Scala API。因此,它将始终使用 Java 类型堆栈自动推断出序列化器。要为 Scala Datastream API 启动 savepoint 请在所有类型信息中手动传递。


初始化状态时支持四种不同类型的状态:


Operator State

Broadcast State

Keyed State

Window State

Modifying Savepoints


除了从临时创建一个 savepoint 外,你还可以基于现有的 Savepoints,当为现有作业启动单个新的算子时。


Savepoint
    .load(bEnv, oldPath, new HashMapStateBackend())
    .withOperator("uid", transformation)
    .write(newPath);


Read And Write State Demo


下面就来实现一下我们平时使用最多的 Keyed State 状态的读取和写入。


Read State
package flink.state;
import bean.Jason;
import bean.UserDefinedSource;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class FlinkStreamingDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 设置任务的最大并行度 也就是keyGroup的个数
        env.setMaxParallelism(128);
        //env.getConfig().setAutoWatermarkInterval(1000L);
        // 设置开启checkpoint
        env.enableCheckpointing(10000L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("file:///Users/jasonlee/flink-1.14.0/checkpoint");
        // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        DataStreamSource<Jason> dataStreamSource = env.addSource(new UserDefinedSource());
        dataStreamSource.keyBy(k -> k.getName())
                .process(new KeyedProcessFunction<String, Jason, Jason>() {
                    private ValueState<Integer> state;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
                        state = getRuntimeContext().getState(stateDescriptor);
                    }
                    @Override
                    public void processElement(Jason value, KeyedProcessFunction<String, Jason, Jason>.Context ctx, Collector<Jason> out) throws Exception {
                        if (state.value() != null) {
                            System.out.println("状态里面有数据 :" + state.value());
                            value.setAge(state.value() + value.getAge());
                            state.update(state.value() + value.getAge());
                        } else {
                            state.update(value.getAge());
                        }
                        out.collect(value);
                    }
                }).uid("my-uid")
                .print("local-print");
        env.execute();
    }
}


代码非常简单,里面只用了一个 ValueState,来保存用户的 age ,key 是 name。要为带状态的算子设置唯一的 uid("my-uid"),在读取状态的时候需要指定算子的 uid。


先把这个任务跑起来,然后只要任务 checkpoint 做成功就可以把任务停掉了。



然后来看一下生成的 ck 文件。



可以看到做了 10 次 ck,那这里我们就来读取 chk-10 这个 ck 里面的状态。


读取和写入状态的代码如下:


package flink.state;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.api.BootstrapTransformation;
import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
public class FlinkReadAndUpdateState {
    private static final String ckPath = "file:///Users/jasonlee/flink-1.14.0/checkpoint/b02f75ede7e3b093eb3b58bdd5906de3/chk-10";
    private static final Collection<KeyedState> data =
            Arrays.asList(new KeyedState("hive", 1), new KeyedState("JasonLee1", 100), new KeyedState("hhase", 3));
    public static void main(String[] args) throws Exception {
        stateRead(ckPath);
        //stateWrite("");
    }
    /**
     * 从 ck 读取状态数据
     * @param ckPath
     * @throws Exception
     */
    public static void stateRead(String ckPath) throws Exception {
        ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
        bEnv.setParallelism(1);
        ExistingSavepoint savepoint = Savepoint.load(bEnv, ckPath, new HashMapStateBackend());
        DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction());
        List<KeyedState> keyedStates = keyedState.collect();
        for (KeyedState ks: keyedStates) {
            System.out.println(String.format("key: %s, value: %s", ks.key, ks.value));
        }
    }
    /**
     * 初始化状态数据
     * @param ckPath
     */
    public static void stateWrite(String ckPath) throws Exception {
        int maxParallelism = 128;
        ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<KeyedState> dataKeyedState = bEnv.fromCollection(data);
        BootstrapTransformation<KeyedState> transformation = OperatorTransformation
                .bootstrapWith(dataKeyedState)
                .keyBy(k -> k.key)
                .transform(new WriterFunction());
        Savepoint
                .create(new HashMapStateBackend(), maxParallelism)
                .withOperator("uid-test", transformation)
                .write("file:///Users/jasonlee/flink-1.14.0/checkpoint/init_state");
        bEnv.execute();
    }
    public static class WriterFunction extends KeyedStateBootstrapFunction<String, KeyedState> {
        ValueState<Integer> state;
        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
            state = getRuntimeContext().getState(stateDescriptor);
        }
        @Override
        public void processElement(KeyedState value, KeyedStateBootstrapFunction<String, KeyedState>.Context ctx) throws Exception {
            state.update(value.value);
        }
    }
    public static class ReaderFunction extends KeyedStateReaderFunction<String, KeyedState> {
        ValueState<Integer> state;
        @Override
        public void open(Configuration parameters) {
            ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("state", Types.INT);
            state = getRuntimeContext().getState(stateDescriptor);
        }
        @Override
        public void readKey(
                String key,
                Context ctx,
                Collector<KeyedState> out) throws Exception {
            KeyedState data = new KeyedState();
            data.key = key;
            data.value = state.value();
            out.collect(data);
        }
    }
    public static class KeyedState {
        public String key;
        public int value;
        public KeyedState(String key, int value) {
            this.key = key;
            this.value = value;
        }
        public KeyedState() {}
    }
}


这里读取和写入状态的代码放到一起了,只需调用 savepoint 的 readKeyedState 方法指定一下上面代码里面设置的 uid,还需要继承 KeyedStateReaderFunction 实现 readKey 方法就可以了。代码比较简单,这里就不在多说。直接来看一下读取的结果。


执行这个代码,打印的状态数据如下:


key: JasonLee35, value: 35
key: JasonLee66, value: 66
key: JasonLee81, value: 81
key: JasonLee74, value: 74
key: JasonLee90, value: 90
key: JasonLee36, value: 36
key: JasonLee85, value: 85
key: JasonLee39, value: 39
key: JasonLee72, value: 72
key: JasonLee65, value: 65
key: JasonLee58, value: 58
key: JasonLee9, value: 9
key: JasonLee69, value: 69
key: JasonLee82, value: 82
key: JasonLee53, value: 53
key: JasonLee6, value: 6
key: JasonLee79, value: 79
key: JasonLee32, value: 32
key: JasonLee64, value: 64
key: JasonLee76, value: 76
key: JasonLee91, value: 91
key: JasonLee18, value: 18
key: JasonLee26, value: 26
key: JasonLee40, value: 40
key: JasonLee25, value: 25
key: JasonLee54, value: 54
key: JasonLee21, value: 21
key: JasonLee55, value: 55
key: JasonLee78, value: 78
key: JasonLee71, value: 71
key: JasonLee42, value: 42
key: JasonLee56, value: 56
key: JasonLee17, value: 17
key: JasonLee88, value: 88
key: JasonLee61, value: 61
key: JasonLee27, value: 27
key: JasonLee41, value: 41
key: JasonLee12, value: 12
key: JasonLee63, value: 63
key: JasonLee5, value: 5
key: JasonLee73, value: 73
key: JasonLee67, value: 67
key: JasonLee29, value: 29
key: JasonLee31, value: 31
key: JasonLee14, value: 14
key: JasonLee92, value: 92
key: JasonLee7, value: 7
key: JasonLee45, value: 45
key: JasonLee48, value: 48
key: JasonLee11, value: 11
key: JasonLee75, value: 75
key: JasonLee84, value: 84
key: JasonLee13, value: 13
key: JasonLee77, value: 77
key: JasonLee59, value: 59
key: JasonLee83, value: 83
key: JasonLee15, value: 15
key: JasonLee37, value: 37
key: JasonLee52, value: 52
key: JasonLee30, value: 30
key: JasonLee62, value: 62
key: JasonLee34, value: 34
key: JasonLee19, value: 19
key: JasonLee87, value: 87
key: JasonLee86, value: 86
key: JasonLee38, value: 38
key: JasonLee57, value: 57
key: JasonLee10, value: 10
key: JasonLee49, value: 49
key: JasonLee46, value: 46
key: JasonLee8, value: 8
key: JasonLee28, value: 28
key: JasonLee2, value: 2
key: JasonLee89, value: 89
key: JasonLee16, value: 16
key: JasonLee24, value: 24
key: JasonLee50, value: 50
key: JasonLee3, value: 3
key: JasonLee51, value: 51
key: JasonLee44, value: 44
key: JasonLee47, value: 47
key: JasonLee33, value: 33
key: JasonLee68, value: 68
key: JasonLee22, value: 22
key: JasonLee80, value: 80
key: JasonLee20, value: 20
key: JasonLee23, value: 23
key: JasonLee1, value: 1
key: JasonLee70, value: 70
key: JasonLee60, value: 60
key: JasonLee4, value: 4
key: JasonLee43, value: 43


可以看到这个就是我们写入的状态数据。


然后再来测试一下初始化状态数据,跟读取状态刚好相反,我们需要先写入一个状态到指定的路径。然后在指定这个状态路径启动任务。


运行上面写入的代码,会在 /Users/jasonlee/flink-1.14.0/checkpoint/init_state 路径下面生成一个 _metadata 文件。来看一下生成的文件。



这里我读取状态和写入状态用的是同一个算子,也就是上面的 KeyedProcessFunction 算子,注意在恢复状态的时候需要把算子的 uid 改成和 .withOperator("uid-test", transformation) 参数保持一致。


然后就可以 通过下面的命令指定 ck 启动任务。


flink run -d -m yarn-cluster \
-Dyarn.application.name=FlinkStreamingNewDemoHome \
-Dyarn.application.queue=flink \
-Dmetrics.reporter.promgateway.groupingKey="jobname=FlinkStreamingNewDemoHome" \
-Dmetrics.reporter.promgateway.jobName=FlinkStreamingNewDemoHome \
-c flink.state.FlinkStreamingDemo \
-Denv.java.opts="-Dflink_job_name=FlinkStreamingNewDemoHome" \
-s hdfs:///flink-rockdb/checkpoints/init_state/_metadata \
/home/jason/bigdata/jar/flink-1.14.x-1.0-SNAPSHOT.jar



从上图可以看出任务确实是从我们指定的 ck 恢复的,这里其实和指定 checkpoint 或 savepoint 恢复任务是一样的,可以再来看一下 TM 里我们在代码里面打印的日志。



因为我们初始化了 JasonLee1 100 所以从状态里面读取出来的是 100 然后第一条数据的 age 是 1 所以打印的 JasonLee1 101 是没问题的。整个读取和写入状态的流程就结束了,其他类型的状态这里就不在演示,用法基本都是一样的。实际使用的时候根据场景选择不同类型的状态就可以了。


总结


State Processor API 提供了状态的读取写入以及修改的功能,从此状态对于用户来说是可见的,我们可以方便的查看我们自己定义的状态,也可以在任务启动之前初始化历史数据作为状态的冷启动,极大的丰富了使用的场景。

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
844 12
Flink CDC YAML:面向数据集成的 API 设计
|
SQL 人工智能 关系型数据库
Flink CDC YAML:面向数据集成的 API 设计
Flink CDC YAML:面向数据集成的 API 设计
634 5
|
SQL 分布式计算 测试技术
概述Flink API中的4个层次
【7月更文挑战第14天】Flink的API分为4个层次:核心底层API(如ProcessFunction)、DataStream/DataSet API、Table API和SQL。
|
SQL 关系型数据库 API
实时计算 Flink版产品使用问题之如何使用stream api
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
Kubernetes Oracle 关系型数据库
实时计算 Flink版操作报错合集之用dinky在k8s上提交作业,会报错:Caused by: org.apache.flink.table.api.ValidationException:,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
584 0
|
6月前
|
缓存 监控 前端开发
顺企网 API 开发实战:搜索 / 详情接口从 0 到 1 落地(附 Elasticsearch 优化 + 错误速查)
企业API开发常陷参数、缓存、错误处理三大坑?本指南拆解顺企网双接口全流程,涵盖搜索优化、签名验证、限流应对,附可复用代码与错误速查表,助你2小时高效搞定开发,提升响应速度与稳定性。
|
6月前
|
JSON 算法 API
Python采集淘宝商品评论API接口及JSON数据返回全程指南
Python采集淘宝商品评论API接口及JSON数据返回全程指南
|
7月前
|
数据可视化 测试技术 API
从接口性能到稳定性:这些API调试工具,让你的开发过程事半功倍
在软件开发中,接口调试与测试对接口性能、稳定性、准确性及团队协作至关重要。随着开发节奏加快,传统方式已难满足需求,专业API工具成为首选。本文介绍了Apifox、Postman、YApi、SoapUI、JMeter、Swagger等主流工具,对比其功能与适用场景,并推荐Apifox作为集成度高、支持中文、可视化强的一体化解决方案,助力提升API开发与测试效率。
|
6月前
|
JSON API 数据安全/隐私保护
Python采集淘宝拍立淘按图搜索API接口及JSON数据返回全流程指南
通过以上流程,可实现淘宝拍立淘按图搜索的完整调用链路,并获取结构化的JSON商品数据,支撑电商比价、智能推荐等业务场景。
|
7月前
|
JSON 前端开发 API
如何调用体育数据足篮接口API
本文介绍如何调用体育数据API:首先选择可靠服务商并注册获取密钥,接着阅读文档了解基础URL、端点、参数及请求头,然后使用Python等语言发送请求、解析JSON数据,最后将数据应用于Web、App或分析场景,同时注意密钥安全、速率限制与错误处理。
727 152

热门文章

最新文章