配置不生效: .setStartingOffsets(OffsetsInitializer.timestamp(1706321869181L))
大家有遇到过这种问题吗?
配置:
<properties>
<hbase.version>2.4.4</hbase.version>
<hadoop.version>3.2.1</hadoop.version>
<flink.version>1.13.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
return KafkaSource.<Tuple4<String, String, byte[], Long>>builder()
.setProperties(properties)
.setTopics(topics)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.timestamp(1706321869181L))
.setDeserializer(new DefaultRecordDeserializationSchema())
.build();
根据您提供的配置和代码,问题可能出在setStartingOffsets
方法的参数上。您设置的起始偏移量为1706321869181L
,这个时间戳对应的日期是2024-01-01 00:00:00
,这可能超出了您的Kafka数据的时间范围。
为了解决这个问题,您可以尝试以下方法:
OffsetsInitializer.latest()
方法来设置起始偏移量。修改后的代码如下:
return KafkaSource.<Tuple4<String, String, byte[], Long>>builder()
.setProperties(properties)
.setTopics(topics)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.latest()) // 修改为从最新数据开始消费
.setDeserializer(new DefaultRecordDeserializationSchema())
.build();
如果问题仍然存在,请检查其他配置项是否正确,以及您的Kafka和Flink环境是否正常。
Flink 1.13.2版本中设置起始偏移量为指定时间戳的方法看起来是正确的。若配置未生效,请检查以下方面:
log.message.timestamp.type
参数,并且该时间戳类型需是CreateTime
或LogAppendTime
之一。版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。