开发者社区> 问答> 正文

请教如何在flink job 自定义kafka source function 及时提交offset

我有这样一个业务场景,在checkpoint disable的情况下,不想用auto.commit 功能在等待interval时间后提交offsets, 想要在flink job sink 完成后及时提交offset, 我阅读了一下flinkKafkaConsumer的源码,没有找到实现方式,麻烦请教大神有什么建议的实现方式吗? 或者还有什么其他的建议? 谢谢!  附件中是我写的测试代码。 

53行我想new 一个kafkaConsumer出来,调用commitSync()方法但不管用。 

KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer(properties); 

kafkaConsumer.commitSync();*来自志愿者整理的flink邮件归档

展开
收起
CCCC 2021-12-02 15:07:02 900 0
1 条回答
写回答
取消 提交回答
  • 我猜测,要是可以在invoke的时候,拿到sink当前对应的barriar的话,从这个barriar取出offset,或者在richSinkFunction时,调用getRuntimeContext()上下文对象,从这个上下文中,拿到当前的task  metrics,从而取出task的offset*来自志愿者整理的FLINK邮件归档

    2021-12-02 15:50:26
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载