开发者社区> 问答> 正文

flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

Hi,社区的各位大家好: 我目前生产上面使用的是1.8.2版本,相对稳定 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究, 截止目前先后研究了1.10.1 1.11.1共2个大版本

在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加 状态后端使用的是rocksdb 的增量模式 StateBackend backend =new RocksDBStateBackend("hdfs:///checkpoints-data/",true); 设置了官网文档中找到的删除策略: TableConfig tableConfig = streamTableEnvironment.getConfig(); tableConfig.setIdleStateRetentionTime(Time.minutes(2), Time.minutes(7));

请问是我使用的方式不对吗?

通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator

版本影响:flink1.10.1 flink1.11.1 planner:blink planner source : kafka source 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

sql: insert into result select request_time ,request_id ,request_cnt ,avg_resptime ,stddev_resptime ,terminal_cnt ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'yyyy-MM-dd HH:mm:ss.SSS'),0,19) from ( select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time ,commandId as request_id ,count(*) as request_cnt ,ROUND(avg(CAST(respTime as double)),2) as avg_resptime ,ROUND(stddev_pop(CAST(respTime as double)),2) as stddev_resptime from log where commandId in (104005 ,204005 ,404005) and errCode=0 and attr=0 group by TUMBLE(times, INTERVAL '1' MINUTE),commandId

union all

select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1' MINUTE),'yyyy-MM-dd HH:mm:ss.SSS'),0,21) as request_time ,99999999 ,count(*) as request_cnt ,ROUND(avg(CAST(respTime as double)),2) as avg_resptime ,ROUND(stddev_pop(CAST(respTime as double)),2) as stddev_resptime from log where commandId in (104005 ,204005 ,404005) and errCode=0 and attr=0 group by TUMBLE(times, INTERVAL '1' MINUTE) )

source:

create table log ( eventTime bigint ,times timestamp(3) …………………… ,commandId integer ,watermark for times as times - interval '5' second ) with( 'connector' = 'kafka-0.10', 'topic' = '……', 'properties.bootstrap.servers' = '……', 'properties.group.id' = '……', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' )

sink1: create table result ( request_time varchar ,request_id integer ,request_cnt bigint ,avg_resptime double ,stddev_resptime double ,insert_time varchar ) with ( 'connector' = 'kafka-0.10', 'topic' = '……', 'properties.bootstrap.servers' = '……', 'properties.group.id' = '……', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' )*来自志愿者整理的flink邮件归档

展开
收起
说了是一只鲳鱼 2021-12-07 10:42:49 1019 0
1 条回答
写回答
取消 提交回答
  • SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state 越来越大的情况,或许可以检查下

    watermark[1]

    [1]

    https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html *来自志愿者整理的flink邮件归档

    2021-12-07 11:28:44
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
SQL Server在电子商务中的应用与实践 立即下载
GeoMesa on Spark SQL 立即下载
原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载