Flink CDC中我flinksql写入doris报这个错,请教下大家,是doris不支持flinksql的sql接口么,Query schema: [station_id: VARCHAR(100), station_name: VARCHAR(255), total_switch_electric_quantity: DECIMAL(38, 2), total_pay_amount: DECIMAL(38, 2), total_order_id_count: BIGINT NOT NULL]
Sink schema: [f0: RAW('com.ibm.icu.impl.Row', ?)]
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:1005)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:340)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:307)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:310)
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:324)
at com.anyixing.energo.realtime.app.dws.FlinkKafkaSQLExample01.main(FlinkKafkaSQLExample01.java:161)?
根据错误信息,sink 的 schema 与 query 的 schema 不匹配。您需要确保 sink 和 query 的 schema 完全相同。
查询的 schema 包括以下列:
[station_id: VARCHAR(100), station_name: VARCHAR(255), total_switch_electric_quantity: DECIMAL(38, 2), total_pay_amount: DECIMAL(38, 2), total_order_id_count: BIGINT NOT NULL]
而 sink 的 schema 是一个未知的类型 RAW('com.ibm.icu.impl.Row', ?)
。您需要将 sink 的 schema 更改为与查询的 schema 相同。、
从错误信息来看,问题出在Flink SQL写入Doris时,源表和目标表的schema不匹配。具体来说,源表的schema是:
station_id: VARCHAR(100), station_name: VARCHAR(255), total_switch_electric_quantity: DECIMAL(38, 2), total_pay_amount: DECIMAL(38, 2), total_order_id_count: BIGINT NOT NULL
而目标表的schema是:
f0: RAW('com.ibm.icu.impl.Row', ?)
为了解决这个问题,你需要确保源表和目标表的schema是一致的。你可以尝试修改目标表的schema,使其与源表的schema相匹配。例如,你可以将目标表的schema修改为:
f0: STRUCT(
station_id: VARCHAR(100),
station_name: VARCHAR(255),
total_switch_electric_quantity: DECIMAL(38, 2),
total_pay_amount: DECIMAL(38, 2),
total_order_id_count: BIGINT
)
然后重新运行Flink SQL任务,看看是否还会出现相同的错误。
这个错误是因为Flink SQL的输出格式与Doris的输入格式不匹配。在Flink SQL中,您需要指定一个输出格式,例如ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ('separatorChar' = ',', 'quoteChar' = '"', 'escapeChar' = '\')。然而,Doris似乎并不支持这种格式,因此导致了错误。
为了解决这个问题,您可能需要查看Doris的文档,看看它支持哪种输入格式,然后在Flink SQL中使用相应的格式。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。