测试Flink版本:1.11.0
Flink sql支持这种语法插入吗,在插入时指定具体的字段顺序和要插入的列
Insert into tableName(col1[,col2]) select col1[,col2]
目前通过测试发现了以下问题
建表语句:
create table t1(a int,b string,c int) with ();
create table t2(a int,b string,c int) with ();
问题1:测试发现insert into时查询和sink schema的匹配规则是按照定义的顺序进行
测试语句:
insert into t2 select t1.a,t1.c, t1.b from t1;
报错信息:
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.t2 do not match.
Query schema: [a: INT, c: INT, b: VARCHAR(2147483647)]
Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT]
at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204)
at scala.Option.map(Option.scala:146)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80)
at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro nmentImpl.java:632)
问题2:支持Insert into tableName(col1[,col2]) select col1[,col2]的语法,但并 没有真正起作用,还是按照定义的顺序进行匹配
测试语句:
insert into t2(a,c,b) select t1.a,t1.c, t1.b from t1;
报错信息:
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.t2 do not match.
Query schema: [a: INT, c: INT, b: VARCHAR(2147483647)]
Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT]
at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204)
at scala.Option.map(Option.scala:146)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80)
at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.explain(TableEnviro nmentImpl.java:632)
问题3:当insert into的字段比sink的schema的字段少也会如此
测试语句:
insert into t2(a,b)
select t1.a, t1.b from t1;
报错信息:
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.t2 do not match.
Query schema: [a: INT, c: VARCHAR(2147483647)]
Sink schema: [a: INT, b: VARCHAR(2147483647), c: INT]
at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyI mplicitCast(TableSinkUtils.scala:100)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:213)
at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(Plann erBase.scala:204)
at scala.Option.map(Option.scala:146)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(Planner Base.scala:204)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:98)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$1.apply(Str eamPlanner.scala:80)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala: 234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:80)
at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanne r.scala:43)
at org.apache.flink.table.api.internal.TableEnvironmentImp总结:
目前的实现限制了查询的和写人的灵活性,
只有找到schema定义的字段顺序才能进行正确的插入,
当字段很多时会比较麻烦,
还有,只插入某些列的需求也是存在的,目前不能支持*来自志愿者整理的flink邮件归档
Flink 目前的确不支持这个语法... 我已经创建了一个 issue[1],可以在那里跟踪这个 feature 的进展。
[1] https://issues.apache.org/jira/browse/FLINK-18726*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。