开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC中如果其他的表用了jdbc,那后面这些其他表有更新,能同步过来吗?

问题1:Flink CDC中如果其他的表用了jdbc,那后面这些其他表有更新,能同步过来吗?还会是一张流表吗?
It also prevents that the join result is updated when a joined Customer row is updated in the future. 我看官方文档这意思是,其他的表就彻底成了一张维表了,其他表有更新也不会做同步。
e6f1f2538a867e9195105fd8a79c1844.png
问题2:所以大数据量的多张流表join该怎么整?多张流表普通join还是会把多张表的数据都加载到内存里呀,内存根本不可能够用。按你们说的方案,那就只剩一张流表了,其他都成维表了。

展开
收起
十一0204 2023-07-26 08:14:22 75 0
3 条回答
写回答
取消 提交回答
  • 在 Flink CDC 中,如果其他的表使用了 JDBC 连接器进行读取,并且这些表有更新,是可以实时同步到 Flink 中的。Flink 提供了多种连接器和源来与不同类型的数据源进行交互,包括 JDBC 连接器。

    当其他表发生更新时(如插入、更新或删除操作),Flink 可以通过定期或实时查询数据库来获取最新的数据变更,并将其作为流数据传输到 Flink 任务中进行处理。你可以使用 Flink 的 JDBC 连接器和相应的查询语句来配置并监控这些表的变化。

    以下是一些在 Flink 中与其他表进行实时同步的常见方法:

    1. 使用 JDBC 连接器:使用 Flink 的 JDBC 连接器来连接其他表,配置相应的 JDBC URL、用户名、密码等信息,并编写查询语句来读取数据。然后,通过适当的窗口操作和处理逻辑,将这些数据与其他数据流进行关联和计算。

    2. 使用 CDC 连接器:对于支持 Change Data Capture(CDC)的数据库,可以使用 Flink 的 CDC 连接器来捕获数据库的变更事件,并将这些变更事件流转发到 Flink 中进行处理。这样,无论其他表是通过 JDBC 还是 CDC 连接器进行更新,都可以实时同步到 Flink 中。

    需要注意的是,具体的实现方式取决于数据库的类型、版本和支持的功能。不同的数据库可能具有不同的 CDC 实现方式或支持的功能。你需要根据所使用的数据库和 Flink 版本,查阅相应的文档,并了解其支持的特性和限制。

    希望这个解答能够回答你的问题!如果还有其他疑问,请随时提问。

    2023-07-31 22:49:06
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    Flink CDC可以捕获数据库中的变化并将其转换为数据流,您可以将这些数据流与其他表的数据流进行Join操作,从而实现对其他表的同步更新。
    具体来说,您可以使用Flink的JDBC Connector将其他表的数据流读取到Flink中,然后将其与CDC捕获的数据流进行Join操作。在Join操作中,您可以指定Join条件,从而将两个数据流中的数据进行匹配。当其他表中的数据发生更新时,JDBC Connector将会重新读取数据并将其发送到Flink中,Flink则会自动将更新的数据与CDC捕获的数据流进行Join操作,从而实现实时同步。
    以下是一个示例代码,演示了如何使用Flink JDBC Connector将其他表的数据流与CDC捕获的数据流进行Join操作:
    java
    Copy
    // 创建CDC数据流
    DataStreamSource cdcStream = env.addSource(new FlinkCDCSourceFunction(
    "mysql-cdc",
    MySQLSource.builder()
    .hostname("localhost")
    .port(3306)
    .databaseList("db1")
    .tableList("table1")
    .username("root")
    .password("root")
    .deserializer(new StringDebeziumDeserializationSchema())
    .build()
    ));

    // 创建JDBC数据流
    JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
    .setDrivername("com.mysql.jdbc.Driver")
    .setDBUrl("jdbc:mysql://localhost:3306/db2")
    .setUsername("root")
    .setPassword("root")
    .setQuery("SELECT * FROM table2")
    .setRowTypeInfo(new RowTypeInfo(TypeInformation.of(Integer.class), TypeInformation.of(String.class)))
    .finish();
    DataStreamSource jdbcStream = env.createInput(jdbcInputFormat);

    // 将CDC数据流和JDBC数据流进行Join操作
    DataStream joinedStream = cdcStream
    .keyBy(new KeySelector() {
    @Override
    public Integer getKey(String value) throws Exception {
    // 根据CDC数据流中的key进行分组
    return Integer.parseInt(value.split(":")[0]);
    }
    })
    .connect(jdbcStream.keyBy(new KeySelector() {
    @Override
    public Integer getKey(Row value) throws Exception {
    // 根据JDBC数据流中的key进行分组
    return (Integer) value.getField(0);
    }
    }))
    .flatMap(new CoFlatMapFunction() {
    private MapState state;

        @Override
        public void open(Configuration parameters) throws Exception {
            // 在open方法中创建MapState,用于保存JDBC数据流中的数据
            MapStateDescriptor<Integer, String> descriptor = new MapStateDescriptor<>("jdbc_data", TypeInformation.of(Integer.class), TypeInformation.of(String.class));
            state = getRuntimeContext().getMapState(descriptor);
        }
    
        @Override
        public void flatMap1(String value, Collector<String> out) throws Exception {
            // 处理CDC数据流
            int key = Integer.parseInt(value.split(":")[0]);
            String data = value.split(":")[1];
            String jdbcData = state.get(key);
            if (jdbcData != null) {
                out.collect(key + ":" + data + ":" + jdbcData);
            }
        }
    
        @Override
        public void flatMap2(Row value, Collector<String> out) throws Exception {
            // 处理JDBC数据流
            int key = (Integer) value.getField(0);
            String data = (String) value.getField(1);
            state.put(key, data);
        }
    });
    

    在上述示例中,使用Flink JDBC Connector将table2表中的数据流读取到Flink中,然后与CDC捕获的table1表数据流进行Join操作。在Join操作中,使用connect方法将两个数据流连接起来,然后使用flatMap方法进行Join操作。在flatMap方法中,使用MapState保存JDBC数据流中的数据,当CDC数据流中的数据和JDBC数据流中的数据进行匹配时,输出Join后的结果

    2023-07-29 15:52:27
    赞同 展开评论 打赏
  • 意中人就是我呀!

    "回答1:对的,维表更新不会更新之前取走的数据。
    此回答整理至钉群“Flink CDC 社区”。"

    2023-07-26 12:15:41
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 覃立辉 立即下载
    Flink CDC Meetup PPT - 孙家宝 立即下载
    Flink CDC Meetup PPT - 徐榜江 立即下载