开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

FlinkCDC同步oracle数据的时候,数据总是要个两三分钟才能同步过去

FlinkCDC同步oracle数据的时候,数据总是要个两三分钟才能同步过去,不管是插入的一条数据还是多条数据,间隔的时间也不固定。请问是设置有啥问题吗?代码如下:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing();
env.getCheckpointConfig().setCheckpointInterval(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setParallelism(1);

    EnvironmentSettings Settings = EnvironmentSettings.newInstance()
            .inStreamingMode()
            .build();

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings);

    String sourceDDL = "CREATE TABLE TS_XX_source (\n" +
                    "ACCT_ID DECIMAL,\n" +
                    " SEQ DECIMAL,\n" +
                    " CC_TYPE DECIMAL,\n" +
                    " CREDIT_LIMIT DECIMAL,\n" +
                    " EFF_DATE TIMESTAMP(3),\n" +
                    " EXP_DATE TIMESTAMP(3),\n" +
                    " SP_ID DECIMAL,\n" +
                    " primary key (ACCT_ID) not enforced\n" +
                    ") WITH (\n" +
                    " 'connector' = 'oracle-cdc',\n" +
                    " 'hostname' = '******',\n" +
                    " 'port' = '******',\n" +
                    " 'username' = '******',\n" +
                    " 'password' = '******',\n" +
                    " 'database-name' = '******',\n" +
                    " 'schema-name' = '******'," +
                    " 'table-name' = '******',\n" +
                    " 'scan.startup.mode' = 'latest-offset'\n" +
                    ")";

    String sinkDDL = "CREATE TABLE TS_XX_sink (\n" +
                    "ACCT_ID DECIMAL,\n" +
                    " SEQ DECIMAL,\n" +
                    " CC_TYPE DECIMAL,\n" +
                    " CREDIT_LIMIT DECIMAL,\n" +
                    " EFF_DATE TIMESTAMP(3),\n" +
                    " EXP_DATE TIMESTAMP(3),\n" +
                    " SP_ID DECIMAL,\n" +
                    " primary key (ACCT_ID) not enforced\n" +
                    ") WITH (\n" +
                    " 'connector' = 'jdbc',\n" +
                    " 'driver' = 'oracle.jdbc.driver.OracleDriver',\n" +
                    " 'url' = '******',\n" +
                    " 'username' = '****',\n" +
                    " 'password' = '****',\n" +
                    " 'table-name' = '****'\n" +
                    ")";

    String transformDmlSQL =  "insert into TS_XX_sink select * from TS_XX_source";

    tableEnv.executeSql(sourceDDL).print();
    tableEnv.executeSql(sinkDDL).print();
    tableEnv.executeSql(transformDmlSQL).print();

    env.execute("sync-cdc");
}

展开
收起
iwhale-小周 2023-12-04 14:59:51 131 0
2 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    从您提供的代码来看,您已经设置了Flink CDC的检查点配置。但是,关于数据同步速度慢的问题,可能与以下因素有关:

    1. 网络延迟:Oracle数据库和Flink CDC之间的网络连接可能会影响数据同步速度。请确保网络连接稳定且带宽足够。

    2. Oracle数据库性能:Oracle数据库的性能可能会影响数据同步速度。请检查数据库的负载情况,如CPU、内存和磁盘I/O等。

    3. Flink CDC配置:您可以尝试调整Flink CDC的配置参数,以优化数据同步速度。例如,增加checkpointInterval的值,以便更频繁地触发检查点。同时,可以考虑增加minPauseBetweenCheckpoints的值,以便在检查点之间有更多的时间间隔。

    4. Flink集群资源:Flink集群的资源(如CPU、内存和磁盘I/O)也会影响数据同步速度。请确保集群资源充足,以便更好地处理数据同步任务。

    5. 数据量:如果数据量很大,数据同步速度可能会受到影响。在这种情况下,您可以考虑分批处理数据,或者使用其他优化方法来提高数据同步速度。

    2023-12-04 20:14:10
    赞同 展开评论 打赏
  • 根据你提供的代码,有几点可能会影响数据同步的速度:

    1. checkpoint 配置:你的 checkpoint 配置设置为每 2 秒进行一次,并且最小暂停时间是 1.5 秒。这可能会导致频繁的 checkpoint 操作,从而影响性能。你可以尝试增大 checkpoint 间隔和最小暂停时间。
    2. 并行度:你设置了环境的并行度为 1,这意味着只有一个线程在处理数据流。增加并行度可以提高处理速度,但也要注意资源使用情况。
    3. 网络延迟:由于 Oracle CDC 是基于日志传输的,所以网络延迟也可能是一个因素。检查源数据库和目标数据库之间的网络连接是否稳定。
    4. Oracle CDC 的性能:Oracle CDC 连接器本身可能存在性能问题。你可以尝试调整连接器的相关参数,或者查看是否有其他优化方法。

    为了进一步诊断问题,你可以考虑以下步骤:

    1. 监控和日志:通过 Flink Web UI 监控任务的运行状态,并检查日志以查找任何异常或错误信息。
    2. 性能测试:在生产环境中进行一些简单的性能测试,插入少量数据并观察同步所需的时间。这可以帮助确定问题是否与数据量有关。
    3. 实验性更改:逐步更改配置并观察对性能的影响。例如,尝试增加 checkpoint 间隔、增加并行度等。
    2023-12-04 16:05:47
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

热门讨论

热门文章

相关电子书

更多
PostgresChina2018_樊文凯_ORACLE数据库和应用异构迁移最佳实践 立即下载
PostgresChina2018_王帅_从Oracle到PostgreSQL的数据迁移 立即下载
Oracle云上最佳实践 立即下载

相关镜像