在 Apache Arrow DataFusion 28.0.0 中快速聚合数百万个组
已发布 2023 年 8 月 5 日
作者 alamb, Dandandan, tustvold
在 Apache Arrow DataFusion 中快速聚合数百万个组
Andrew Lamb, Daniël Heres, Raphael Taylor-Davies,
注意:本文最初发布在 InfluxData 博客
TLDR(太长不看)
分组聚合是任何分析工具的核心部分,可以创建对海量数据的易于理解的摘要。 Apache Arrow DataFusion 的并行聚合能力在 新发布的 28.0.0
版本中,对于具有大量(10,000 或更多)组的查询,速度提高了 2-3 倍。
提高聚合性能对 DataFusion 的所有用户都很重要。 例如,时序数据平台 InfluxDB 和 全栈可观测性平台 Coralogix 都会聚合大量的原始数据,以监控并为我们的客户创建见解。 提高 DataFusion 的性能使我们能够以更少的资源更快地生成见解,从而提供更好的用户体验。 由于 DataFusion 是开源的,并根据宽松的 Apache 2.0 许可证发布,因此整个 DataFusion 社区也能从中受益。
通过新的优化,DataFusion 的分组速度现在接近 DuckDB,该系统经常报告 出色的 分组基准测试性能数据。 图 1 包含 ClickBench 在单个 Parquet 文件上的代表性示例,完整结果在本文章末尾。
图 1:DataFusion 27.0.0
、DataFusion 28.0.0
和 DuckDB 0.8.1
在单个 Parquet 文件上针对 ClickBench 查询 16、17、18 和 19 的查询性能。
高基数分组介绍
聚合是一个花哨的词,用于计算在多个列中具有相同值的许多行上的摘要统计信息。 我们将具有相同值的行称为组,“高基数”意味着数据集中有大量不同的组。 在撰写本文时,分析引擎中“大量”的组约为 10,000。
例如,ClickBench hits 数据集包含 1 亿次跨一组网站的匿名用户点击。 ClickBench 查询 17 是
SELECT "UserID", "SearchPhrase", COUNT(*)
FROM hits
GROUP BY "UserID", "SearchPhrase"
ORDER BY COUNT(*)
DESC LIMIT 10;
用英语来说,此查询查找“所有点击中最热门的十个(用户,搜索词组)组合”,并产生以下结果(前十名用户没有搜索词组)
+---------------------+--------------+-----------------+
| UserID | SearchPhrase | COUNT(UInt8(1)) |
+---------------------+--------------+-----------------+
| 1313338681122956954 | | 29097 |
| 1907779576417363396 | | 25333 |
| 2305303682471783379 | | 10597 |
| 7982623143712728547 | | 6669 |
| 7280399273658728997 | | 6408 |
| 1090981537032625727 | | 6196 |
| 5730251990344211405 | | 6019 |
| 6018350421959114808 | | 5990 |
| 835157184735512989 | | 5209 |
| 770542365400669095 | | 4906 |
+---------------------+--------------+-----------------+
ClickBench 数据集包含
- 99,997,497 行总数1
- 17,630,976 个不同的用户(不同的 UserIDs)2
- 6,019,103 个不同的搜索词组3
- (UserID,SearchPhrase)的 24,070,560 个不同组合4 因此,要回答查询,DataFusion 必须将 1 亿个不同的输入行中的每一个映射到 2400 万个不同的组中的一个,并记录每个组中有多少个这样的行。
解决方案
与数据库和其他分析系统中的大多数概念一样,此算法的基本思想很简单,并在计算机科学入门课程中教授。 您可以使用如下程序来计算查询5
import pandas as pd
from collections import defaultdict
from operator import itemgetter
# read file
hits = pd.read_parquet('hits.parquet', engine='pyarrow')
# build groups
counts = defaultdict(int)
for index, row in hits.iterrows():
group = (row['UserID'], row['SearchPhrase']);
# update the dict entry for the corresponding key
counts[group] += 1
# Print the top 10 values
print (dict(sorted(counts.items(), key=itemgetter(1), reverse=True)[:10]))
这种方法虽然简单,但速度慢且内存效率极低。 它需要 40 多秒才能计算出不到 1% 的数据集的结果6。 DataFusion 28.0.0
和 DuckDB 0.8.1
都在 10 秒内计算出整个数据集的结果。
要快速有效地回答此查询,您必须编写代码,使其能够
- 通过并行计算保持所有核心处于繁忙状态进行聚合
- 快速更新聚合值,使用可向量化的循环,编译器很容易将其转换为现代 CPU 中可用的高性能 SIMD 指令。
本文的其余部分解释了 DataFusion 中分组的工作原理以及我们在 28.0.0
中所做的改进。
两阶段并行分区分组
DataFusion 27.0.
和 28.0.0
都使用最先进的两阶段并行哈希分区分组,类似于其他高性能向量化引擎,例如 DuckDB 的并行分组聚合。 在图中,它看起来像
▲ ▲
│ │
│ │
│ │
┌───────────────────────┐ ┌───────────────────┐
│ GroupBy │ │ GroupBy │ Step 4
│ (Final) │ │ (Final) │
└───────────────────────┘ └───────────────────┘
▲ ▲
│ │
└────────────┬───────────┘
│
│
┌─────────────────────────┐
│ Repartition │ Step 3
│ HASH(x) │
└─────────────────────────┘
▲
│
┌────────────┴──────────┐
│ │
│ │
┌────────────────────┐ ┌─────────────────────┐
│ GroupyBy │ │ GroupBy │ Step 2
│ (Partial) │ │ (Partial) │
└────────────────────┘ └─────────────────────┘
▲ ▲
┌──┘ └─┐
│ │
.─────────. .─────────.
,─' '─. ,─' '─.
; Input : ; Input : Step 1
: Stream 1 ; : Stream 2 ;
╲ ╱ ╲ ╱
'─. ,─' '─. ,─'
`───────' `───────'
图 2:两阶段重新分区分组:数据从底部(源)到顶部(结果)分两个阶段流动。 首先(步骤 1 和 2),每个核心将数据读入核心特定的哈希表,计算中间聚合,而无需任何跨核心协调。 然后(步骤 3 和 4),DataFusion 按组值将数据划分为不同的子集(“重新分区”),并且每个子集都发送到特定的核心,该核心计算最终聚合。
这两个阶段对于保持多核系统中核心的繁忙至关重要。 两个阶段都使用相同的哈希表方法(在下一节中说明),但不同之处在于组的分布方式以及从累加器发出的部分结果。 第一阶段在生成数据后尽快聚合数据。 但是,如图 2 所示,这些组可以位于任何输入中的任何位置,因此经常在许多不同的核心上找到同一组。 第二阶段使用哈希函数将数据均匀地重新分配到各个核心上,因此每个组值都由一个核心进行处理,该核心发出该组的最终结果。
┌─────┐ ┌─────┐
│ 1 │ │ 3 │
│ 2 │ │ 4 │ 2. After Repartitioning: each
└─────┘ └─────┘ group key appears in exactly
┌─────┐ ┌─────┐ one partition
│ 1 │ │ 3 │
│ 2 │ │ 4 │
└─────┘ └─────┘
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
┌─────┐ ┌─────┐
│ 2 │ │ 2 │
│ 1 │ │ 2 │
│ 3 │ │ 3 │
│ 4 │ │ 1 │
└─────┘ └─────┘ 1. Input Stream: groups
... ... values are spread
┌─────┐ ┌─────┐ arbitrarily over each input
│ 1 │ │ 4 │
│ 4 │ │ 3 │
│ 1 │ │ 1 │
│ 4 │ │ 3 │
│ 3 │ │ 2 │
│ 2 │ │ 2 │
│ 2 │ └─────┘
└─────┘
Core A Core B
图 3:聚合阶段期间跨 2 个核心的组值分布。 在第一阶段,每个组值 1
、2
、3
、4
都存在于每个核心处理的输入流中。 在重新分区后的第二阶段,组值 1
和 2
由核心 A 处理,值 3
和 4
仅由核心 B 处理。
由于篇幅限制,上面未提及 DataFusion 实现中的一些其他细节,例如
- 何时从第一阶段的哈希表中发出数据的策略(例如,因为数据是部分排序的)
- 处理每个聚合的特定过滤器(由于
FILTER
SQL 子句) - 中间值的数据类型(对于某些聚合(例如
AVG
),可能与最终输出不同)。 - 当内存使用量超过其预算时采取的措施。
哈希分组
DataFusion 查询可以为每个组计算许多不同的聚合函数,包括内置和/或用户定义的AggregateUDFs
。 每个聚合函数的状态(称为累加器)都使用哈希表进行跟踪(DataFusion 使用优秀的 HashBrown RawTable API),该 API 从逻辑上存储标识特定组值的“索引”。
27.0.0
中的哈希分组
如图 3 所示,DataFusion 27.0.0
将数据存储在 GroupState
结构中,顾名思义,它跟踪每个组的状态。 每个组的状态包括:
- 组列的实际值,采用 Arrow Row 格式。
- 每个组的正在进行的累积(例如,
COUNT
聚合的运行计数),采用两种可能的格式之一(Accumulator
或RowAccumulator
)。 - 用于跟踪每个批次中哪些行与每个聚合匹配的暂存空间。
┌──────────────────────────────────────┐
│ │
│ ... │
│ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ │
│ ┃ ┃ │
┌─────────┐ │ ┃ ┌──────────────────────────────┐ ┃ │
│ │ │ ┃ │group values: OwnedRow │ ┃ │
│ ┌─────┐ │ │ ┃ └──────────────────────────────┘ ┃ │
│ │ 5 │ │ │ ┃ ┌──────────────────────────────┐ ┃ │
│ ├─────┤ │ │ ┃ │Row accumulator: │ ┃ │
│ │ 9 │─┼────┐ │ ┃ │Vec<u8> │ ┃ │
│ ├─────┤ │ │ │ ┃ └──────────────────────────────┘ ┃ │
│ │ ... │ │ │ │ ┃ ┌──────────────────────┐ ┃ │
│ ├─────┤ │ │ │ ┃ │┌──────────────┐ │ ┃ │
│ │ 1 │ │ │ │ ┃ ││Accumulator 1 │ │ ┃ │
│ ├─────┤ │ │ │ ┃ │└──────────────┘ │ ┃ │
│ │ ... │ │ │ │ ┃ │┌──────────────┐ │ ┃ │
│ └─────┘ │ │ │ ┃ ││Accumulator 2 │ │ ┃ │
│ │ │ │ ┃ │└──────────────┘ │ ┃ │
└─────────┘ │ │ ┃ │ Box<dyn Accumulator> │ ┃ │
Hash Table │ │ ┃ └──────────────────────┘ ┃ │
│ │ ┃ ┌─────────────────────────┐ ┃ │
│ │ ┃ │scratch indices: Vec<u32>│ ┃ │
│ │ ┃ └─────────────────────────┘ ┃ │
│ │ ┃ GroupState ┃ │
└─────▶ │ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ │
│ │
Hash table tracks an │ ... │
index into group_states │ │
└──────────────────────────────────────┘
group_states: Vec<GroupState>
There is one GroupState PER GROUP
图 4:DataFusion 27.0.0
中的哈希分组运算符结构。 哈希表将每个组映射到 GroupState,其中包含所有组的状态。
为了计算聚合,DataFusion 为每个输入批次执行以下步骤:
- 使用 高效的向量化代码计算哈希,针对每种数据类型进行了专门优化。
- 使用哈希表确定每个输入行的组索引(为新看到的组创建新条目)。
- 为每个具有输入行的组更新累加器,如果数量足够,将行组装成连续范围以进行向量化累加。
DataFusion 还会将哈希值存储在表中,以避免在调整哈希表大小时可能产生的高昂的哈希重新计算。
这种方案对于相对较少数量的不同组非常有效:所有累加器都可以使用大的连续行批次有效地更新。
但是,由于以下原因,此方案对于高基数分组并不理想:
- 每个组的多次分配,用于组值行格式,以及
RowAccumulator
和每个Accumulator
。Accumulator
中也可能存在其他分配。 - 非向量化更新: 由于每个输入批次中不同组的数量很大(因此每个组的值数量很小),累加器更新通常会回退到较慢的非向量化形式。
28.0.0
中的哈希分组
对于 28.0.0
,我们遵循传统的系统优化原则重写了核心分组实现:减少分配、类型专业化和积极的向量化。
DataFusion 28.0.0
使用相同的 RawTable 并仍然存储组索引。 主要区别如图 4 所示:
- 组值存储在:
- 内联在
RawTable
中(对于原始类型的单个列),其中转换为 Row 格式的成本高于其带来的好处。 - 在一个单独的 Rows 结构中,为所有组值进行单个连续分配,而不是每个组分配一次。 累加器在内部管理所有组的状态,因此更新中间值的代码是一个紧密的类型专用循环。 新的
GroupsAccumulator
接口产生了高效的类型累加器更新循环。
- 内联在
┌───────────────────────────────────┐ ┌───────────────────────┐
│ ┌ ─ ─ ─ ─ ─ ┐ ┌─────────────────┐│ │ ┏━━━━━━━━━━━━━━━━━━━┓ │
│ │ ││ │ ┃ ┌──────────────┐ ┃ │
│ │ │ │ ┌ ─ ─ ┐┌─────┐ ││ │ ┃ │┌───────────┐ │ ┃ │
│ │ X │ 5 │ ││ │ ┃ ││ value1 │ │ ┃ │
│ │ │ │ ├ ─ ─ ┤├─────┤ ││ │ ┃ │└───────────┘ │ ┃ │
│ │ Q │ 9 │──┼┼──┐ │ ┃ │ ... │ ┃ │
│ │ │ │ ├ ─ ─ ┤├─────┤ ││ └──┼─╋─▶│ │ ┃ │
│ │ ... │ ... │ ││ │ ┃ │┌───────────┐ │ ┃ │
│ │ │ │ ├ ─ ─ ┤├─────┤ ││ │ ┃ ││ valueN │ │ ┃ │
│ │ H │ 1 │ ││ │ ┃ │└───────────┘ │ ┃ │
│ │ │ │ ├ ─ ─ ┤├─────┤ ││ │ ┃ │values: Vec<T>│ ┃ │
│ Rows │ ... │ ... │ ││ │ ┃ └──────────────┘ ┃ │
│ │ │ │ └ ─ ─ ┘└─────┘ ││ │ ┃ ┃ │
│ ─ ─ ─ ─ ─ ─ │ ││ │ ┃ GroupsAccumulator ┃ │
│ └─────────────────┘│ │ ┗━━━━━━━━━━━━━━━━━━━┛ │
│ Hash Table │ │ │
│ │ │ ... │
└───────────────────────────────────┘ └───────────────────────┘
GroupState Accumulators
Hash table value stores group_indexes One GroupsAccumulator
and group values. per aggregate. Each
stores the state for
Group values are stored either inline *ALL* groups, typically
in the hash table or in a single using a native Vec<T>
allocation using the arrow Row format
图 5:DataFusion 28.0.0
中的哈希分组运算符结构。 组值可以直接存储在哈希表中,也可以使用 arrow Row 格式存储在单个分配中。 哈希表包含组索引。 单个 GroupsAccumulator
存储所有组的每个聚合状态。
这种新结构显着提高了高基数组的性能,这是由于:
- 减少分配:不再有每个组的单独分配。
- 连续的本机累加器状态:类型专用累加器使用某些本机类型的 Rust Vec<T> 在单个连续分配中存储所有组的值。
- 向量化状态更新:内部聚合更新循环(类型专用并以本机
Vec
的形式)由 Rust 编译器很好地向量化(感谢 LLVM!)。
备注
一些向量化分组实现将累加器状态按行直接存储在哈希表中,这通常可以有效地利用现代 CPU 缓存。 以列方式管理累加器状态可能会牺牲一些缓存局部性,但是它可以确保哈希表的大小保持较小,即使存在大量组和聚合时,也更容易让编译器向量化累加器更新。
DataFusion 28.0.0
可能会或可能不会将哈希值存储在表中,具体取决于重新计算哈希值的成本。 这优化了计算哈希值(例如,对于字符串而言很昂贵)与将其存储在哈希表中的成本之间的权衡。
将状态更新推送到 GroupsAccumulators 产生的一个微妙之处是,每个累加器都必须处理类似的变化,包括有/无过滤以及输入中有/无 null。 DataFusion 28.0.0
使用模板化的 NullState
,它封装了跨累加器的这些常见模式。
代码结构很大程度上受到 DataFusion 使用 Rust(一种专注于速度和安全性的新系统编程语言)实现的事实的影响。 Rust 强烈反对 C/C++ 哈希分组实现中使用的许多传统指针强制转换“技巧”。 DataFusion 聚合代码几乎完全是 safe
,只有在必要时才偏离到 unsafe
。(Rust 是一个绝佳的选择,因为它使 DataFusion 快速、易于嵌入,并且可以防止许多与多线程 C/C++ 代码相关的崩溃和安全问题)。
ClickBench 结果
以下是针对具有 DataFusion 27.0.0
、DataFusion 28.0.0
和 DuckDB 0.8.1
的单个 Parquet 文件运行 ClickBench 查询的完整结果。 这些数字是在具有 8 个内核和 32 GB RAM 的 GCP e2-standard-8 machine
上运行的,使用了 此处的脚本。
随着行业朝着由组件组装而成的数据系统发展,使用 Apache Arrow 和 Parquet 等开放标准交换数据而不是自定义存储和内存格式变得越来越重要。 因此,此基准测试使用单个输入 Parquet 文件,该文件代表许多 DataFusion 用户,并且符合当前分析趋势,即避免在查询之前将数据加载/转换为自定义存储格式中。
DataFusion 现在达到了接近 DuckDB 的速度来查询 Parquet 数据。 虽然我们不打算与一个实际上编写了 公平基准测试被认为很困难的团队进行基准测试对决,但希望每个人都同意 DataFusion 28.0.0
是一个显着的改进。
图 6:DataFusion 27.0.0
、DataFusion 28.0.0
和 DuckDB 0.8.1
在所有 43 个 ClickBench 查询中针对单个 hits.parquet
文件的性能。 越低越好。
备注
由于计划器错误(Q9、Q11、Q12、14)或内存不足(Q33),DataFusion 27.0.0
无法运行多个查询。 DataFusion 28.0.0
解决了这些问题。
DataFusion 在查询 21 和 22 上比 DuckDB 更快,这可能是由于字符串模式匹配的优化实现。
结论:性能很重要
将聚合性能提高两倍以上,使使用 DataFusion 构建产品和项目的开发人员可以将更多时间花在增值领域特定功能上。 我们相信使用 DataFusion 构建系统比从头开始构建类似系统要快得多。 DataFusion 提高了生产力,因为它消除了重建易于理解但实现成本高昂的分析数据库技术的需要。 虽然我们对 DataFusion 28.0.0
的改进感到满意,但我们绝不会止步于此,并且正在寻求 (更多)聚合性能。 性能的未来是光明的。
致谢
DataFusion 是一项 社区努力,如果没有社区中许多人的贡献,这项工作是不可能完成的。 特别感谢 sunchao、yjshen、yahoNanJing、mingmwang、ozankabak、mustafasrepo 以及所有贡献想法、评论和鼓励的人 期间 这项 工作。
关于 DataFusion
Apache Arrow DataFusion 是一个可扩展的查询引擎和数据库工具包,用 Rust 编写,它使用 Apache Arrow 作为其内存格式。 DataFusion 与 Apache Calcite、Facebook 的 Velox 和类似技术一起,是下一代“解构数据库”架构的一部分,在这种架构中,新系统建立在快速、模块化组件的基础上,而不是作为单个紧密集成的系统。
备注
-
SELECT COUNT(*) FROM 'hits.parquet';
↩ -
SELECT COUNT(DISTINCT "UserID") as num_users FROM 'hits.parquet';
↩ -
SELECT COUNT(DISTINCT "SearchPhrase") as num_phrases FROM 'hits.parquet';
↩ -
SELECT COUNT(*) FROM (SELECT DISTINCT "UserID", "SearchPhrase" FROM 'hits.parquet')
↩ -
hits_0.parquet,这是分区 ClickBench 数据集中的一个文件,具有
100,000
行,大小为 117 MB。 整个数据集在一个 14 GB Parquet 文件中有100,000,000
行。 脚本在 40 分钟后未完成整个数据集,并且在峰值时使用了 212 GB RAM。 ↩