FlinkCDC同步oracle数据的时候,数据总是要个两三分钟才能同步过去,不管是插入的一条数据还是多条数据,间隔的时间也不固定。请问是设置有啥问题吗?代码如下:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing();
env.getCheckpointConfig().setCheckpointInterval(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setParallelism(1);
EnvironmentSettings Settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings);
String sourceDDL = "CREATE TABLE TS_XX_source (\n" +
"ACCT_ID DECIMAL,\n" +
" SEQ DECIMAL,\n" +
" CC_TYPE DECIMAL,\n" +
" CREDIT_LIMIT DECIMAL,\n" +
" EFF_DATE TIMESTAMP(3),\n" +
" EXP_DATE TIMESTAMP(3),\n" +
" SP_ID DECIMAL,\n" +
" primary key (ACCT_ID) not enforced\n" +
") WITH (\n" +
" 'connector' = 'oracle-cdc',\n" +
" 'hostname' = '******',\n" +
" 'port' = '******',\n" +
" 'username' = '******',\n" +
" 'password' = '******',\n" +
" 'database-name' = '******',\n" +
" 'schema-name' = '******'," +
" 'table-name' = '******',\n" +
" 'scan.startup.mode' = 'latest-offset'\n" +
")";
String sinkDDL = "CREATE TABLE TS_XX_sink (\n" +
"ACCT_ID DECIMAL,\n" +
" SEQ DECIMAL,\n" +
" CC_TYPE DECIMAL,\n" +
" CREDIT_LIMIT DECIMAL,\n" +
" EFF_DATE TIMESTAMP(3),\n" +
" EXP_DATE TIMESTAMP(3),\n" +
" SP_ID DECIMAL,\n" +
" primary key (ACCT_ID) not enforced\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'driver' = 'oracle.jdbc.driver.OracleDriver',\n" +
" 'url' = '******',\n" +
" 'username' = '****',\n" +
" 'password' = '****',\n" +
" 'table-name' = '****'\n" +
")";
String transformDmlSQL = "insert into TS_XX_sink select * from TS_XX_source";
tableEnv.executeSql(sourceDDL).print();
tableEnv.executeSql(sinkDDL).print();
tableEnv.executeSql(transformDmlSQL).print();
env.execute("sync-cdc");
}
从您提供的代码来看,您已经设置了Flink CDC的检查点配置。但是,关于数据同步速度慢的问题,可能与以下因素有关:
网络延迟:Oracle数据库和Flink CDC之间的网络连接可能会影响数据同步速度。请确保网络连接稳定且带宽足够。
Oracle数据库性能:Oracle数据库的性能可能会影响数据同步速度。请检查数据库的负载情况,如CPU、内存和磁盘I/O等。
Flink CDC配置:您可以尝试调整Flink CDC的配置参数,以优化数据同步速度。例如,增加checkpointInterval
的值,以便更频繁地触发检查点。同时,可以考虑增加minPauseBetweenCheckpoints
的值,以便在检查点之间有更多的时间间隔。
Flink集群资源:Flink集群的资源(如CPU、内存和磁盘I/O)也会影响数据同步速度。请确保集群资源充足,以便更好地处理数据同步任务。
数据量:如果数据量很大,数据同步速度可能会受到影响。在这种情况下,您可以考虑分批处理数据,或者使用其他优化方法来提高数据同步速度。
根据你提供的代码,有几点可能会影响数据同步的速度:
为了进一步诊断问题,你可以考虑以下步骤:
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。