开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flinkcdc connector mysql 有个bug直接source读取就报错,什么原因?

flinkcdc connector mysql 有个bug, 我mysql中表的字段是 bigint unsigned zerofill 类型的 , 直接source读取就报错,什么原因?

展开
收起
小易01 2023-07-26 08:24:22 172 0
3 条回答
写回答
取消 提交回答
  • 如果在 Flink CDC 中使用 MySQL Connector 读取具有 bigint unsigned zerofill 类型的字段时出现错误,可能是因为 Flink CDC 的 MySQL Connector 不支持该特定数据类型。

    Flink CDC MySQL Connector 目前对特定的数据类型支持存在一些限制。bigint unsigned zerofill 是一个 MySQL 特有的数据类型,它在无符号的情况下将使用零填充,这可能导致与 Flink CDC 连接器之间的类型不匹配或解析问题。

    为了解决这个问题,你可以尝试以下方法:

    1. 修改数据类型:如果可能的话,将 MySQL 表中的字段类型修改为兼容的类型。例如,将 bigint unsigned zerofill 修改为 bigint,然后重新启动 Flink CDC。

    2. 自定义转换器(Custom Deserialization):如果你无法修改表的字段类型,可以考虑使用自定义转换器来处理读取错误。通过实现自定义的反序列化逻辑,你可以在 CDC 数据源中将 bigint unsigned zerofill 数据类型正确地转换为 Flink 支持的类型。

    请注意,自定义转换器可能需要编写一些额外的代码,并确保正确处理数据类型转换、验证和解析等方面的逻辑。可以参考 Flink 的文档了解如何实现自定义反序列化逻辑。

    另外,你也可以查看 Flink 社区的相关讨论、提交 Issue 或联系 Flink 开发团队来了解是否有计划修复这个问题的更新版本。

    2023-07-31 22:36:26
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    Flink CDC连接MySQL的JDBC驱动器和Debezium的MySQL Connector的确存在一些兼容性问题,可能导致直接使用Flink CDC的MySQL CDC Source读取MySQL数据时出现错误。
    其中一个常见的问题是,当MySQL表中存在多个主键时,Flink CDC的MySQL CDC Source会抛出以下异常信息:
    Copy
    java.lang.IllegalArgumentException: Multiple primary keys are not supported.
    这是因为Flink CDC的MySQL CDC Source目前只支持单一主键的表,当表中存在多个主键时会抛出上述异常。
    解决这个问题的方法是使用Debezium的MySQL Connector作为CDC源,并通过Flink的Debezium JSON格式解析器将CDC数据转换为Flink的数据格式。例如:
    java
    Copy
    DebeziumDeserializationSchema deserializer = new DebeziumDeserializationSchema.Builder()
    .setFormat(DebeziumDeserializationSchema.DEZYM_FORMAT_JSON)
    .setResultType(RowData.class)
    .build();

    Properties debeziumProperties = new Properties();
    debeziumProperties.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
    debeziumProperties.setProperty("database.hostname", "localhost");
    debeziumProperties.setProperty("database.port", "3306");
    debeziumProperties.setProperty("database.user", "root");
    debeziumProperties.setProperty("database.password", "root");
    debeziumProperties.setProperty("database.server.id", "1");
    debeziumProperties.setProperty("database.server.name", "test");
    debeziumProperties.setProperty("database.whitelist", "test.*");
    debeziumProperties.setProperty("database.history", MemoryDatabaseHistory.class.getName());
    debeziumProperties.setProperty("table.whitelist", "test.my_table");
    debeziumProperties.setProperty("snapshot.mode", "initial");

    FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("test", deserializer, kafkaProps);
    consumer.setStartFromEarliest();

    DataStream stream = env.addSource(new DebeziumSourceFunction<>(debeziumProperties, deserializer))
    .name("Debezium MySQL Source")
    .setParallelism(1);
    上述示例中,使用了Flink的DebeziumSourceFunction作为CDC源,并通过DebeziumDeserializationSchema将CDC数据转换为Flink的数据格式。这种方法可以避免使用Flink CDC的MySQL CDC Source时可能出现的兼容性问题。

    2023-07-29 15:41:31
    赞同 展开评论 打赏
  • 意中人就是我呀!

    "bigint unsigned 类型映射成decimal(20,0)就行了,官方文档类型映射有得。
    d4681093dfe6274800ca39e75368eb95.png
    https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)
    此回答整理至钉群“Flink CDC 社区”。"

    2023-07-26 12:16:49
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

热门讨论

热门文章

相关电子书

更多
One Box: 解读事务与分析一体化数据库 HybridDB for MySQL 立即下载
One Box:解读事务与分析一体化数据库HybridDB for MySQL 立即下载
如何支撑HTAP场景-HybridDB for MySQL系统架构和技术演进 立即下载

相关镜像