开发者社区> 问答> 正文

flink state.backend是rocksdb,存储在hdfs上,经常遇到checkpoint执行不成功的情况 checkpoint超时过期的原因(设置checkpoint超时为60s)

已解决

flink程序在yarn上执行 kafka source topic分区为20 设置程序并行度为10,-yn 2 -ys 5 -ytm 26600 -yjm 5120,我看到网上有说是因为程序并行度太高导致打开的文件数太多,从而导致的在hdfs创建文件不成功,但是我的并行度设置的并不高啊,有知道原因的吗?请赐教 具体报错如下:
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 4763 for operator Window(TumblingProcessingTimeWindows(20000), ProcessingTimeTrigger, DeviceFilterAggrateFunction, DeviceFilterWindowFunction) (9/10).}

at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.Exception: Could not materialize checkpoint 4763 for operator Window(TumblingProcessingTimeWindows(20000), ProcessingTimeTrigger, DeviceFilterAggrateFunction, DeviceFilterWindowFunction) (9/10).

at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more

Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://xxxx/user/portal/flink/checkpoints/884b263235d825095de14a1912bf4ea5/chk-4763/f0f939d2-43e5-4863-8835-374305febfa0 in order to obtain the stream state handle

at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more

Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://xxxx/user/portal/flink/checkpoints/884b263235d825095de14a1912bf4ea5/chk-4763/f0f939d2-43e5-4863-8835-374305febfa0 in order to obtain the stream state handle

at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:325)
at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeMetaData(RocksDBKeyedStateBackend.java:2507)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.runSnapshot(RocksDBKeyedStateBackend.java:2559)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /user/portal/flink/checkpoints/884b263235d825095de14a1912bf4ea5/chk-4763/f0f939d2-43e5-4863-8835-374305febfa0 (inode 1341207482): File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_28531506_1, pendingcreates: 19]

at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3436)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3526)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3493)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:787)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:536)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

at org.apache.hadoop.ipc.Client.call(Client.java:1476)
at org.apache.hadoop.ipc.Client.call(Client.java:1413)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy10.complete(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:462)
at sun.reflect.GeneratedMethodAccessor42.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy11.complete(Unknown Source)
at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2506)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2482)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2447)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:311)
... 12 more

2018-11-18 16:00:32,999 INFO org.apache.flink.runtime.taskmanager.Task - Triggering cancellation of task code Window(TumblingProcessingTimeWindows(20000), ProcessingTimeTrigger, DeviceFilterAggrateFunction, DeviceFilterWindowFunction) (9/10) (62f79b2d3e6d7262440d4d5d355f345a).
2018-11-18 16:00:32,999 WARN org.apache.hadoop.hdfs.DFSClient - Caught exception
java.lang.InterruptedException: sleep interrupted

at java.lang.Thread.sleep(Native Method)
at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2525)
at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2482)
at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2447)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
at org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:311)
at org.apache.flink.runtime.state.CheckpointStreamWithResultProvider$PrimaryStreamOnly.closeAndFinalizeCheckpointStreamResult(CheckpointStreamWithResultProvider.java:77)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.materializeMetaData(RocksDBKeyedStateBackend.java:2507)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalSnapshotOperation.runSnapshot(RocksDBKeyedStateBackend.java:2559)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

2018-11-18 16:00:33,247 INFO org.apache.hadoop.hdfs.DFSClient - Could not complete /user/portal/flink/checkpoints/884b263235d825095de14a1912bf4ea5/shared/02c4013b-a1a5-499d-a210-d52220ad3320 retrying...
2018-11-18 16:00:41,321 INFO org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Deleting existing instance base directory /mnt/dfs/6/yarn/local/usercache/portal/appcache/application_1542185049150_263136/flink-io-715c79eb-ce7b-4bf0-9c9f-fa029027174d/job_884b263235d825095de14a1912bf4ea5_op_WindowOperator_ea632d67b7d595e5b851708ae9ad79d6__9_10__uuid_d7bd17ff-4032-4f7d-a4d6-307c1f3e84ba.
2018-11-18 16:00:41,344 INFO org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation - Closing zookeeper sessionid=0x36001e76ee21304

展开
收起
莱昂007 2018-11-19 15:12:19 13696 0
1 条回答
写回答
取消 提交回答
  • 倪完:image

    checkpoint文件被删了,可以确认下hdfs的namenode日志看是被来自哪台机器的应用删的,如果是jobmaster所在机器,应该是jobmaster在checkpoint超时后主动清的,这种情况一方面加大checkpoint的超时时间设置(如果checkpoint文件比较大,60s可能是不够的,也可改用增量checkpoint减少checkpoint文件大小),另一方面还是需要确定一下具体超时原因。

    2019-07-17 23:15:25
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink峰会 - 李佳林 立即下载
Flink峰会 - 徐榜江 立即下载
海量数据分布式存储——Apache HDFS之最新进展 立即下载