有哪位大佬帮我看下,谢谢
尝试了很久,还是无法解析嵌套结构的Json
Error
Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 4, column 9 to line 4, column 31: Column 'data.transaction_type' not found in any table at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) at com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
嵌套Json 定义的 format 和 schema 如下:
.withFormat(new Json() .jsonSchema( """{type: 'object', | properties: { | database: { | type: 'string' | }, | table: { | type: 'string' | }, | maxwell_ts: { | type: 'integer' | }, | data: { | type: 'object', | properties :{ | reference_id :{ | type: 'string' | }, | transaction_type :{ | type: 'integer' | }, | merchant_id :{ | type: 'integer' | }, | create_time :{ | type: 'integer' | }, | status :{ | type: 'integer' | } | } | } | } | } """.stripMargin.replaceAll("\n", " ") ) ) .withSchema(new Schema() .field("table", STRING()) .field("database", STRING()) .field("data", ROW(FIELD("reference_id",STRING()), FIELD("transaction_type",INT()), FIELD("merchant_id",INT()), FIELD("status",INT()))) //.field("event_time", BIGINT()) // .from("maxwell_ts") //.rowtime(new Rowtime() // //.timestampsFromField("ts" * 1000) // .timestampsFromField("ts") // .watermarksPeriodicBounded(60000) //) )
bsTableEnv.sqlUpdate("""INSERT INTO yyyyy | SELECT table
, database
| data.reference_id
, | data.transaction_type
, | data.merchant_id
, | data.create_time
, | data.status
| FROM xxxx""".stripMargin)
来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档
你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子: create table my_source ( database varchar, maxwell_ts bigint, table varchar, data row< transaction_sn varchar, parent_id int, user_id int, amount int, reference_id varchar, status int, transaction_type int, merchant_id int, update_time int, create_time int
来自志愿者整理的flink邮件归档来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。