开发者社区> 问答> 正文

MapState 无法更新怎么办

主要代码如下:

class getRule extends KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule], KafkaStreamSource] { private var carEfenceState: MapState[String, Boolean] = _

override def open(parameters: Configuration): Unit = {

carEfenceState = getRuntimeContext.getMapState(new

MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String],

classOf[Boolean]))

}

override def processBroadcastElement(in2: List[Rule], context:

KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule],

KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit =

{

context.getBroadcastState(ruleStateDescriptor).put("rules", in2)

}

override def processElement(kafkaSource: KafkaStreamSource,

readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource,

List[Rule], KafkaStreamSource]#ReadOnlyContext, collector:

Collector[KafkaStreamSource]): Unit = {

val ruleIterator =

readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator()

while (ruleIterator.hasNext) {

val ruleMap: Map.Entry[String, List[Rule]] = ruleIterator.next()

val ruleList: List[Rule] = ruleMap.getValue

for (rule <- ruleList) {

val mapKey = kafkaSource.vno + rule.id

val tempState = carEfenceState.get(mapKey)

val currentState = if (tempState != null) tempState else false

// 业务逻辑

if (!currentState) {

...

carEfenceState.put(mapKey, true)

...

} else if (currentState) {

...

carEfenceState.remove(mapKey)

...

}

}

}

}

}

c*来自志愿者整理的flink邮件归档

展开
收起
EXCEED 2021-12-01 15:31:54 828 0
1 条回答
写回答
取消 提交回答
  • A read-only view of the {@link BroadcastState}.

    *

    • Although read-only, the user code should not modify the value returned by the {@link

    • #get(Object)} or the entries of the immutable iterator returned by the {@link

    • #immutableEntries()}, as this can lead to inconsistent states. The reason for this is that we do

    • not create extra copies of the elements for performance reasons.

    *

    • @param The key type of the elements in the {@link ReadOnlyBroadcastState}.

    • @param The value type of the elements in the {@link ReadOnlyBroadcastState}.

    */

    这是源码中对ReadOnlyBroadcastState的描述,希望对你有帮助*来自志愿者整理的flink邮件归档

    2021-12-01 15:54:55
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载