阿里云RDS PG实践 - 流式标签 - 万亿级,实时任意标签圈人

本文涉及的产品
RDS SQL Server Serverless,2-4RCU 50GB 3个月
推荐场景:
云数据库 RDS SQL Server,基础系列 2核4GB
云原生数据库 PolarDB 分布式版,标准版 2核8GB
简介:

标签

PostgreSQL , 阅后即焚 , 流计算 , 标签


背景

varbitx是阿里云RDS PG提供的一个BIT操作插件,使用这个插件已经成功的帮助用户提供了万亿级的毫秒级实时圈人功能。

《阿里云RDS for PostgreSQL varbitx插件与实时画像应用场景介绍》

《基于 阿里云 RDS PostgreSQL 打造实时用户画像推荐系统(varbitx)》

结合阅后即焚的流式批量处理,schemaless UDF,可以实现高效的增、删标签,以及毫秒级别的按标签圈人。

《PostgreSQL 在铁老大订单系统中的schemaless设计和性能压测》

目标是百亿级用户体量,千万级标签。

架构图

pic

阿里云varbitx 添加标签的函数接口

1. set_bit_array

set_bit_array (  
  varbit,  
  int,   -- 目标BIT (0|1)  
  int,   -- 填充BIT (0|1)  
  int[]  -- 目标位置  
) returns varbit  
   
  将指定位置的BIT设置为0|1,(起始位=0),超出原始长度的部分填充0|1  
  例如 set_bit_array('111100001111', 0, 1, array[1,15]) 返回 1011000011111110  

正在添加这个函数,用于一次性处理添加和删除的BIT的设置

2. set_bit_array

set_bit_array (  
  varbit,  
  int,     -- 1目标BIT (0|1)  
  int[]    -- 1目标位置  
  int,     -- 2目标BIT (0|1)  
  int[],   -- 2目标位置  
  int      -- 填充BIT (0|1)  
) returns varbit  
  
  将指定位置的BIT设置为0|1,(起始位=0),超出原始长度的部分填充0|1  
  例如 set_bit_array('111100001111', 0, array[1,15], 1, array[0,4], 0) 返回 1011100011111110  

阿里云varbitx 从bitmap得到字典ID

1. bit_posite

bit_posite (    
  varbit,     
  int,      -- (0|1)    
  boolean     
) returns int[]      
    
  返回 0|1 的位置,(起始位=0), true时正向返回,false时反向返回        
  例如 bit_posite ('11110010011', 1, true) 返回 [0,1,2,3,6,9,10]      
       bit_posite ('11110010011', 1, false) 返回 [10,9,6,3,2,1,0]     

从字典ID得到USERID

select uid from dict where id = any ( bit_posite(x,x,x) );      

demo

1 字典表

将USERID转换为ARRAY下标,即字典表

首先需要用到无缝自增ID

《PostgreSQL 无缝自增ID的实现 - by advisory lock》

1、字典表如下

create table dict(id int8 primary key, uid int8 not null);    
  
create unique index idx_dict_uid on dict (uid);    

2 生成已有用户

create table t_uid (  
  uid int8 primary key  -- 已有用户的USER ID  
);  

插入一批 USERID (一亿)

insert into t_uid select id from generate_series(1,100000000) t(id) order by random() ;  

3 已有用户,一次性生成mapping下标

create sequence seq minvalue 0 start 0;    
  
  
insert into dict select nextval('seq'), uid from t_uid;    
  
  
select min(id),max(id),count(*) from dict;    
 min |   max    |   count     
-----+----------+-----------  
   0 | 99999999 | 100000000  
(1 row)  

4 无缝自增下标 函数

新增的用户,通过这个函数写入,确保无缝自增下标:

create or replace function f_uniq(i_uid int8) returns int as $$  
declare  
  newid int;    
  i int := 0;    
  res int;    
begin  
  loop   
    if i>0 then   
      perform pg_sleep(0.2*random());  
    else  
      i := i+1;  
    end if;  
  
    -- 获取已有的最大ID+1 (即将插入的ID)  
    select max(id)+1 into newid from dict;    
    if newid is not null then  
      -- 获取AD LOCK  
      if pg_try_advisory_xact_lock(newid) then  
        -- 插入  
        insert into dict (id,uid) values (newid,i_uid) ;    
        -- 返回此次获取到的UID  
        return newid;    
      else  
        -- 没有获取到AD LOCK则继续循环  
        continue;    
      end if;    
    else  
      -- 表示这是第一条记录,获取AD=0 的LOCK  
      if pg_try_advisory_xact_lock(0) then  
        insert into dict (id, uid) values (0, i_uid) ;    
        return 0;    
      else  
        continue;     
      end if;    
    end if;    
  end loop;    
    
  -- 如果因为瞬态导致PK冲突了,继续调用  
  exception when others then  
    select f_uniq(i_uid) into res;    
    return res;    
end;  
$$ language plpgsql strict;  

例如,新增一个USERID。

select f_uniq(?);  

5 从ID或者USERID或从USERID获得ID

create or replace function get_id_from_uid (int8) returns int8 as $$  
  select id from dict where uid=$1;  
$$ language sql strict;  
  
create or replace function get_uid_from_id (int8) returns int8 as $$  
  select uid from dict where id=$1;  
$$ language sql strict;  

6 标签描述表

create table t_tags(tagid int primary key, desc text);    

7 流式标签表

1、UID范围映射表,落在什么区间的ID,对应什么后缀的表名

建议在写入时,自动写到不同的t_tag_log表,这样的话,不需要在消费时过滤。

create table t_mapping (  
  dict_id int8range,      -- 字典ID区间  
  suffix text unique             -- 表名后缀  
);  
  
alter table t_mapping add constraint ck_exclude_dict_id exclude using gist(dict_id with &&);  -- 防止交叉区间  
insert into t_mapping values (int8range(0,100000000), '0');  
insert into t_mapping values (int8range(100000000,200000000), '1');  
insert into t_mapping values (int8range(200000000,300000000), '2');  
insert into t_mapping values (int8range(300000000,400000000), '3');  

2、流式标签主表

create table t_tag_log (  
  dict_id int8,        -- 用户下标ID    
  action int2,         -- 删除0、新增1    
  tagid int,           -- 标签ID   
  crt_time timestamp   -- 时间    
);  
  
create index idx_t_tag_log on t_tag_log (crt_time);  

3、创建流式标签子表

按tagid分区,按t_mapping后缀对应关系分区。

do language plpgsql $$  
declare  
  x text;  
begin  
  for i in 0..63 loop  
    for x in select suffix from t_mapping loop  
      execute format('create table t_tag_log_%s_%s (like t_tag_log including all) inherits (t_tag_log)', i, x);  
    end loop;  
  end loop;  
end;  
$$;  
  
postgres=# \dt t_tag_log_*  
             List of relations  
 Schema |      Name      | Type  |  Owner     
--------+----------------+-------+----------  
 public | t_tag_log_0_0  | table | postgres  
 public | t_tag_log_0_1  | table | postgres  
 public | t_tag_log_0_2  | table | postgres  
 public | t_tag_log_0_3  | table | postgres  
 public | t_tag_log_10_0 | table | postgres  
 public | t_tag_log_10_1 | table | postgres  
 public | t_tag_log_10_2 | table | postgres  
 public | t_tag_log_10_3 | table | postgres  
 public | t_tag_log_11_0 | table | postgres  
 public | t_tag_log_11_1 | table | postgres  
.....................

4、使用schema lessUDF写入标签信息

create or replace function ins_tag(v_uid int8, v_action int2, v_tagid int, v_crt_time timestamp) returns void as $$  
declare  
  i int := mod(v_tagid, 64);  
  x text;   
begin  
  select suffix into x from t_mapping where dict_id @> $1;  
  execute format ('insert into t_tag_log_%s_%s (dict_id, action, tagid, crt_time) values (%s, %s, %s, %L)', i, x, get_id_from_uid(v_uid), v_action, v_tagid, v_crt_time);  
end;  
$$ language plpgsql strict;  

8 高速写入贴标签、删除标签

vi test.sql  
  
\set uid random(1,100000000)  
\set tagid random(1,100000)  
\set action random(0,1)  
select ins_tag (:uid::int8, :action::int2, :tagid, now()::timestamp);  
  
pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120  

单表单条写入性能:

单表或多表 批量写入 性能可以超过100万行/s。

transaction type: ./test.sql  
scaling factor: 1  
query mode: prepared  
number of clients: 32  
number of threads: 32  
duration: 120 s  
number of transactions actually processed: 19013298  
latency average = 0.202 ms  
latency stddev = 0.267 ms  
tps = 158442.952245 (including connections establishing)  
tps = 158449.386772 (excluding connections establishing)  
script statistics:  
 - statement latencies in milliseconds:  
         0.001  \set uid random(1,100000000)  
         0.000  \set tagid random(1,100000)  
         0.000  \set action random(0,1)  
         0.200  insert into t_tag_log (dict_id, action, tagid, crt_time) values (get_id_from_uid(:uid), :action, :tagid, now());  

9 标签反转表

对应到前面的拆分模式,有两种模式,一种模式,使用多表存储。另一种模式使用单表存储。

1、多表存储。

标签反转主表

create table tag_userbitmap (  
  tagid int primary key,             -- 标签ID  
  usrebitmap varbit                  -- 用户bitmap  
);  

标签反转子表

do language plpgsql $$  
declare  
  x text;  
begin  
  for x in select suffix from t_mapping loop  
    execute format('create table tag_userbitmap_%s (like tag_userbitmap including all) inherits (tag_userbitmap)', x);  
  end loop;  
end;  
$$;  

tag不需要分区,因为增量更新时,没有行锁冲突,而且表大小也足够。

postgres=# \dt tag_userbitmap*  
              List of relations  
 Schema |       Name       | Type  |  Owner     
--------+------------------+-------+----------  
 public | tag_userbitmap   | table | postgres  
 public | tag_userbitmap_0 | table | postgres  
 public | tag_userbitmap_1 | table | postgres  
 public | tag_userbitmap_2 | table | postgres  
 public | tag_userbitmap_3 | table | postgres  
(5 rows)  

2、单表存储,则加一列区分OFFSET。

create table tag_userbitmap_single (
  tagid int ,               -- 标签ID
  bitmap_offset int ,       -- bit号段offset 值
  usrebitmap varbit,        -- 当前号段的bitmap
  primary key (tagid, bitmap_offset)
);

10 阅后即焚-流式消费标签,合并到标签反转表

不考虑分区表时,这样来进行消费。

with tmp as (delete from t_tag_log     -- 查询哪个表  
  where ctid = any ( array (  
    select ctid from t_tag_log order by crt_time limit 100000       -- 批量处理10万条  
  )) returning *  
)   
select tagid, action, array_agg(dict_id) as dict_id from    
(  
  select row_number() over w1 as rn, *   
  from tmp   
  window w1 as (partition by dict_id, tagid order by crt_time desc)  -- 每个ID,每个标签,取最后一条  
) t where rn=1  
group by tagid, action                                               -- 聚合为下标数组  
;  

11 schemaless UDF, 阅后即焚-流式消费标签,合并到标签反转表

对应tag_userbimap表的设计(单表、或多表),schemaless UDF也有不同。

1、多表:在UDF中自动拼接表名,将BITMAP合并到对应子表。

create or replace function merge_tags(v_limit int, v_suffix1 int, v_suffix2 int) returns void as $$  
declare  
  v_tagid int;  
  v_action int2;  
  v_dict_id int[];  
  v_offset int;  
  v_fixed_dict_id int[];  
begin  
  select substring(dict_id::text,'(\d+),') into v_offset from t_mapping where suffix=v_suffix2::text;   
  
  for v_tagid, v_action, v_dict_id in   
    execute format   
('  
with tmp as (delete from t_tag_log_%s_%s     -- 查询哪个表  
  where ctid = any ( array (  
    select ctid from t_tag_log_%s_%s order by crt_time limit %s       -- 批量处理N条  
  )) returning *  
)   
select tagid, action, array_agg(dict_id) as dict_id from    
(  
  select row_number() over w1 as rn, *   
  from tmp   
  window w1 as (partition by dict_id, tagid order by crt_time desc)   -- 每个ID,每个标签,取最后一条  
) t where rn=1  
group by tagid, action                                               
',   
v_suffix1, v_suffix2, v_suffix1, v_suffix2, v_limit)    
  
loop  
  select array(select unnest(v_dict_id)-v_offset) into v_fixed_dict_id;    
  -- raise notice '% ,% ,% ,%', v_tagid, v_action, v_dict_id, v_fixed_dict_id;  
  execute format('insert into tag_userbitmap_%s (tagid, usrebitmap) values (%s, set_bit_array(''0'', %s, %s, %L))   
                  on conflict (tagid)   
                  do update set usrebitmap=set_bit_array(tag_userbitmap_%s.usrebitmap, %s, %s, %L)',   
                  v_suffix2, v_tagid, v_action, 0, v_fixed_dict_id, v_suffix2, v_action, 0, v_fixed_dict_id);  
end loop;  
end;  
$$ language plpgsql strict;  

2、单表:

create or replace function merge_tags(v_limit int, v_suffix1 int, v_suffix2 int) returns void as $$  
declare  
  v_tagid int;  
  v_action int2;  
  v_dict_id int[];  
  v_offset int;  
  v_fixed_dict_id int[];  
begin  
  select substring(dict_id::text,'(\d+),') into v_offset from t_mapping where suffix=v_suffix2::text;   
  
  for v_tagid, v_action, v_dict_id in   
    execute format   
('  
with tmp as (delete from t_tag_log_%s_%s     -- 查询哪个表  
  where ctid = any ( array (  
    select ctid from t_tag_log_%s_%s order by crt_time limit %s       -- 批量处理N条  
  )) returning *  
)   
select tagid, action, array_agg(dict_id) as dict_id from    
(  
  select row_number() over w1 as rn, *   
  from tmp   
  window w1 as (partition by dict_id, tagid order by crt_time desc)   -- 每个ID,每个标签,取最后一条  
) t where rn=1  
group by tagid, action                                               
',   
v_suffix1, v_suffix2, v_suffix1, v_suffix2, v_limit)    
  
loop  
  select array(select unnest(v_dict_id)-v_offset) into v_fixed_dict_id;    
  -- raise notice '% ,% ,% ,%', v_tagid, v_action, v_dict_id, v_fixed_dict_id;  
  execute format('insert into tag_userbitmap (tagid, bitmap_offset, usrebitmap) values (%s, %s, set_bit_array(''0'', %s, %s, %L))   
                  on conflict (tagid, bitmap_offset)   
                  do update set usrebitmap=set_bit_array(tag_userbitmap.usrebitmap, %s, %s, %L)',   
                  v_suffix2, v_suffix1, v_action, 0, v_fixed_dict_id, v_action, 0, v_fixed_dict_id);  
end loop;  
end;  
$$ language plpgsql strict;  

3、单表和多表的函数接口是一样的,都是调用merge_tags函数,增量更新TAG,不同组合,允许并行调用。

以下QUERY可以并行调用:

select merge_tags(100000, 0, 0);  
select merge_tags(100000, 0, 1);  
select merge_tags(100000, 0, 2);  
select merge_tags(100000, 0, 3);  
.....  
select merge_tags(100000, 63, 0);  
select merge_tags(100000, 63, 1);  
select merge_tags(100000, 63, 2);  
select merge_tags(100000, 63, 3);  

12 查询包含哪些TAG的USEID

这个SQL可以略微修改,包括coalesce,补齐varbit长度到固定长度等。

full outer JOIN 得到最终的varbit。

根据bitmap求dict_id, 再根据dict_id求user_id。

多表:

select bit_posite(t1.usrebitmap||t2.usrebitmap||t3.usrebitmap||t4.usrebitmap, '1', true) 
from tag_userbitmap_0 t1 , ....
where tx.tagid in (....);

或单表:

create aggregate bit_agg (varbit) (sfunc = bitcat, stype=varbit) ;
select bit_posite(bit_and(tag), '1', true) from (
  select bit_agg(usrebitmap) from tag_userbitmap where tagid in (...) group by tagid
) t

详见用法

《阿里云RDS for PostgreSQL varbitx插件与实时画像应用场景介绍》

其他

1、维护字典表(删除僵尸用户)

2、同时收缩VARBITX

查询某个用户有哪些标签

这种点查,建议使用以下结构,数组作为标签。与本文相反。

create table t_user_tags(   
  uid int8 primary key,   -- 用户ID  
  tagid int[]             -- 标签ID  
);  

相似案例

《PostgreSQL手机行业经营分析、决策系统设计 - 实时圈选、透视、估算》

《阿里云RDS for PostgreSQL varbitx插件与实时画像应用场景介绍》

《基于 阿里云 RDS PostgreSQL 打造实时用户画像推荐系统(varbitx)》

《HTAP数据库 PostgreSQL 场景与性能测试之 32 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(JSON + 函数流式计算)》

《HTAP数据库 PostgreSQL 场景与性能测试之 31 - (OLTP) 高吞吐数据进出(堆存、行扫、无需索引) - 阅后即焚(读写大吞吐并测)》

《HTAP数据库 PostgreSQL 场景与性能测试之 27 - (OLTP) 物联网 - FEED日志, 流式处理 与 阅后即焚 (CTE)》

《在PostgreSQL中实现update | delete limit - CTID扫描实践 (高效阅后即焚)》

《PostgreSQL 异步消息实践 - Feed系统实时监测与响应(如 电商主动服务) - 分钟级到毫秒级的实现》

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
5月前
|
存储 关系型数据库 MySQL
MySQL——数据库备份上传到阿里云OSS存储
MySQL——数据库备份上传到阿里云OSS存储
216 0
|
4天前
|
运维 关系型数据库 MySQL
体验领礼啦!体验自建数据库迁移到阿里云数据库RDS,领取桌面置物架!
「技术解决方案【Cloud Up 挑战赛】」上线!本方案介绍如何将自建数据库平滑迁移至云数据库RDS,解决业务增长带来的运维难题。通过使用RDS MySQL,您可获得稳定、可靠和安全的企业级数据库服务,专注于核心业务发展。完成任务即可领取桌面置物架,每个工作日限量50个,先到先得。
|
30天前
|
关系型数据库 开发者 RDS
【实践】体验RDS通用云盘核心能力
这些图片展示了阿里巴巴云开发生态的不同方面,包括开发者工具、平台服务、技术文档、社区支持等,旨在为开发者提供全面的支持和便利,促进技术创新和应用开发。
|
6月前
|
人工智能 关系型数据库 MySQL
基于阿里云的PolarDB MySQL版实现AI增强数据管理
本文将介绍如何利用阿里云的PolarDB MySQL版结合AI技术,实现数据管理的自动化和智能化。
407 0
|
3月前
|
容灾 关系型数据库 数据库
阿里云RDS服务巴黎奥运会赛事系统,助力云上奥运稳定运行
2024年巴黎奥运会,阿里云作为官方云服务合作伙伴,提供了稳定的技术支持。云数据库RDS通过备份恢复、实时监控、容灾切换等产品能力,确保了赛事系统的平稳运行。
 阿里云RDS服务巴黎奥运会赛事系统,助力云上奥运稳定运行
|
2月前
|
SQL DataWorks 关系型数据库
阿里云 DataWorks 正式支持 SelectDB & Apache Doris 数据源,实现 MySQL 整库实时同步
阿里云数据库 SelectDB 版是阿里云与飞轮科技联合基于 Apache Doris 内核打造的现代化数据仓库,支持大规模实时数据上的极速查询分析。通过实时、统一、弹性、开放的核心能力,能够为企业提供高性价比、简单易用、安全稳定、低成本的实时大数据分析支持。SelectDB 具备世界领先的实时分析能力,能够实现秒级的数据实时导入与同步,在宽表、复杂多表关联、高并发点查等不同场景下,提供超越一众国际知名的同类产品的优秀性能,多次登顶 ClickBench 全球数据库分析性能排行榜。
|
6月前
|
缓存 运维 关系型数据库
数据库容灾 | MySQL MGR与阿里云PolarDB-X Paxos的深度对比
经过深入的技术剖析与性能对比,PolarDB-X DN凭借其自研的X-Paxos协议和一系列优化设计,在性能、正确性、可用性及资源开销等方面展现出对MySQL MGR的多项优势,但MGR在MySQL生态体系内也占据重要地位,但需要考虑备库宕机抖动、跨机房容灾性能波动、稳定性等各种情况,因此如果想用好MGR,必须配备专业的技术和运维团队的支持。 在面对大规模、高并发、高可用性需求时,PolarDB-X存储引擎以其独特的技术优势和优异的性能表现,相比于MGR在开箱即用的场景下,PolarDB-X基于DN的集中式(标准版)在功能和性能都做到了很好的平衡,成为了极具竞争力的数据库解决方案。
|
5月前
|
弹性计算 关系型数据库 MySQL
新一期陪跑班开课啦!阿里云专家手把手带你体验RDS通用云盘核心能力
本次课程将手把手带领用户创建一个云数据库RDS MySQL(通用云盘),并通过云服务器ECS对RDS MySQL实例进行压测,体验IO加速和IO突发带来的性能提升;并通过DMS执行DDL,将数据归档到OSS,再结合云盘缩容,体验数据归档带来的成本优势。
|
5月前
|
关系型数据库 MySQL 网络安全
阿里云安装Mysql
阿里云安装Mysql
386 1
|
5月前
|
关系型数据库 数据库 数据安全/隐私保护
"告别繁琐!Python大神揭秘:如何一键定制阿里云RDS备份策略,让数据安全与效率并肩飞,轻松玩转云端数据库!"
【8月更文挑战第14天】在云计算时代,数据库安全至关重要。阿里云RDS提供自动备份,但标准策略难以适应所有场景。传统手动备份灵活性差、管理成本高且恢复效率低。本文对比手动备份,介绍使用Python自定义阿里云RDS备份策略的方法,实现动态调整备份频率、集中管理和智能决策,提升备份效率与数据安全性。示例代码演示如何创建自动备份任务。通过自动化与智能化备份管理,支持企业数字化转型。
120 2

热门文章

最新文章

相关产品

  • 云数据库 RDS MySQL 版
  • 云数据库 RDS