主要代码如下:
class getRule extends KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule], KafkaStreamSource] { private var carEfenceState: MapState[String, Boolean] = _
override def open(parameters: Configuration): Unit = {
carEfenceState = getRuntimeContext.getMapState(new
MapStateDescriptor[String, Boolean]("carEfenceState", classOf[String],
classOf[Boolean]))
}
override def processBroadcastElement(in2: List[Rule], context:
KeyedBroadcastProcessFunction[String, KafkaStreamSource, List[Rule],
KafkaStreamSource]#Context, collector: Collector[KafkaStreamSource]): Unit =
{
context.getBroadcastState(ruleStateDescriptor).put("rules", in2)
}
override def processElement(kafkaSource: KafkaStreamSource,
readOnlyContext: KeyedBroadcastProcessFunction[String, KafkaStreamSource,
List[Rule], KafkaStreamSource]#ReadOnlyContext, collector:
Collector[KafkaStreamSource]): Unit = {
val ruleIterator =
readOnlyContext.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator()
while (ruleIterator.hasNext) {
val ruleMap: Map.Entry[String, List[Rule]] = ruleIterator.next()
val ruleList: List[Rule] = ruleMap.getValue
for (rule <- ruleList) {
val mapKey = kafkaSource.vno + rule.id
val tempState = carEfenceState.get(mapKey)
val currentState = if (tempState != null) tempState else false
// 业务逻辑
if (!currentState) {
...
carEfenceState.put(mapKey, true)
...
} else if (currentState) {
...
carEfenceState.remove(mapKey)
...
}
}
}
}
}
c*来自志愿者整理的flink邮件归档
A read-only view of the {@link BroadcastState}.
*
Although read-only, the user code should not modify the value returned by the {@link
#get(Object)} or the entries of the immutable iterator returned by the {@link
#immutableEntries()}, as this can lead to inconsistent states. The reason for this is that we do
not create extra copies of the elements for performance reasons.
*
@param The key type of the elements in the {@link ReadOnlyBroadcastState}.
@param The value type of the elements in the {@link ReadOnlyBroadcastState}.
*/
这是源码中对ReadOnlyBroadcastState的描述,希望对你有帮助*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。