flink cep 如何实现 跳过策略

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: flink cep 如何实现 跳过策略

指示匹配过程后的跳过策略

今天讲的是 flink cep 如何实现 多个窗口之间的滚动匹配

即避免以下这种情况出现,当然是否需要避免取决你的工作需求或者要学习什么东西

flink cep pattern 代码


```scala

//定义一个匹配模式

val pattern: Pattern[weather, weather] = Pattern

     .begin[weather]("one")

     //一次或多次

     .timesOrMore(12)

     //时间是12小时的范围 相当于窗口

     .within(Time.hours(12))

```


```bash

out -> {[1,May 10 15:00:00 ],[1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ]}

out -> {[1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ],[1,May 10 19:00:00 ]}

out -> {[1,May 10 17:00:00 ],[1,May 10 18:00:00 ],[1,May 10 19:00:00 ],[1,May 10 20:00:00 ]}

```

然而你想要的是这样的匹配规则,则是匹配过一次之后就不再使用这条数据作为其他匹配的数据源


```bash

out -> {[1,May 10 15:00:00 ],[1,May 10 16:00:00 ],[1,May 10 17:00:00 ],[1,May 10 18:00:00 ]}

out -> {[1,May 10 19:00:00 ],[1,May 10 20:00:00 ],[1,May 10 21:00:00 ],[1,May 10 22:00:00 ]}

out -> {[1,May 10 23:00:00 ],[1,May 11 00:00:00 ],[1,May 11 01:00:00 ],[1,May 11 02:00:00 ]}

```

修改匹配模式代码


```scala

   /**

    * AfterMatchSkipStrategy 指示匹配过程后的跳过策略

    */

    //定义一个匹配模式

   val pattern: Pattern[weather, weather] = Pattern

   //AfterMatchSkipStrategy 指示匹配过程后的跳过策略

   //skipPastLastEvent  丢弃在发出的匹配结束之前开始的每个部分匹配

     .begin[weather]("one" , AfterMatchSkipStrategy.skipPastLastEvent())

           //一次或多次

     .timesOrMore(1)

     //时间是12小时的范围 相当于窗口

     .within(Time.hours(5))

```

## 得到臆想的数据

然后就可以得到我们想要的数据啦  看一下我的实际数据


```bash

==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0)))

---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2428,台南,9999.0,9999.0,9999.0)))

==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0)))

---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2429,台中,9999.0,9999.0,9999.0)))

==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0)))

---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2426,台北,9999.0,9999.0,9999.0)))

==== Map(one -> Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0)))

---- Some(Buffer(weather(Tue May 10 15:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 16:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 17:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 18:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0), weather(Tue May 10 19:00:00 CST 2022,台湾省,2427,台东,9999.0,9999.0,9999.0)))

```

## **前方有危险**

这个方法的应用实际上是有问题的,我们来分析一下

我们的规则是匹配过的数据不可以再次进行匹配,然而当我们设置了`.timesOrMore(1)`我们会发现数据里每次只有一条数据,这是为什么呢?

我分析`.timesOrMore(1)`会使模式匹配将所有可以匹配到的数据数据量达到1次时就进行收集,如果我们加上了`.begin[weather]("one",AfterMatchSkipStrategy.skipPastLastEvent())`,這個我们刚刚学了,他是使将所有匹配过的数据不再进行匹配,这个规则再加上`.timesOrMore(1)`会默认有一条1条数据的匹配,所以造成了所有的数据匹配都是一条。!?

## **重点碧坑**

如果我们需要用到`.begin[weather]("one",AfterMatchSkipStrategy.skipPastLastEvent())`和`.timesOrMore(1)`那末我们一定要考虑每次匹配的数据至少有几条,其实这里就已经固定了条数,而不再是或者更更多条数的匹配规则,这里可以考虑动态更改匹配条数,可以做数据库持久化,然后实时动态更新。动态更改所需匹配的固定条数,这里由于才疏学浅,尚未摸索到更好的方法,后续几天如果有解决方案我会更新这个博客。



`革命尚未成功-同志仍须努力`



## **跳过策略**

接下来我们讨论不同跳过策略对匹配结果的影响:


- 不跳过(NO_SKIP)

代码调用 AfterMatchSkipStrategy.noSkip()。这是默认策略,所有可能的匹配都会输出。所以这里会输出完整的 6 个匹配。


- 跳至下一个(SKIP_TO_NEXT)

代码调用 AfterMatchSkipStrategy.skipToNext()。找到一个 a1 开始的最大匹配之后,跳过a1 开始的所有其他匹配,直接从下一个 a2 开始匹配起。当然 a2 也是如此跳过其他匹配。最终得到(a1 a2 a3 b),(a2 a3 b),(a3 b)。可以看到,这种跳过策略跟使用.greedy()效果是相同的。


-  跳过所有子匹配(SKIP_PAST_LAST_EVENT)

代码调用 AfterMatchSkipStrategy.skipPastLastEvent()。找到 a1 开始的匹配(a1 a2 a3 b)之后,直接跳过所有 a1 直到 a3 开头的匹配,相当于把这些子匹配都跳过了。最终得到(a1 a2 a3 b),这是最为精简的跳过策略。


- 跳至第一个(SKIP_TO_FIRST[a])

代码调用 AfterMatchSkipStrategy.skipToFirst(“a”),这里传入一个参数,指明跳至哪个模式的第一个匹配事件。找到 a1 开始的匹配(a1 a2 a3 b)后,跳到以最开始一个 a(也就是 a1)为开始的匹配,相当于只留下 a1 开始的匹配。最终得到(a1 a2 a3 b),(a1 a2 b),(a1 b)。


- 跳至最后一个(SKIP_TO_LAST[a])

代码调用 AfterMatchSkipStrategy.skipToLast(“a”),同样传入一个参数,指明跳至哪个模式的最后一个匹配事件。找到 a1 开始的匹配(a1 a2 a3 b)后,跳过所有 a1、a2 开始的匹配,跳到以最后一个 a(也就是 a3)为开始的匹配。最终得到(a1 a2 a3 b),(a3 b)。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
103 0
|
26天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
398 2
探索Flink动态CEP:杭州银行的实战案例
|
3月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
106 0
|
3月前
|
分布式计算 监控 大数据
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
大数据-129 - Flink CEP 详解 Complex Event Processing - 复杂事件处理
87 0
|
5月前
|
监控 Java API
【揭秘】如何用Flink CEP揪出那些偷偷摸摸连续登录失败的“捣蛋鬼”?——一场数据流中的侦探游戏
【8月更文挑战第26天】Flink 是一款先进的流处理框架,提供复杂事件处理(CEP)功能以识别实时数据流中的特定模式。CEP 在 Flink 中通过 `CEP` API 实现,支持基于模式匹配的事件检测。本文通过监测用户连续三次登录失败的具体案例介绍 Flink CEP 的工作原理与应用方法。首先创建 Flink 环境并定义数据源,接着利用 CEP 定义连续三次失败登录的模式,最后处理匹配结果并输出警报。Flink CEP 能够轻松扩展至更复杂的场景,如异常行为检测和交易欺诈检测等,有效应对多样化的业务需求。
66 0
|
6月前
|
监控 关系型数据库 MySQL
实时计算 Flink版产品使用问题之在进行全量数据初始化时,连接器一般会采用什么策略
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
数据处理 Apache 流计算
【Flink】Flink的CEP机制
【4月更文挑战第21天】【Flink】Flink的CEP机制
|
8月前
|
资源调度 监控 Java
实时计算 Flink版产品使用合集之如何使用CEP库进行数据处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
定位技术 流计算
在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
【2月更文挑战第12天】在Flink CEP中,可以通过定义带有时间约束的模式来匹配事件的持续时间
72 3
|
8月前
|
SQL 消息中间件 Apache
flink问题之cep超时事件如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
108 1

热门文章

最新文章