开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flink 1.15 在写flink sql 程序,不能从检查点恢复数据,希望大佬帮忙看看

flink 1.15 在写flink sql 程序on yarn的时候,设置了检查点,手动取消任务,然后从检查点启动程序,好像不能从检查点读取到状态。希望大佬帮忙看看,困扰好几天了。

说明: flink版本:flink 1.15 运行环境:flink on yarn的应用模式 代码模式:在java中写的flink sql

逻辑描述: 1.建一张读取kafka数据的来源表 2.建一张用于sink的mysql表 3.汇总kafka来源表的数据,然后插入sink表(2个字段:编号名称,编号的数量)

问题描述: 1.启动程序 2.往kafka写入2条相同编号(00001)的数据 3.程序会统计这个编号的数据量写入sink表,结果为:00001,2 4.手动停止程序 5.从检查点启动程序 6.往kafka再写入一条编号(00001)的数据 7.程序将结果写入sink表,结果为:00001,1 问题来了,这里统计的编号00001的数量为1,正常来说从检查点恢复,统计的结果应该是数量为3才对。

代码: 1.读取kafka数据代码: String createSourceKakfaDDL ="create table source_kafka_table_report_info( \n" + " msg_id STRING,\n" + " report_no STRING,\n" + " min_bms_soc decimal(11,2),\n" + " max_bms_soc decimal(11,2),\n" + " min_ev_mileage decimal(11,2),\n" + " max_ev_mileage decimal(11,2),\n" + " min_fuel_mileage decimal(11,2),\n" + " max_fuel_mileage decimal(11,2),\n" + " min_fuel_remaining decimal(11,2),\n" + " max_fuel_remaining decimal(11,2),\n" + " BCM_TyrePressure_LF decimal(11,2),\n" + " BCM_TyrePressure_RF decimal(11,2),\n" + " BCM_TyrePressure_LR decimal(11,2),\n" + " BCM_TyrePressure_RR decimal(11,2),\n" + " file_path STRING,\n" + " collect_time TIMESTAMP(3),\n" + " vin STRING,\n" + " min_mileage decimal(11,2),\n" + " max_mileage decimal(11,2),\n" + " proctime AS PROCTIME(),\n" + //" ts AS TO_TIMESTAMP(collect_time),"+ //" ts AS TO_TIMESTAMP(FROM_UNIXTIME(collect_time/1000))\n" + " WATERMARK FOR collect_time AS collect_time - INTERVAL '1' SECOND" + " ) WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'topic-atp-vat-report-info',\n" + " 'properties.bootstrap.servers' = '"+bootstrap_servers+"',\n" + //" 'scan.startup.mode' = 'earliest-offset',\n" + " 'scan.startup.mode' = 'latest-offset'," + " 'properties.group.id' = 'test_group03',\n" +
" 'format' = 'json' \n" + " )"; tableEnv.executeSql(createSourceKakfaDDL);

2.设置检查点代码: env.enableCheckpointing(10000L, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("hdfs://nameservice:8020/user/hdfs/flink/test01_status"); env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));

3.创建输出表代码: String createOutDDL ="create table eol_test_report_xt_test(" + " report_no string," + " report_count decimal(11,2)," + " primary key (report_no) not enforced" + ") WITH (" + "'connector' = 'jdbc'," + " 'url' ='"+mysql_url+"'," + " 'driver' = 'com.mysql.jdbc.Driver', " + " 'table-name' = 'eol_test_report_xt_test', " + " 'username' = '"+mysql_username+"', " + " 'password' = '"+mysql_password+"', " + " 'sink.buffer-flush.max-rows' = '10000', " + " 'sink.buffer-flush.interval' = '5s', " + " 'sink.max-retries' = '3', " + " 'sink.parallelism' = '2' " + ")";

    tableEnv.executeSql(createOutDDL);

4.插入目标表代码: String insertSinkDDL ="insert into eol_test_report_xt_test"+ " ("+ " report_no," + " report_count" + " )" + " select " + " t01.report_no, "+ " count(*) as report_count" + " from source_kafka_table_report_info t01" + " group by t01.report_no "+ ""; tableEnv.executeSql(insertSinkDDL);

5.从检查点恢复程序命令: bin/flink run-application -t yarn-application -d -Dtaskmanager.memory.process.size=2048M -Dtaskmanager.numberOfTaskSlots=3 -p 1 -c com.dfsk.atp.vat.faultdetection.Test01 ./projects/atp-vat-flink-data-1.0-SNAPSHOT.jar --mode dev -s hdfs://nameservice:8020/user/hdfs/flink/test01_status/ccbab43446872f8b69e165b54b191cfb/chk-10/_metadata

展开
收起
游客fuzojzpl5x2bu 2023-06-15 16:30:00 268 0
4 条回答
写回答
取消 提交回答
  • 公众号:网络技术联盟站,InfoQ签约作者,阿里云社区签约作者,华为云 云享专家,BOSS直聘 创作王者,腾讯课堂创作领航员,博客+论坛:https://www.wljslmz.cn,工程师导航:https://www.wljslmz.com

    在 Flink 中,检查点是一种重要的容错机制,用于保障流处理任务的数据一致性和可靠性。如果您在使用 Flink SQL 编写程序时,遇到了无法从检查点恢复数据的问题,可能有以下几个原因:

    1. 检查点开启失败。在 Flink 中,需要显式地开启检查点功能才能使用它。您可以通过在 Flink SQL 程序中设置相关参数来启用检查点机制,比如设置 checkpointing.mode 和 checkpointing.interval 参数。如果检查点开启失败,将无法进行数据恢复。可以查看 Flink 的日志文件或控制台输出,查看有无相关错误信息。

    2. 状态后端配置错误。在使用检查点机制时,需要将状态保存到状态后端中。Flink 支持多种状态后端,包括内存、文件系统、HDFS、RocksDB 等。如果状态后端配置错误,可能导致无法从检查点恢复数据。建议您仔细检查状态后端配置是否正确。

    3. 数据源读取错误。如果 Flink SQL 程序中的数据源读取出现错误,可能导致无法正确生成检查点。例如,数据源读取超时、连接中断等情况都可能导致数据生成不完整。建议您仔细检查数据源读取部分是否正常。

    4. 程序逻辑错误。Flink SQL 程序本身可能存在逻辑错误,导致无法正确恢复数据。建议您检查程序逻辑是否正确,并尝试对程序进行调试。

    2023-06-16 15:03:41
    赞同 展开评论 打赏
  • 北京阿里云ACE会长

    在 Flink SQL 中使用检查点功能需要进行以下设置:

    首先需要在 Flink 集群中启用状态后端,例如使用 RocksDB 等状态后端。在 Flink 的配置文件中设置 state.backend 参数,指定状态后端类型,例如 state.backend: rocksdb。

    然后需要设置检查点相关的参数,例如检查点间隔时间、并发检查点数等参数。在 Flink SQL 中,可以通过 SET 命令设置这些参数,例如:

    pgsql Copy SET env.checkpoint.interval = 60000; SET env.checkpoint.max-concurrent-checkpoints = 1; 上述代码设置了检查点间隔时间为 60 秒,最大并发检查点数为 1。

    在 Flink SQL 中创建表时,需要设置表的属性,包括是否支持检查点、是否需要恢复等属性。例如,在创建 Kafka 来源表时,可以设置如下属性: scheme Copy 'connector' = 'kafka', 'topic' = 'source_topic', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'source_group_id', 'scan.startup.mode' = 'earliest-offset', 'scan.topic-partition-discovery.interval' = '60000', 'scan.checkpoint-enabled' = 'true', 'scan.checkpoint-mode' = 'exactly-once', 'scan.checkpoint-restore' = 'true', 'scan.checkpoint-restore.path' = 'hdfs://localhost:9000/flink/checkpoints', 'scan.checkpoint-restore.type' = 'filesystem', 'scan.checkpoint-restore.external.path' = 'true' 上述代码设置了 Kafka 来源表的检查点相关属性,包括启用检查点、设置检查点模式为精确一次、设置检查点恢复路径为 HDFS、设置外部路径等。

    在 Flink SQL 中执行作业时,可以设置作业的属性,例如并行度、任务取消时是否保留状态等。例如,在执行汇总数据的作业时,可以设置如下属性: Copy SET execution.parallelism = 4; SET execution.savepoint-mode = 'NEVER'; 上述代码设置了作业的并行度为 4,设置任务取消时不保留状态。

    如果您的程序不能从检查点恢复数据,可以检查一下上述设置是否正确,并查看具体的错误日志,以确定问题所在。同时,在 Flink SQL 编程时,建议使用 Flink 的 Java/Scala API 来编写程序,这样可以更好地控制程序的状态和检查点,以避免出现一些不可预期的问题。

    2023-06-16 08:02:48
    赞同 展开评论 打赏
  • 值得去的地方都没有捷径

    您好,从您提供的信息来看,可能是因为在从检查点启动程序后,程序无法正确恢复状态。请检查以下几个方面:

    检查您的 Flink 集群的配置文件,确保在检查点启动时,状态后端和检查点存储路径的配置正确。您可以使用 Flink UI 来查看检查点的状态和存储路径是否正确。

    检查您的 SQL 程序是否正确设置了状态后端和检查点存储路径。您可以在程序中添加以下代码:

    SET execution.checkpointing.mode = 'EXACTLY_ONCE'; SET execution.checkpointing.interval = '1m'; SET execution.checkpointing.timeout = '10m'; SET execution.checkpointing.min-pause = '500ms'; SET state.backend = 'filesystem'; SET state.checkpoints.dir = 'hdfs:///flink/checkpoints'; 请根据您的实际情况修改检查点相关的配置参数。

    检查您的 SQL 程序是否正确处理了从检查点恢复的情况。例如,您需要在程序中添加以下代码来处理从检查点恢复的情况: INSERT INTO mysql_sink_table SELECT report_no, COUNT(*) AS cnt FROM source_kafka_table_report_info GROUP BY report_no 请注意,您需要在 SQL 程序中正确处理状态的恢复和更新,以确保程序能够正确地从检查点恢复状态并继续处理数据。

    希望这些建议能够帮助您解决问题。如果问题仍然存在,请提供更多的信息和代码,以便我们更好地帮助您。

    2023-06-15 18:50:06
    赞同 展开评论 打赏
  • 根据你提供的代码描述,您的 Flink SQL 程序应该可以使用检查点机制实现重启后状态的恢复。不过,我注意到您的问题描述中说“往 Kafka 再写入一条编号(00001) 的数据,程序将结果写入 sink 表,结果为:00001,1”, 这个现象看起来是 Flink 程序并没有完全从检查点中恢复状态。

    针对您遇到的问题,我提供以下一些可能的原因和解决方案供您参考:

    1. 请确保您对 Flink 程序所使用的 state 的定义以及 checkpoint 的设置与标准实践一致。state 的定义错误或 checkpoint 没有正确配置可能会导致状态恢复失败。

    2. 检查一下是否存在特殊情况,比如网络中断或存储系统出现故障等原因,导致检查点数据无法正常保存,从而导致恢复失败。可以在程序运行时配置相关监控和告警机制来检测这些问题。

    3. 如果第二次写入相同的编号(00001) 的数据不是在手动取消任务后马上发生的,而是在等待一段时间后再发送的,那么可能是因为部分检查点数据已经过期或被丢弃。在这种情况下,可以调整相关配置,如存储检查点的时间间隔、保留检查点的数量和过期时间,来避免数据丢失。

    4. 如果前三点都没有解决问题,可以考虑在 Flink 程序中添加一些日志语句,来追踪代码执行过程中出现的问题,以便定位解决。

    2023-06-15 16:38:02
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    SQL Server 2017 立即下载
    GeoMesa on Spark SQL 立即下载
    原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载