flinkcdc connector mysql 有个bug, 我mysql中表的字段是 bigint unsigned zerofill 类型的 , 直接source读取就报错,什么原因?
如果在 Flink CDC 中使用 MySQL Connector 读取具有 bigint unsigned zerofill
类型的字段时出现错误,可能是因为 Flink CDC 的 MySQL Connector 不支持该特定数据类型。
Flink CDC MySQL Connector 目前对特定的数据类型支持存在一些限制。bigint unsigned zerofill
是一个 MySQL 特有的数据类型,它在无符号的情况下将使用零填充,这可能导致与 Flink CDC 连接器之间的类型不匹配或解析问题。
为了解决这个问题,你可以尝试以下方法:
修改数据类型:如果可能的话,将 MySQL 表中的字段类型修改为兼容的类型。例如,将 bigint unsigned zerofill
修改为 bigint
,然后重新启动 Flink CDC。
自定义转换器(Custom Deserialization):如果你无法修改表的字段类型,可以考虑使用自定义转换器来处理读取错误。通过实现自定义的反序列化逻辑,你可以在 CDC 数据源中将 bigint unsigned zerofill
数据类型正确地转换为 Flink 支持的类型。
请注意,自定义转换器可能需要编写一些额外的代码,并确保正确处理数据类型转换、验证和解析等方面的逻辑。可以参考 Flink 的文档了解如何实现自定义反序列化逻辑。
另外,你也可以查看 Flink 社区的相关讨论、提交 Issue 或联系 Flink 开发团队来了解是否有计划修复这个问题的更新版本。
Flink CDC连接MySQL的JDBC驱动器和Debezium的MySQL Connector的确存在一些兼容性问题,可能导致直接使用Flink CDC的MySQL CDC Source读取MySQL数据时出现错误。
其中一个常见的问题是,当MySQL表中存在多个主键时,Flink CDC的MySQL CDC Source会抛出以下异常信息:
Copy
java.lang.IllegalArgumentException: Multiple primary keys are not supported.
这是因为Flink CDC的MySQL CDC Source目前只支持单一主键的表,当表中存在多个主键时会抛出上述异常。
解决这个问题的方法是使用Debezium的MySQL Connector作为CDC源,并通过Flink的Debezium JSON格式解析器将CDC数据转换为Flink的数据格式。例如:
java
Copy
DebeziumDeserializationSchema deserializer = new DebeziumDeserializationSchema.Builder()
.setFormat(DebeziumDeserializationSchema.DEZYM_FORMAT_JSON)
.setResultType(RowData.class)
.build();
Properties debeziumProperties = new Properties();
debeziumProperties.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
debeziumProperties.setProperty("database.hostname", "localhost");
debeziumProperties.setProperty("database.port", "3306");
debeziumProperties.setProperty("database.user", "root");
debeziumProperties.setProperty("database.password", "root");
debeziumProperties.setProperty("database.server.id", "1");
debeziumProperties.setProperty("database.server.name", "test");
debeziumProperties.setProperty("database.whitelist", "test.*");
debeziumProperties.setProperty("database.history", MemoryDatabaseHistory.class.getName());
debeziumProperties.setProperty("table.whitelist", "test.my_table");
debeziumProperties.setProperty("snapshot.mode", "initial");
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("test", deserializer, kafkaProps);
consumer.setStartFromEarliest();
DataStream stream = env.addSource(new DebeziumSourceFunction<>(debeziumProperties, deserializer))
.name("Debezium MySQL Source")
.setParallelism(1);
上述示例中,使用了Flink的DebeziumSourceFunction作为CDC源,并通过DebeziumDeserializationSchema将CDC数据转换为Flink的数据格式。这种方法可以避免使用Flink CDC的MySQL CDC Source时可能出现的兼容性问题。
"bigint unsigned 类型映射成decimal(20,0)就行了,官方文档类型映射有得。
https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)
此回答整理至钉群“Flink CDC 社区”。"
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。