JDBC connector似乎无法确保buffer中的数据在checkpoint的时候全部入库。这是因为 OutputFormat中没有一个接口,供checkpoint的时候调用。 从JDBC的connector的 代码来看,只能设定一个超时时间用以刷新数据,但还是可能存在丢数据的case。
我的问题是:是否有办法强制刷新buffer中的数据入库?
@Public public interface OutputFormat extends Serializable {
/** * Configures this output format. Since output formats are instantiated generically and hence parameterless, * this method is the place where the output formats set their basic fields based on configuration values. *
* This method is always called first on a newly instantiated output format. * * @param parameters The configuration with all parameters. */ void configure(Configuration parameters);
/** * Opens a parallel instance of the output format to store the result of its parallel instance. *
* When this method is called, the output format it guaranteed to be configured. * * @param taskNumber The number of the parallel instance. * @param numTasks The number of parallel tasks. * @throws IOException Thrown, if the output could not be opened due to an I/O problem. */ void open(int taskNumber, int numTasks) throws IOException;
/** * Adds a record to the output. *
* When this method is called, the output format it guaranteed to be opened. * * @param record The records to add to the output. * @throws IOException Thrown, if the records could not be added to to an I/O problem. */ void writeRecord(IT record) throws IOException;
/** * Method that marks the end of the life-cycle of parallel output instance. Should be used to close * channels and streams and release resources. * After this method returns without an error, the output is assumed to be correct. *
* When this method is called, the output format it guaranteed to be opened. * * @throws IOException Thrown, if the input could not be closed properly. */ void close() throws IOException; }
-- *来自志愿者整理的flink邮件归档
是的,感觉你是对的。 JdbcOutputFormat
会被 wrap 在 OutputFormatSinkFunction
中,而 OutputFormatSinkFunction
没有继承 CheckpointedFunction
,所以没法在 snapshotState 时候调用format.flush。*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。