开发者社区> 问答> 正文

FlinkSQL 1.10 DDL无法指定水印怎么解决?

DDL语句如下:  CREATE TABLE ods_usage_naga_dsp_filter (  request row<value row<DSP_FILTER_LOG row<date varchar,placementid  varchar,reqid varchar,source varchar,time varchar,filter  array<row<filterName varchar,planId varchar>> >>>,  event_ts as to_timestamp(concat(date,time),'yyyy-MM-ddHH:mm:ss'),  WATERMARK FOR event_ts AS event_ts - interval '60' second  )WITH (  'connector.type' = 'kafka',  'format.fail-on-missing-field'='false',  'connector.version' = 'universal',  'connector.topic' = 'xxfilter',  'connector.startup-mode' = 'latest-offset',  'connector.properties.zookeeper.connect'='xx:2181',  'connector.properties.bootstrap.servers'='xx:9092',  'connector.properties.group.id'='xx_test',  'update-mode' = 'append',  'format.type' = 'json'  );  启动程序报错:  org.apache.flink.client.program.ProgramInvocationException: The main method  caused an error: From line 3, column 37 to line 3, column 42: Unknown  identifier 'date'  at  org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)  at  org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)  at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)  at  org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)  at  org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)  at  org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)  at java.security.AccessController.doPrivileged(Native Method)  at javax.security.auth.Subject.doAs(Subject.java:422)  at  org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)  at  org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)  Caused by: org.apache.calcite.runtime.CalciteContextException: From line 3,  column 37 to line 3, column 42: Unknown identifier 'date'  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)  at  sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)  at  sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)  at  org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)  at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834)  at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819)  at  org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841)  at  org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667)  at  org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)  at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)  at  org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)  at  org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)  at  org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:593)  at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:237)  at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:219)  at  org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600)  at  org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587)  at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)  at  org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691)  at  org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676)  at org.apache.calcite.sql.SqlNode.validateExpr(SqlNode.java:236)  at org.apache.calcite.sql.SqlOperator.validateCall(SqlOperator.java:407)  at org.apache.calcite.sql.SqlFunction.validateCall(SqlFunction.java:200)  at  org.apache.calcite.sql.validate.SqlValidatorImpl.validateCall(SqlValidatorImpl.java:5304)  at org.apache.calcite.sql.SqlCall.validate(SqlCall.java:116)  at  org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:943)  at  org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930)  at  org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:490)  at  org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177)  at  org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130)  at  org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)  at  org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)  at com.cootek.streaming.flink.SQLSubmit$.callCreateTable(SQLSubmit.scala:31)  at com.cootek.streaming.flink.SQLSubmit$.callCommand(SQLSubmit.scala:15)  at  com.cootek.streaming.FlinkBootstrap$$anonfun$1.apply(FlinkBootstrap.scala:26)  at  com.cootek.streaming.FlinkBootstrap$$anonfun$1.apply(FlinkBootstrap.scala:26)  at scala.collection.Iterator$class.foreach(Iterator.scala:891)  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)  at  com.cootek.streaming.FlinkBootstrap$.delayedEndpoint$com$cootek$streaming$FlinkBootstrap$1(FlinkBootstrap.scala:26)  at  com.cootek.streaming.FlinkBootstrap$delayedInit$body.apply(FlinkBootstrap.scala:11)  at scala.Function0$class.apply$mcV$sp(Function0.scala:34)  at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)  at scala.App$$anonfun$main$1.apply(App.scala:76)  at scala.App$$anonfun$main$1.apply(App.scala:76)  at scala.collection.immutable.List.foreach(List.scala:392)  at  scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)  at scala.App$class.main(App.scala:76)  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)  at  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)  at java.lang.reflect.Method.invoke(Method.java:498)  at  org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)  ... 11 more  Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown  identifier 'date'  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)  at  sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)  at  sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)  at  org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)  at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)  ... 63 more  flink无法识别关键字date/time??*来自志愿者整理的flink邮件归档

展开
收起
玛丽莲梦嘉 2021-12-02 16:21:34 596 0
1 条回答
写回答
取消 提交回答
  • 你的datetime都是在嵌套结构内部的字段,需要用*request.value.date  request.value.time*来使用它们。*来自志愿者整理的FLINK邮件归档

    2021-12-02 17:18:39
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
MaxCompute Logview参数详解和问题排查(废弃) 立即下载
大批量处理excel文件到ODPS中方案 立即下载
时序数据库TSDB新功能 - 如何用SQL进行时序查询 立即下载