开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC中哪位大佬碰到作业动态添加表,whose schema isn't known t

Flink CDC中哪位大佬碰到作业动态添加表,whose schema isn't known to this connector. One possible cause is an incomplete database history topic. Take a new snapshot in this case爆这个错的,cdc2.2,加了scanNewlyAddedTableEnabled?

展开
收起
真的很搞笑 2023-07-01 19:57:08 348 0
2 条回答
写回答
取消 提交回答
  • 在 Flink CDC 中,如果您在作业运行时动态添加表,并且出现了 "whose schema isn't known to this connector" 的异常,可能是因为 Flink CDC 在解析数据时需要知道表的 schema 信息。这个异常可能是由于数据库历史主题信息不完整导致的。在这种情况下,可以尝试执行以下操作:

    1. 确保数据库历史主题信息完整:    - 检查是否正确配置了数据库历史主题(database.history.topic)和相关的 Kafka 连接信息。    - 如果历史主题中没有包含新添加表的信息,可以尝试重新创建历史主题或者重新启动 CDC 作业以生成新的快照。

    2. 启用 scanNewlyAddedTableEnabled:    - 在 Flink CDC 2.2 版本中,提供了 scanNewlyAddedTableEnabled 参数,用于支持动态添加表的功能。    - 确保已经将该参数设置为 true,以便 CDC 连接器能够自动扫描并处理新添加的表。

    3. 显式注册新的动态表:    - 如果使用 Flink Table API 或 SQL 来处理数据,可以在代码中显式注册新添加的表,并提供正确的 schema 信息。    - 使用 StreamTableEnvironment 的 registerTableSource 方法,并传入表名和对应的 TableSource 对象,以注册动态表。    - 可以自定义 TableSource 实现类来读取动态表的数据,并提供正确的 schema 信息。

    需要注意的是,确保在动态添加表时提供正确的 schema 信息对于 Flink CDC 正确解析数据非常重要。根据您的具体情况,选择适合的方法来解决该问题。

    2023-07-30 13:25:55
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    在 Flink CDC 中,如果您在运行时动态添加表,可能会出现 "whose schema isn't known" 的异常。这是因为 Flink CDC 需要在运行时知道表的 schema 信息,才能正确地解析数据。如果在动态添加表时没有提供正确的 schema 信息,就会出现该异常。
    为了解决这个问题,您可以尝试以下方法:
    显式指定表的 schema 信息:在动态添加表时,可以显式指定表的 schema 信息。例如,您可以在代码中使用 StreamTableEnvironment 的 registerTableSource 方法,同时传入表名和 schema 对象,以注册动态表。示例代码如下:
    pgsql
    Copy
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    TableSchema schema = new TableSchema(new String[] {"id", "name"}, new TypeInformation[] {Types.INT, Types.STRING});
    tableEnv.registerTableSource("dynamic_table", new CustomTableSource(schema));
    需要注意的是,CustomTableSource 是您自定义的 TableSource 实现类,用于读取动态表的数据。
    使用 DataStream API 动态添加表:在 Flink CDC 中,您可以使用 DataStream API 来动态创建和添加表。具体而言,您可以使用 DataStream API 的 addSink 方法,将数据写入到动态表中。示例代码如下:
    clojure
    Copy
    DataStream> dynamicTableData = env.fromCollection(Arrays.asList(new Tuple2<>(1, "foo"), new Tuple2<>(2, "bar")));
    TableSchema schema = new TableSchema(new String[] {"id", "name"}, new TypeInformation[] {Types.INT, Types.STRING});
    tableEnv.fromDataStream(dynamicTableData, schema).executeInsert("dynamic_table");
    以上是两种常见的动态添加表的

    2023-07-30 11:02:26
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载