public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set checkpoint interval to 10 seconds
env.enableCheckpointing(10000);
// Define source and transformation
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<String> transformation = source.map(new MyMapFunction());
// Define a checkpoint directory
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"));
// Example of how to handle the checkpoint stream
transformation.addSink(new MySink());
env.execute("My Job");
}
public static class MySink extends RichSinkFunction<String> {
private transient PrintWriter out;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Open the file stream
out = new PrintWriter(new FileOutputStream("output.txt"), true);
}
@Override
public void close() throws Exception {
super.close();
// Close the file stream
if (out != null) {
out.close();
}
}
@Override
public void invoke(String value) throws Exception {
// Write to the file stream
out.write(value);
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set checkpoint interval to 10 seconds
env.enableCheckpointing(10000);
// Define source and transformation
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<String> transformation = source.map(new MyMapFunction());
// Define a checkpoint directory
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"));
// Example of how to handle the checkpoint stream
transformation.addSink(new MySink());
env.execute("My Job");
// Make sure the file stream is closed before exiting
transformation.close();
}
以上就是确保checkpoint的文件句柄正确关闭的两种方法。同时,建议使用try-with-resources语句来确保文件流的正确关闭。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。