自版本1.5.0以来,ApacheFlink提供了一种新的状态类型,称为广播状态(Broadcast State)。在本文中,我们会解释什么是广播状态,并通过一个例子,演示如何将其应用于一个用来评估基于事件流的动态模式的应用程序。本文将指导大家学习广播状态的处理步骤和相关的源代码,以便在今后的实践中能实现此类的应用。
什么是广播状态?
广播状态可以用于通过一个特定的方式来组合并共同处理两个事件流。第一个流的事件被广播到另一个operator的所有并发实例,这些事件将被保存为状态。另一个流的事件不会被广播,而是发送给同一个operator的各个实例,并与广播流的事件一起处理。广播状态非常适合两个流中一个吞吐大,一个吞吐小,或者需要动态修改处理逻辑的情况。我们将使用后者的一个具体实例来解释广播状态,并在本文的其余部分里对详细的API加以说明。
使用广播状态的动态模型评估(Dynamic Pattern Evaluation with Broadcast State)
假设一个电子商务类型的网站,它获取了所有用户的操作行为数据作为一个用户的操作流。网站的运营团队致力于分析用户的操作,来提高销售额,改善用户体验,并监测和预防恶意行为。我们要实现了一个流应用程序,用于检测用户事件流中的模式。但是,公司希望避免在每次模式有变化的时候还要修改和重新部署应用程序。反而,我们使用另外一个特征流来读取、更新当前特征,接下来我们通过一个实例逐步阐述如何通过ApacheFlink中的广播状态来完成相应工作。
实例的程序获取两个数据流。第一个流提供了网站上的用户操作行为数据,如上图左上方所示。一个用户的交互事件由操作的类型(用户登录、用户注销、添加到购物车或者完成付款等)和用户的ID(按颜色编码的)组成。图中的用户操作事件流包含用户1001的“登出”操作,然后是用户1003的“支付完成”事件,以及用户1002的“添加到购物车”操作。
第二个流提供了应用程序要评估的用户操作模式。模式是由两个连续的操作组成的。在上图中,模式流包含了以下两种:
• 模式1:用户登录并立即登出,并没有点击网站上其它的页面;
• 模式2:用户将商品添加到购物车,然后登出,而并没有完成购买操作;
这样的模式有助于企业更好地分析用户行为、检测恶意行为和提高网站体验。例如,如果只是将商品添加到购物车里而没有完成后续的支付,那么网站可以采取合适的方法,更好地了解用户没有购买的原因,并采取一定的措施以提高网站的购买转化率(例如提供优惠券、免运费等)。
在上图右侧,显示了一个operator的三个并发实例,这些实例获取模式和用户操作行为的数据流,评估数据流上的模式,并向下游发出模式匹配事件。为了简便起见,我们的例子中的operator只对一个进行两次后续操作行为的模式进行评估。当从模式流中获取到新模式的时候,将替换当前活动的模式。原则上,该operator也可以实现评估更复杂的模式或多个模式,这些模式可以单独添加或是删除。
我们将描述负责模式匹配的程序如何处理用户的操作和模式流。
首先,向operator发送一个模式,该模式被广播给这个operator的三个并发实例,接着,每个并发实例将模式存储在广播状态中。由于广播状态只能使用广播数据来进行更新,因此所有并发实例的状态都应该是相同的。
接下来,第一个用户信息流会基于用户 ID进行划分,并发送给operator的实例。分区会确保同一用户的所有操作都由同一并发实例处理。上图显示了在operator实例处理了第一个模式和前三个操作行为事件之后应用程序的状态。
当任务接收到新的用户操作数据时,它通过查看用户最新的和历史的操作记录来评估当前的活动模式。对于每个用户,operator都在keyed state中存储用户的上一个操作。到目前为止,由于上图中的任务只为每个用户接收一个操作(我们刚刚启动了应用程序),因此不需要评估模式。最后,keyed state中用户的上一个操作将更新为最新的操作,以便在同一用户的下一个操作行为到达时能够进行查找。
在前三个操作行为被处理了之后,下一个事件,即用户1001的注销操作,将被发送到处理用户1001的并发实例中。当并发实例接收到用户操作的数据时,它从广播状态和用户1001的上一个操作中查找当前的模式。由于这两个操作符合模式匹配,因此会往下游发送匹配事件。最后,该任务会通过使用最新的操作来覆盖前一个事件以更新其keyed state。
当一个新模式进入了模式流,它会被广播给所有任务,并且每个并发实例通过使用新模式替换当前模式来更新其广播状态。
一旦广播状态更新为新模式,那么匹配逻辑将像以前一样继续执行,即用户操作行为事件按键(key)进行分区,并由负责的并发实例进行评估。
如何实现广播状态的应用程序呢?
到目前为止,我们在概念上讨论了应用程序,并解释了如何使用广播状态来评估事件流上的动态模式。接下来,我们将展示如何使用Flink的DataStream API和广播状态功能实现该实例的程序代码。
让我们从程序的输入数据开始。有两个数据流:操作行为流和模式流,在这一点上,我们并不关心数据流从何而来。这些流可以从Apache Kafka、Kinesis或任何其它系统中获取。
Action 和 Pattern都是POJO,每个都含有两个字段:
• Action的字段: Long userId, String action
• Pattern的字段: String firstAction, String secondAction
作为第一步,我们将 userId作为操作行为流上的键。
接下来,我们准备广播状态。广播状态通常表示为MapState,这是Flink提供的最通用的状态接口类。
由于这个应用程序一次只评估和存储一个Pattern,所以我们将广播状态配置成具有键类型Void和值类型Pattern的MapState。MapState的键永远为 null。
以 MapStateDescriptor为参数,调用模式流上的broadcast 转换操作,得到一个 BroadcastStream对象 bcedPatterns。
在获得了 keyedstreamactionsByUser和广播流 bcedPatterns之后,我们对两个流使用了connect()方法,并在连接的流上调用了PatternEvaluator类(见下面PatternEvaluator的代码)。PatternEvaluator是实现KeyedBroadcastProcessFunction接口的自定义类。它调用了我们之前讨论过的模式匹配逻辑,并发出Tuple2的记录,其中包含用户ID和匹配的模式。
KeyedBroadcastProcessFunction接口提供了三种方法来处理数据记录和发出的结果:
• processBroadcastElement()方法:每次收到广播流的记录时会调用。在PatternEvaluator类中,我们只需使用null键将接收到的Pattern记录放入广播状态中(记住,我们只在MapState中存储一个模式);
• processElement()方法:接受到用户行为流的每条消息时会调用。并能够对广播状态进行只读操作,以防止导致跨越类中多个并发实例的不同广播状态的修改。PatternEvaluator类的processElement()方法从广播状态中获取当前模式,并从keyed state中获取用户的前一个操作。如果两者都存在,它会检查前一个和当前的操作行为是否与模式匹配,如果是这样,则会发出模式匹配记录。最后,它将keyed state更新为当前用户操作;
• onTimer()方法:当之前注册过的计时器触发时被调用。计时器可以在processElement方法中定义,用于执行计算或是清除状态。为了保持代码的简洁性,我们没有在例子中实现这个方法。但是,当用户在某段时间内没有操作时,它可以用来删除最后一个操作,以避免由于非活动用户而导致状态增长;
你可能注意到了KeyedBroadcastProcessFunction类方法的上下文对象,提供了对其它功能的访问方法,例如:
• 广播状态(读写或只读,取决于方法)
• TimerService,允许访问记录的时间戳、当前的水印,并可以注册计时器
• 当前键(仅在processElement()方法中可用)
• 一种将函数应用于每个已注册键的keyed state的方法(仅在processBroadcastElement()方法中可用)
KeyedBroadcastProcessFunction类与其它任何ProcessFunction类一样,完全可以调用Flink的状态和时间功能,因此可以用于实现复杂的程序逻辑。广播状态被设计成了多功能,能够适应不同的场景和用例。虽然我们只讨论了一个比较简单的应用程序,但是你可以通过多个方式使用广播状态来实现应用的需求。
结论
在本文中,我们通过学习一个应用程序的实例,来解释ApacheFlink的广播状态是什么,以及如何应用它来评估事件流上的动态模式。我们还讨论了它的API,并展示了相关的源代码。