HybridDB PostgreSQL "Sort、Group、distinct 聚合、JOIN" 不惧怕数据倾斜的黑科技和原理 - 多阶段聚合

RDS PostgreSQL Serverless,0.5-4RCU 50GB 3个月
RDS SQL Server Serverless,2-4RCU 50GB 3个月
云原生数据库 PolarDB 分布式版,标准版 2核8GB


PostgreSQL , Greenplum , JOIN , group by , distinct , 聚合 , 非分布键 , 数据倾斜 , 多阶段聚合







JOIN,排序,group by,distinct。



3、group by非分布键字段




( HybridDB for PostgreSQL基于GPDB开源版本改进而来,已包含这个功能。 )

非分布键 JOIN,排序,group by,distinct

1、非分布键 group by


tbl_ao_col表是c1的分布键,但是我们group by使用了c398字段,因此看看它是怎么做的呢?请看执行计划的解释。

postgres=# explain analyze select c398,count(*),sum(c399),avg(c399),min(c399),max(c399) from tbl_ao_col group by c398;      
                                                                       QUERY PLAN                                                                             
 Gather Motion 48:1  (slice2; segments: 48)  (cost=123364.18..123582.28 rows=9693 width=96)      
 // 返回结果    
   Rows out:  10001 rows at destination with 120 ms to end, start offset by 1.921 ms.      
   ->  HashAggregate  (cost=123364.18..123582.28 rows=202 width=96)      
   // 重分布后再次聚合。    
         Group By: tbl_ao_col.c398      
         Rows out:  Avg 208.4 rows x 48 workers.  Max 223 rows (seg17) with 0.001 ms to first row, 54 ms to end, start offset by 35 ms.      
         ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=122928.00..123121.86 rows=202 width=96)      
         // 第一次聚合后,记录数以及降低到了几千行,因此重分布后即使出现倾斜,关系也不大。    
               Hash Key: tbl_ao_col.c398      
               Rows out:  Avg 8762.2 rows x 48 workers at destination.  Max 9422 rows (seg46) with 31 ms to end, start offset by 63 ms.      
               ->  HashAggregate  (cost=122928.00..122928.00 rows=202 width=96)      
               // 这一步是在segment节点聚合    
                     Group By: tbl_ao_col.c398      
                     Rows out:  Avg 8762.2 rows x 48 workers.  Max 8835 rows (seg2) with 0.004 ms to first row, 8.004 ms to end, start offset by 82 ms.      
                     ->  Append-only Columnar Scan on tbl_ao_col  (cost=0.00..107928.00 rows=20834 width=16)      
                           Rows out:  0 rows (seg0) with 28 ms to end, start offset by 64 ms.      
 Slice statistics:      
   (slice0)    Executor memory: 377K bytes.      
   (slice1)    Executor memory: 1272K bytes avg x 48 workers, 1272K bytes max (seg0).      
   (slice2)    Executor memory: 414K bytes avg x 48 workers, 414K bytes max (seg0).      
 Statement statistics:      
   Memory used: 128000K bytes      
 Settings:  optimizer=off      
 Optimizer status: legacy query optimizer      
 Total runtime: 122.173 ms      
(22 rows)      


非分布键 GROUP BY,首先会在本地节点group by,然后按GROUP BY字段进行数据重分布,然后再在本地节点GROUP BY,最后返回GROUP BY结果给master节点,返回给用户。

Greenplum会根据group by的字段的distinct值的比例,考虑是直接重分布数据,还是先在本地聚合后再重分布数据(减少重分布的数据量)。

2、非分布键 distinct


tbl 为 随机分布

postgres=# explain analyze select count(distinct c2) from tbl;  
                                                                         QUERY PLAN                                                                           
 Aggregate  (cost=1549462.55..1549462.56 rows=1 width=8)  
   Rows out:  1 rows with 0.002 ms to first row, 0.645 ms to end, start offset by 1.681 ms.  
   ->  Gather Motion 48:1  (slice2; segments: 48)  (cost=1548947.03..1549450.04 rows=1001 width=4)  
         Rows out:  1001 rows at destination with 498 ms to end, start offset by 1.684 ms.  
         ->  HashAggregate  (cost=1548947.03..1548959.55 rows=21 width=4)  
               Group By: tbl.c2  
               Rows out:  Avg 20.9 rows x 48 workers.  Max 31 rows (seg17) with 0.002 ms to first row, 152 ms to end, start offset by 39 ms.  
               ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=1548912.00..1548932.02 rows=21 width=4)  
                     Hash Key: tbl.c2  
                     Rows out:  Avg 1001.0 rows x 48 workers at destination.  Max 1488 rows (seg17) with 309 ms to end, start offset by 39 ms.  
                     ->  HashAggregate  (cost=1548912.00..1548912.00 rows=21 width=4)  
                           Group By: tbl.c2  
                           Rows out:  Avg 1001.0 rows x 48 workers.  Max 1001 rows (seg0) with 0.006 ms to first row, 271 ms to end, start offset by 42 ms.  
                           ->  Append-only Columnar Scan on tbl  (cost=0.00..1048912.00 rows=2083334 width=4)  
                                 Rows out:  0 rows (seg0) with 25 ms to end, start offset by 42 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 327K bytes.  
   (slice1)    Executor memory: 764K bytes avg x 48 workers, 764K bytes max (seg0).  
   (slice2)    Executor memory: 292K bytes avg x 48 workers, 292K bytes max (seg0).  
 Statement statistics:  
   Memory used: 128000K bytes  
 Settings:  enable_bitmapscan=off; enable_seqscan=off; optimizer=off  
 Optimizer status: legacy query optimizer  
 Total runtime: 502.576 ms  
(24 rows)  


非分布键 求distinct,首先会在本地节点hash 聚合,然后按distinct字段进行数据重分布,然后再在本地节点hash 聚合,最后返回结果给master节点,返回给用户。


3、非分布键 distinct + 非分布键 group by

tbl 为 随机分布

postgres=# explain analyze select count(distinct c2) from tbl group by c3;  
                                                                           QUERY PLAN                                                                              
 Gather Motion 48:1  (slice2; segments: 48)  (cost=1805483.56..1805484.83 rows=101 width=12)  
   Rows out:  101 rows at destination with 990 ms to end, start offset by 519 ms.  
   ->  HashAggregate  (cost=1805483.56..1805484.83 rows=3 width=12)  
         Group By: partial_aggregation.c3  
         Rows out:  Avg 2.5 rows x 41 workers.  Max 4 rows (seg9) with 0.005 ms to first row, 0.284 ms to end, start offset by 577 ms.  
         ->  HashAggregate  (cost=1802703.29..1803967.05 rows=2107 width=8)  
               Group By: tbl.c3, tbl.c2  
               Rows out:  Avg 2465.9 rows x 41 workers.  Max 4004 rows (seg9) with 0.001 ms to first row, 260 ms to end, start offset by 577 ms.  
               ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=1798912.00..1800934.02 rows=2107 width=8)  
                     Hash Key: tbl.c3  
                     Rows out:  Avg 118362.0 rows x 41 workers at destination.  Max 192192 rows (seg9) with 663 ms to end, start offset by 577 ms.  
                     ->  HashAggregate  (cost=1798912.00..1798912.00 rows=2107 width=8)  
                           Group By: tbl.c3, tbl.c2  
                           Rows out:  Avg 101100.9 rows x 48 workers.  Max 101101 rows (seg0) with 0.005 ms to first row, 747 ms to end, start offset by 562 ms.  
                           ->  Append-only Columnar Scan on tbl  (cost=0.00..1048912.00 rows=2083334 width=8)  
                                 Rows out:  0 rows (seg0) with 40 ms to end, start offset by 560 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 327K bytes.  
   (slice1)    Executor memory: 1117K bytes avg x 48 workers, 1117K bytes max (seg0).  
   (slice2)    Executor memory: 435K bytes avg x 48 workers, 452K bytes max (seg0).  
 Statement statistics:  
   Memory used: 128000K bytes  
 Settings:  enable_bitmapscan=off; enable_seqscan=off; optimizer=off  
 Optimizer status: legacy query optimizer  
 Total runtime: 1511.120 ms  
(25 rows)  

distinct和group by都是非分布键,Greenplum分布式执行计划优雅的解决了非分布键group by与distinct数据重分布带来的网络传输的问题。

4、非分布键 join





postgres=# explain analyze select a.c1,count(*) from a join b on (a.id=b.id) group by a.c1;  
                                                                                          QUERY PLAN                                                                                             
 Gather Motion 48:1  (slice3; segments: 48)  (cost=0.00..2730.45 rows=1 width=12)  
   Rows out:  1 rows at destination with 7190 ms to end, start offset by 2.357 ms.  
   ->  GroupAggregate  (cost=0.00..2730.45 rows=1 width=12)  
         Group By: a.c1  
         Rows out:  1 rows (seg22) with 0.001 ms to first row, 0.320 ms to end, start offset by 54 ms.  
         ->  Sort  (cost=0.00..2730.44 rows=1 width=12)  
               Sort Key: a.c1  
               Rows out:  1 rows (seg22) with 0.001 ms to end, start offset by 54 ms.  
               Executor memory:  33K bytes avg, 33K bytes max (seg0).  
               Work_mem used:  33K bytes avg, 33K bytes max (seg0). Workfile: (0 spilling, 0 reused)  
               ->  Redistribute Motion 48:48  (slice2; segments: 48)  (cost=0.00..2730.44 rows=1 width=12)  
                     Hash Key: a.c1  
                     Rows out:  1 rows at destination (seg22) with 7138 ms to end, start offset by 54 ms.  
                     ->  Result  (cost=0.00..2730.44 rows=1 width=12)  
                           Rows out:  1 rows (seg42) with 0.003 ms to end, start offset by 77 ms.  
                           ->  GroupAggregate  (cost=0.00..2730.44 rows=1 width=12)  
                                 Group By: a.c1  
                                 Rows out:  1 rows (seg42) with 0.002 ms to first row, 1054 ms to end, start offset by 77 ms.  
                                 ->  Sort  (cost=0.00..2730.44 rows=1 width=4)  
                                       Sort Key: a.c1  
                                       Rows out:  10000000 rows (seg42) with 0.003 ms to end, start offset by 77 ms.  
                                       Executor memory:  1400K bytes avg, 65676K bytes max (seg42).  
                                       Work_mem used:  1400K bytes avg, 65676K bytes max (seg42). Workfile: (1 spilling, 0 reused)  
                                       Work_mem wanted: 481337K bytes avg, 481337K bytes max (seg42) to lessen workfile I/O affecting 1 workers.  
                                       ->  Hash Join  (cost=0.00..2730.44 rows=1 width=4)  
                                             Hash Cond: b.id = a.id  
                                             Rows out:  10000000 rows (seg42) with 0.014 ms to first row, 4989 ms to end, start offset by 77 ms.  
                                             Executor memory:  6511K bytes avg, 6513K bytes max (seg18).  
                                             Work_mem used:  6511K bytes avg, 6513K bytes max (seg18). Workfile: (0 spilling, 0 reused)  
                                             ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..471.72 rows=208130 width=4)  
                                                   Hash Key: b.id  
                                                   Rows out:  10000000 rows at destination (seg42) with 0.004 ms to end, start offset by 77 ms.  
                                                   ->  Table Scan on b  (cost=0.00..436.27 rows=208130 width=4)  
                                                         Rows out:  Avg 208333.3 rows x 48 workers.  Max 208430 rows (seg17) with 4.815 ms to first row, 824 ms to end, start offset by 92 ms.  
                                             ->  Hash  (cost=436.27..436.27 rows=208475 width=8)  
                                                   Rows in:  (No row requested) 0 rows (seg0) with 0 ms to end.  
                                                   ->  Table Scan on a  (cost=0.00..436.27 rows=208475 width=8)  
                                                         Rows out:  Avg 208333.3 rows x 48 workers.  Max 208401 rows (seg18) with 34 ms to first row, 46 ms to end, start offset by 63 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 330K bytes.  
   (slice1)    Executor memory: 1129K bytes avg x 48 workers, 1129K bytes max (seg0).  
   (slice2)  * Executor memory: 2139K bytes avg x 48 workers, 66504K bytes max (seg42).  Work_mem: 65676K bytes max, 481337K bytes wanted.  
   (slice3)    Executor memory: 372K bytes avg x 48 workers, 388K bytes max (seg22).  Work_mem: 33K bytes max.  
 Statement statistics:  
   Memory used: 128000K bytes  
   Memory wanted: 1444908K bytes  
 Settings:  enable_bitmapscan=on; enable_seqscan=on; optimizer=on  
 Optimizer status: PQO version 1.602  
 Total runtime: 7193.902 ms  
(49 rows)  



postgres=# explain analyze select a.c1,count(*) from a join b on (a.id=b.id) group by a.c1;  
                                                                                               QUERY PLAN                                                                                                  
 Gather Motion 48:1  (slice4; segments: 48)  (cost=0.00..990.85 rows=101 width=12)  
   Rows out:  101 rows at destination with 752 ms to first row, 753 ms to end, start offset by 732 ms.  
   ->  GroupAggregate  (cost=0.00..990.85 rows=3 width=12)  
         Group By: a.c1  
         Rows out:  Avg 2.5 rows x 41 workers.  Max 4 rows (seg9) with 746 ms to end, start offset by 738 ms.  
         ->  Sort  (cost=0.00..990.85 rows=3 width=12)  
               Sort Key: a.c1  
               Rows out:  Avg 118.2 rows x 41 workers.  Max 192 rows (seg9) with 746 ms to end, start offset by 738 ms.  
               Executor memory:  58K bytes avg, 58K bytes max (seg0).  
               Work_mem used:  58K bytes avg, 58K bytes max (seg0). Workfile: (0 spilling, 0 reused)  
               ->  Redistribute Motion 48:48  (slice3; segments: 48)  (cost=0.00..990.85 rows=3 width=12)  
                     Hash Key: a.c1  
                     Rows out:  Avg 118.2 rows x 41 workers at destination.  Max 192 rows (seg9) with 594 ms to first row, 746 ms to end, start offset by 738 ms.  
                     ->  Result  (cost=0.00..990.85 rows=3 width=12)  
                           Rows out:  Avg 101.0 rows x 48 workers.  Max 101 rows (seg0) with 675 ms to first row, 676 ms to end, start offset by 740 ms.  
                           ->  HashAggregate  (cost=0.00..990.85 rows=3 width=12)  
                                 Group By: a.c1  
                                 Rows out:  Avg 101.0 rows x 48 workers.  Max 101 rows (seg0) with 675 ms to first row, 676 ms to end, start offset by 740 ms.  
                                 Executor memory:  4185K bytes avg, 4185K bytes max (seg0).  
                                 ->  Hash Join  (cost=0.00..964.88 rows=208191 width=4)  
                                       Hash Cond: a.id = b.id  
                                       Rows out:  Avg 208333.3 rows x 48 workers.  Max 208401 rows (seg18) with 282 ms to first row, 661 ms to end, start offset by 767 ms.  
                                       Executor memory:  4883K bytes avg, 4885K bytes max (seg18).  
                                       Work_mem used:  4883K bytes avg, 4885K bytes max (seg18). Workfile: (0 spilling, 0 reused)  
                                       (seg18)  Hash chain length 1.3 avg, 4 max, using 159471 of 262151 buckets.  
                                       ->  Redistribute Motion 48:48  (slice1; segments: 48)  (cost=0.00..444.59 rows=208378 width=8)  
                                             Hash Key: a.id  
                                             Rows out:  Avg 208333.3 rows x 48 workers at destination.  Max 208401 rows (seg18) with 0.112 ms to first row, 104 ms to end, start offset by 1048 ms.  
                                             ->  Table Scan on a  (cost=0.00..436.27 rows=208378 width=8)  
                                                   Rows out:  Avg 208333.3 rows x 48 workers.  Max 208422 rows (seg31) with 0.117 ms to first row, 64 ms to end, start offset by 749 ms.  
                                       ->  Hash  (cost=440.42..440.42 rows=208191 width=4)  
                                             Rows in:  Avg 208333.3 rows x 48 workers.  Max 208401 rows (seg18) with 250 ms to end, start offset by 798 ms.  
                                             ->  Redistribute Motion 48:48  (slice2; segments: 48)  (cost=0.00..440.42 rows=208191 width=4)  
                                                   Hash Key: b.id  
                                                   Rows out:  Avg 208333.3 rows x 48 workers at destination.  Max 208401 rows (seg18) with 0.219 ms to first row, 132 ms to end, start offset by 798 ms.  
                                                   ->  Table Scan on b  (cost=0.00..436.27 rows=208191 width=4)  
                                                         Rows out:  Avg 208333.3 rows x 48 workers.  Max 208388 rows (seg3) with 0.146 ms to first row, 77 ms to end, start offset by 760 ms.  
 Slice statistics:  
   (slice0)    Executor memory: 313K bytes.  
   (slice1)    Executor memory: 1096K bytes avg x 48 workers, 1096K bytes max (seg0).  
   (slice2)    Executor memory: 1096K bytes avg x 48 workers, 1096K bytes max (seg0).  
   (slice3)    Executor memory: 25518K bytes avg x 48 workers, 25518K bytes max (seg0).  Work_mem: 4885K bytes max.  
   (slice4)    Executor memory: 374K bytes avg x 48 workers, 382K bytes max (seg0).  Work_mem: 58K bytes max.  
 Statement statistics:  
   Memory used: 128000K bytes  
 Settings:  enable_bitmapscan=on; enable_seqscan=on; optimizer=on  
 Optimizer status: PQO version 1.602  
 Total runtime: 1486.335 ms  
(48 rows)  

非分布键 排序

1、merge sort


Greenplum使用了merge sort,首先在数据节点本地排序(所有节点并行),然后master节点向segment请求数据,在master节点merge sort合并。


非分布键 group by 和 distinct 的原理





第四阶段,返回结果给master,有必要的话master节点调用聚合函数的final func(已经是很少的记录数和运算量)。

非分布键 JOIN 的原理




《Greenplum 行存、列存,堆表、AO表的原理和选择》

《分布式DB(Greenplum)中数据倾斜的原因和解法 - 阿里云HybridDB for PostgreSQL最佳实践》


《日增量万亿+级 实时分析、数据规整 - 阿里云HybridDB for PostgreSQL最佳实践》

阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
JSON 关系型数据库 PostgreSQL
PostgreSQL 9种索引的原理和应用场景
PostgreSQL 支持九种主要索引类型,包括 B-Tree、Hash、GiST、SP-GiST、GIN、BRIN、Bitmap、Partial 和 Unique 索引。每种索引适用于不同场景,如 B-Tree 适合范围查询和排序,Hash 仅用于等值查询,GiST 支持全文搜索和几何数据查询,GIN 适用于多值列和 JSON 数据,BRIN 适合非常大的表,Bitmap 适用于低基数列,Partial 只对部分数据创建索引,Unique 确保列值唯一。
关系型数据库 数据库 PostgreSQL
深入理解 PostgreSQL 的 JOIN 连接
深入理解 PostgreSQL 的 JOIN 连接
204 4
关系型数据库 数据管理 Go
225 0
关系型数据库 大数据 PostgreSQL
161 0
缓存 运维 关系型数据库
PostgreSQL技术大讲堂 - 第43讲:流复制原理
PostgreSQL技术大讲堂 - 第43讲:流复制原理
307 2
存储 关系型数据库 Go
795 0
存储 关系型数据库 数据库
深入理解 PostgreSQL 的架构和内部工作原理
深入理解 PostgreSQL 的架构和内部工作原理
713 0
存储 关系型数据库 数据库
文章来自: 朱贤文 | 成都文武信息技术有限公司 分析
357 1
存储 对象存储 块存储
SQL 存储 Cloud Native
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB PostgreSQL版解析与实践(上)——二、产品架构及原理
《阿里云认证的解析与实战-数据仓库ACP认证》——云原生数据仓库AnalyticDB PostgreSQL版解析与实践(上)——二、产品架构及原理


  • 云原生数据库 PolarDB