模式匹配在SQL中应用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 模式匹配在Flink中经常听到,即CEP。CEP在DataStream API中应用已经非常成熟了,在近两年FlinkSQl中也逐渐应用起来,离线场景中如何应用模式匹配是本文主要研究的方向

很多场景中都会应用到模式匹配如:用户异常行为实时监测、银行卡异地监控、下单未支付等等;FlinkSQL中使用MATCH_RECOGNIZE子句进行复杂事件处理,我们先看个FlinkSQL中如何识别

  1. 在FlinkSQL client中创建一个测试表Ticket 其schema 如下
Ticket
     |-- symbol: String                           # 股票的代号
     |-- price: Long                              # 股票的价格
     |-- tax: Long                                # 股票应纳税额
     |-- rowtime: TimeIndicatorTypeInfo(rowtime)  # 更改这些值的时间点
Flink SQL>CREATETABLE Ticket (>             symbol string,>             price int,>             tax int,>             rowtime TIMESTAMP(3),>             WATERMARK FOR rowtime AS rowtime --模式匹配必须要有水位线>) WITH (>'connector'='filesystem',>'format'='csv',>'path'='file:///mnt/ps/SAS/BigData/file/ticket.csv'>);[INFO] Execute statement succeed.
Flink SQL>
  1. 为了简化,我们只考虑单个股票 ACME 的传入数据。其中的行是连续追加的。查询数据如下
symbol                          price                            tax                        rowtime
ACME                             1212021-09-0109:00:00.000ACME                             1722021-09-0109:00:01.000ACME                             1912021-09-0109:00:02.000ACME                             2132021-09-0109:00:03.000ACME                             2522021-09-0109:00:04.000ACME                             1812021-09-0109:00:05.000ACME                             1512021-09-0109:00:06.000ACME                             1422021-09-0109:00:07.000ACME                             2422021-09-0109:00:08.000ACME                             2522021-09-0109:00:09.000ACME                             1912021-09-0109:00:10.000
  1. 现在的任务是找出一个单一股票价格不断下降的时期
SELECT*FROM Ticket
    MATCH_RECOGNIZE (--只能用于追加表        PARTITION BY symbol --按symbol分组,相同数据会在一个节点进行计算ORDERBY rowtime  --同一组下按事件时间进行排序        MEASURES  --定义输出            START_ROW.rowtimeAS start_tstamp,            LAST(PRICE_DOWN.rowtime)AS bottom_tstamp,            LAST(PRICE_UP.rowtime)AS end_tstamp
        ONE ROW PER MATCH --匹配成功输出一条        AFTER MATCH SKIP TO LAST PRICE_UP --从匹配成功的事件序列中最后一个对应价格上升的事件开始匹配下一次        PATTERN (START_ROW PRICE_DOWN+ PRICE_UP)--定义3个事件:开始行 价格下降 价格回升(+号代表一个或多个数据)        DEFINE  --定义事件的具体含义            PRICE_DOWN AS--上一条价格下降事件的价格为空并且下降事件的价格小于开始行的价格或者下降事件的价格小于上一条的价格(LAST(PRICE_DOWN.price,1)ISNULLAND PRICE_DOWN.price< START_ROW.price)OR                    PRICE_DOWN.price< LAST(PRICE_DOWN.price,1),            PRICE_UP AS--价格回升事件的价格大于上一条价格下降价格                PRICE_UP.price> LAST(PRICE_DOWN.price,1)) MR;--结果如下 从2021-09-01 09:00:04.000开始下降,直到2021-09-01 09:00:08.00回涨symbol      start_tstamp                  bottom_tstamp                     end_tstamp
ACME        2021-09-0109:00:04.0002021-09-0109:00:07.0002021-09-0109:00:08.000
  • 离线SQL如何去实现呢
  1. 先生成示例数据源
with tb1 as(select        symbol,        price,        tax,        rowtime
fromvalues('ACME',12,1,'2021-09-01 09:00:00'),('ACME',17,2,'2021-09-01 09:00:01'),('ACME',19,1,'2021-09-01 09:00:02'),('ACME',21,3,'2021-09-01 09:00:03'),('ACME',25,2,'2021-09-01 09:00:04'),('ACME',18,1,'2021-09-01 09:00:05'),('ACME',15,1,'2021-09-01 09:00:06'),('ACME',14,2,'2021-09-01 09:00:07'),('ACME',24,2,'2021-09-01 09:00:08'),('ACME',25,2,'2021-09-01 09:00:09'),('ACME',19,1,'2021-09-01 09:00:10')               t(symbol,price,tax,rowtime))
  1. 通过计算上一条和下一条的股票价格差判断是否连续下降
tb2 as(select        symbol,        price,        tax,        rowtime,        price-lag(price,1,price) over(partition by symbol orderby rowtime) lag_price_diff,        lead(price,1,price) over(partition by symbol orderby rowtime)-price lead_price_diff,        lead(rowtime,1,rowtime) over(partition by symbol orderby rowtime)as lead_rowtime
from tb1
)--结果展示如下symbol  price tax rowtime lag_price_diff  lead_price_diff
ACME    1212021-09-0109:00:0005ACME    1722021-09-0109:00:0152ACME    1912021-09-0109:00:0222ACME    2132021-09-0109:00:0324ACME    2522021-09-0109:00:044-7ACME    1812021-09-0109:00:05-7-3ACME    1512021-09-0109:00:06-3-1ACME    1422021-09-0109:00:07-110ACME    2422021-09-0109:00:08101ACME    2522021-09-0109:00:091-6ACME    1912021-09-0109:00:10-60
  1. 差值为负值即价格下降,根据此进行划分标签
tb3 as(select        symbol,        price,        tax,        rowtime,        lag_price_diff,        lead_price_diff,        lead_rowtime,        sum(if(lag_price_diff>0,1,0)) over(partition by symbol orderby rowtime) flag
from tb2
where lag_price_diff<0or lead_price_diff <0)--结果如下symbol  price tax rowtime lag_price_diff  lead_price_diff lead_rowtime  flag
ACME  2522021-09-0109:00:044-72021-09-0109:00:051ACME  1812021-09-0109:00:05-7-32021-09-0109:00:061ACME  1512021-09-0109:00:06-3-12021-09-0109:00:071ACME  1422021-09-0109:00:07-1102021-09-0109:00:081ACME  2522021-09-0109:00:091-62021-09-0109:00:102ACME  1912021-09-0109:00:10-602021-09-0109:00:102
  1. 标签flag为2的不符合连续下降,只有一条差值位置,不是连续下降,需要进行过滤
tb4 as(select        symbol,        price,        tax,        rowtime,        lead_rowtime,        flag,        sum(if(lag_price_diff<0,1,0)) over(partition by symbol,flag) ct
from tb3
)--结果如下symbol  price tax rowtime lead_rowtime  flag  ct
ACME  2522021-09-0109:00:042021-09-0109:00:0513ACME  1812021-09-0109:00:052021-09-0109:00:0613ACME  1512021-09-0109:00:062021-09-0109:00:0713ACME  1422021-09-0109:00:072021-09-0109:00:0813ACME  2522021-09-0109:00:092021-09-0109:00:1021ACME  1912021-09-0109:00:102021-09-0109:00:1021
  1. 根据真实情况即连续的定义对数据进行过滤,统计结果
select    symbol,--flag,    min(rowtime) start_tstamp,    max(rowtime) bottom_tstamp,    max(lead_rowtime) end_tstamp
from tb4
where ct >1groupby symbol,flag;--结果如下symbol  start_tstamp        bottom_tstamp       end_tstamp
ACME    2021-09-0109:00:042021-09-0109:00:072021-09-0109:00:08
  • 下面我们在看一个电商中的场景,用户浏览商品后会进行下单,下单后有可能会进行支付,我们需要分析某日某商品进行浏览、收藏、下单、支付的用户
  1. 先生成简单的示例数据
with tb1 as(select        user_id,        shop_id,        user_behav,        op_time,        substr(op_time,1,10) dt
fromvalues('1001','A1','浏览','2021-09-01 17:03:01'),('1001','A1','收藏','2021-09-01 17:04:12'),('1001','A2','浏览','2021-09-01 17:02:02'),('1001','A2','收藏','2021-09-01 17:03:42'),('1001','A2','下单','2021-09-01 17:06:25'),('1002','A1','浏览','2021-09-01 17:00:32'),('1002','A1','收藏','2021-09-01 17:03:12'),('1002','A1','浏览','2021-09-01 17:03:45'),('1002','A1','下单','2021-09-01 17:05:41'),('1002','A1','支付','2021-09-01 17:06:26'),('1003','A1','浏览','2021-09-01 17:08:13'),('1003','A1','浏览','2021-09-01 17:09:14')               t(user_id,shop_id,user_behav,op_time))
  1. 我们只看A1店铺的数据,使用collect_list或者wm_concat(Maxcomputer内置函数,Hive中是concat_wm)进行汇总用户的行为
tb2 as(select        dt,        user_id,-- collect_list(user_behav)        wm_concat(",",user_behav) behavs
from tb1
where shop_id ='A1'groupby dt,user_id
)--展示结果如下dt  user_id behavs
2021-09-011001  浏览,收藏
2021-09-011002  浏览,收藏,浏览,下单,支付
2021-09-011003  浏览,浏览
  1. 匹配规则使用like,这里需要下单之后的行为为支付
select    dt,    user_id,    behavs
from tb2
where behavs like'%浏览%收藏%下单_支付%';--结果如下dt  user_id behavs
2021-09-011002  浏览,收藏,浏览,下单,支付

使用离线SQL分析分析匹配,主要是按维度把所有行为路径进行汇总拼接,然后使用字符匹配或者复杂的使用正则匹配,实际业务分析过程中,如有类似需求,可以参考上述方式。如有更好的方式,欢迎探讨。

拜了个拜

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
20天前
|
Java 前端开发 容器
Struts 2:在技术变革的风暴中航行,探索框架的革命性未来与创新融合之路
【8月更文挑战第31天】Struts 2作为一款成熟的企业级Java Web框架,凭借其稳定性和灵活性自2007年以来广受欢迎。它基于MVC设计模式,支持插件架构和RESTful服务,并能与Spring框架良好集成。然而,面对微服务架构和容器化技术(如Docker和Kubernetes)的兴起,Struts 2需提供更轻量级和支持指南来适应变化。通过深化与现代前端框架(如React和Vue.js)及AI技术的集成,并强化安全性与开发工具,Struts 2有望保持竞争力并迎接未来挑战。
30 0
|
21天前
|
SQL 安全 数据库
|
21天前
|
SQL 测试技术 数据处理
|
21天前
|
SQL 数据采集 数据挖掘
深入理解SQL中的DISTINCT语句及其应用
【8月更文挑战第31天】
34 0
|
21天前
|
SQL 数据处理 数据库
SQL正则表达式应用:文本数据处理的强大工具——深入探讨数据验证、模式搜索、字符替换等核心功能及性能优化和兼容性问题
【8月更文挑战第31天】SQL正则表达式是数据库管理和应用开发中处理文本数据的强大工具,支持数据验证、模式搜索和字符替换等功能。本文通过问答形式介绍了其基本概念、使用方法及注意事项,帮助读者掌握这一重要技能,提升文本数据处理效率。尽管功能强大,但在不同数据库系统中可能存在兼容性问题,需谨慎使用以优化性能。
31 0
|
21天前
|
SQL 数据管理 关系型数据库
SQL与云计算:利用云数据库服务实现高效数据管理——探索云端SQL应用、性能优化、安全性与成本效益,为企业数字化转型提供全方位支持
【8月更文挑战第31天】在数字化转型中,企业对高效数据管理的需求日益增长。传统本地数据库存在局限,而云数据库服务凭借自动扩展、高可用性和按需付费等优势,成为现代数据管理的新选择。本文探讨如何利用SQL和云数据库服务(如Amazon RDS、Google Cloud SQL和Azure SQL Database)实现高效的数据管理。通过示例和最佳实践,展示SQL在云端的应用、性能优化、安全性及成本效益,助力企业提升竞争力。
38 0
|
21天前
|
SQL 监控 关系型数据库
"SQL性能瓶颈大揭秘:一步步教你揪出慢查询元凶,从根源解决数据库拖沓问题,让应用速度飞起来!"
【8月更文挑战第31天】作为一名数据库管理员或开发者,面对复杂系统时,运行缓慢的SQL查询常常令人头疼。本文将指导你如何诊断并解决这些问题。首先,通过性能监控工具识别出问题查询;其次,利用`EXPLAIN`分析其执行计划,了解索引使用情况;接着,优化查询语句,如使用合适索引、减少JOIN操作等;再者,优化数据库设计,采用分区表或调整硬件资源;最后,持续监控性能并调优。通过这些步骤,可有效提升数据库的整体性能。
24 0
|
21天前
|
SQL 存储 NoSQL
从SQL到NoSQL:理解不同数据库类型的选择与应用——深入比较数据模型、扩展性、查询语言、一致性和适用场景,为数据存储提供全面决策指南
【8月更文挑战第31天】在信息技术飞速发展的今天,数据库的选择至关重要。传统的SQL数据库因其稳定的事务性和强大的查询能力被广泛应用,而NoSQL数据库则凭借其灵活性和水平扩展性受到关注。本文对比了两种数据库类型的特点,帮助开发者根据应用场景做出合理选择。SQL数据库遵循关系模型,适合处理结构化数据和复杂查询;NoSQL数据库支持多种数据模型,适用于非结构化或半结构化数据。SQL数据库在一致性方面表现优异,但扩展性较差;NoSQL数据库则设计之初便考虑了水平扩展性。SQL使用成熟的SQL语言,NoSQL的查询语言更为灵活。
29 0
|
2月前
|
SQL 弹性计算 资源调度
云服务器 ECS产品使用问题之bin/spark-sql --master yarn如何进行集群模式运行
云服务器ECS(Elastic Compute Service)是各大云服务商阿里云提供的一种基础云计算服务,它允许用户租用云端计算资源来部署和运行各种应用程序。以下是一个关于如何使用ECS产品的综合指南。
|
28天前
|
SQL 存储 NoSQL
数据模型与应用场景对比:SQL vs NoSQL
【8月更文第24天】随着大数据时代的到来,数据存储技术也在不断演进和发展。传统的SQL(Structured Query Language)数据库和新兴的NoSQL(Not Only SQL)数据库各有优势,在不同的应用场景中发挥着重要作用。本文将从数据模型的角度出发,对比分析SQL和NoSQL数据库的特点,并通过具体的代码示例来说明它们各自适用的场景。
40 0