开发者社区> 问答> 正文

FLINK SQL中时间戳怎么处理?

如题: 

我向kafka中输出了json格式的数据 

{"id":5,"price":40,"timestamp":1584942626828,"type":"math"} 

{"id":2,"price":70,"timestamp":1584942629638,"type":"math"} 

{"id":2,"price":70,"timestamp":1584942634951,"type":"math"} 

.... 

其中timestamp字段是13位时间戳,对应的SQL表中应该怎么处理成时间格式呢? 

  - name: bookpojo 

    type: source-table 

    connector:  

      property-version: 1 

      type: kafka 

      version: "universal" 

      topic: pojosource 

      startup-mode: earliest-offset 

      properties: 

        zookeeper.connect: localhost:2181 

        bootstrap.servers: localhost:9092 

        group.id: testGroup 

    format:  

      property-version: 1 

      type: json 

      schema: "ROW<id INT, type STRING, price INT, timestamp TIMESTAMP>" 

    schema:  

      - name: id 

        data-type: INT 

      - name: type 

        data-type: STRING 

      - name: price 

        data-type: INT 

      - name: timestamp 

        data-type: TIMESTAMP(3) 

上述配置,好像有问题。 

我在官网中找到这样一句说明: 

字符串和时间类型:不修剪值。文字"null"也可以理解。时间类型必须根据Java SQL时间格式进行格式化,并以毫秒为单位。例如: 2018-01-01日期,20:43:59时间和2018-01-01 20:43:59.999时间戳。 

时间一定得是字符串类型且带毫秒吗?*来自志愿者整理的flink邮件归档

展开
收起
玛丽莲梦嘉 2021-12-02 16:39:28 1295 0
1 条回答
写回答
取消 提交回答
  • 你定义的Kafka source使用JsonRowDeserializationSchema 解析json字符串并将其转换为Flink types  [1]。  目前JsonRowDeserializationSchema 仅支持 RFC 3339兼容的时间字符串 [2]。 

    [1]  https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java#L446  [2]  https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/TimeFormats.java#L38*来自志愿者整理的FLINK邮件归档

    2021-12-02 17:34:07
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载