大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 大数据-125 - Flink State 02篇 状态原理和原理剖析:广播状态

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)

HDFS(已更完)

MapReduce(已更完)

Hive(已更完)

Flume(已更完)

Sqoop(已更完)

Zookeeper(已更完)

HBase(已更完)

Redis (已更完)

Kafka(已更完)

Spark(已更完)

Flink(正在更新!)

章节内容

上节我们完成了如下的内容:


Flink 状态类型

案例1: Flink State 求平均值

Flink State状态描述

广播状态

基本概念

所有并行实例,这些实例将他们维持为状态,不广播另一个流事件,而是将其发送到同一运营的各个实例,并与广播流的事件一起处理。

新的广播状态非常适合需要加入低吞吐量和高吞吐量或需要动态更新其处理逻辑的应用程序,我们将使用后一个用例的具体示例来解释广播状态。

广播状态下的动态模式评估:想象一下,一个电子商务网站将所有用户的交互捕获为用户操作流,运营该网站的公司有兴趣分析交互以增加收入,改善用户体验,以及监测和防止恶意行为。

该网站实现了一个流应用程序,用于检测用户事件流上的模式,但是,公司希望每次模式更改时都避免修改和重新部署应用程序。相反,应用程序从模式流接收新模式时摄取第二个模式流并更新其活动模式。

在上下文中,我们将逐步讨论此应用程序,并展示它如何利用ApacheFlink的广播状态功能。

我们示例应用程序摄取两个数据流,第一个流在网站上提供用户操作,并在上图的左上方展示,用户交互事件包括操作的类型(用户登录、用户注销、添加购物车或者完成付款)和用户的ID,其由颜色编码。

图中所示的用户动作时间流包含用户1001的注销动作,其后是用户1003的支付完成事件,以及用户1002的添加购物车动作。

第二流提供应用将执行的动作模式,评估,模式由两个连续的动作组成。在上图上,模式流包含以下两个:


模式1:用户登录并立即注销而不无需浏览电子商务网站上的其他页面。

模式2:用户将商品添加到购物车并在不完成购买的情况下注销。

这些模式由助于企业更好的分析用户行为,检测恶意行为并改善网站体验。例如,如果项目被添加到购物车而没有后续购买,网站团队可以适当地采取措施来更好的了解用户未完成购买的原因,并启动特定的流程改善网站的转换(比如折扣、优惠卷等)。

在上图的右侧,该图显示了操作员的三个并行任务,即摄取模式和用户操作流,评估操作流上的模式,并在下游接收到新模式时,替换为当前活动模式。原则上,还可以实现运算符以同时评估更复杂的模式或多个模式,这些模式可以单独添加或者移除。


我们将描述模式匹配应用程序如何处理用户操作和模式流:

首先,将模式发送给操作员,该模式被广播到运营商的所有三个并行任务。任务将模式存储在其广播状态中,由于广播状态只应用使用广播数据进行更新,因此所有任务的状态始终预期相同。

接下来,第一个用户操作按用户ID分区并发送到操作员任务,分区可确保同一用户的所有操作都由同一个任务处理,上图显示了操作员任务消耗第一个模式和前三个操作事件后应用程序的状态。

当任务收到新的用户操作时,它会通过查看用户的最新和先前操作来评估当前活动模式,对于每个用户,操作员先前的操作存储在键控状态。由于上图中的任务到目前为止仅为每个用户收到了一个操作(我们刚启动应用程序),因此不需要评估该模式。最后,用户键控状态中的先前操作被更新为最新动作,以便能够在同一用户的下一个动作到达时查找它。

在处理前三个动作之后,下一个事件(用户1001的注销动作)被运送到处理用户1001的事件的任务,当任务接收到动作时,它从广播状态中查找到当前模式并且用户1001的先前操作。由于模式匹配的两个动作,因此任务发出模式匹配事件,最后,任务通过使用最新操作覆盖上一个事件来更新其键控状态。

当新模式到达模式流时,它被广播到所有任务,并且每个任务通过新模式替换当前模式来更新广播状态。

一旦用新模式更新广播状态,匹配逻辑就像之前一样继续,用户动作时间由密钥分区并负责任务评估。


如何使用

那么如何使用广播状态实现应用程序?

到目前为止,概念上讨论了该应用程序并解释了它如何使用广播状态来评估事件流上的动态模式,接下来,我们将展示如何使用 Flink的DataStream API和广播状态功能实现示例应用程序。


让我们从应用程序的输入数据开始,我们有两个数据流,操作和模式,在这一点上,我们并不关心流来来何处。

这些流可以从ApacheKafka或者Kinesis或任何其他系统中摄取。并与两个字段的POJO:

DataStream<Action> actions = ???`
`DataStream<Pattern> patterns = ???
Action``Pattern

Action:Long userId,String action

Pattern:String firstAction String secondAction

作为第一步,我们在属性上键入操作流。接下来,我们准备广播状态,广播状态始终为Flink提供的最通用的状态原语。由于我们的应用程序一次只评估和存储一个,我们将广播状态配置为具体有键类型和值类型。使用广播状态,我们在流上应用转换并接收,在我们获得了KeyedStream和广播流之后,我们都流式传输并应用了一个userId。


编写代码

package icu.wzk;


import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;


public class BroadCastDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        // 两套数据流 1:用户行为 2:模式
        UserAction user1 = new UserAction(1001L, "login");
        UserAction user2 = new UserAction(1003L, "pay");
        UserAction user3 = new UserAction(1002L, "car");
        UserAction user4 = new UserAction(1001L, "logout");
        UserAction user5 = new UserAction(1003L, "car");
        UserAction user6 = new UserAction(1002L, "logout");
        DataStreamSource<UserAction> actions = env.fromElements(user1, user2, user3, user4, user5, user6);
        MyPattern myPattern1 = new MyPattern("login", "logout");
        MyPattern myPattern2 = new MyPattern("car", "logout");
        DataStreamSource<MyPattern> patterns = env.fromElements(myPattern1, myPattern2);
        KeyedStream<UserAction, Long> keyed = actions
                .keyBy(new KeySelector<UserAction, Long>() {
                    @Override
                    public Long getKey(UserAction value) throws Exception {
                        return value.getUserId();
                    }
                });
        // 将模式流广播到下游的所有算子
        MapStateDescriptor<Void, MyPattern> broadcastStateDescriptor = new MapStateDescriptor<>(
                "patterns", Types.VOID, Types.POJO(MyPattern.class));
        BroadcastStream<MyPattern> broadcastPatterns = patterns.broadcast(broadcastStateDescriptor);

        SingleOutputStreamOperator<Tuple2<Long, MyPattern>> process = keyed
                .connect(broadcastPatterns)
                .process(new PatternEvaluator());

        // 匹配结果输出到控制台
        process.print();
        env.execute("BroadCastDemo");
    }

    // 用户行为类
    public static class UserAction {
        private Long userId;
        private String userAction;

        public UserAction(Long userId, String userAction) {
            this.userId = userId;
            this.userAction = userAction;
        }

        public Long getUserId() {
            return userId;
        }

        public String getUserAction() {
            return userAction;
        }
    }

    // 模式类
    public static class MyPattern {
        private String firstAction;
        private String secondAction;

        // 无参构造函数
        public MyPattern() {
        }

        public MyPattern(String firstAction, String secondAction) {
            this.firstAction = firstAction;
            this.secondAction = secondAction;
        }

        public String getFirstAction() {
            return firstAction;
        }

        public void setFirstAction(String firstAction) {
            this.firstAction = firstAction;
        }

        public String getSecondAction() {
            return secondAction;
        }

        public void setSecondAction(String secondAction) {
            this.secondAction = secondAction;
        }
    }

    public static class PatternEvaluator extends KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>> {

        private transient ValueState<String> prevActionState;

        @Override
        public void open(Configuration parameters) throws Exception {
            prevActionState = getRuntimeContext()
                    .getState(new ValueStateDescriptor<>("lastAction", Types.STRING));
        }

        /**
         * 每个一个Action数据,触发一次执行
         * @author wzk
         * @date 11:21 2024/7/29
        **/
        @Override
        public void processElement(UserAction value, KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>>.ReadOnlyContext ctx, Collector<Tuple2<Long, MyPattern>> out) throws Exception {
            // 把用户行为流和模式流中的模式进行匹配
            ReadOnlyBroadcastState<Void, MyPattern> patterns = ctx
                    .getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class)));
            MyPattern myPattern =  patterns.get(null);
            String prevAction = prevActionState.value();
            if (myPattern != null && prevAction != null) {
                if (myPattern.getFirstAction().equals(prevAction) && myPattern.getSecondAction().equals(value.getUserAction())) {
                    // 匹配成功
                    System.out.println("匹配成功: " + ctx.getCurrentKey());
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), myPattern));
                } else {
                    // 匹配失败
                    System.out.println("匹配失败: " + ctx.getCurrentKey());
                }
            }
            prevActionState.update(value.getUserAction());
        }

        /**
         * 每次来一个模式 Pattern 的时候触发
         * @author wzk
         * @date 11:29 2024/7/29
        **/
        @Override
        public void processBroadcastElement(MyPattern value, KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>>.Context ctx, Collector<Tuple2<Long, MyPattern>> out) throws Exception {
            BroadcastState<Void, MyPattern> broadcastState = ctx
                    .getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class)));
            broadcastState.put(null, value);
        }
    }

}

processBroadcastElement():为广播流的每个调用记录,在我们的函数中,我们只是使用键将接收到的记录放入广播状态(记住,我们只存储一个模式)。PatternEvaluator Pattern null MapState

processElement():为键控流的每个记录调用,它提供对广播状态的只读访问,以防止修改导致跨函数的并行示例的不同广播状态,从广播状态检索当前模式的方法和从键控状态检索用户的先前动作。如果两者都存在,则检查先后和当前操作是否与模式匹配,并且如果是这种情况则发出模式匹配记录。最后,它将键控状态更新为当前用户操作。processElement() ParrternEvaluator

onTimer():在先前注册的计时器出发时调用,定时器可以在任何处理方法中注册,并用于执行计算或将清理状态,我们在示例中没有实现此方法以保持代码简洁。但是,当用户在一段时间内未处于活动状态时,它可以用于删除用户的最后一个操作,以避免由于非活动用户而导致状态增长。

运行测试

数据结果为:

匹配失败: 1003
匹配成功: 1001
匹配失败: 1002
3> (1001,icu.wzk.BroadCastDemo$MyPattern@6d1e6dc7)

控制台输出的结果为:

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
7天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
31 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
8天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
37 1
|
29天前
|
存储 分布式计算 druid
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
47 3
|
29天前
|
消息中间件 分布式计算 druid
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
大数据-154 Apache Druid 架构与原理详解 基础架构、架构演进
27 2
|
29天前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
52 1
|
27天前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
|
28天前
|
SQL 运维 大数据
大数据实时计算产品的对比测评
在使用多种Flink实时计算产品后,我发现Flink凭借其流批一体的优势,在实时数据处理领域表现出色。它不仅支持复杂的窗口机制与事件时间处理,还具备高效的数据吞吐能力和精准的状态管理,确保数据处理既快又准。此外,Flink提供了多样化的编程接口和运维工具,简化了开发流程,但在界面友好度上还有提升空间。针对企业级应用,Flink展现了高可用性和安全性,不过价格因素可能影响小型企业的采纳决策。未来可进一步优化文档和自动化调优工具,以提升用户体验。
109 0
|
29天前
|
SQL 消息中间件 分布式计算
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(一)
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(一)
49 0
|
29天前
|
SQL 大数据
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(二)
大数据-141 - ClickHouse 集群 副本和分片 Zk 的配置 Replicated MergeTree原理详解(二)
59 0
|
29天前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
41 0