开发者社区> 问答> 正文

关于filesystem connector的一点疑问

Flink 1.11的filesystem connector,partition trigger[1]都是使用的默认值,所以分区可以多次提交 现在有这样的场景: 消费kafka数据写入hdfs中,分区字段是 day + hour ,是从事件时间截取出来的,如果数据延迟了,比如现在是19点了,来了17点的数据, 这条数据还能正确的写到17点分区里面吗?还是写到19点分区?还是会被丢弃? 有大佬知道吗,有实际验证过吗 感谢

附上简单sql: CREATE TABLE kafka ( a STRING, b STRING, c BIGINT, process_time BIGINT, e STRING, f STRING, g STRING, h INT, i STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'topic', 'properties.bootstrap.servers' = 'x', 'properties.group.id' = 'test-1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json', 'properties.flink.partition-discovery.interval-millis' = '300000' );

CREATE TABLE filesystem ( day STRING, hour STRING, a STRING, b STRING, c BIGINT, d BIGINT, e STRING, f STRING, g STRING, h INT, i STRING ) PARTITIONED BY (day, hour) WITH ( 'connector' = 'filesystem', 'format' = 'parquet', 'path' = 'hdfs://xx', 'parquet.compression'='SNAPPY', 'sink.partition-commit.policy.kind' = 'success-file' );

insert into filesystem select from_unixtime(process_time,'yyyy-MM-dd') as day, from_unixtime(process_time,'HH') as hour, a, b, c, d, e, f, g, h, i from kafka;

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html#partition-commit-trigger*来自志愿者整理的flink邮件归档

展开
收起
小阿矿 2021-12-03 16:16:33 505 0
1 条回答
写回答
取消 提交回答
  • 按照我的理解,partition time提交分区,是会在current watermark > partition time + commit delay 时机触发分区提交,得看你的sink.partition-commit.delay 设置的多久,如果超过之后,应当默认是会丢弃的吧。

    https://cloud.tencent.com/developer/article/1707182

    这个连接可以看一下 *来自志愿者整理的flink邮件归档

    2021-12-06 10:43:45
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
《Apache Flink-重新定义计算》PDF下载 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载