Flink SQL 功能解密系列 —— 流计算“撤回(Retraction)”案例分析

简介: 通俗讲retract就是传统数据里面的更新操作,也就是说retract是流式计算场景下对数据更新的处理方式。

什么是retraction(撤回)

通俗讲retract就是传统数据里面的更新操作,也就是说retract是流式计算场景下对数据更新的处理
方式。
首先来看下流场景下的一个词频统计列子。

image.png

没有retract会导致最终结果不正确↑:

image.png
retract发挥的作用

下面再分享两个双十一期间retract保证数据正确性的业务case:

case1: 菜鸟物流订单统计

同一个订单的商品在运输过程中,因为各种原因,物流公司是有可能从A变成B的。为了统计物流公司承担的订单数目,菜鸟团队使用blink计算的retraction机制进行变key汇总操作。

-- TT source_table 数据如下:
order_id      tms_company
0001           中通
0002           中通
0003           圆通

-- SQL代码
create view dwd_table as 
select
    order_id,
    StringLast(tms_company)
from source_table
group by order_id;

create view dws_table as 
select 
    tms_company,
    count(distinct order_id) as order_cnt
from dwd_table 
group by tms_company


此时结果为:
tms_company  order_cnt
中通          2
圆通          1

-----------------------
之后又来了一条新数据 0001的订单 配送公司改成 圆通了。这时,第一层group by的会先向下游发送一条 (0001,中通)的撤回消息,第二层group by节点收到撤回消息后,会将这个节点 中通对应的 value减少1,并更新到结果表中;然后第一层的分桶统计逻辑向下游正常发送(0001,圆通)的正向消息,更新了圆通物流对应的订单数目,达到了最初的汇总目的。

order_id      tms_company
0001           中通
0002           中通
0003           圆通
0001           圆通

写入ADS结果会是(满足需求)
tms_company  order_cnt
中通          1
圆通          2

case2: 天猫双十一购物车加购统计:

双11爆款清单与知名综艺IP“火星情报局”跨界合作,汪涵、撒贝宁、陶晶莹等大咖主持加盟,杭州、长沙两地联播,成功打造为“双11子IP”与“双11购物风向标”,树立电商内容综艺化、娱乐化创新典范,为长线模式探索打下基础。

首次深度联动线下场景,在银泰门店落地爆款清单超级大屏,商场人流截停率28%,用户互动时间占营业时间的40%。

选品模式创新,打造最全维度爆款清单:TOP2000性价比爆款+TOP100小黑盒推荐(新品清单)+TOP200买手天团推荐(人群/场景/地域 清单)

核心业务指标

  • 加购金额
  • 加购件数
  • 加购UV

业务计算逻辑

  • 来自TT的数据要进行去重;
  • 以投放场景和购物车维度进行分组,获取每个分组的最后一条(最新)数据;
  • 以投放场景和小时为维度进行分组,统计 加购金额,加购件数和加购UV 业务指标;

业务BlinkSQL代码

--Blink SQL
--********************************************************************--
--Comment: 天猫双11官方爆款清单统计计算
--********************************************************************--
CREATE TABLE dwd_mkt_membercart_ri(
    cart_id      BIGINT, -- '购物车id',
    sku_id       BIGINT, -- '存放商品的skuId,无sku时,为0',
    item_id      BIGINT, -- '外部id:商品id或者skuid',
    quantity     BIGINT, -- '购买数量',
    user_id      BIGINT, -- '用户id',
    status       BIGINT, -- '状态1:正常-1:删除',
    gmt_create   VARCHAR, -- '属性创建时间',
    gmt_modified VARCHAR, -- '属性修改时间',
    biz_id VARCHAR, -- 投放场景,
    start_time VARCHAR, -- 投放开始时间
    end_time VARCHAR, -- 投放结束时间
    activity_price_time VARCHAR, -- 活动开始时间
    price VARCHAR, -- 商品价格
    dbsync_operation BIGINT -- 时间自动用于排序
) 
WITH 
(
    type='tt'
    -- 其他信息省略
);

--groub by 方式重复,防止TT重发
CREATE VIEW distinct_dwd_mkt_membercart_ri AS 
SELECT
    cart_id,
    sku_id,
    item_id,
    quantity,
    user_id,
    status,
    gmt_create,
    gmt_modified,
    biz_id, 
    start_time, 
    end_time, 
    activity_price_time,
    price, 
    dbsync_operation
FROM
    dwd_mkt_membercart_ri
GROUP BY    
    cart_id,
    sku_id,
    item_id,
    quantity,
    user_id,
    status,
    gmt_create,
    gmt_modified,
    biz_id, 
    start_time, 
    end_time, 
    activity_price_time,
    price, 
    dbsync_operation;

-- 每个投放和购物车数据的最后一条
CREATE VIEW tmp_dwd_mkt_membercart_ri AS 
SELECT 
    biz_id as biz_id,
    LAST_VALUE(user_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as user_id,
    LAST_VALUE(item_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as item_id, 
    LAST_VALUE(sku_id,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as sku_id,
    LAST_VALUE(start_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as start_time,
    LAST_VALUE(end_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as end_time,
    LAST_VALUE(activity_price_time,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as activity_price_time,
    LAST_VALUE(price,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as price,
    LAST_VALUE(quantity,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as quantity,
    LAST_VALUE(status,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as status,
    LAST_VALUE(gmt_modified,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as gmt_modified,
    LAST_VALUE(gmt_create,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as gmt_create,
    LAST_VALUE(dbsync_operation,UNIX_TIMESTAMP(gmt_modified)*10+dbsync_operation) as dbsync_operation
FROM distinct_dwd_mkt_membercart_ri
WHERE DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMdd')=DATE_FORMAT(gmt_modified , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMdd')
GROUP BY 
cart_id,biz_id;

--存储小时维度的计算结果
CREATE TABLE result_mkt_membercart_ri_eh(
    id VARCHAR, 
    data_time VARCHAR,  
    all_preheating_cart_cnt BIGINT, -- 预热期间的 加购件数
    all_preheating_cart_alipay BIGINT,-- 预热期间的 加购金额
    eh_all_preheating_cart_uv BIGINT,-- 预热期间的 加购UV
    all_cart_cnt BIGINT, -- 投放期间的 加购件数
    all_cart_alipay BIGINT, -- 投放期间的 加购金额
    eh_all_cart_uv BIGINT, -- 投放期间的 加购UV
    primary key(id,data_time)
) WITH (
    type = 'custom',
     -- 其他信息省略
    timeDiv='hour'
) ;
--统计小时维度的 xx xx xx 业务指标
INSERT INTO result_mkt_membercart_ri_eh 
SELECT 
    biz_id,
    DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMddHH') data_time, 
    `sum`(case when gmt_modified<=COALESCE(activity_price_time,end_time) then quantity else 0 end) as all_preheating_cart_cnt,
    `sum`(case when gmt_modified<=COALESCE(activity_price_time,end_time) then quantity*CAST(price AS BIGINT) else 0 end) as all_preheating_cart_alipay,
    `sum`((case when gmt_modified<=COALESCE(activity_price_time,end_time) then user_id end)) eh_all_preheating_cart_uv,
    `sum`(quantity) as all_cart_cnt,
    `sum`(quantity*CAST(price AS BIGINT)) as all_cart_alipay,
    `count`(distinct user_id) eh_all_cart_uv
FROM tmp_dwd_mkt_membercart_ri 
WHERE 
   status>0 
GROUP BY  biz_id ,DATE_FORMAT(gmt_create , 'yyyy-MM-dd HH:mm:ss' , 'yyyyMMddHH') ;

上面case2天猫业务场景里面的加购金额统计来说,当每个投放场景的购物车的数据发生变化时候,就意味着上面【CREATE VIEW tmp_dwd_mkt_membercart_ri 】中的LAST_VALUE发生变化,最外层的sum统计【INSERT INTO result_mkt_membercart_ri_eh 】就要将前一条的LAST_VALUE【VALUE-1】撤回,用update的新LAST_VALUE【VALUE-2】进行求和统计,这样blink就需要有一种机制将VALUE-1进行撤回,利用【VALUE-2】进行计算,这种机制我们称为retract。

retract 实现原理参考

https://docs.google.com/document/d/18XlGPcfsGbnPSApRipJDLPg5IFNGTQjnz7emkVpZlkw/edit#heading=h.cjkoun4w44l4

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
3月前
|
SQL 数据可视化 关系型数据库
MCP与PolarDB集成技术分析:降低SQL门槛与简化数据可视化流程的机制解析
阿里云PolarDB与MCP协议融合,打造“自然语言即分析”的新范式。通过云原生数据库与标准化AI接口协同,实现零代码、分钟级从数据到可视化洞察,打破技术壁垒,提升分析效率99%,推动企业数据能力普惠化。
301 3
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
831 43
|
4月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
304 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
5月前
|
SQL 消息中间件 Kafka
Flink SQL 详解:流批一体处理的强大工具
Flink SQL 是 Apache Flink 提供的 SQL 引擎,支持流批一体处理,统一操作流数据与批数据,具备高性能、低延迟、丰富数据源支持及标准 SQL 兼容性,适用于实时与离线数据分析。
907 1
|
7月前
|
SQL 关系型数据库 MySQL
凌晨2点报警群炸了:一条sql 执行200秒!搞定之后,我总结了一个慢SQL查询、定位分析解决的完整套路
凌晨2点报警群炸了:一条sql 执行200秒!搞定之后,我总结了一个慢SQL查询、定位分析解决的完整套路
凌晨2点报警群炸了:一条sql 执行200秒!搞定之后,我总结了一个慢SQL查询、定位分析解决的完整套路
|
7月前
|
SQL 算法 数据挖掘
【SQL周周练】:利用行车轨迹分析犯罪分子作案地点
【SQL破案系列】第一篇: 如果监控摄像头拍下了很多车辆的行车轨迹,那么如何利用这些行车轨迹来分析车辆运行的特征,是不是能够分析出犯罪分子“踩点”的位置
239 15
|
8月前
|
SQL 关系型数据库 MySQL
【MySQL】SQL分析的几种方法
以上就是SQL分析的几种方法。需要注意的是,这些方法并不是孤立的,而是相互关联的。在实际的SQL分析中,我们通常需要结合使用这些方法,才能找出最佳的优化策略。同时,SQL分析也需要对数据库管理系统,数据,业务需求有深入的理解,这需要时间和经验的积累。
299 12
|
9月前
|
存储 Kubernetes 调度
|
关系型数据库 MySQL 网络安全
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
5-10Can't connect to MySQL server on 'sh-cynosl-grp-fcs50xoa.sql.tencentcdb.com' (110)")
|
SQL 存储 监控
SQL Server的并行实施如何优化?
【7月更文挑战第23天】SQL Server的并行实施如何优化?
555 13

热门文章

最新文章

相关产品

  • 实时计算 Flink版