开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

哪位大佬碰到Flink CDC作业动态添加表,whose schema 爆这个错的?cdc2.2

哪位大佬碰到Flink CDC作业动态添加表,whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case爆这个错的?cdc2.2

展开
收起
真的很搞笑 2023-07-01 19:57:08 173 0
2 条回答
写回答
取消 提交回答
  • 在 Flink CDC 中,如果您在作业运行时动态添加表,并且出现了 "whose schema isn't known to this connector" 的异常,可能是因为 Flink CDC 需要在运行时知道表的 schema 信息,才能正确地解析数据。如果在动态添加表时没有提供正确的 schema 信息,就会出现该异常。

    为了解决这个问题,您可以尝试以下方法:

    1. 显式指定表的 schema 信息:在动态添加表时,可以显式指定表的 schema 信息。例如,您可以在代码中使用 StreamTableEnvironment 的 registerTableSource 方法,同时传入表名和 schema 对象,以注册动态表。示例代码如下:

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    TableSchema schema = new TableSchema(new String[] {"id", "name"}, new TypeInformation[] {Types.INT, Types.STRING});
    tableEnv.registerTableSource("dynamic_table", new CustomTableSource(schema));
    

    需要注意的是,CustomTableSource 是您自定义的 TableSource 实现类,用于读取动态表的数据。

    2. 使用动态表工厂创建动态表:Flink 提供了动态表工厂(DynamicTableFactory),可以根据传入的参数动态创建表。您可以通过实现 TableFactory 接口,并在 createTableSource() 方法中返回包含表 schema 信息的 TableSource 对象。然后,在注册表时使用 DynamicTableFactory 创建动态表。具体实现细节和示例代码请参考 Flink 官方文档。

    2023-07-30 13:25:56
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    在 Flink CDC 中,如果您在运行时动态添加表,可能会出现 "whose schema isn't known" 的异常,这通常是由于没有为动态表指定正确的 schema 信息导致的。
    为了解决这个问题,您可以尝试以下方法:
    使用表工厂创建动态表:在动态添加表时,可以使用 TableFactory 创建动态表。例如,您可以使用 FlinkJdbcTableSourceFactory 创建动态表,并在 createTableSource() 方法中返回动态表的 schema 信息。示例代码如下:
    clojure
    Copy
    public class CustomTableSourceFactory implements TableFactory {
    @Override
    public TableSource<?> createTableSource(Context context) {
    TableSchema schema = new TableSchema(
    new String[]{"id", "name", "price"},
    new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE}
    );
    JdbcTableSource jdbcTableSource = JdbcTableSource.builder()
    .setDrivername("com.mysql.jdbc.Driver")
    .setDBUrl("jdbc:mysql://localhost:3306/test")
    .setUsername("root")
    .setPassword("123456")
    .setQuery("select * from dynamic_table")
    .setRowType(schema.toRowType())
    .build();
    return jdbcTableSource;
    }
    }
    使用 StreamTableEnvironment 注册动态表:在动态添加表时,可以使用 StreamTableEnvironment 注册动态表,并在注册表时指定表的 schema 信息。例如,您可以在代码中使用 StreamTableEnvironment 的 registerTableSource 方法,同时传入表名和 schema 对象,以注册动态表。示例代码如下:
    pgsql
    Copy
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    TableSchema schema = new TableSchema(new String[] {"id", "name", "price"}, new TypeInformation[] {Types.INT, Types.STRING, Types.DOUBLE});
    tableEnv.registerTableSource("dynamic_table", new CustomTableSource(schema));
    需要注意的是,以上示例代码中的 CustomTableSource 是您自定义的 TableSource 实现类,用于读取动态表的数据。

    2023-07-30 11:02:27
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载