在 new ProcessWindowFunction()中创建了ValueState,想在第二天0点的时候ValueState清空开始重新计算,但是返现ValueState并没有清空,而是叠加前一天的继续计算,这个.clear()方法应该在什么时候加,才能生效呢?
--部分代码
.window(TumblingProcessingTimeWindows.of(Time.days(1))) .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(10)))
.process(new ProcessWindowFunction[(String,String,Long), String, Tuple, TimeWindow] {
private var pv_st: ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
pv_st = getRuntimeContext.getState[Long](new ValueStateDescriptor[Long]("pv_stCount", classOf[Long]))
}
override def process(key: Tuple, context: Context, elements: Iterable[(String,String,Long)], out: Collector[String]): Unit = {
var c_st = 0
val elementsIterator = elements.iterator
// 遍历窗口数据,获取唯一word
while (elementsIterator.hasNext) {
val ac_name = elementsIterator.next()._2
if(!ac_name.isEmpty && ac_name.equals("listentime")){
c_st +=1
}
}
val time: Date = new Date()
val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val date = dateFormat.format(time)
// add current
pv_st.update(pv_st.value() + c_st)
var jsonStr = ""+key.getField(0)+"_"+date+"&" // json格式开始
jsonStr += "{"+
""yesterday_foreground_play_pv":""+pv_st.value()+
""}";
//判断逻辑,是否到第二天,如果到第二天状态数据全部清空,重新累加
if(stateDate.equals("") || stateDate.equals(date)){
stateDate=date
out.collect(jsonStr)
}else{
out.collect(jsonStr)
pv_st.clear()
stateDate=date
}
}
})*来自志愿者整理的flink邮件归档```js
从代码看 if(stateDate.equals("") || stateDate.equals(date)) 无法判断究竟是从哪里获取到stateDate变量赋值,不清楚你这里里面的判断逻辑是否能生效。
其次,state.clear() 之后,再次获取时,返回值会是null,代码片段里面也没有看出来哪里有对这个的校验。*来自志愿者整理的FLINK邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。