flink cdc 怎么做断点续传啊我程序不小心出了问题。异常了。重启,默认的init全量拉数据
通过采集模块-业务数据采集之FlinkCDC DataStreamAPI 进行打包进行断点续传测试
注意:测试环境最好使用linux系统的jar提交 在idea上可能会出现ck保存失败问题 断点续传测试: - 1.自动保存的ck(关闭自动删除) 用ck启动 - 2.使用手动的savepoint启动
因为设置的ck文件系统是hadoop 所以需要添加flink和hadoop的继承 在主机环境变量添加 - FLINK集成HADOOP需要 export HADOOP_CLASSPATH=hadoop classpath
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
source一下 source /etc/profile.d/my_ini.sh
使用单机模式启动Flink bin/start-cluster.sh
在webUI上手动cancel job作业 变更监听的mysql库的表的数据 观察是否断点续传
flinkcdc 做断点续传,需要将flinkcdc读取binlog的位置信息以状态方式保存在checkpoint中即可。 开启checkpoint,每隔5s执行一次ck,指定ck的一致性语义。
env.enableCheckpointing(5000L); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传, 需要从Checkpoint或者Savepoint启动程序,通过这种方式来实现断点续传
public class flinkCDC {
public static void main(String[] args) throws Exception {
//1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.flinkcdc 做断点续传,需要将flinkcdc读取binlog的位置信息以状态方式保存在checkpoint中即可.
//(1)开启checkpoint 每隔5s 执行一次ck 指定ck的一致性语义
env.enableCheckpointing(5000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//3.设置任务关闭后,保存最后后一次ck数据.
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,2000L));
env.setStateBackend(new FsStateBackend("hdfs://192.168.1.204:9000/flinkCDC"));
//4.设置访问HDFS的用户名
System.setProperty("HADOOP_USER_NAME","root");
//5.创建Sources数据源
Properties prop = new Properties();
prop.setProperty("scan.startup.mode","initial"); //"scan.startup.mode","initial" 三种要补充解释下
DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
.hostname("192.168.1.205")
.port(3306)
.username("root")
.password("Root@123")
.tableList("ssm.order") //这里的表名称,书写格式:db.table 不然会报错
.databaseList("ssm")
.debeziumProperties(prop)
.deserializer(new StringDebeziumDeserializationSchema())
.build();
//6.添加数据源
DataStreamSource<String> source = env.addSource(mysqlSource);
//7.打印数据
source.print();
//8.执行任务
env.execute();
}
}
这个可用提供一种思路,首先需要设置一个检查点,每五秒执行一次,检查是否应答了,如果没有应答就重新进行请求处理,保存上次已经处理的进度,下次可用在这个基础上面继续进行传送。里面最重要的一个点是保存最后一次的ck数据。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。