开发者社区> 问答> 正文

flinkSQL1.11写出数据到jdbc fleld type do not match

hello 我在使用flinksql1.11写出数据到jdbc是遇到了field type类型不匹配的问题,是我类型设置有问题吗? 下面是我的异常日志以及sql文件

SET stream.enableCheckpointing=1000*60; SET stream.setParallelism=3;

-- Kafka cdbp zdao source 表 create TABLE cloud_behavior_source( operation STRING, operation_channel STRING, time STRING, ip STRING, lat STRING, lng STRING, user_id STRING, device_id STRING, imei STRING, targets ARRAY<ROW<type STRING,value STRING>>, product_name STRING, product_version STRING, product_vendor STRING, platform STRING, platform_version STRING, languaage STRING, locale STRING, other_para MAP<STRING, STRING NULL> ) with ( 'connector'='kafka', 'topic'='cloud_behavior', 'properties.bootstrap.servers'='', 'properties.group.id'='testGroup', 'format'='avro', 'scan.startup.mode'='earliest-offset' );

-- Hbase zdao uv 统计 Sink 表 create TABLE cloud_behavior_sink( operation STRING, operation_channel STRING, ip STRING, lat STRING, lng STRING, user_id STRING, device_id STRING ) with ( 'connector'='jdbc', 'url'='jdbc:mysql://hosts:3306/d_bigdata', 'table-name'='flink_sql_test', 'username'='', 'password'='', 'sink.buffer-flush.max-rows'='100' );

-- 业务过程 insert into cloud_behavior_sink select * from cloud_behavior_source;

SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/data1/flink/flink-1.11.1-log/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Field types of query result and registered TableSink default_catalog.default_database.cloud_behavior_sink do not match. Query schema: [operation: VARCHAR(2147483647), operation_channel: VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei: VARCHAR(2147483647), targets: ARRAY<ROW<type VARCHAR(2147483647), value VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647), product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647), platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647), languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para: MAP<VARCHAR(2147483647), VARCHAR(2147483647)>] Sink schema: [operation: VARCHAR(2147483647), operation_channel: VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id: VARCHAR(2147483647)] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992) Caused by: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.cloud_behavior_sink do not match. Query schema: [operation: VARCHAR(2147483647), operation_channel: VARCHAR(2147483647), time: VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id: VARCHAR(2147483647), imei: VARCHAR(2147483647), targets: ARRAY<ROW<type VARCHAR(2147483647), value VARCHAR(2147483647)>>, product_name: VARCHAR(2147483647), product_version: VARCHAR(2147483647), product_vendor: VARCHAR(2147483647), platform: VARCHAR(2147483647), platform_version: VARCHAR(2147483647), languaage: VARCHAR(2147483647), locale: VARCHAR(2147483647), other_para: MAP<VARCHAR(2147483647), VARCHAR(2147483647)>] Sink schema: [operation: VARCHAR(2147483647), operation_channel: VARCHAR(2147483647), ip: VARCHAR(2147483647), lat: VARCHAR(2147483647), lng: VARCHAR(2147483647), user_id: VARCHAR(2147483647), device_id: VARCHAR(2147483647)] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:100) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:229) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:204) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) at com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callDML(SqlSubmit.java:97) at com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.callCommand(SqlSubmit.java:72) at com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.run(SqlSubmit.java:53) at com.intsig.flink.streaming.streaming_project.common.sql_submit.SqlSubmit.main(SqlSubmit.java:24) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) ... 11 more*来自志愿者整理的flink

展开
收起
毛毛虫雨 2021-12-06 15:37:39 895 0
1 条回答
写回答
取消 提交回答
  • 你的source跟sink的字段数量都不一样多,你需要让insert的语句的query的table schema跟sink表的schema相同才可以。 比如可以用下面的SQL来写入:

    insert into cloud_behavior_sink
    select
    operation,
    operation_channel,
    ip,
    lat,
    lng,
    user_id,
    device_id
    from cloud_behavior_source;
    ```*来自志愿者整理的flink
    2021-12-06 16:06:29
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Dynamic DDL Adding Structure to Streaming Data on the Fly 立即下载
Apache Phoenix and HBase: Past, Present and Future of SQL over HBase 立即下载
低代码开发师(初级)实战教程 立即下载