Flink CDC1.4及以上,flink cdc 同步oracle数据库的时候做snapshot,会报Call snapshotState() on closed source, checkpoint failed。这个问题有那位知道怎么解决吗?
这个问题可能是由于 Flink CDC 在同步 Oracle 数据库时在完成快照后关闭了源导致的。您可以在 Flink CDC 的官方文档中找到相关的解决方案:
```yaml
mySqlSource:
type: mysql-source
database: testDB
table.whitelist: myTable
parallelism: 1
snapshot.incremental.snapshot.isolation.level: read-committed
snapshot.new.snapshot.select.sql: SELECT * FROM myTable
snapshot.mode: when_needed # 开启实时模式
# 解决 Call snapshotState() on closed source, checkpoint failed 错误
mySqlSource:
snapshot.fetch-size: 5000
其中 snapshot.incremental.snapshot.isolation.level: read-committed
和 snapshot.new.snapshot.select.sql
设置是为了开启实时模式,避免在快照完成后关闭源;snapshot.fetch-size
设置则可以限制每次快照获取的数据量。
同时,您还可以尝试降低 checkpoint 间隔,以及检查网络环境、硬件资源等其他可能导致性能瓶颈的因素。
这个问题可能是由于在关闭源表后尝试调用snapshotState()方法导致的。为了解决这个问题,你可以尝试以下步骤:
FlinkCDCSource
的getChangelogStream()
方法来获取一个ChangelogStream
对象,然后使用createSnapshot()
方法来创建快照。FlinkCDCSource<Row> source = ...;
ChangelogStream changelogStream = source.getChangelogStream();
changelogStream.createSnapshot();
FlinkCDCSource
的getChangelogStream()
方法来获取一个ChangelogStream
对象,然后使用commit()
方法来提交所有挂起的操作。FlinkCDCSource<Row> source = ...;
ChangelogStream changelogStream = source.getChangelogStream();
changelogStream.commit();
Thread.sleep()
方法来实现这个功能。FlinkCDCSource<Row> source = ...;
ChangelogStream changelogStream = source.getChangelogStream();
changelogStream.commit();
Thread.sleep(5000); // 等待5秒
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。