开发者社区> 问答> 正文

Flink 1.10 JSON 解析

hi:  1、在Json数据解析的时候,请问这里面为什么用的是decimal,而不是bigint  [image: image.png]  2、我在使用connect的时候,发现解析Json数组元素出现异常,这是误用导致的还是一个bug  json:{"business":"riskt","data":[{"tracking_number":"0180024020920","invoice_no":"2020021025"}],"database":"installmentdb","table":"t_sales_order","ts":1581576074069,"type":"UPDATE","putRowNum":19999}  jsonSchema:{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}  connect: 

streamTableEnv  .connect(  new Kafka()  .version("0.11")  .topic("mysql_binlog_test_str")  .startFromEarliest()  .property("zookeeper.connect", "localhost:2181")  .property("bootstrap.servers", "localhost:9092")  )  .withFormat(  new Json() 

.jsonSchema("{"type":"object","properties":{"business":{"type":"string"},"data":{"type":"array","items":[{"type":"object","properties":{"tracking_number":{"type":"string"},"invoice_no":{"type":"string"}}}]},"database":{"type":"string"},"table":{"type":"string"},"ts":{"type":"integer"},"type":{"type":"string"},"putRowNum":{"type":"integer"}}}")  )  .withSchema(  new Schema()  .field("business", DataTypes.STRING())  .field("data",  DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.ROW(  DataTypes.FIELD("tracking_number",  DataTypes.STRING()),  DataTypes.FIELD("invoice_no",  DataTypes.STRING())))))  .field("database", DataTypes.STRING())  .field("table", DataTypes.STRING())  .field("ts", DataTypes.DECIMAL(38, 18))  .field("type", DataTypes.STRING())  .field("putRowNum", DataTypes.DECIMAL(38, 18))  )  .createTemporaryTable("Test"); 

异常信息:Caused by: java.io.IOException: Failed to deserialize JSON object. 

at  org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)  at  org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)  at  org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)  at  org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)  at  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)  at  org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)  at  org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)  at  org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)  Caused by: java.lang.ClassCastException:  org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode  cannot be cast to  org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode  at  org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:411)  at  org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)  at  org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)  at  org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)  at  org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)  at  org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)  ... 7 more*来自志愿者整理的flink邮件归档

展开
收起
玛丽莲梦嘉 2021-12-02 16:43:53 1484 0
1 条回答
写回答
取消 提交回答
  • 看了你的数据,"data" 是一个 array 的类型,所以 data 的schema定义需要改成 ARRAY(ROW(FIELD("tracking_numbrer", STRING), FIELD("invoice_no", STRING))) 另外建议删除 .jsonSchema(...), 1.10 开始 flink-json 已经支持自动从 table schema 中推断 json schema 了。*来自志愿者整理的FLINK邮件归档

    2021-12-02 17:39:10
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
神龙云服务器产品及技术深度解析 立即下载
弹性创造价值:基于ECS的最佳性价比实践解析 立即下载
又快又稳:阿里云下一代虚拟交换机解析 立即下载

相关镜像