大佬们请教一个flink 重启生产者的问题,我通过检查点重启作业,但是sink的Kafka topic换了,就一直报错说生产者尝试使用producer id,但是当前没有给其分配对应的transactional id,这个怎么解决?我通过网上了解到的 producer启动时Kafka会为其分配一个唯一的producer id,这个transactional id是我们自己手动指定的?
在Flink中,Kafka的producer id是由Flink自动分配的,而transactional id需要手动指定。当通过检查点重启作业时,如果Kafka topic已经更改,则需要重新配置Kafka producer以使用新的topic和transactional id。
以下是一些可能有用的步骤:
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "my-group");
props.setProperty("enable.auto.commit", "false");
props.setProperty("transactional.id", "my-txn-id");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.fromElements("hello", "world");
stream
.addSink(new KafkaSinkFunction<>(topic, transactionalId, producer));
env.execute("My Flink Job");
class KafkaSinkFunction<T> extends RichSinkFunction<T> {
private final String topic;
private final String transactionalId;
private final Producer<String, String> producer;
public KafkaSinkFunction(String topic, String transactionalId, Producer<String, String> producer) {
this.topic = topic;
this.transactionalId = transactionalId;
this.producer = producer;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void invoke(T value, Context context) throws Exception {
producer.send(new ProducerRecord<>(topic, value));
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。