目前有个需求想要实现Flink SQL的保存点,但是由于在SQL中operator uid是随机生成的,一旦修改SQL会导致无法读取到Savepoints中的状态信息。
想到一种方式是正常执行Savepoint操作,然后启动的时候手动读取Savepoint中的内容,获取Kafka每个分区的消费offset,再替换到SQL代码中。
目前通过在源码中打印相关日志可以发现,保存点触发时,消费的记录正常读取到9,但是手动读取的时候,却找不到这个而信息。
下面是我手动读取的代码,以下内容是参考Savepoint单元测试中的读取案例:
String savepointPath = "hdfs://namenode:8020/flink/savepoints/test040603/savepoint-a2cfcd-2ee1c4afcf9f"; CompletedCheckpointStorageLocation location = AbstractFsCheckpointStorageAccess .resolveCheckpointPointer(savepointPath);
try (DataInputStream stream = new DataInputStream(location.getMetadataHandle().openInputStream())) { CheckpointMetadata metadata = Checkpoints .loadCheckpointMetadata(stream, Thread.currentThread().getContextClassLoader(), savepointPath); System.out.println(metadata); } catch (IOException e) { e.printStackTrace(); }
通过debug发现,有三个operator,但是记录的offset是1038,而不是9,不知道正确的读取方法是什么?*来自志愿者整理的flink
需要通过BatchEnvironment,把Savepoint当做输入,然后构造序列化解析器和类型等信息,通过DataSet.collect()进行解析,就可以读取到目标数据了。希望对你有所帮助~*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。