开发者社区> 问答> 正文

通过savepoint重启任务,link消费kafka,为什么有重复消息?

大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka topic B. 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!

My Versions Flink 1.12.4 Kafka 2.0.1 Java 1.8

Core code:

env.enableCheckpointing(300000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);

tableEnv.createTemporaryView("data_table",dataDS); String sql = "select * from data_table a inner join hive_catalog.dim.dim.project for system_time as of a.proctime as b on a.id = b.id" Table table = tableEnv.sqlQuery(sql); DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx);

// Kafka producer parameter Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, kafkaBufferMemory); producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize); producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs); producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 300000); producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); producerProps.put(ProducerConfig.RETRIES_CONFIG, "5"); producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");

resultDS.addSink(new FlinkKafkaProducer (sinkTopic, new JSONSchema(), producerProps, new FlinkFixedPartitioner<>(), FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5)) .setParallelism(sinkParallelism);*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-02 16:02:50 877 0
1 条回答
写回答
取消 提交回答
  • 我是通过savepoint的方式重启的,命令如下:

    cancel command:

    /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel
    -yid application_1625497885855_698371
    -s hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
    59cf6ccc83aa163bd1e0cd3304dfe06a

    print savepoint:

    hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494

    restart command:

    /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run
    -m yarn-cluster
    -yjm 4096 -ytm 4096
    -ynm User_Click_Log_Split_All
    -yqu syh_offline
    -ys 2
    -d
    -p 64
    -s hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
    -n
    -c com.datacenter.etl.ods.common.mobile.UserClickLogAll
    /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar*来自志愿者整理的FLINK邮件归档

    2021-12-02 16:22:58
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Java Spring Boot开发实战系列课程【第16讲】:Spring Boot 2.0 实战Apache Kafka百万级高并发消息中间件与原理解析 立即下载
MaxCompute技术公开课第四季 之 如何将Kafka数据同步至MaxCompute 立即下载
消息队列kafka介绍 立即下载