#环境
Flink 版本: 1.10.1 kafka 版本:2.12-2.2.2 DAG:source kafka->etl->sink kafka 在at least once模式下运行正常,改为extractly once执行checkpoint每次都不成功。
#配置
尝试过增加以下配置: props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, conf.get(SINK_TRANSACTION_TIMEOUT_CONFIG, "300000")); props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); props.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "crazy_flink");
#错误日志
020-07-28 19:04:43,601 jobmanager-future-thread-8 INFO 1371 CheckpointCoordinator: Discarding checkpoint 2 of job 00000000000000000000000000000000. org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 2 for operator BatchWindow_KAFKA_BAICHUAN_ETL_03 -> Sink: KafkaSink_KAFKA_BAICHUAN_ETL_03 (249/256). Failure reason: Checkpo int was declined. at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:434) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1403) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1337) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:974) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:870) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:843) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:803) at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:86) at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:177) at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:155) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:133) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Pending record count must be zero at this point: 8551 at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:970) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:893) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:99) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:317) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:979) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402) ... 19 more 2020-07-28 19:04:43,614 flink-akka.actor.default-dispatcher-36 INFO 200 JobMaster: Trying to recover from a global failure. org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold. at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleTaskLevelCheckpointException(CheckpointFailureManager.java:87) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.failPendingCheckpointDueToTaskFailure(CheckpointCoordinator.java:1467) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.discardCheckpoint(CheckpointCoordinator.java:1377) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:719) at org.apache.flink.runtime.scheduler.SchedulerBase.lambda$declineCheckpoint$5(SchedulerBase.java:819) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadP
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。