AsynchronousException{java.lang.Exception: Could not materialize checkpoint 20 for operator Window(TumblingProcessingTimeWindows(5), ProcessingTimeTrigger, MyRichRedisWindowFuntion) (1/8).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 20 for operator Window(TumblingProcessingTimeWindows(5), ProcessingTimeTrigger, MyRichRedisWindowFuntion) (1/8).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=5249873 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=5249873 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.checkSize(MemCheckpointStreamFactory.java:64)
at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetBytes(MemCheckpointStreamFactory.java:145)
at org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory$MemoryCheckpointOutputStream.closeAndGetHandle(MemCheckpointStreamFactory.java:126)
at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:826)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$HeapSnapshotStrategy$1.performOperation(HeapKeyedStateBackend.java:759)
at org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
fs.default-scheme: hdfs://@hadoop:9000/
fs.hdfs.hadoopconf: hdfs:///flink/data/
state.checkpoints.dir: hdfs:///flink/checkpoints/
state.checkpoints.num-retained: 20
state.savepoints.dir: hdfs:///flink/flink-savepoints/
state.backend.fs.checkpoint.dir: hdfs:///flink/state/checkpoints/
全局设置 checkpoint 保存地址 ,那么window 操作的保存地址 应该也是该位置 .
但是为什么还是会将checkpoint 使用memory 方式?
在代码层设置 checkpoint保存模式:
env.setStateBackend(new
FsStateBackend("hdfs:///flink/checkpoints/workFlowCheckpoint"));
解决后hdfs 目录:
但是在1.6.2 版本 该类没设置为Deprecated ,求问 :
我这个解决办法是有什么不准确的方式么? 还是说 全局设置checkpoint 对于window 自身并没有生效?
你好,其实你是误解了 checkpoint 配置与checkpointStorage之间的关系。
FsStateBackend <--> 使用 FsCheckpointStorage,每个FsStateBackend的state数据会写到DFS里面,返回的handle不包含实际数据,只是一个路径地址(除非实际数据的size小于state.backend.fs.memory-threshold,会存储在返回的hanlde里面)。
MemoryStateBackend <--> 使用 MemoryBackendCheckpointStorage,每个MemoryStateBackend的state数据都会直接存储在返回的handle里面。
如果你只配置了checkpoint path相关的配置,由于没有声明是FsStateBackend,导致会使用默认的MemoryStateBackend,从而所有的state都会直接返回给JM,当state数据量大的时候,就会因为超过阈值而报错(默认值5MB)
如果没有在 flink-conf.yaml 里配置 state.backend 的值,或者通过api 显示设置 statebackend,默认为 heap 模式
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。