开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

Flink CDC请教下大家,是doris不支持flinksql的sql接口么?

Flink CDC中我flinksql写入doris报这个错,请教下大家,是doris不支持flinksql的sql接口么,Query schema: [station_id: VARCHAR(100), station_name: VARCHAR(255), total_switch_electric_quantity: DECIMAL(38, 2), total_pay_amount: DECIMAL(38, 2), total_order_id_count: BIGINT NOT NULL]
Sink schema: [f0: RAW('com.ibm.icu.impl.Row', ?)]
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:1005)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:340)
at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:307)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:310)
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:324)
at com.anyixing.energo.realtime.app.dws.FlinkKafkaSQLExample01.main(FlinkKafkaSQLExample01.java:161)?

展开
收起
真的很搞笑 2023-11-15 08:22:00 120 0
3 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    根据错误信息,sink 的 schema 与 query 的 schema 不匹配。您需要确保 sink 和 query 的 schema 完全相同。
    查询的 schema 包括以下列:

    [station_id: VARCHAR(100), station_name: VARCHAR(255), total_switch_electric_quantity: DECIMAL(38, 2), total_pay_amount: DECIMAL(38, 2), total_order_id_count: BIGINT NOT NULL]
    

    而 sink 的 schema 是一个未知的类型 RAW('com.ibm.icu.impl.Row', ?)。您需要将 sink 的 schema 更改为与查询的 schema 相同。、

    2023-11-15 22:38:20
    赞同 展开评论 打赏
  • 月移花影,暗香浮动

    从错误信息来看,问题出在Flink SQL写入Doris时,源表和目标表的schema不匹配。具体来说,源表的schema是:

    station_id: VARCHAR(100), station_name: VARCHAR(255), total_switch_electric_quantity: DECIMAL(38, 2), total_pay_amount: DECIMAL(38, 2), total_order_id_count: BIGINT NOT NULL
    

    而目标表的schema是:

    f0: RAW('com.ibm.icu.impl.Row', ?)
    

    为了解决这个问题,你需要确保源表和目标表的schema是一致的。你可以尝试修改目标表的schema,使其与源表的schema相匹配。例如,你可以将目标表的schema修改为:

    f0: STRUCT(
        station_id: VARCHAR(100),
        station_name: VARCHAR(255),
        total_switch_electric_quantity: DECIMAL(38, 2),
        total_pay_amount: DECIMAL(38, 2),
        total_order_id_count: BIGINT
    )
    

    然后重新运行Flink SQL任务,看看是否还会出现相同的错误。

    2023-11-15 16:48:36
    赞同 展开评论 打赏
  • 这个错误是因为Flink SQL的输出格式与Doris的输入格式不匹配。在Flink SQL中,您需要指定一个输出格式,例如ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ('separatorChar' = ',', 'quoteChar' = '"', 'escapeChar' = '\')。然而,Doris似乎并不支持这种格式,因此导致了错误。

    为了解决这个问题,您可能需要查看Doris的文档,看看它支持哪种输入格式,然后在Flink SQL中使用相应的格式。

    2023-11-15 09:15:41
    赞同 1 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载