开发者社区> 问答> 正文

如何从复杂的kafka消息体定义 table?

大家好: 我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type. 如果json, avro不能满足的话,是不是得自己自定义一个。 自定义的话不知道如何写,请各位帮忙指教下。

定义的表如下: CREATE TABLE MyUserTable( uuid VARCHAR, orgId VARCHAR ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'topic_name', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.group.id' = 'testGroup', 'format.type' = '?' )

Kafka的消息体如下, 好像不符合avro之类的标准格式:

{ "beforeData": [], "byteSize": 272, "columnNumber": 32, "data": [{ "byteSize": 8, "columnName": "APPLY_PERSON_ID", "rawData": 10017, "type": "LONG" }, { "byteSize": 12, "columnName": "UPDATE_SALARY", "rawData": "11000.000000", "type": "DOUBLE" }, { "byteSize": 11, "columnName": "UP_AMOUNT", "rawData": "1000.000000", "type": "DOUBLE" }, { "byteSize": 3, "columnName": "CURRENCY", "rawData": "CNY", "type": "STRING" }, { "byteSize": 32, "columnName": "EXCHANGE_RATE", "rawData": "1.000000000000000000000000000000", "type": "DOUBLE" }, { "byteSize": 11, "columnName": "DEDUCTED_ACCOUNT", "rawData": "1000.000000", "type": "DOUBLE" }, { "byteSize": 1, "columnName": "ENTER_AT_PROCESS", "rawData": "Y", "type": "STRING" }], "dataCount": 0, "dataMetaData": { "connector": "mysql", "pos": 1000368076, "row": 0, "ts_ms": 1625565737000, "snapshot": "false", "db": "testdb", "table": "flow_person_t" }, "key": "APPLY_PERSON_ID", "memorySize": 1120, "operation": "insert", "rowIndex": -1, "timestamp": "1970-01-01 00:00:00" }*来自志愿者整理的flink邮件归档

展开
收起
塔塔塔塔塔塔 2021-12-02 15:14:21 898 0
1 条回答
写回答
取消 提交回答
  • 事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了.*来自志愿者整理的FLINK邮件归档

    2021-12-02 15:39:07
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载