PolarDB-X AP 引擎与 ClickBench 数据集
PolarDB-X作为一款云原生分布式数据库,具有在线事务及分析的处理能力(HTAP)、计算存储分离、全局二级索引等重要特性。在 HTAP 方面,PolarDB-X 实现了向量化执行引擎,列存多级查询缓存的能力,在先前 TPC-H 的报告中已经显示出其查询速度处于业内领先水平(参考:PolarDB-X TPC-H 100G 白皮书)。
ClickBench 是由 Clickhouse. Inc 推出的一款用于测试 OLAP 系统在大宽表场景下性能表现的测试集。数据规模为单表 70G,共105列,数据行数为 99997497(约1亿行),具有 43 条查询 SQL,由于 ClickBench 测试集的 SQL 语句涵盖了大宽表场景下的各类场景,因此近年来国内外诸多 OLAP 厂商都竞相参与打榜。
本文讲述了 PolarDB-X 在 ClickBench 测试集上的探索与实践,包括了通过增加优化器规则来对 SQL 进行改写以实现更优的执行计划,执行器层面通过 HashSet 来优化 DISTINCT 算子,同时实现了自适应的两阶段 AGG 以根据 workload 自动决定是否进行预聚合。由于 ClickBench 十分考验底层的实现细节,PolarDB-X 还对 NoGroupBy 的 Agg 函数以及 Group By Long, Long 的场景进行了单独的优化。
经过优化,PolarDB-X 在大宽表场景下的性能表现已经到达第一梯队,具体性能细节可参考 ClickBench 性能白皮书。
在本文的最后部分,我们对 ClickBench 测试集的瓶颈点进行了分析,并且对 Swiss Hash Table 以及 HyperScan 两种优化方向进行了讨论,相信未来 PolarDB-X 的成绩会得到更进一步的提升。
优化器规则
Group By 列消除
对于 Q36 来说,虽然 GROUP BY 的列数有 4 列,但是他们都是 ClientIP 与某个常量相减的形式,考虑到如果确定了 ClientIP,那么 ClientIP, ClientIP - 1, ClientIP - 2, ClientIP - 3 的值也会被唯一确定,因此在 GROUP BY 运算中 groupId 一定相同,因此我们可以将聚合列裁剪为 GROUP BY ClientIP 来省去额外的计算。
对于 Q35 也是同样的逻辑,GROUP BY 1, URL 指向的第一列为常量 1,因此将其改写为 GROUP BY URL 并不会影响最终结果。
SELECT 1, URL, COUNT(*) AS c FROM hits GROUP BY 1, URL ORDER BY c DESC LIMIT 10;
PolarDB-X 的优化器基于 Cascade 模型实现,得益于其强大的Rule扩展性,在优化器中添加一个 RBO 规则是十分便捷的。以 Q36 为例,简化后的逻辑如下所示
多 AGG Call 的常量提取
在 ClickBench 中,有部分 SQL 语句可以通过优化器 RBO 规则进行改写而不改变原有的语意,例如Q30 需要计算 90 次 ResolutionWidth 与某一个常量的和,这在执行器需要保存 90 列数据,但如果我们使用分配律进行简单的拆解就可以将 SQL 改为如下形式
# Q30 优化前 SELECT SUM(ResolutionWidth), SUM(ResolutionWidth + 1), SUM(ResolutionWidth + 2), SUM(ResolutionWidth + 3), SUM(ResolutionWidth + 4), ..., SUM(ResolutionWidth + 89) FROM hits; # Q30 优化后 SELECT SUM(ResolutionWidth), SUM(ResolutionWidth) + 1 * count(*), SUM(ResolutionWidth) + 2 * count(*), SUM(ResolutionWidth) + 3 * count(*), SUM(ResolutionWidth) + 4 * count(*), ..., UM(ResolutionWidth) + 89 * count(*) FROM hits;
这样执行器就可以只计算 SUM(ResolutionWidth)列和 count(*)列的结果。
执行器优化
DISTINCT 优化
在 ClickBench 中有大量的 COUNT(DISTINCT())查询语句,例如 Q5
SELECT COUNT(DISTINCT UserID) FROM hits;
PolarDB-X 对于 DISTINCT 的处理是将 groupID 与 DISTINCT 的 column 一起放入 Hash 表中去重。
- 对于NoGroupBy的SQL,例如SELECT COUNT(DISTINCT(UserID)) FROM hits,输入 DISTINCT MAP 的是UserID这一列
- 对于含有GroupBy的SQL,例如SELECT COUNT(DISTINCT(UserID)) FROM hits GROUP By RegionID,输入 DISTINCT MAP 的是GroupId, UserID两列
在 PolarDB-X 的原有实现中,DISTINCT 与 GROUP BY 语句使用了相同的 aggOpenHashMap 来实现。
然而由于 DISTINCT 的 HashMap 只需要检查是否重复,而不需要记录 groupID 的信息,因此我们为 DISTINCT 设计了独立的 HashSet,即在HashMap中,需要 key 和 value 两个数组, key维护了hash -> groupByKey的映射,value维护了hash -> groupId的映射。而在 HashSet 中只需要维护 key 数组即可。
同时由于HashSet只需要判断每一行是否是distinct,因此不需要计算groupId相关的信息,可以省去相关运算 (下图中左侧为 HashSet 的代码示例,右侧是 HashTable 的代码示例)。
以 ClickBench Q5 SELECT COUNT(DISTINCT UserID) FROM hits;为例,使用 HashSet 相比 HashMap 获得了 20%的性能提升。
自适应两阶段 AGG
两阶段 AGG 对于 ClickBench 中的绝大多数 SQL 都有加速效果,例如 Q10 中的 GROUP BY RegionID,这是因为 RegionID 这一列的 cardinality 只有 15w,因此通过预聚合可以大幅度减少 Shuffle 的数据量。
但是对于 Q32 和 Q33,其需要对 Long 类型的 WatchID 和 Int 类型的 ClientIP 做 GroupBy,这两列的组合基数接近于表的大小,此时如果执行预聚合不但需要承担 hash 表的开销,同时网络 shuffle 的数据量也并没有减少,在我们的测试中 Q33 两阶段 agg 比一阶段 agg 的速度下降了两倍。
在 PolarDB-X 原有的实现中,若开启 ENABLE_PREFER_AGG,则优化器会根据表的 cardinality 统计信息来决定是否启用两阶段 agg,然而统计信息的估计往往是不准确的,尤其是对于多列的 cardinality 估计与事实偏差可能达到一到两个数量级。
因此我们将自适应两阶段 AGG 的逻辑在执行器层面进行了实现,具体来说有如下几步改造
- 将 HashAgg 拆为一阶段的 PreHashAgg 与第二阶段的 FinalHashAgg
- PreHashAgg 改为流式执行,即每当预聚合出一定数量的 Chunk 就会 buildHashTable 并通知上游 Driver 来拉取数据。FinalHashAgg 仍保持阻塞执行(即将全部数据都写入 Hash 表才会 BuildHashTable)。
- PreHashAgg 在消费 Chunk 的同时记录 HashTable 的大小,以及前 n 批数据的 cardinality。若当前 HashTable 的大小超过了 L3 cache(此时再进行预聚合性能会严重下降),若 n 批数据的 cardinality 都满足 则PreHashAgg 回退为 PlainHashAgg 算子。
- PlainHashAgg 算子只会对输入的 Chunk 进行简单的 aggregator 列初始化,而不需要写入 hashTable,当完成 aggregator 初始化时其会将 Chunk 放入 ResultIterator 中等待上游 Driver 获取结果。
在我们的测试中,自适应两阶段 Agg 能够对所有 SQL 进行正确的判断来决定是否启用两阶段 Agg,在保证其余 SQL 性能不回退的情况下将 Q33 的耗时由 5.5s 减小至 3.4s。
MPP 压缩
对于 Q34,其 SQL 语句为
SELECT URL, COUNT(*) AS c FROM hits GROUP BY URL ORDER BY c DESC LIMIT 10
由于 URL 列的 byte 数量较大(平均为 100 bytes),我们发现其在节点 Shuffle 时网络 IO 成为瓶颈。在 2*16c64g 的规格下,实测 shuffle 数据量达到了 2.4GB。因此我们对网络传输时的 Chunk 使用了 LZ4 压缩机制。
然而考虑到当压缩率较低时网络数据 Shuffle 量并没有减少,并且会增加解压缩的开销,因此我们会动态的对每个 Chunk 进行压缩率的比较,当压缩率超过某个 threshold 时才向上游返回压缩后的 Chunk。
其主要流程为
在开启压缩后,Q34 的网络传输数据量由 2.4GB 减小为 300MB,速度提升 2s。
private SerializedChunk serializeForce(Chunk page) { int maxCompressedLength = maxCompressedLength(serializationBuffer.size()); byte[] compressionBuffer = new byte[maxCompressedLength]; int actualCompressedLength = compressor.get() .compress(serializationBuffer.slice().getBytes(), 0, serializationBuffer.size(), compressionBuffer, 0, maxCompressedLength); //MINIMUM_COMPRESSION_RATIO默认为0.8 if (((1.0 * actualCompressedLength) / serializationBuffer.size()) > MINIMUM_COMPRESSION_RATIO) { return new SerializedChunk(serializationBuffer.slice(), ChunkCompression.UNCOMPRESSED, page.getPositionCount(), serializationBuffer.size()); } return new SerializedChunk( Slices.copyOf(Slices.wrappedBuffer(compressionBuffer, 0, actualCompressedLength)), ChunkCompression.COMPRESSED, page.getPositionCount(), serializationBuffer.size()); }
GroupBy Long, Long
在 GroupBy 时对 inputChunk 进行类型特定的优化可以大幅度提升 Agg 的性能,然而由于 Java 语言本身不支持 Int128 类型,因此需要用 long 数组来模拟实现。
具体来说由两种实现方式
- 使用 2 个 long 数组 high 和 low 分别表示 Int128 的高位和低位。这种实现方式的缺点是对于 low 和 high 数组会有 2 次随机访存,但是没有额外的计算开销。
- 使用 1 个 long 数组,第 i*2 个位置表示第 i 个数字的高位,第 i*2+1 个位置表示第 i 个数字的低位。这种实现方式的缺点是会有额外的 index 计算开销,但优点是 i*2 和 i*2+1 一定在同一个 cacheline 中,减少了一次访存的开销。
在经过测试对比,我们发现方式 1 的性能略微好于方式 2。
同时在实现时,serializedInputBlock 只需要持有对Chunk中两个long数组的引用,不需要占用额外的内存空间。(但是key数组仍然需要创建新对象,因为hash表需要常驻内存)。
// only maintain reference serializedBlock = new Int128Array(longBlock1, longBlock2);
以 ClickBench Q9 SELECT RegionID, COUNT(DISTINCT UserID) AS u FROM hits GROUP BY RegionID ORDER BY u DESC LIMIT 10; 为例,对比默认的 GroupBy 实现,Int128GroupBy 实现了 3 倍的性能提升。
NoGroupBy 优化
对于类似 Q3 SELECT MIN(EventDate), MAX(EventDate) FROM hits;
这种不含有 Group By 语句的 agg 函数,在处理时不需要计算 groupId 的值,也不需要对每个 group 进行 accmulate 聚合操作,因此可以为其设计独立的 agg 语义。
对于 Q3 而言,我们可以设计出一个独立的 DateMaxMinNoGroupByAccumulator,这个 Accumulator 只需要枚举每个 input 的 row 取最大最小值即可。
@Override public void accumulate(Chunk chunk, Chunk inputChunk) { Block block = chunk.getBlock(0); if (block instanceof DateBlock) { int position = block.getPositionCount(); if (!isVisited) { beforeValue = ((DateBlock) block).getPackedLong(0); isVisited = true; } for (int i = 0; i < position; i++) { long newValue = ((DateBlock) block).getPackedLong(i); boolean cmp = newValue > beforeValue; if ((isMin && !cmp) || (!isMin && cmp)) { beforeValue = newValue; } } } }
Dynamic DecimalAccumulator
对于 ClickBench Q4 涉及到 AVG 函数的计算
SELECT AVG(UserID) FROM hits;
在 PolarDB-X 的实现中,AVG 函数的计算方式为 ,由于 UserID 的列类型为 long,因此 SUM(UserID)可能会超过 LONG_MAX,所以需要用 Decimal 类型来进行存储。
对于朴素的 Decimal 实现,底层维护了一个 byte 数组表示各个位的值,在加减时需要考虑到进位/借位操作。
但是对于 byte 数组的操作显然是比较耗时的,因此 PolarDB-X 设计了 Decimal64 和 Decimal128 类型,对于 Decimal64,其由一个 long 变量实现,对于 Decimal128 使用 2 个 long 变量分别表示 低位和高位。
在计算 SUM 时优先采用 Decimal64 计算,在 Decimal64 即将溢出时转为 Decimal128 表示,在 Decimal128 即将溢出时转为朴素的 Decimal 表示。
@Override public void accumulate(int groupId, Chunk inputChunk, int[] groupIdSelection, int selSize) { Block inputBlock = inputChunk.getBlock(0); DecimalBlock decimalBlock = inputBlock.cast(DecimalBlock.class); // Prepare result array and execute summary in vectorization mode. results[0] = results[1] = results[2] = 0; decimalBlock.cast(Block.class).sum(groupIdSelection, selSize, results); // Check sum result state and try to directly append sum result. if (results[2] == E_DEC_DEC64) { rescale(decimalBlock.getScale()); long sumResult = results[0]; accumulateDecimal64(groupId, sumResult); } else if (results[2] == E_DEC_DEC128) { rescale(decimalBlock.getScale()); long decimal128Low = results[0]; long decimal128High = results[1]; accumulateDecimal128(groupId, decimal128Low, decimal128High); } else { // Fall back to row-by-row mode for (int i = 0; i < selSize; i++) { int position = groupIdSelection[i]; accumulate(groupId, inputBlock, position); } } }
未来规划
经过不断的优化,目前 PolarDB-X 在 ClickBench 测试集上的性能表现已经十分优秀,目前的性能瓶颈主要落在 Q29 的 REGEXP_REPLACE 正则表达式替换以及 Q33/Q34 的 GROUP BY String。
而对于 Q33/Q34 和 Q29,团队内部已有解决方案,目前正在试验与开发当中。
Swiss Hash Table
对于 Q33,其性能瓶颈在于 GROUP BY STRING 算子,其底层实现是一个 String 类型的开放寻址法 HashTable。针对这一场景业界也有特定的算法优化,例如 ClickHouse 的论文《SAHA: A String Adaptive Hash Table for Analytical Databases》中介绍了一种根据字符串长度做 dispatch 对不同场景分别解决的方案。
SELECT URL, COUNT(*) AS c FROM hits GROUP BY URL ORDER BY c DESC LIMIT 10;
PolarDB-X 作为一款面对通用场景的分布式 HTAP 数据库,面向类型特定场景的优化固然重要,但同时我们也希望着眼于更高的层面,在更通用的场景上进行优化。
在团队内部的调研中,我们发现在 CppCon 2017 中提出的 Swiss Hash Table 给我们带来了巨大的启发,这种 Hash Table 不仅能够对全类型的场景进行优化,而且充分考虑了 CPU 层面的 SIMD 加速,结合 PolarDB-X 先前对 SIMD 指令探索(参考文章: 深度优化 | PolarDB-X 基于向量化SIMD指令的探索),我们认为这种算法能够在通用场景下产生最大的收益。
具体来说,Swiss Hash Table 具有以下优点
- 支持全类型任意列向量化,支持Group By任意字段,底层全部采用byte类型存储,充分利用CPU SIMD能力
- 对于Long/Int等定长字段存放在RecordTable中,采用byte[][1024]存储,对于String变长字段存放在独立的VariableData区域。
- 探测hash表时使用ControlTable来进行hash值粗糙过滤,将byte数组一次进行 n 个位置的比较 (这里的 n 取决于底层的 compare 指令,在 AVX512 指令下 n = 64,若采用 long 类型比较,则 n = 8)。
考虑到文章篇幅有限,在这里我们不会详细介绍 Swiss Hash Table,这里直接给出一种在 PolarDB-X 中可行的设计方案。
Swiss Hash Table的设计分为3层
- 1. ControlTable:用于探测时根据待匹配数据的hash值进行粗糙过滤
- 2. RecordTable:用于存放定长数据
- 底层实现为byte[][1024],第二维宽度为1MB,尽可能保证数据被load到L3 cache
- Entity的设计为:
- POINT(12Bytes,可选): 若Group By字段中有String类型,则指向对应的VariableData区域
- GroupID(4Bytes, 必须):记录entity对应的GroupID
- PrecomputedHash(8 Bytes, 可选):在两阶段agg中若pre agg聚合度较差,可通过预先计算hash值避免额外的hash值计算开销
- FixedLentg:记录固定长度的数据,其中每个数据项被记录为IsNull(1Bytes) + Data(8Bytes)
- 3. VariableData:用于存放变长数据
其 Probe 过程的主要流程为
- 对于任意的待匹配的元素,首先获取 hash 值的低 8 位,在 Control Table 中进行粗糙筛选。这里筛选的粒度取决于底层 compare 指令的位宽,若使用 unsafe 类做比较,可以一次过滤 8 个 slot,若使用 JDK17 中的 Vector API,则在理想情况下一次可过滤 64 个 Slot
- 若第一步中存在匹配的元素,则比较真实值。
- 若存在可变类型,则通过 Point 来找到 VariableData 中的可变数据。
通过这种方式,可以在第一步中过滤掉大量的 invalid 数据,从而加速 Probe 过程。
HyperScan
对于 Q29,其瓶颈在于 REGEXP_REPLACE 这个正则表达式实现,对于正则表达式我们并不陌生,其存在两种实现方案:非确定性有限状态自动机(NFA) 和确定性有限状态自动机(DFA),区别在于是否允许从一个状态出发通过多条路径到达下一个状态。
目前在 Java 生态下有 3 种主流的正则表达式实现流派。
- JONI:首先在 JRuby 项目中被开发,其基于 Ruby 的 Oniguruma项目演化而来,底层采用基于 NFA 的匹配策略,在绝大部分场景下会有良好的性能,但是由于 NFA 的匹配方式是以正则串作为驱动,在匹配失败时会对待匹配串进行回溯,因此在某些极端场景下会回退至指数级复杂度。
- Re2J:Re2J 基于 Google 的 Re2 项目演化而来,基于 DFA 的匹配方式,其以待匹配串作为驱动,在枚举待匹配串时对不合法的正则串片段进行过滤,因此能够达到与待匹配串线形相关的复杂度。然而由于 Re2J 项目并不完善,其存在一些匹配功能的缺失,且在绝大多数场景下性能劣于 JONI 库。
- HyperScan-Java:基于 Intel 的 HyperScan 项目实现,HyperScan 最早在 NSDI'19 年的 paper《Hyperscan: A Fast Multi-pattern Regex Matcher for Modern CPUs》被提出,HyperScan 相较于 JONI 或者 Re2J 的优势在于两点:1. HyperScan 使用一种特殊的 FA 算法来实现正则表达式的匹配,其主要思想在于对正则串进行拆分,采用图算法将模式串拆分为没有交集的 string 和 sub-regex(FA), 对于独立的 string 采用字符串匹配算法进行匹配(这里采用 CPU 友好的 Shift-Or 算法而不是理论更快的 AC 自动机),对于部分独立构造 NFA,因此这种算法的性能和 DFA 相当,而内存占用又和 NFA 一样小。2. 底层使用了大量的 SIMD 指令(当然同时也牺牲了一定的系统兼容性,HyperScan 仅能在 X86 平台上发挥出其极致性能)。
目前主流的 OLAP 引擎(例如 ClickHouse)均采用了 HyperScan 及其变体(例如 VectorScan)。
然而目前的 HyperScan-Java 实现效率较差,Java-Cpp 对象的转化消耗了大量的 CPU 资源,后续我们计划结合 PolarDB-X 的向量化表达式能力,通过修改 HyperScan-Java 的源码来实现批量的正则替换来进行加速。
总结
通过上述优化方法,PolarDB-X 在 ClickBench 大宽表测试集上的表现显著提升。优化器规则的改进显著减少了不必要的计算开销,执行器层面的优化如独立的 HashSet、动态两阶段聚合以及高效的压缩机制,大幅提升了 DISTINCT 操作、自适应聚合以及网络传输效率。尽管当前在正则表达式处理和字符串 Group By 上仍存在瓶颈,PolarDB-X 团队已制定了包括 Swiss Hash Table 和 HyperScan 优化在内的解决方案,预计将在未来进一步提升系统性能。整体而言,PolarDB-X 的持续优化和创新,确保其在分布式 HTAP 数据库领域保持竞争力,并为用户提供高效、可靠的分析处理能力。