PostgreSQL 相似搜索分布式架构设计与实践 - dblink异步调用与多机并行(远程 游标+记录 UDF实例)

本文涉及的产品
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
简介:

标签

PostgreSQL , 多表并行 , 多机并行 , dblink , 异步调用 , 相似搜索


背景

背景请参考如下:

《PostgreSQL 相似搜索设计与性能 - 地址、QA、POI等文本 毫秒级相似搜索实践》

当需要进行相似搜索的数据量大于单机处理能力时,我们需要水平拆分来提高搜索能力。

或者可以使用阿里云的PolarDB for PG的产品(类似ORACLE RAC,支持增加计算节点)。比水平分库的好处是数据是共享存储的,不需要拆分。

回到水平分库的场景,如果我们把数据库拆成了多个,那么,如何让查询并行起来呢?

用DBLINK异步调用,可以让查询并行起来。架构设计如下:

pic

实际上采用DBLINK异步调用实现并行的例子很多:

《PostgreSQL dblink异步调用实现 并行hash分片JOIN - 含数据交、并、差 提速案例》

进入正题,下面是一个DEMO,按部就班的演示如何使用异步调用实现多库并行相似搜索。

DEMO

1、我们这里使用本地的4个DB来代表远程数据库,这4个DB完全可以安装到远程。这里只是为了测试方便。

本地库名:

postgres  

远程库名:

db0  
db1  
db2  
db3  

2、首先需要创建用户和测试DB

create role test login encrypted password 'secret';  
create database db0 with owner test;  
create database db1 with owner test;  
create database db2 with owner test;  
create database db3 with owner test;  

3、在本地某个库中创建dblink插件

create extension dblink;  

4、创建连接远程库的SERVER

CREATE SERVER db0 FOREIGN DATA WRAPPER dblink_fdw OPTIONS (hostaddr '127.0.0.1', dbname 'db0');  
CREATE SERVER db1 FOREIGN DATA WRAPPER dblink_fdw OPTIONS (hostaddr '127.0.0.1', dbname 'db1');  
CREATE SERVER db2 FOREIGN DATA WRAPPER dblink_fdw OPTIONS (hostaddr '127.0.0.1', dbname 'db2');  
CREATE SERVER db3 FOREIGN DATA WRAPPER dblink_fdw OPTIONS (hostaddr '127.0.0.1', dbname 'db3');  

5、配置连接远程库的用户密码(用户密码都是远程库的,可不是本地的哦,你想用本地用户连远程库,没门)

CREATE USER MAPPING FOR postgres SERVER db0 OPTIONS (user 'test', password 'secret');  
CREATE USER MAPPING FOR postgres SERVER db1 OPTIONS (user 'test', password 'secret');  
CREATE USER MAPPING FOR postgres SERVER db2 OPTIONS (user 'test', password 'secret');  
CREATE USER MAPPING FOR postgres SERVER db3 OPTIONS (user 'test', password 'secret');  

远程库操作

在所有远程库上创建测试表,灌入测试数据,创建相似搜索函数。(注意下面的脚本需要调整好对应调度dbname)

1、必要的插件(注意下面的脚本需要调整好对应调度dbname)

\c db3 postgres  
create extension pg_trgm;    
create extension dblink;  

2、主表和相似搜索依赖的索引(注意下面的脚本需要调整好对应调度dbname)

\c db3 test  
create unlogged table tbl(id int primary key, info text);    
create index idx_tbl_info on tbl using gin (info gin_trgm_ops);  
    
-- alter table tbl set (parallel_workers =64);    

3、创建分区(本文仅做测试,真正的分区表用法请参考: 《PostgreSQL 11 分区表用法及增强 - 增加HASH分区支持 (hash, range, list)》 )

do language plpgsql $$    
declare    
begin    
  for i in 0..63    
  loop    
    execute format('drop table if exists tbl%s ', i);    
    execute format('create unlogged table tbl%s (like tbl including all) inherits(tbl)', i);    
    -- 提前设置好表级并行度,方便后面做并行测试    
    -- execute format('alter table tbl%s set (parallel_workers =64)', i);    
  end loop;    
end;    
$$;    

4、创建连接函数

create or replace function conn(      
  name,   -- dblink名字      
  text    -- 连接串,URL      
) returns void as $$        
declare        
begin        
  perform dblink_connect($1, $2);       
  return;        
exception when others then        
  return;        
end;        
$$ language plpgsql strict;     

5、创建生成随机函数的函数

-- 生成随机汉字符串      
create or replace function gen_hanzi(int) returns text as $$      
declare      
  res text;      
begin      
  if $1 >=1 then      
    select string_agg(chr(19968+(random()*20901)::int), '') into res from generate_series(1,$1);      
    return res;      
  end if;      
  return null;      
end;      
$$ language plpgsql strict;      

6、写入测试数据,随机文本(注意下面的脚本需要调整好对应调度dbname)

do language plpgsql $$    
declare    
  dbname name := 'db3';  
begin    
  for i in 0..63    
  loop    
    perform conn('link'||i,  'hostaddr=127.0.0.1 user=test password=secret dbname='||dbname);     
    perform dblink_send_query('link'||i, format('insert into tbl%s select generate_series(1, 15625), gen_hanzi(64); analyze tbl%s;', i, i));    
  end loop;    
end;    
$$;    

7、创建相似搜索用到的UDF

create or replace function get_res(  
  text,     -- 要按相似搜的文本  
  int8,     -- 限制返回多少条  
  float4 default 0.3,   -- 相似度阈值,低于这个值不再搜搜  
  float4 default 0.1    -- 相似度递减步长,直至阈值  
) returns setof record as $$    
declare    
  lim float4 := 1;    
begin    
  -- 判定  
  if not ($3 <= 1 and $3 > 0) then   
    raise notice '$3 must >0 and <=1';  
    return;  
  end if;  
    
  if not ($4 > 0 and $4 < 1) then  
    raise notice '$4 must >0 and <=1';  
    return;  
  end if;  
  loop    
    -- 设置相似度阈值    
    perform set_limit(lim);    
        
    -- 查看当前阈值下,有没有相似记录    
    perform similarity(info, $1) as sml, * from tbl where info % $1 limit 1;    
        
    -- 如果有,则返回N条    
    if found then    
      return query select similarity(info, $1) as sml, * from tbl where info % $1 order by sml desc limit $2;    
      return;    
    end if;    
    
    -- 否则继续,降低阈值    
    -- 当阈值小于0.3时,不再降阈值搜索,认为没有相似。    
    if lim < $3 then    
      return;    
    else    
      lim := lim - $4;    
    end if;    
  end loop;    
end;    
$$ language plpgsql strict;    

本地库操作

创建建立远程连接的函数

create or replace function conn(        
  name,   -- dblink名字        
  text    -- 连接串,URL        
) returns void as $$          
declare          
begin          
  perform dblink_connect($1, $2);         
  return;          
exception when others then          
  return;          
end;          
$$ language plpgsql strict;        

返回游标

1、定义UDF1 - 返回游标(如果返回记录数很多,建议使用游标,因为PLPGSQL是需要等所有记录都拿到才会开始返回,返回记录的话RT会较高)

例子

create or replace function get_res_cursor(  
  text,     -- 要按相似搜的文本  
  int8,     -- 限制返回多少条  
  float4 default 0.3,   -- 相似度阈值,低于这个值不再搜搜  
  float4 default 0.1    -- 相似度递减步长,直至阈值  
) returns setof refcursor as $$    
declare    
  i int := 1;  
  ref refcursor[];    
  res refcursor;   
  dbname name[] := array['db0', 'db1', 'db2', 'db3'];  -- 定义集群  
  db name;  
begin  
  foreach db in array dbname  
  loop   
    ref[i] := 'link'||i;  
    res := ref[i];  
    perform conn('link'||i,  db);         
    perform dblink_open('link'||i, 'link'||i, format('select * from get_res(%L, %s, %s, %s) as t(sml real, id int, info text)', $1, $2, $3, $4));    
    return next res;  
    i := i+1;  
  end loop;  
end;  
$$ language plpgsql strict;  

使用例子

postgres=# begin;  
BEGIN  
Time: 0.045 ms  
postgres=# select * from get_res_cursor('怮媕苸淏倍椡帪暀虻爴荡巒讉輶魂馜虑范噞蠭鲧烳渃麠钸趥剘偣瑴鑪颭蚢佚簀哌內霡擷槧緸褫齈跊甏軙襧漆疅泅睤帍槇驗縐棂', 10, 0.1, 0.05);  
 get_res_cursor   
----------------  
 link1  
 link2  
 link3  
 link4  
(4 rows)  
  
Time: 18.624 ms  
postgres=# select * from dblink_fetch('link1','link1',10) as t(sml real, id int, info text);  
 sml | id | info   
-----+----+------  
(0 rows)  
  
Time: 219.972 ms  
postgres=# select * from dblink_fetch('link1','link1',10) as t(sml real, id int, info text);  
 sml | id | info   
-----+----+------  
(0 rows)  
  
Time: 0.252 ms  
postgres=# select * from dblink_fetch('link2','link2',10) as t(sml real, id int, info text);  
 sml | id | info   
-----+----+------  
(0 rows)  
  
Time: 215.891 ms  
postgres=# select * from dblink_fetch('link3','link3',10) as t(sml real, id int, info text);  
 sml | id | info   
-----+----+------  
(0 rows)  
  
Time: 215.188 ms  
postgres=# select * from dblink_fetch('link4','link4',10) as t(sml real, id int, info text);  
   sml    | id |                                                               info                                                                 
----------+----+----------------------------------------------------------------------------------------------------------------------------------  
 0.779412 |  1 | 递陊怮媕苸淏倍椡帪暀虻爴荡巒讉輶魂馜虑范噞蠭鲧烳渃麠钸趥剘偣瑴鑪颭蚢佚簀哌內霡擷槧緸褫齈跊甏軙襧漆疅泅睤帍槇驗縐棂轪氐洚重銄懟諔  
(1 row)  
  
Time: 106.692 ms  

返回记录

1、定义UDF2 - 返回记录(注意,建议限制返回的条数,因为PLPGSQL是需要等所有记录都拿到才会开始返回)

例子

create or replace function get_res_record(  
  text,     -- 要按相似搜的文本  
  int8,     -- 限制返回多少条  
  float4 default 0.3,   -- 相似度阈值,低于这个值不再搜搜  
  float4 default 0.1    -- 相似度递减步长,直至阈值  
) returns setof record as $$    
declare    
  i int;  
  ref refcursor[];    
  res refcursor;   
  dbname name[] := array['db0', 'db1', 'db2', 'db3'];  -- 定义集群  
  db name;  
begin  
  i := 1;  
  foreach db in array dbname  
  loop   
    perform conn('link'||i,  db);     
    perform 1 from dblink_get_result('link'||i) as t(sml real, id int, info text);       
    perform dblink_send_query('link'||i, format('select * from get_res(%L, %s, %s, %s) as t(sml real, id int, info text)', $1, $2, $3, $4));    
    i := i+1;  
  end loop;  
  
  i := 1;  
  foreach db in array dbname  
  loop   
    return query SELECT * FROM dblink_get_result('link'||i) as t(sml real, id int, info text);     
    i := i+1;  
  end loop;  
end;  
$$ language plpgsql strict;  

使用例子

postgres=# select * from get_res_record('怮媕苸淏倍椡帪暀虻爴荡巒讉輶魂馜虑范噞蠭鲧烳渃麠钸趥剘偣瑴鑪颭蚢佚簀哌內霡擷槧緸褫齈跊甏軙襧漆疅泅睤帍槇驗縐棂', 10, 0.77, 0.4) as (sml real, id int, info text);  
   sml    | id |                                                               info                                                                 
----------+----+----------------------------------------------------------------------------------------------------------------------------------  
 0.779412 |  1 | 递陊怮媕苸淏倍椡帪暀虻爴荡巒讉輶魂馜虑范噞蠭鲧烳渃麠钸趥剘偣瑴鑪颭蚢佚簀哌內霡擷槧緸褫齈跊甏軙襧漆疅泅睤帍槇驗縐棂轪氐洚重銄懟諔  
(1 row)  
  
Time: 32.329 ms  

小结

使用本文提到的方法,你就可以将多个PostgreSQL当成一个PostgreSQL来使用,实现并行相似搜索的线性扩容。

性能指标,详见:

《PostgreSQL 相似搜索设计与性能 - 地址、QA、POI等文本 毫秒级相似搜索实践》

使用dblink异步调用,实现相似文本搜索的横向线性扩展,性能不衰减。

参考

《PostgreSQL 相似搜索设计与性能 - 地址、QA、POI等文本 毫秒级相似搜索实践》

《PostgreSQL dblink异步调用实现 并行hash分片JOIN - 含数据交、并、差 提速案例》

https://www.postgresql.org/docs/10/static/dblink.html

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍如何基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
机器学习/深度学习 分布式计算 数据处理
分布式计算框架:并行力量的交响乐章
分布式计算框架如Apache Spark解决单机计算挑战,通过拆分任务到多机并行处理提升效率。Spark以其内存计算加速处理,支持批处理、查询、流处理和机器学习。以下是一个PySpark统计日志中每日UV的示例,展示如何利用SparkContext、map和reduceByKey进行数据聚合分析。这些框架的运用,正改变大数据处理领域,推动数据分析和机器学习的边界。【6月更文挑战第18天】
613 2
|
6月前
|
负载均衡 测试技术 调度
大模型分布式推理:张量并行与流水线并行技术
本文深入探讨大语言模型分布式推理的核心技术——张量并行与流水线并行。通过分析单GPU内存限制下的模型部署挑战,详细解析张量并行的矩阵分片策略、流水线并行的阶段划分机制,以及二者的混合并行架构。文章包含完整的分布式推理框架实现、通信优化策略和性能调优指南,为千亿参数大模型的分布式部署提供全面解决方案。
1626 4
|
6月前
|
存储 监控 算法
117_LLM训练的高效分布式策略:从数据并行到ZeRO优化
在2025年,大型语言模型(LLM)的规模已经达到了数千亿甚至数万亿参数,训练这样的庞然大物需要先进的分布式训练技术支持。本文将深入探讨LLM训练中的高效分布式策略,从基础的数据并行到最先进的ZeRO优化技术,为读者提供全面且实用的技术指南。
675 2
|
7月前
|
并行计算 算法 调度
基于串行并行ADMM算法的主从配电网分布式优化控制研究(Matlab代码实现)
基于串行并行ADMM算法的主从配电网分布式优化控制研究(Matlab代码实现)
436 0
|
机器学习/深度学习 边缘计算 人工智能
第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025) 2025 2nd international Conference on Edge Computing, Parallel and Distributed Computing
第二届边缘计算与并行、分布式计算国际学术会议(ECPDC 2025) 2025 2nd international Conference on Edge Computing, Parallel and Distributed Computing 机器学习 计算学习理论 数据挖掘 科学计算 计算应用 数字图像处理 人工智能
303 6
|
10月前
|
存储 监控 关系型数据库
突破IO瓶颈:PolarDB分布式并行查询(Parallel Query)深度调优手册
在海量数据处理中,I/O瓶颈严重制约数据库性能。本文基于PolarDB MySQL 8.0.32版本,深入解析分布式并行查询技术如何提升CPU利用率至86.7%、IO吞吐达8.5GB/s,并结合20+实战案例,系统讲解并行架构、执行计划优化、资源调优与故障排查方法,助力实现高性能数据分析。
372 6
|
存储 关系型数据库 分布式数据库
PolarDB 并行查询问题之分布式查询执行过程中的数据分发如何解决
PolarDB 并行查询问题之分布式查询执行过程中的数据分发如何解决
221 1
|
关系型数据库 分布式数据库 数据库
PostgreSQL+Citus分布式数据库
PostgreSQL+Citus分布式数据库
624 15
|
分布式计算 并行计算 大数据
NumPy 并行计算与分布式部署
【8月更文第30天】随着数据量的不断增长,传统的单机计算模型已经难以满足对大规模数据集处理的需求。并行和分布式计算成为了处理这些大数据集的关键技术。虽然 NumPy 本身并不直接支持并行计算,但可以通过结合其他库如 Numba 和 Dask 来实现高效的并行和分布式计算。
318 1
|
分布式计算 API 对象存储
Ray是一个开源的分布式计算框架,用于构建和扩展分布式应用。它提供了简单的API,使得开发者可以轻松地编写并行和分布式代码,而无需担心底层的复杂性。
Ray是一个开源的分布式计算框架,用于构建和扩展分布式应用。它提供了简单的API,使得开发者可以轻松地编写并行和分布式代码,而无需担心底层的复杂性。
2868 11

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版
  • 推荐镜像

    更多