如题:
我向kafka中输出了json格式的数据
{"id":5,"price":40,"timestamp":1584942626828,"type":"math"}
{"id":2,"price":70,"timestamp":1584942629638,"type":"math"}
{"id":2,"price":70,"timestamp":1584942634951,"type":"math"}
....
其中timestamp字段是13位时间戳,对应的SQL表中应该怎么处理成时间格式呢?
- name: bookpojo
type: source-table
connector:
property-version: 1
type: kafka
version: "universal"
topic: pojosource
startup-mode: earliest-offset
properties:
zookeeper.connect: localhost:2181
bootstrap.servers: localhost:9092
group.id: testGroup
format:
property-version: 1
type: json
schema: "ROW<id INT, type STRING, price INT, timestamp TIMESTAMP>"
schema:
- name: id
data-type: INT
- name: type
data-type: STRING
- name: price
data-type: INT
- name: timestamp
data-type: TIMESTAMP(3)
上述配置,好像有问题。
我在官网中找到这样一句说明:
字符串和时间类型:不修剪值。文字"null"也可以理解。时间类型必须根据Java SQL时间格式进行格式化,并以毫秒为单位。例如: 2018-01-01日期,20:43:59时间和2018-01-01 20:43:59.999时间戳。
时间一定得是字符串类型且带毫秒吗?*来自志愿者整理的flink邮件归档
你定义的Kafka source使用JsonRowDeserializationSchema 解析json字符串并将其转换为Flink types [1]。 目前JsonRowDeserializationSchema 仅支持 RFC 3339兼容的时间字符串 [2]。
[1] https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java#L446 [2] https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java#L38*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。