Morsel-Driven Parallelism: A NUMA-Aware Query Evaluation Framework for the Many-Core Age 论文解读

本文涉及的产品
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS AI 助手,专业版
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: 这篇paper介绍了TUM的内存数据库系统HyPer中使用的,基于小块数据(morsel)来驱动的并行查询执行框架。用来解决many-cores(NUMA)环境下,分析型查询的scalability问题。

这篇paper介绍了TUM的内存数据库系统HyPer中使用的,基于小块数据(morsel)来驱动的并行查询执行框架。用来解决many-cores(NUMA)环境下,分析型查询的scalability问题。

背景

现代cpu的体系架构下,core在不断增多,memory的容量也在持续增大,因此单点的memory controller逐渐出现了性能的瓶颈,为了解决这个问题引入了decentralized的多controller结构,也就是NUMA,但由于跨NUMA socket的非均匀访问特性,导致即使单机内也显现出了集群的特点:数据的访问性能取决于访问线程和目标内存是否存在于同一socker内。

传统的Volcano执行框架采用了一种优美而简单的模型,使得各个算子之间充分解耦,算子完全不感知并行执行的存在,在planning期间会决定计划的并行度,并为其分配固定的线程数,各个算子的执行和并行无关,只有exchange operator负责并行的数据交互,各个pipeline之间,并不共享数据,而是分发数据。

而HyPer思路则完全不同,使用了一种自适应的数据驱动的调度执行方案:系统使用固定数量的线程池,各个query的执行过程中的数据,被切分为细粒度的单元(morsel),然后结合对其进行处理的operator pipeline,封装为task,交给worker thread pool去执行。

通过动态的调度策略,尽量保证数据的本地化,这样每个query的并行度是可以动态调整的,所有worker执行所有query的tasks,使吞吐量最大化,并且结合调度策略可以实现load banlance,没有动态的thread创建/销毁,可以做到NUMA-aware,也就是尽可能倾向于NUMA-local的执行,worker尽可能只处理本socket内的局部数据,除非其发生空闲,从而最小化跨socket的数据访问。

总体架构

从全局角度,存在一个dispatcher来保存各个query传递过来的任务(pipeline job),每个任务是一个query的一个subplan,对应着底层要被处理的数据(在各个NUMA node memory中),有一个全局的线程池,主动向dispatcher请求task来执行。

dispatcher根据调度策略,分配给其某个pipeline job中的NUMA-local的(morsel数据+job code),整体作为一个task,交给thread执行,thread执行完成后再继续请求任务,此外如果完成了一个pipeline job后,中间结果会进行完全的物化,来等待下一个pipeline job的处理,而下一个job在处理这些输入数据时仍然会做全局的“逻辑上”动态的morsel切分,保证数据仍然均匀被处理。

image.png

上图中不同颜色代表不同的NUMA-node内的cores和memory,一个thread请求task时,尽量分配给其local morsel的task,当不再有task可执行时,就从其他node中steal一个继续处理,避免idle worker thread,保证负载均衡。

对于一个pipeline job,当有一组threads去执行它时,这种细粒度的morsel,可以使各个worker争抢任务,类似PolarDB中parallel scan的机制,让workload均衡,减少data skew的影响。

对于任一个query,其执行过程中就可以基于调度策略动态调整dop,而且由于morsel很小,这种调整也会比较平滑。

从整体来看,所有query交错的执行,保证了整体的吞吐量,而且可以集成不同的调度策略(基于优先级..)。

Dispatcher

dispatcher本身是一个逻辑的概念,并不存在一个物理线程,本质就是一个全局的pipeline job队列(无锁),当worker向其要task时,根据其所在NUMA node,分配对应node的job,并动态”切分“出一个本地的morsel数据,封装为task交给worker执行。

dispatcher并没有一个独立的全局线程,因为可能成为contention点,还会占用其他worker thread的执行资源,而是把调度逻辑实现在了wokrer thread的请求处理中。

Worker thread

Worker thread在系统初始化时就创建了固定的数量,不会再修改,每个worker thread会绑定特定的core,worker thread拿到一个task后(多数是local morsel,steal时是remote morsel)开始处理,将结果物化到local storage area中,或者是写入全局的share structure中(hash table…),worker的调度时机就是在一个morsel处理完成时。

全局shared data structure是以interleaved方式分配在多个socket之间,由kernel来控制具体分配策略。

单个query

每个query在由query compiler将plan拆分为若干了plan segment,每个subplan是最大化的pipeline operators集合,直到遇到pipeline breaker,然后将各个pipeline operators进行CodeGen,生成machine code来保证高效执行。

每个query会生成一个QEPobject,负责管理pipeline segment之间的依赖关系,并将segment转换为pipeline job,在一个pipeline job执行完成时,会触发QEPobject的被动状态机,选择下一个可以执行的pipeline job,传递给dispatcher,也就是挂到全局的job队列中。

每个pipeline输入是一个单元的morsel数据,输出的结果也要物化在NUMA本地的存储area中,由后续pipeline处理。

并行化算子

除了这种全新的调度执行框架,HyPer中还实现了各个算子的高效并行化。

hash join

Hash join是多个线程并行build,并写入到一个全局shared hash table中,这个过程分为2个步骤。

  1. 各个线程读取numa-local数据并进行filtering,结果物化到本地的临时存储中
  2. 第1步完成后整个hash table的大小可以精确得知,因此可以采用更加高效的lock-free hash table结构,避免了动态grow的开销,然后各个thread并发的把每个record的pointer,插入到hash table中

lock-free hash table 使用了一种early filtering的优化,也就是在每个bucket list的header ptr中,加入一个tag,通过查看这个tag就可以知道是否有能命中的data item(具体原理没细讲,但应该类似bloom filter)

image.png

这个tag和list pointer集成到一个64bit的单元中,这样每次插入元素时,对header pointer+tag的修改,就可以通过一个CAS操作原子性的完成,如果失败了则重试即可,实现了高效的并发insert,算法如下:

image.png

完成build后,probe就可以流水线的进行了,而且可以跨越多个hash table,如下图中,R的一条记录可以先probe S,再probe T从而完成join。

image.png

对于hash table/storage area,HyPer使用了large page(2MB)的方式,这样可以减小page table大小,尽量放入cache中,此外由于page少了page fault也会大大减少,减少TLB miss的次数。具体的分配使用了mmap,由kernel来管理内存策略。

table scan

为了实现morsel-driven,需要将data 均匀分布在各个NUMA node中,最简单的就是round-robin。也可以根据query情况,按照join列进行hash分布,这样实现co-located join,尽量让数据保持本地化处理。(当然还是有可能有steal的)

Note: 即使没有hash分布,也不影响morsel-driven这种NUMA-aware的特性,本地的数据仍然是尽量在本地处理的。

group by + aggregation

分组聚集总体上采用了local aggregation -> repartition -> parallel aggregation的策略。

image.png

每个worker thread先做numa-local的局部聚集,然后将结果放入一个本地的partition结构中,后续会把这些paritition进行分发,各个worker thread接收到对应的partitions,做二次的hash聚集。

Sort

对于main-memory数据库系统,研究 表明hash的执行方式比sort会更加高效,因此HyPer中只有Order by/TopN这样的算子使用了排序操作。

其策略是 local order by -> 并行mergesort。

首先各个worker thread 在本地进行order by或者维护一个top k的heap,然后在本地排序后,根据等距key的原则记录一些local separators,然后将所有local separators汇总+排序,并计算出global separators(具体算法没细讲。。)

基于global separators,各个local run可以找到对应的点,然后各自分发到目标thread,做并行的merge sort。

image.png

总结

HyPer的性能数据是非常亮眼的:

image.png

对于TPC-H 100G,没有任何索引(只有主键)的情况下,所有查询都在3s以内执行完成,是当时最快的single-server执行结果。

总结下其基本思路,这些是其他database系统也可以借鉴的:

  • fine-grained 动态调度
  • NUMA-aware尽量保证本地化处理
  • 线程间的高效synchronization
  • full operator parallelism,各个算子都有不错的并行执行策略
目录
相关文章
|
NoSQL Linux Apache
brpc最新安装上手指南
brpc最新安装上手指南
1439 1
brpc最新安装上手指南
|
SQL 存储 缓存
OceanBase查询优化器
本文整理自OceanBase团队高级技术专家王国平,在深入浅出 OceanBase线上技术沙龙第二期的分享。
OceanBase查询优化器
|
SQL 存储 调度
从 Volcano 火山模型到 Pipeline 执行模型,阿里云数据库 SelectDB 内核 Apache Doris 执行模型的迭代
一个合适的执行模型对于提高查询效率和系统性能至关重要。本文全面剖析 Apache Doris Pipeline 执行模型的设计与改造历程,并在 2.1 版本对并发执行模式与调度模式进一步优化,解决了执行并发受限、执行及调度开销大等问题。
63931 3
从 Volcano 火山模型到 Pipeline 执行模型,阿里云数据库 SelectDB 内核 Apache Doris 执行模型的迭代
|
关系型数据库 开发工具 git
Greenplum ORCA 优化器的编译安装与使用
背景 ORCA 是PostgreSQL的下一代优化器,在QUERY的优化上比自带的优化器有长足的进步。 安装ORCA cmake wget https://cmake.org/files/v3.5/cmake-3.5.2.tar.gz tar -zxvf cmake-3.5.2.tar
11167 0
|
9月前
|
存储 BI Shell
Doris基础-架构、数据模型、数据划分
Apache Doris 是一款高性能、实时分析型数据库,基于MPP架构,支持高并发查询与复杂分析。其前身是百度的Palo项目,现为Apache顶级项目。Doris适用于报表分析、数据仓库构建、日志检索等场景,具备存算一体与存算分离两种架构,灵活适应不同业务需求。它提供主键、明细和聚合三种数据模型,便于高效处理更新、存储与统计汇总操作,广泛应用于大数据分析领域。
894 2
|
存储 关系型数据库 分布式数据库
|
7月前
|
存储 人工智能 关系型数据库
阿里云AnalyticDB for PostgreSQL 入选VLDB 2025:统一架构破局HTAP,Beam+Laser引擎赋能Data+AI融合新范式
在数据驱动与人工智能深度融合的时代,企业对数据仓库的需求早已超越“查得快”这一基础能力。面对传统数仓挑战,阿里云瑶池数据库AnalyticDB for PostgreSQL(简称ADB-PG)创新性地构建了统一架构下的Shared-Nothing与Shared-Storage双模融合体系,并自主研发Beam混合存储引擎与Laser向量化执行引擎,全面解决HTAP场景下性能、弹性、成本与实时性的矛盾。 近日,相关研究成果发表于在英国伦敦召开的数据库领域顶级会议 VLDB 2025,标志着中国自研云数仓技术再次登上国际舞台。
750 1
|
存储 SQL 人工智能
Hologres 4.0全新发布:AI时代的一站式多模态分析平台
2025年云栖大会,Hologres发布全新4.0版本升级,以“AI时代的一站式多模态分析平台”为核心理念,全面展示了Hologres在结构化、半结构化与非结构化数据分析能力上的重大突破,特别是在OLAP分析、点查、向量检索、全文检索、湖仓协同及AI Function集成等方面的领先优势,刷新ClickBench、JSONBench、VectorDBBench等多项榜单,登顶第一。
|
12月前
|
存储 运维 Serverless
千万级数据秒级响应!碧桂园基于 EMR Serverless StarRocks 升级存算分离架构实践
碧桂园服务通过引入 EMR Serverless StarRocks 存算分离架构,解决了海量数据处理中的资源利用率低、并发能力不足等问题,显著降低了硬件和运维成本。实时查询性能提升8倍,查询出错率减少30倍,集群数据 SLA 达99.99%。此次技术升级不仅优化了用户体验,还结合AI打造了“一看”和“—问”智能场景助力精准决策与风险预测。
1083 69