开发者社区> 问答> 正文

Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContext

如何在聚合函数中使用State?

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}

import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}

import org.apache.flink.api.java.typeutils.TupleTypeInfo

import org.apache.flink.table.functions.{AggregateFunction, FunctionContext}

import java.lang.{Iterable => JIterable}

class IntDiffSumAccumulator extends JTuple2[Int, Boolean]

class IntDiffSumFunction extends AggregateFunction[Int, IntDiffSumAccumulator] {

override def open(context: FunctionContext): Unit = {

// Flink1.7.2 这里没法获取到 RuntimeContext,没有办法初始化State

//getRuntimeContext.getState(desc)

val a = this.hashCode()

print(s"hashCode:$a")

super.open(context)

}

override def createAccumulator(): IntDiffSumAccumulator = {

val acc = new IntDiffSumAccumulator()

acc.f0 = 0

acc.f1 = false

acc

}

def accumulate(accumulator: IntDiffSumAccumulator, value: Int): Unit = {

accumulator.f0 += value

accumulator.f1 = true

}

override def getValue(accumulator: IntDiffSumAccumulator): Int = {

if (accumulator.f1) {

accumulator.f0

} else {

Int.MinValue

}

}

def merge(acc: IntDiffSumAccumulator, its: JIterable[IntDiffSumAccumulator]) = {

val iter = its.iterator()

while (true) {

val a = iter.next()

if (a.f1) {

acc.f0 += a.f0

acc.f1 = true

}

}

}

def resetAccumulator(acc: IntDiffSumAccumulator) = {

acc.f0 = 0

acc.f1 = false

}

override def getAccumulatorType: TypeInformation[IntDiffSumAccumulator] =

new TupleTypeInfo[IntDiffSumAccumulator](BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO)

}*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-08 14:48:35 2447 0
1 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载