Flink State状态
Flink是一个有状态的流式计算引擎,所以会将中间计算结果(状态)进行保存,默认保存到TaskManager的堆内存中,但是当task挂掉,那么这个task所对应的状态都会被清空,造成了数据丢失,无法保证结果的正确性,哪怕想要得到正确结果,所有数据都要重新计算一遍,效率很低。想要保证 At -least-once 和 Exactly-once,需要把数据状态持久化到更安全的存储介质中,Flink提供了堆内内存、堆外内存、HDFS、RocksDB等存储介质。
先来看下Flink提供的状态有哪些,Flink中状态分为两种类型:
- Keyed State
基于KeyedStream上的状态,这个状态是跟特定的Key绑定,KeyedStream流上的每一个Key都对应一个State,每一个Operator可以启动多个Thread处理,但是相同Key的数据只能由同一个Thread处理,因此一个Keyed状态只能存在于某一个Thread中,一个Thread会有多个Keyed state。 - Non-Keyed State(Operator State)
Operator State与Key无关,而是与Operator绑定,整个Operator只对应一个State。比如:Flink中的Kafka Connector就使用了Operator State,它会在每个Connector实例中,保存该实例消费Topic的所有(partition, offset)映射。
Flink针对Keyed State提供了以下可以保存State的数据结构
- ValueState:类型为T的单值状态,这个状态与对应的Key绑定,最简单的状态,通过update更新值,通过value获取状态值。
- ListState:Key上的状态值为一个列表,这个列表可以通过add方法往列表中添加值,也可以通过get()方法返回一个Iterable来遍历状态值。
- ReducingState:每次调用add()方法添加值的时候,会调用用户传入的reduceFunction,最后合并到一个单一的状态值。
- MapState<UK, UV>:状态值为一个Map,用户通过put或putAll方法添加元素,get(key)通过指定的key获取value,使用entries()、keys()、values()检索。
- AggregatingState
<IN, OUT>
:保留一个单值,表示添加到状态的所有值的聚合。和ReducingState
相反的是, 聚合类型可能与添加到状态的元素的类型不同。使用add(IN)
添加的元素会调用用户指定的AggregateFunction
进行聚合。 - FoldingState<T, ACC>:已过时建议使用AggregatingState 保留一个单值,表示添加到状态的所有值的聚合。 与
ReducingState
相反,聚合类型可能与添加到状态的元素类型不同。 使用add(T)
添加的元素会调用用户指定的FoldFunction
折叠成聚合值。
案例1:使用ValueState keyed state检查车辆是否发生了急加速
object ValueStateTest { case class CarInfo(carId: String, speed: Long) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("node01", 8888) stream.map(data => { val arr = data.split(" ") CarInfo(arr(0), arr(1).toLong) }).keyBy(_.carId) .map(new RichMapFunction[CarInfo, String]() { //保存上一次车速 private var lastTempState: ValueState[Long] = _ override def open(parameters: Configuration): Unit = { val lastTempStateDesc = new ValueStateDescriptor[Long]("lastTempState", createTypeInformation[Long]) lastTempState = getRuntimeContext.getState(lastTempStateDesc) } override def map(value: CarInfo): String = { val lastSpeed = lastTempState.value() this.lastTempState.update(value.speed) if ((value.speed - lastSpeed).abs > 30 && lastSpeed != 0) "over speed" + value.toString else value.carId } }).print() env.execute() } }
案例2:使用 MapState 统计单词出现次数
import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.state.{MapState, MapStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ //MapState 实现 WordCount object KeyedStateTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.fromCollection(List("I love you","hello spark","hello flink","hello hadoop")) val pairStream = stream.flatMap(_.split(" ")).map((_,1)).keyBy(_._1) pairStream.map(new RichMapFunction[(String,Int),(String,Int)] { private var map:MapState[String,Int] = _ override def open(parameters: Configuration): Unit = { //定义map state存储的数据类型 val desc = new MapStateDescriptor[String,Int]("sum",createTypeInformation[String],createTypeInformation[Int]) //注册map state map = getRuntimeContext.getMapState(desc) } override def map(value: (String, Int)): (String, Int) = { val key = value._1 val v = value._2 if(map.contains(key)){ map.put(key,map.get(key) + 1) }else{ map.put(key,1) } val iterator = map.keys().iterator() while (iterator.hasNext){ val key = iterator.next() println("word:" + key + "\t count:" + map.get(key)) } value } }).setParallelism(3) env.execute() } }
案例3:使用ReducingState统计每辆车的速度总和
import com.msb.state.ValueStateTest.CarInfo import org.apache.flink.api.common.functions.{ReduceFunction, RichMapFunction} import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ //统计每辆车的速度总和 object ReduceStateTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("node01", 8888) stream.map(data => { val arr = data.split(" ") CarInfo(arr(0), arr(1).toLong) }).keyBy(_.carId) .map(new RichMapFunction[CarInfo, CarInfo] { private var reduceState: ReducingState[Long] = _ override def map(elem: CarInfo): CarInfo = { reduceState.add(elem.speed) println("carId:" + elem.carId + " speed count:" + reduceState.get()) elem } override def open(parameters: Configuration): Unit = { val reduceDesc = new ReducingStateDescriptor[Long]("reduceSpeed", new ReduceFunction[Long] { override def reduce(value1: Long, value2: Long): Long = value1 + value2 }, createTypeInformation[Long]) reduceState = getRuntimeContext.getReducingState(reduceDesc) } }) env.execute() } }
案例4:使用AggregatingState统计每辆车的速度总和
import com.msb.state.ValueStateTest.CarInfo import org.apache.flink.api.common.functions.{AggregateFunction, ReduceFunction, RichMapFunction} import org.apache.flink.api.common.state.{AggregatingState, AggregatingStateDescriptor, ReducingState, ReducingStateDescriptor} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ //统计每辆车的速度总和 object ReduceStateTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.socketTextStream("node01", 8888) stream.map(data => { val arr = data.split(" ") CarInfo(arr(0), arr(1).toLong) }).keyBy(_.carId) .map(new RichMapFunction[CarInfo, CarInfo] { private var aggState: AggregatingState[Long,Long] = _ override def map(elem: CarInfo): CarInfo = { aggState.add(elem.speed) println("carId:" + elem.carId + " speed count:" + aggState.get()) elem } override def open(parameters: Configuration): Unit = { val aggDesc = new AggregatingStateDescriptor[Long,Long,Long]("agg",new AggregateFunction[Long,Long,Long] { //初始化累加器值 override def createAccumulator(): Long = 0 //往累加器中累加值 override def add(value: Long, acc: Long): Long = acc + value //返回最终结果 override def getResult(accumulator: Long): Long = accumulator //合并两个累加器值 override def merge(a: Long, b: Long): Long = a+b },createTypeInformation[Long]) aggState = getRuntimeContext.getAggregatingState(aggDesc) } }) env.execute() } }
CheckPoint & SavePoint
有状态流应用中的检查点(checkpoint),其实就是所有任务的状态在某个时间点的一个快照(一份拷贝)。简单来讲,就是一次“存盘”,让我们之前处理数据的进度不要丢掉。在一个流应用程序运行时,Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态;如果发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程,就如同“读档”一样。
默认情况下,检查点是被禁用的,需要在代码中手动开启。直接调用执行环境的enableCheckpointing()方法就可以开启检查点。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment(); env.enableCheckpointing(1000);
这里传入的参数是检查点的间隔时间,单位为毫秒。
除了检查点之外,Flink 还提供了“保存点”(savepoint)的功能。保存点在原理和形式上跟检查点完全一样,也是状态持久化保存的一个快照;保存点与检查点最大的区别,就是触发的时机。检查点是由 Flink 自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。因此两者尽管原理一致,但用途就有所差别了:检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复。
检查点具体的持久化存储位置,取决于“检查点存储”(CheckpointStorage)的设置。默认情况下,检查点存储在 JobManager 的堆(heap)内存中。而对于大状态的持久化保存,Flink也提供了在其他存储位置进行保存的接口,这就是 CheckpointStorage。具体可以通过调用检查点配置的 setCheckpointStorage()来配置,需要传入一个CheckpointStorage 的实现类。Flink 主要提供了两种 CheckpointStorage:作业管理器的堆内存(JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)。对于实际生产应用,我们一般会将 CheckpointStorage 配置为高可用的分布式文件系统(HDFS,S3 等)。
Flink中基于异步轻量级的分布式快照技术提供了Checkpoint容错机制,分布式快照可以将同一时间点Task/Operator的状态数据全局统一快照处理,包括上面提到的用户自定义使用的Keyed State和Operator State,当未来程序出现问题,可以基于保存的快照容错。
CheckPoint原理
Flink会在输入的数据集上间隔性地生成checkpoint barrier,通过栅栏(barrier)将间隔时间段内的数据划分到相应的checkpoint中。当程序出现异常时,Operator就能够从上一次快照中恢复所有算子之前的状态,从而保证数据的一致性。例如在KafkaConsumer算子中维护offset状态,当系统出现问题无法从Kafka中消费数据时,可以将offset记录在状态中,当任务重新恢复时就能够从指定的偏移量开始消费数据。
默认情况Flink不开启检查点,用户需要在程序中通过调用方法配置和开启检查点,另外还可以调整其他相关参数
- Checkpoint开启和时间间隔指定
开启检查点并且指定检查点时间间隔为1000ms,根据实际情况自行选择,如果状态比较大,则建议适当增加该值
env.enableCheckpointing(1000)
- exactly-ance和at-least-once语义选择
选择exactly-once语义保证整个应用内端到端的数据一致性,这种情况比较适合于数据要求比较高,不允许出现丢数据或者数据重复,与此同时,Flink的性能也相对较弱,而at-least-once语义更适合于时廷和吞吐量要求非常高但对数据的一致性要求不高的场景。如下通过setCheckpointingMode()方法来设定语义模式,默认情况下使用的是exactly-once模式
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
- Checkpoint超时时间
超时时间指定了每次Checkpoint执行过程中的上限时间范围,一旦Checkpoint执行时间超过该阈值,Flink将会中断Checkpoint过程,并按照超时处理。该指标可以通过setCheckpointTimeout方法设定,默认为10分钟
env.getCheckpointConfig.setCheckpointTimeout(5 * 60 * 1000)
- Checkpoint之间最小时间间隔
该参数主要目的是设定两个Checkpoint之间的最小时间间隔,防止Flink应用密集地触发Checkpoint操作,会占用了大量计算资源而影响到整个应用的性能
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(600)
- 最大并行执行的Checkpoint数量
在默认情况下只有一个检查点可以运行,根据用户指定的数量可以同时触发多个Checkpoint,进而提升Checkpoint整体的效率
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
- 任务取消后,是否删除Checkpoint中保存的数据
设置为RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留CheckPoint数据,以便根据实际需要恢复到指定的CheckPoint
设置为DELETE_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会删除CheckPoint数据,只有Job执行失败的时候才会保存CheckPoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
- 容忍的检查的失败数
设置可以容忍的检查的失败数,超过这个数量则系统自动关闭和停止任务
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(1)
SavePoint原理
Savepoints 是检查点的一种特殊实现,底层实现其实也是使用Checkpoints的机制。Savepoints是用户以手工命令的方式触发Checkpoint,并将结果持久化到指定的存储路径中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机运维或者升级应用等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况,从而无法实现从端到端的 Excatly-Once 语义保证。
要使用Savepoints,需要按照以下步骤进行:
- 配置状态后端: 在Flink中,状态可以保存在不同的后端存储中,例如内存、文件系统或分布式存储系统(如HDFS)。要启用Savepoint,您需要在Flink配置文件中配置合适的状态后端。通常,使用分布式存储系统作为状态后端是比较常见的做法,因为它可以提供更好的可靠性和容错性。
- 生成Savepoint: 在您的Flink应用程序运行时,可以通过以下方式手动触发生成Savepoint:
bin/flink savepoint <jobID> [targetDirectory]
- 其中,
<jobID>
是您要保存状态的Flink作业的Job ID,[targetDirectory]
是可选的目标目录,用于保存Savepoint数据。如果没有提供targetDirectory
,Savepoint将会保存到Flink配置中所配置的状态后端中。 - 恢复Savepoint: 要恢复到Savepoint状态,可以通过以下方式提交作业:
bin/flink run -s :savepointPath [:runArgs]
- 其中,
savepointPath
是之前生成的Savepoint的路径,runArgs
是您提交作业时的其他参数。 - 确保应用程序状态的兼容性: 在使用Savepoints时,应用程序的状态结构和代码必须与生成Savepoint的版本保持兼容。这意味着在更新应用程序代码后,可能需要做一些额外的工作来保证状态的向后兼容性,以便能够成功恢复到旧的Savepoint。
StateBackend状态后端
在Flink中提供了StateBackend来存储和管理状态数据
Flink一共实现了三种类型的状态管理器:MemoryStateBackend、FsStateBackend、RocksDBStateBackend
MemoryStateBackend
基于内存的状态管理器将状态数据全部存储在JVM堆内存中。基于内存的状态管理具有非常快速和高效的特点,但也具有非常多的限制,最主要的就是内存的容量限制,一旦存储的状态数据过多就会导致系统内存溢出等问题,从而影响整个应用的正常运行。同时如果机器出现问题,整个主机内存中的状态数据都会丢失,进而无法恢复任务中的状态数据。因此从数据安全的角度建议用户尽可能地避免在生产环境中使用MemoryStateBackend。
Flink将MemoryStateBackend作为默认状态后端管理器
env.setStateBackend(new MemoryStateBackend(100*1024*1024))
注意:聚合类算子的状态会同步到JobManager内存中,因此对于聚合类算子比较多的应用会对JobManager的内存造成一定的压力,进而影响集群。
FsStateBackend
和MemoryStateBackend有所不同,FsStateBackend是基于文件系统的一种状态管理器,这里的文件系统可以是本地文件系统,也可以是HDFS分布式文件系统
env.setStateBackend(new FsStateBackend("path",true))
如果path是本地文件路径,其格式:file:///
如果path是HDFS文件路径,格式为:hdfs://
第二个参数代表是否异步保存状态数据到HDFS,异步方式能够尽可能避免checkpoint的过程中影响流式计算任务。FsStateBackend更适合任务量比较大的应用,例如:包含了时间范围非常长的窗口计算,或者状态比较大的场景。
RocksDBStateBackend
RocksDBStateBackend是Flink中内置的第三方状态管理器,和前面的状态管理器不同,RocksDBStateBackend需要单独引入相关的依赖包到工程中。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>1.9.2</version> </dependency>
env.setStateBackend(new RocksDBStateBackend("hdfs://"))
RocksDBStateBackend采用异步的方式进行状态数据的Snapshot,任务中的状态数据首先被写入本地RockDB中,这样在RockDB仅会存储正在进行计算的热数据,而需要进行CheckPoint的时候,会把本地的数据直接复制到远端的FileSystem中。
与FsStateBackend相比,RocksDBStateBackend在性能上要比FsStateBackend高一些,主要是因为借助于RocksDB在本地存储了最新热数据,然后通过异步的方式再同步到文件系统中,但RocksDBStateBackend和MemoryStateBackend相比性能就会较弱一些。RocksDB克服了State受内存限制的缺点,同时又能够持久化到远端文件系统中,推荐在生产中使用。