点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(已更完)
Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
Flink 状态类型
案例1: Flink State 求平均值
Flink State状态描述
广播状态
基本概念
所有并行实例,这些实例将他们维持为状态,不广播另一个流事件,而是将其发送到同一运营的各个实例,并与广播流的事件一起处理。
新的广播状态非常适合需要加入低吞吐量和高吞吐量或需要动态更新其处理逻辑的应用程序,我们将使用后一个用例的具体示例来解释广播状态。
广播状态下的动态模式评估:想象一下,一个电子商务网站将所有用户的交互捕获为用户操作流,运营该网站的公司有兴趣分析交互以增加收入,改善用户体验,以及监测和防止恶意行为。
该网站实现了一个流应用程序,用于检测用户事件流上的模式,但是,公司希望每次模式更改时都避免修改和重新部署应用程序。相反,应用程序从模式流接收新模式时摄取第二个模式流并更新其活动模式。
在上下文中,我们将逐步讨论此应用程序,并展示它如何利用ApacheFlink的广播状态功能。
我们示例应用程序摄取两个数据流,第一个流在网站上提供用户操作,并在上图的左上方展示,用户交互事件包括操作的类型(用户登录、用户注销、添加购物车或者完成付款)和用户的ID,其由颜色编码。
图中所示的用户动作时间流包含用户1001的注销动作,其后是用户1003的支付完成事件,以及用户1002的添加购物车动作。
第二流提供应用将执行的动作模式,评估,模式由两个连续的动作组成。在上图上,模式流包含以下两个:
模式1:用户登录并立即注销而不无需浏览电子商务网站上的其他页面。
模式2:用户将商品添加到购物车并在不完成购买的情况下注销。
这些模式由助于企业更好的分析用户行为,检测恶意行为并改善网站体验。例如,如果项目被添加到购物车而没有后续购买,网站团队可以适当地采取措施来更好的了解用户未完成购买的原因,并启动特定的流程改善网站的转换(比如折扣、优惠卷等)。
在上图的右侧,该图显示了操作员的三个并行任务,即摄取模式和用户操作流,评估操作流上的模式,并在下游接收到新模式时,替换为当前活动模式。原则上,还可以实现运算符以同时评估更复杂的模式或多个模式,这些模式可以单独添加或者移除。
我们将描述模式匹配应用程序如何处理用户操作和模式流:
首先,将模式发送给操作员,该模式被广播到运营商的所有三个并行任务。任务将模式存储在其广播状态中,由于广播状态只应用使用广播数据进行更新,因此所有任务的状态始终预期相同。
接下来,第一个用户操作按用户ID分区并发送到操作员任务,分区可确保同一用户的所有操作都由同一个任务处理,上图显示了操作员任务消耗第一个模式和前三个操作事件后应用程序的状态。
当任务收到新的用户操作时,它会通过查看用户的最新和先前操作来评估当前活动模式,对于每个用户,操作员先前的操作存储在键控状态。由于上图中的任务到目前为止仅为每个用户收到了一个操作(我们刚启动应用程序),因此不需要评估该模式。最后,用户键控状态中的先前操作被更新为最新动作,以便能够在同一用户的下一个动作到达时查找它。
在处理前三个动作之后,下一个事件(用户1001的注销动作)被运送到处理用户1001的事件的任务,当任务接收到动作时,它从广播状态中查找到当前模式并且用户1001的先前操作。由于模式匹配的两个动作,因此任务发出模式匹配事件,最后,任务通过使用最新操作覆盖上一个事件来更新其键控状态。
当新模式到达模式流时,它被广播到所有任务,并且每个任务通过新模式替换当前模式来更新广播状态。
一旦用新模式更新广播状态,匹配逻辑就像之前一样继续,用户动作时间由密钥分区并负责任务评估。
如何使用
那么如何使用广播状态实现应用程序?
到目前为止,概念上讨论了该应用程序并解释了它如何使用广播状态来评估事件流上的动态模式,接下来,我们将展示如何使用 Flink的DataStream API和广播状态功能实现示例应用程序。
让我们从应用程序的输入数据开始,我们有两个数据流,操作和模式,在这一点上,我们并不关心流来来何处。
这些流可以从ApacheKafka或者Kinesis或任何其他系统中摄取。并与两个字段的POJO:
DataStream<Action> actions = ???` `DataStream<Pattern> patterns = ??? Action``Pattern
Action:Long userId,String action
Pattern:String firstAction String secondAction
作为第一步,我们在属性上键入操作流。接下来,我们准备广播状态,广播状态始终为Flink提供的最通用的状态原语。由于我们的应用程序一次只评估和存储一个,我们将广播状态配置为具体有键类型和值类型。使用广播状态,我们在流上应用转换并接收,在我们获得了KeyedStream和广播流之后,我们都流式传输并应用了一个userId。
编写代码
package icu.wzk; import org.apache.flink.api.common.state.*; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.util.Collector; public class BroadCastDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // 两套数据流 1:用户行为 2:模式 UserAction user1 = new UserAction(1001L, "login"); UserAction user2 = new UserAction(1003L, "pay"); UserAction user3 = new UserAction(1002L, "car"); UserAction user4 = new UserAction(1001L, "logout"); UserAction user5 = new UserAction(1003L, "car"); UserAction user6 = new UserAction(1002L, "logout"); DataStreamSource<UserAction> actions = env.fromElements(user1, user2, user3, user4, user5, user6); MyPattern myPattern1 = new MyPattern("login", "logout"); MyPattern myPattern2 = new MyPattern("car", "logout"); DataStreamSource<MyPattern> patterns = env.fromElements(myPattern1, myPattern2); KeyedStream<UserAction, Long> keyed = actions .keyBy(new KeySelector<UserAction, Long>() { @Override public Long getKey(UserAction value) throws Exception { return value.getUserId(); } }); // 将模式流广播到下游的所有算子 MapStateDescriptor<Void, MyPattern> broadcastStateDescriptor = new MapStateDescriptor<>( "patterns", Types.VOID, Types.POJO(MyPattern.class)); BroadcastStream<MyPattern> broadcastPatterns = patterns.broadcast(broadcastStateDescriptor); SingleOutputStreamOperator<Tuple2<Long, MyPattern>> process = keyed .connect(broadcastPatterns) .process(new PatternEvaluator()); // 匹配结果输出到控制台 process.print(); env.execute("BroadCastDemo"); } // 用户行为类 public static class UserAction { private Long userId; private String userAction; public UserAction(Long userId, String userAction) { this.userId = userId; this.userAction = userAction; } public Long getUserId() { return userId; } public String getUserAction() { return userAction; } } // 模式类 public static class MyPattern { private String firstAction; private String secondAction; // 无参构造函数 public MyPattern() { } public MyPattern(String firstAction, String secondAction) { this.firstAction = firstAction; this.secondAction = secondAction; } public String getFirstAction() { return firstAction; } public void setFirstAction(String firstAction) { this.firstAction = firstAction; } public String getSecondAction() { return secondAction; } public void setSecondAction(String secondAction) { this.secondAction = secondAction; } } public static class PatternEvaluator extends KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>> { private transient ValueState<String> prevActionState; @Override public void open(Configuration parameters) throws Exception { prevActionState = getRuntimeContext() .getState(new ValueStateDescriptor<>("lastAction", Types.STRING)); } /** * 每个一个Action数据,触发一次执行 * @author wzk * @date 11:21 2024/7/29 **/ @Override public void processElement(UserAction value, KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>>.ReadOnlyContext ctx, Collector<Tuple2<Long, MyPattern>> out) throws Exception { // 把用户行为流和模式流中的模式进行匹配 ReadOnlyBroadcastState<Void, MyPattern> patterns = ctx .getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class))); MyPattern myPattern = patterns.get(null); String prevAction = prevActionState.value(); if (myPattern != null && prevAction != null) { if (myPattern.getFirstAction().equals(prevAction) && myPattern.getSecondAction().equals(value.getUserAction())) { // 匹配成功 System.out.println("匹配成功: " + ctx.getCurrentKey()); out.collect(new Tuple2<>(ctx.getCurrentKey(), myPattern)); } else { // 匹配失败 System.out.println("匹配失败: " + ctx.getCurrentKey()); } } prevActionState.update(value.getUserAction()); } /** * 每次来一个模式 Pattern 的时候触发 * @author wzk * @date 11:29 2024/7/29 **/ @Override public void processBroadcastElement(MyPattern value, KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>>.Context ctx, Collector<Tuple2<Long, MyPattern>> out) throws Exception { BroadcastState<Void, MyPattern> broadcastState = ctx .getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class))); broadcastState.put(null, value); } } }
processBroadcastElement():为广播流的每个调用记录,在我们的函数中,我们只是使用键将接收到的记录放入广播状态(记住,我们只存储一个模式)。PatternEvaluator Pattern null MapState
processElement():为键控流的每个记录调用,它提供对广播状态的只读访问,以防止修改导致跨函数的并行示例的不同广播状态,从广播状态检索当前模式的方法和从键控状态检索用户的先前动作。如果两者都存在,则检查先后和当前操作是否与模式匹配,并且如果是这种情况则发出模式匹配记录。最后,它将键控状态更新为当前用户操作。processElement() ParrternEvaluator
onTimer():在先前注册的计时器出发时调用,定时器可以在任何处理方法中注册,并用于执行计算或将清理状态,我们在示例中没有实现此方法以保持代码简洁。但是,当用户在一段时间内未处于活动状态时,它可以用于删除用户的最后一个操作,以避免由于非活动用户而导致状态增长。
运行测试
数据结果为:
匹配失败: 1003 匹配成功: 1001 匹配失败: 1002 3> (1001,icu.wzk.BroadCastDemo$MyPattern@6d1e6dc7)
控制台输出的结果为: