【最佳实践】实时计算Flink在广告行业的实时数仓建设实践

简介: 通过每个广告位上不同广告的投放地区、广告ID、设备唯一编码等信息,可以统计点击次数、投放次数等指标,可用于制定更高效的广告投放策略,降低投放成本,提高广告收益。

行业背景

  • 行业现状: 

    • 广告仍然是互联网公司的主要变现手段,2019年,中国广告市场总体规模达到8674.28亿元,较2018年增长了8.54%,据统计全球互联网市值前十的公司广告收入占比高达40%,可见其重要性。AI、大数据、智能投放等创新技术的普及应用,不仅创生了一批独角兽营销平台,而且大幅拉低了广告投放门槛,拓宽了广告市场空间。
  • 大数据在其行业中的作用:

    • 大数据技术的应用在改变我们生活及工作的同时,为我们寻找数据背后的客观规律提供了一种有效途径。对潜在消费群体进行深入分析,并进行定制营销基础上的现代广告营销,对数据的规模及精准度有着极高的要求,而大数据的出现无疑为其落地提供了强有力的支撑。

业务场景

类似媒体,新闻类等APP,上面有各种广告位提供给广告主。广告主投放广告,用户点击广告将实时的产生操作日志数据,对这些日志数据进行实时分析,通过每个广告位上不同广告的投放地区、广告ID、设备唯一编码等信息,可以统计点击次数、投放次数等指标,可用于制定更高效的广告投放策略,降低投放成本,提高广告收益。

技术架构

image.png


架构解析:
数据采集:该场景中,APP、Web、Server等服务上会产生大量的广告投放、用户广告点击等操作日志数据,这些日志数据被实时采集至日志服务系统(SLS),作为Flink的数据源。
实时数仓架构:该场景中,整个实时数仓构建,全部通过 Flink完成。Flink读取SLS中的原始日志数据,经过数据清洗、数据处理等操作写出到DataHub,Flink进一步读取DataHub的数据进行实时统计分析,最终输出对应的指标结果到RDS,供业务系统使用。

业务指标

  • 实时数据中间层,对原始日志进行实时数据清洗

    • 获取投放主题及维度打宽
    • 获取点击主题及维度打宽
  • 统计投放指标

    • 某个广告在某个省的当天投放量
    • 某个广告在某个市的当天投放量
    • 某个广告在某个投放终端的当天投放量
  • 统计点击指标

    • 某个广告在某个省的当天点击量
    • 某个广告在某个市的当天点击量
    • 某个广告在某个投放终端的当天点击量
  • 热门广告排行榜    

业务代码

场景一:对原始日志进行实时数据清洗

投放主题

根据业务主题分成投放主题和点击主题,当release_status=1时为投放主题。

输入表

create table ods_release(
  `sid` varchar,           --投放请求ID
  exts varchar,                       --扩展信息
  device_type varchar,     --1 android| 2 ios | 9 其他
  release_status varchar,  --投放状态 1 or 2
  device_num varchar,      --设备唯一编码
  release_session varchar, --投放会话ID
  `date` date              --创建时间
) with (
  type ='sls',
...
);

输出表

create table dw_release_exposure(
  release_session varchar, -- comment '投放会话id'
  release_status varchar,  -- comment '投放状态'
  device_num varchar,      -- comment '设备唯一编码'
  device_type varchar,     -- comment '1 android| 2 ios | 9 其他'
  area_code varchar,       -- comment '地区'
  aid varchar,             -- comment '广告id'
  ct date                  -- comment '创建时间'
)with(
type='datahub',
...
);

业务代码

insert into dw_release_exposure
select
  release_session,
  release_status,
  device_num,
  device_type,
  json_value(exts,'$.area_code'),
  json_value(exts,'$.aid'),
  `date` as ct
from
ods_release
where release_status='1'
;

投放主题关联维度表

投放主题与地区维度表、设备维度表进行聚合,得出宽表

输入表

create table dw_release_exposure(
  release_session varchar, -- comment '投放会话id'
  release_status varchar,  -- comment '投放状态'
  device_num varchar,      -- comment '设备唯一编码'
  device_type varchar,     -- comment '1 android| 2 ios | 9 其他'
  area_code varchar,       -- comment '地区'
  aid varchar,             -- comment '广告id'
  ct date                  -- comment '创建时间'
)with(
type='datahub',
...
);

--dim维度表
--(地区,省市,唯一地区编码,编码和city_id是一一对应的)
create table dim_province(
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  region_id bigint,
  region_name varchar,
 PRIMARY KEY (area_code),
 PERIOD FOR SYSTEM_TIME--定义维表的变化周期。
 )with(
    type= 'rds',
...
);

--(用户设备维度表)
create table dim_device(
  device_type varchar comment '1 android| 2 ios | 9 其他',
  device_name varchar comment '设备名字',
 PRIMARY KEY (device_type),
 PERIOD FOR SYSTEM_TIME--定义维表的变化周期。
)with(
type= 'rds',
...
);

输出表

create table dm_release_exposure(
  aid varchar,
  aid_count bigint,
  device_name varchar,
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  ct date
)with(
type='datahub',
...
);

业务代码

insert into dm_release_exposure
select
  a.aid,
  count(a.aid) aid_count,
  c.device_name,
  a.area_code,
  b.province_id,
  b.province_name,
  b.city_id,
  b.city_name,
  a.ct
from
dw_release_exposure a
join
dim_province  FOR SYSTEM_TIME AS OF PROCTIME() as b on a.area_code=b.area_code
join
dim_device  FOR SYSTEM_TIME AS OF PROCTIME() as c on a.device_type=c.device_type
group by
a.aid,
a.area_code,
a.ct
;

点击主题

根据业务主题分成投放主题和点击主题,当release_status=2时为点击主题。

输入表

create table ods_release(
  `sid` varchar,           --投放请求ID
  exts varchar,                       --扩展信息
  device_type varchar,     --1 android| 2 ios | 9 其他
  release_status varchar,  --投放状态 1 or 2
  device_num varchar,      --设备唯一编码
  release_session varchar, --投放会话ID
  `date` date              --创建时间
  ) with (
  type ='sls',
...
);

输出表

create table dw_release_click(
  release_session varchar,  -- comment '投放会话id'
  release_status varchar,   -- comment '投放状态'
  device_num varchar,       -- comment '设备唯一编码' 
  device_type varchar,      -- comment '1 android| 2 ios | 9 其他'
  `user_id` varchar,          -- comment '用户id'
  area_code varchar,        -- comment '地区'
  aid varchar,              -- comment '广告id'
  ct date                   -- comment '创建时间'
)with(
type='datahub',
...
);

业务代码

insert into dw_release_click
select
  release_session,
  release_status,
  device_num,
  device_type,
  json_value(exts,'$.user_id') as `user_id`,
  json_value(exts,'$.area_code') as area_code,
  json_value(exts,'$.aid') as aid,
  `date` as ct
from
ods_release
where release_status='2'
;

点击主题关联维度表

点击主题与地区维度表进行聚合,得出宽表

输入表

create table dw_release_click(
  release_session varchar,  -- comment '投放会话id'
  release_status varchar,   -- comment '投放状态'
  device_num varchar,       -- comment '设备唯一编码' 
  device_type varchar,      -- comment '1 android| 2 ios | 9 其他'
  area_code varchar,        -- comment '地区'
  aid varchar,              -- comment '广告id'
  user_id varchar,          -- comment '用户id'
  ct date                   -- comment '创建时间'
)with(
type='datahub',
...
);

--dim维度表
--(地区,省市,唯一地区编码,编码和city_id是一一对应的)
create table dim_province(
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  region_id bigint,
  region_name varchar,
 PRIMARY KEY (area_code),
 PERIOD FOR SYSTEM_TIME--定义维表的变化周期。
 )with(
    type= 'rds',
...
);

--(用户设备维度表)
create table dim_device(
device_type varchar comment '1 android| 2 ios | 9 其他',
device_name varchar comment '设备名字',
 PRIMARY KEY (device_type),
 PERIOD FOR SYSTEM_TIME--定义维表的变化周期。
)with(
type= 'rds',
...
);

输出表

create table dm_release_click(
  aid varchar,
  aid_count bigint,
  device_name varchar,
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  ct date
)with(
type='datahub',
...
);

业务代码

insert into dm_release_click
select
  a.aid,
  count(a.aid) aid_count,
  c.device_name,
  a.area_code,
  b.province_id,
  b.province_name,
  b.city_id,
  b.city_name,
  a.ct
from
dw_release_click a
join
dim_province  FOR SYSTEM_TIME AS OF PROCTIME() as b
on a.area_code=b.area_code
join
dim_device  FOR SYSTEM_TIME AS OF PROCTIME() as c on
a.device_type=c.device_type
group by
a.aid,
a.area_code,
a.ct
;

场景二:统计投放指标

某个广告在某个省的当天投放量

以aid和province_name分组,统计某个广告在某个省的当天投放量

输入表

create table dm_release_exposure(
  aid varchar,
  aid_count bigint,
  device_name varchar,
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  ct date
)with(
type='datahub',
...
);

输出表

--某个广告在某个省的当天投放量
CREATE TABLE ads_release_exposure_pro (
    aid                       VARCHAR,
    aid_count                 BIGINT,
    province_name             VARCHAR,
  ct                        DATE,
    primary key(aid,province_name,ct)
) WITH (
    type= 'rds',
...
);

业务代码

insert into ads_release_exposure_pro
select 
  aid,
  sum(aid_count) as aid_count,
  province_name,
  ct
from
dm_release_exposure
group by
aid,
province_name,
ct
;

某个广告在某个市的当天投放量

以aid和city_name分组,统计某个广告在某个市的当天投放量

输入表

create table dm_release_exposure(
  aid varchar,
  aid_count bigint,
  device_name varchar,
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  ct date
)with(
type='datahub',
...
);

输出表

CREATE TABLE ads_release_exposure_city (
    aid                   VARCHAR,
    aid_count             BIGINT,
    city_name             VARCHAR,
  ct                    DATE,
    primary key(aid,city_name,ct)
) WITH (
    type= 'rds',
...
);

业务代码

insert into ads_release_exposure_city
select 
  aid,
  sum(aid_count) as aid_count,
  city_name,
  ct
from
dm_release_exposure
group by
aid,
city_name,
ct
;

某个广告在某个投放终端的当天投放量

以aid和device_name分组,统计某个广告在某个用户客户端上的当天投放量

输入表

create table dm_release_exposure(
  aid varchar,
  aid_count bigint,
  device_name varchar,
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  ct date
)with(
type='datahub',
...
);

输出表

CREATE TABLE ads_release_exposure_device (
    aid                     VARCHAR,
    aid_count               BIGINT,
    device_name             VARCHAR,
  ct                      DATE,
    primary key(aid,device_name,ct)
) WITH (
    type= 'rds',
...
);

业务代码

insert into ads_release_exposure_device
select
  aid,
  sum(aid_count),
  device_name,
  ct
from
dm_release_exposure
group by 
aid,
device_name,
ct
;

场景三:统计点击指标

某个广告在某个省的当天点击量

以ct和aid、provice_name分组,统计某个广告在某个省的当天点击量

输入表

create table dm_release_click(
  aid varchar,
  aid_count bigint,
  device_name varchar,
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  ct date
)with(
type='datahub',
...
);

输出表

CREATE TABLE ads_release_click_pro (
  aid                  VARCHAR,
  aid_count            BIGINT,
  province_name        VARCHAR,
  ct                   DATE,
  primary key(aid,province_name,ct)
) WITH (
  type= 'rds',
...
);

业务代码

insert into ads_release_click_pro
select
  aid,
  count(aid) as aid_count,
  province_name,
  ct
from
dm_release_click
group by
aid,
province_name,
ct
;

某个广告在某个市的当天点击量

以ct和aid、city_name分组,统计某个广告在某个市的当天点击量

输入表

create table dm_release_click(
aid varchar,
aid_count bigint,
device_name varchar,
area_code varchar,
province_id bigint,
province_name varchar,
city_id bigint,
city_name varchar,
ct date
)with(
type='datahub',
...
);

输出表

CREATE TABLE ads_release_click_city (
  aid                  VARCHAR,
  aid_count            BIGINT,
  city_name            VARCHAR,
  ct                   DATE,
  primary key(aid,city_name,ct)
) WITH (
  type= 'rds',
...
);

业务代码

insert into ads_release_click_city
select
aid,
count(aid) as aid_count,
city_name,
ct
from
dm_release_click
group by
aid,
city_name,
ct
;

某个广告在某个投放终端的当天投放量

以aid和device_name分组,统计某个广告在某个用户客户端上的当天投放量

输入表

create table dm_release_click(
  aid varchar,
  aid_count bigint,
  device_name varchar,
  area_code varchar,
  province_id bigint,
  province_name varchar,
  city_id bigint,
  city_name varchar,
  ct date
)with(
type='datahub',
...
);

输出表

CREATE TABLE ads_release_click_device (
  aid                     VARCHAR,
  aid_count               BIGINT,
  device_name             VARCHAR,
  ct                      DATE,
    primary key(aid,device_name,ct)
) WITH (
  type= 'rds',
...
);

业务代码

insert into ads_release_click_device
select
  aid,
  sum(aid_count),
  device_name,
  ct
from
dm_release_exposure
group by
aid,
device_name,
ct
;


场景四:热门广告排行榜

以ct和aid分组,计算当天每个广告的总点击量,对广告ID进行topn排序,得到点击次数最多的三个广告作为最热门广告。根据按天维度的时间字段(ct)和广告ID(aid)分组,计算每天每个广告的总点击量,根据广告ID对点击量进行topn排序,统计得到每天点击次数最多的三个广告,用于数据大屏中的热门广告排行榜。

输入表

create table dm_release_click(
aid varchar,
aid_count bigint,
area_code varchar,
province_id bigint,
province_name varchar,
city_id bigint,
city_name varchar,
ct date
)with(
type='datahub',
...
);

输出表

CREATE TABLE ads_release_click_dtclick (
  Ranking              BIGINT,
    aid                  VARCHAR,
    ct                   DATE,
  aid_count            BIGINT,
  primary key(aid,ct)
) WITH (
    type= 'rds',
...
);

业务代码

INSERT INTO ads_release_click_dtclick
SELECT 
Ranking,
aid,
ct,
aid_count
FROM (
  SELECT *,
     ROW_NUMBER() OVER (PARTITION BY `ct` ORDER BY aid_count desc) AS Ranking
  FROM (
        SELECT 
       `ct` AS `ct`,
        COUNT(aid) AS aid_count,
        aid
        FROM  dm_release_click
        GROUP BY `ct`,aid
    )a
) 
WHERE Ranking <= 3 

实时计算 Flink 版产品交流群

test

阿里云实时计算Flink - 解决方案:
https://developer.aliyun.com/article/765097
阿里云实时计算Flink - 场景案例:
https://ververica.cn/corporate-practice
阿里云实时计算Flink - 产品详情页:
https://www.aliyun.com/product/bigdata/product/sc

相关文章
|
11月前
|
存储 消息中间件 OLAP
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
本文整理自淘天集团高级数据开发工程师朱奥在Flink Forward Asia 2024的分享,围绕实时数仓优化展开。内容涵盖项目背景、核心策略、解决方案、项目价值及未来计划五部分。通过引入Paimon和Hologres技术,解决当前流批存储不统一、实时数据可见性差等痛点,实现流批一体存储与高效近实时数据加工。项目显著提升了数据时效性和开发运维效率,降低了使用门槛与成本,并规划未来在集团内推广湖仓一体架构,探索更多技术创新场景。
1800 3
基于 Flink+Paimon+Hologres 搭建淘天集团湖仓一体数据链路
|
9月前
|
SQL 分布式计算 DataWorks
破界·融合·进化:解码DataWorks与Hologres的湖仓一体实践
基于阿里云DataWorks与实时数仓Hologres,提供统一的大数据开发治理平台与全链路实时分析能力。DataWorks支持多行业数据集成与管理,Hologres实现海量数据的实时写入与高性能查询分析,二者深度融合,助力企业构建高效、实时的数据驱动决策体系,加速数字化升级。
|
12月前
|
消息中间件 存储 监控
Lalamove基于Flink实时湖仓演进之路
本文由货拉拉国际化技术部资深数据仓库工程师林海亮撰写,围绕Flink在实时数仓中的应用展开。文章首先介绍了Lalamove业务背景,随后分析了Flink在实时看板、数据服务API、数据监控及数据分析中的应用与挑战,如多数据中心、时区差异、上游改造频繁及高成本问题。接着阐述了实时数仓架构从无分层到引入Paimon湖仓的演进过程,解决了数据延迟、兼容性及资源消耗等问题。最后展望未来,提出基于Fluss+Paimon优化架构的方向,进一步提升性能与降低成本。
421 11
Lalamove基于Flink实时湖仓演进之路
|
12月前
|
存储 监控 数据挖掘
京东物流基于Flink & StarRocks的湖仓建设实践
本文整理自京东物流高级数据开发工程师梁宝彬在Flink Forward Asia 2024的分享,聚焦实时湖仓的探索与建设、应用实践、问题思考及未来展望。内容涵盖京东物流通过Flink和Paimon等技术构建实时湖仓体系的过程,解决复杂业务场景下的数据分析挑战,如多维OLAP分析、大屏监控等。同时,文章详细介绍了基于StarRocks的湖仓一体方案,优化存储成本并提升查询效率,以及存算分离的应用实践。最后,对未来数据服务的发展方向进行了展望,计划推广长周期数据存储服务和原生数据湖建设,进一步提升数据分析能力。
1090 1
京东物流基于Flink & StarRocks的湖仓建设实践
|
12月前
|
存储 SQL 运维
中国联通网络资源湖仓一体应用实践
本文分享了中国联通技术专家李晓昱在Flink Forward Asia 2024上的演讲,介绍如何借助Flink+Paimon湖仓一体架构解决传统数仓处理百亿级数据的瓶颈。内容涵盖网络资源中心概况、现有挑战、新架构设计及实施效果。新方案实现了数据一致性100%,同步延迟从3小时降至3分钟,存储成本降低50%,为通信行业提供了高效的数据管理范例。未来将深化流式数仓与智能运维融合,推动数字化升级。
587 0
中国联通网络资源湖仓一体应用实践
|
7月前
|
存储 JSON 数据处理
Flink基于Paimon的实时湖仓解决方案的演进
本文源自Apache CommunityOverCode Asia 2025,阿里云专家苏轩楠分享Flink与Paimon构建实时湖仓的演进实践。深度解析Variant数据类型、Lookup Join优化等关键技术,提升半结构化数据处理效率与系统可扩展性,推动实时湖仓在生产环境的高效落地。
837 1
Flink基于Paimon的实时湖仓解决方案的演进
|
12月前
|
存储 消息中间件 分布式计算
Hologres实时数仓在B站游戏的建设与实践
本文介绍了B站游戏业务中实时数据仓库的构建与优化过程。为满足日益增长的数据实时性需求,采用了Hologres作为核心组件优化传统Lambda架构,实现了存储层面的流批一体化及离线-实时数据的无缝衔接。文章详细描述了架构选型、分层设计(ODS、DWD、DIM、ADS)及关键技术挑战的解决方法,如高QPS点查、数据乱序重写等。目前,该实时数仓已广泛应用于运营分析、广告投放等多个场景,并计划进一步完善实时指标体系、扩展明细层应用及研发数据实时解析能力。
Hologres实时数仓在B站游戏的建设与实践
|
存储 分布式计算 MaxCompute
Hologres实时湖仓能力入门实践
本文由武润雪(栩染)撰写,介绍Hologres 3.0版本作为一体化实时湖仓平台的升级特性。其核心能力包括湖仓存储一体、多模式计算一体、分析服务一体及Data+AI一体,极大提升数据开发效率。文章详细解析了两种湖仓架构:MaxCompute + Hologres实现离线实时一体化,以及Hologres + DLF + OSS构建开放湖仓架构,并深入探讨元数据抽象、权限互通等重点功能,同时提供具体使用说明与Demo演示。
|
SQL 弹性计算 运维
Hologres计算组实例&分时弹性入门实践
本文由骆撷冬(Hologres PD)撰写,围绕Hologres计算组实例与分时弹性的入门实践展开。内容分为三部分:第一部分介绍Hologres计算组实例的原理与架构,解决负载隔离、资源浪费、大任务和运维难题;第二部分演示计算组实例的入门实践,包括管理、授权、连接及监控等操作;第三部分讲解分时弹性的使用,涵盖配置方法、成本优化及监控告警。通过具体案例与操作步骤,帮助用户更好地理解和应用Hologres的弹性计算能力。
|
8月前
|
SQL 存储 运维
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践
本文介绍了 Apache Doris 在菜鸟的大规模落地的实践经验,菜鸟为什么选择 Doris,以及 Doris 如何在菜鸟从 0 开始,一步步的验证、落地,到如今上万核的规模,服务于各个业务线,Doris 已然成为菜鸟 OLAP 数据分析的最优选型。
457 2
Apache Doris 在菜鸟的大规模湖仓业务场景落地实践

相关产品

  • 实时计算 Flink版