PostgreSQL dblink异步调用实践,跑并行多任务 - 例如开N个并行后台任务创建索引, 开N个后台任务跑若干SQL

简介: 标签PostgreSQL , 后台任务 , DBLINK 异步调用背景使用DBLINK异步接口,可以非常方便的实现跑后台任务,如果要让数据库执行若干条SQL,开N个并行执行,同样可以使用DBLINK封装成API进行调用。

标签

PostgreSQL , 后台任务 , DBLINK 异步调用


背景

使用DBLINK异步接口,可以非常方便的实现跑后台任务,如果要让数据库执行若干条SQL,开N个并行执行,同样可以使用DBLINK封装成API进行调用。

例如,结合我前面的一些文字,可以实现自动选择索引接口、指定并行度、指定表空间、给所有字段创建索引。

《自动选择正确索引访问接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》

《PostgreSQL 快速给指定表每个字段创建索引》

《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 从OSS并行导入数据》

《在PostgreSQL中跑后台长任务的方法 - 使用dblink异步接口》

并行后台任务接口实现

接口效果:

select run_sqls_parallel (  
  参数1:并行度,  
  参数2:要执行的SQLs(数组呈现)  
);  

实现

1、创建dblink插件

create extension if not exists dblink;    

2、创建一个建立连接函数,不报错

create or replace function conn(      
  name,   -- dblink名字      
  text    -- 连接串,URL      
) returns void as $$        
declare        
begin        
  perform dblink_connect($1, $2);       
  return;        
exception when others then        
  return;        
end;        
$$ language plpgsql strict;    

3、创建跑多任务的接口函数

create or replace function run_sqls_parallel(   
  parallels int,    -- 并行度  
  sqls text[],      -- 需要执行的SQLs  
  conn text    default format('hostaddr=%s port=%s user=%s dbname=%s application_name=', '127.0.0.1', current_setting('port'), current_user, current_database())       -- 连接串     
)  
returns setof record as $$  
declare  
  app_prefix_stat text := md5(random()::text);   -- 用来获取pg_stat_activity的实时内容 (由于pg_stat_activity的函数是stable的,无法在事务中获取到被其他会话变更的内容)  
  app_prefix text := md5(random()::text);        -- application, dblink name prefix   
  i int := 1;       -- 任务ID变量,1累加  
  app_conn_name text;  -- application_name, dblink conn name = app_prefix+i   
  sql text;       -- SQL 元素   
  current_conns int := 0;  -- 当前活跃的异步调用   
begin  
  -- 建立获取实时pg_stat_activity内容连接    
  perform conn(app_prefix_stat,  conn||app_prefix_stat);              
  
  foreach sql in array sqls  
  loop  
    -- 当前是否有空闲异步连接   
    select application_name into app_conn_name from   
      dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))  
    as t(application_name text);   
    -- 有空闲异步连接  
    if found then  
      -- 消耗掉上一次异步连接的结果,否则会报错。  
      return query select a from dblink_get_result(app_conn_name, false) as t(a text);      
      return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
  
      -- 发送异步DBLINK调用      
      perform dblink_send_query(app_conn_name, sql);   
      
    -- 无空闲异步连接  
    else  
      -- 当前已建立的异步连接数  
      select cn into current_conns from   
        dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))  
      as t(cn int);      
      loop  
        -- 达到并行度  
        if current_conns >= parallels then   
          -- 是否有空闲异步连接  
          select application_name into app_conn_name from   
            dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' and state='idle' limit 1 $_$, app_prefix))  
          as t(application_name text);   
          -- 有  
          if found then  
            -- 消耗掉上一次异步连接的结果,否则会报错。  
            return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
            return query select a from dblink_get_result(app_conn_name, false) as t(a text);    
  
            -- 发送异步DBLINK调用      
            perform dblink_send_query(app_conn_name, sql);   
              
            -- 退出循环  
            exit;  
          -- 没有,等  
          else  
            perform pg_sleep(1);  
            raise notice 'current running tasks: %, waiting idle conns.', current_conns;  
          end if;  
        -- 未达到并行度  
        else  
          -- 建立连接  
          perform conn(app_prefix||i,  conn||app_prefix||i);             -- 建立连接。  
  
          -- 发送异步DBLINK调用      
          perform dblink_send_query(app_prefix||i, sql);   
            
          -- 连接suffix序号 递增  
          i := i+1;  
            
          -- 退出循环  
          exit;  
        end if;  
      end loop;  
    end if;  
  end loop;  
  
  loop  
    -- 当前已建立的异步连接数  
    select cn into current_conns from   
      dblink(app_prefix_stat, format($_$ select count(*) from pg_stat_activity where application_name ~ '^%s' and state <> 'idle' $_$, app_prefix))  
    as t(cn int);      
    if current_conns=0 then   
      raise notice 'whole tasks done.';  
      for app_conn_name in 
        select application_name from   
          dblink(app_prefix_stat, format($_$ select application_name from pg_stat_activity where application_name ~ '^%s' $_$, app_prefix))  
        as t(application_name text)
      loop
        return query select a from dblink_get_result(app_conn_name, false) as t(a text);   
      end loop;
      return;  
    else  
      raise notice 'the last % tasks running.', current_conns;  
      perform pg_sleep(1);  
    end if;  
  end loop;  
  
end;  
$$ language plpgsql strict;  

试用

1、运行5条SQL,开2个并行任务

select * from run_sqls_parallel(  
 2, -- 并行度  
 -- 执行如下SQL数组  
 array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)']  
)
as t(a text);   
  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  current running tasks: 2, waiting idle conns.  
NOTICE:  the last 2 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  the last 1 tasks running.  
NOTICE:  whole tasks done.  
 run_sqls_parallel   
-------------------  
   
(1 row)  
  
Time: 30070.275 ms (00:30.070)  

2、运行10个并行任务,跑6条SQL

postgres=# select * from run_sqls_parallel(  
 10, -- 并行度  
 -- 执行如下SQL数组  
 array['select pg_sleep(10)', 'select pg_sleep(10)', 'select pg_sleep(10)', 'select count(*) from pg_class where relname ~ ''t''', 'select pg_sleep(10)', 'select pg_sleep(10)']  
)
as t(a text);   

NOTICE:  the last 6 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  the last 5 tasks running.  
NOTICE:  whole tasks done.  
 run_sqls_parallel   
-------------------  
   
(1 row)  
  
Time: 10050.064 ms (00:10.050)  

完全符合预期。

3、结合前面写的文档,我们如果要创建很多索引,可以使用同样的方法实现并行任务

create table t1(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
create table t2(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
create table t3(id int, c1 int,c2 int, c3 int, c4 int, c5 int,c6 int, c7 int, c8 int);  
do language plpgsql $$    
declare    
  tables name[] := array['t1','t2','t3'];     -- 表名  
  n name;   -- 表名  
  x name;   -- 字段名  
  i int;    -- LOOP值  
  sql text;        
  sqls text[];      
  tbs name := 'tbs1';    -- 索引表空间  
begin    
  set maintenance_work_mem='2GB';    
    
  foreach n in array tables loop    
    i := 1;      
    for x in select attname from pg_attribute where attrelid=n::regclass and attnum>=1 and not attisdropped   
    loop    
      -- 结合自动选择索引接口(btree,hash,gin,gist等)的功能,可以实现更完美的全字段创建索引  
      sql := format('create index IF NOT EXISTS idx_%s__%s on %s (%s) tablespace %s', n, i, n, x, tbs);      -- 封装创建索引的SQL    
      sqls := array_append(sqls, sql);   
      i:=i+1;    
    end loop;    
  end loop;    
  
  perform * from run_sqls_parallel(  
    10,   -- 并行度  
    sqls  -- 执行index SQL数组  
  ) as t(a text);   
  
  foreach n in array tables loop    
    execute format('analyze %s', n);     
  end loop;    
end;    
$$;    

完全符合预期。

小结

本文使用dblink异步调用的功能,增加了一个API函数,可以用于开启N个并行,跑若干条长SQL,例如用来创建索引非常给力。

接口效果:

select * from run_sqls_parallel (  
  参数1:并行度,   
  参数2:要后台并行执行的SQLs(数组呈现)     
)
as t(a text);

参考

《自动选择正确索引访问接口(btree,hash,gin,gist,sp-gist,brin,bitmap...)的方法》

《PostgreSQL 快速给指定表每个字段创建索引》

《阿里云RDS PostgreSQL OSS 外部表实践 - (dblink异步调用封装并行) 从OSS并行导入数据》

《在PostgreSQL中跑后台长任务的方法 - 使用dblink异步接口》

《PostgreSQL AB表切换最佳实践 - 提高切换成功率,杜绝雪崩 - 珍藏级》

相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍如何基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
目录
相关文章
|
5月前
|
SQL Java 关系型数据库
在 RDB 上跑 SQL------SPL 轻量级多源混算实践 1
SPL 支持通过 JDBC 连接 RDB,可动态生成 SQL 并传参,适用于 Java 与 SQL 结合的各类场景。本文以 MySQL 为例,演示如何配置数据库连接、编写 SPL 脚本查询 2024 年订单数据,并支持参数过滤和 SQL 混合计算。脚本可在 IDE 直接执行或集成至 Java 应用调用。
|
8月前
|
SQL 存储 关系型数据库
SQL优化策略与实践:组合索引与最左前缀原则详解
本文介绍了SQL优化的多种方式,包括优化查询语句(避免使用SELECT *、减少数据处理量)、使用索引(创建合适索引类型)、查询缓存、优化表结构、使用存储过程和触发器、批量处理以及分析和监控数据库性能。同时,文章详细讲解了组合索引的概念及其最左前缀原则,即MySQL从索引的最左列开始匹配条件,若跳过最左列,则索引失效。通过示例代码,展示了如何在实际场景中应用这些优化策略,以提高数据库查询效率和系统响应速度。
344 10
|
4月前
|
SQL 关系型数据库 Java
SQL 移植--SPL 轻量级多源混算实践 7
不同数据库的 SQL 语法存在差异,尤其是函数写法不同,导致 SQL 移植困难。SPL 提供 sqltranslate 函数,可将标准 SQL 转换为特定数据库语法,实现 SQL 语句在不同数据库间的无缝迁移,支持多种数据库函数映射与自定义扩展。
|
8月前
|
SQL 安全 关系型数据库
SQL注入之万能密码:原理、实践与防御全解析
本文深入解析了“万能密码”攻击的运行机制及其危险性,通过实例展示了SQL注入的基本原理与变种形式。文章还提供了企业级防御方案,包括参数化查询、输入验证、权限控制及WAF规则配置等深度防御策略。同时,探讨了二阶注入和布尔盲注等新型攻击方式,并给出开发者自查清单。最后强调安全防护需持续改进,无绝对安全,建议使用成熟ORM框架并定期审计。技术内容仅供学习参考,严禁非法用途。
1252 0
|
7月前
|
SQL 存储 大数据
Dataphin V5.0:支持创建异步调用API,实现慢 SQL 复杂计算的直连消费
本文介绍了数据服务产品中异步调用的应用场景与优势,包括大数据引擎查询、复杂SQL及大规模数据下载等场景,解决了同步调用可能导致的资源浪费和性能问题。通过创建异步API、测试发布以及权限申请等功能,实现高效稳定的服务提供。以电商订单查询为例,展示了如何利用异步调用提升系统性能与用户体验。
325 9
|
9月前
|
SQL 分布式计算 资源调度
Dataphin功能Tips系列(48)-如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
如何根据Hive SQL/Spark SQL的任务优先级指定YARN资源队列
370 4
|
10月前
|
SQL 关系型数据库 分布式数据库
利用 PolarDB PG 版向量化引擎,加速复杂 SQL 查询!完成任务领发财新年抱枕!
利用 PolarDB PG 版向量化引擎,加速复杂 SQL 查询!完成任务领发财新年抱枕!
318 14
|
11月前
|
SQL 机器学习/深度学习 运维
SQL优化有绝招,使用DAS提升工作效率!完成任务可领取保暖手套!
数据库自治服务(Database Autonomy Service,简称DAS)是一种基于机器学习和专家经验实现数据库自感知、自修复、自优化、自运维及自安全的云服务。数据库自治服务DAS支持自动SQL优化,相比传统的优化方式,能够自动识别问题SQL,生成索引优化建议。
|
SQL 关系型数据库 MySQL
体验使用DAS实现数据库SQL优化,完成任务可得羊羔绒加厚坐垫!
本实验介绍如何通过数据库自治服务DAS对RDS MySQL高可用实例进行SQL优化,包含购买RDS实例并创建数据库、数据导入、生成并优化慢SQL、执行优化后的SQL语句等实验步骤。完成任务,即可领取羊羔绒加厚坐垫,限量500个,先到先得。
426 19
|
SQL 运维
Doris同一个SQL任务,前一天执行成功,第二天执行失败
Doris 动态分区 插入数据 同样的代码隔天运行一个成功一个失败

相关产品

  • 云原生数据库 PolarDB
  • 云数据库 RDS PostgreSQL 版
  • 推荐镜像

    更多