毫秒级延迟查询 Parquet
已发布 2022年12月26日
作者: tustvold 和 alamb
毫秒级延迟查询 Parquet
注意:本文最初发布在InfluxData 博客。
我们相信,直接查询 Apache Parquet 文件可以实现与大多数专用文件格式相似或更好的存储效率和查询性能。虽然这需要大量的工程投入,但 Parquet 开放格式和广泛的生态系统支持的优势使其成为各种数据系统的明显选择。
在本文中,我们将解释在 Apache Arrow Rust Parquet 读取器中实现的一些快速查询 Parquet 格式数据的先进技术。这些技术共同使 Rust 实现成为查询 Parquet 文件(无论是本地磁盘还是远程对象存储)最快的实现之一,甚至是最快的。它能够在几毫秒内查询数 GB 的 Parquet。
我们要感谢 InfluxData 对这项工作的支持。InfluxData 对开源软件有着深刻而持续的承诺,它资助了我们撰写这篇博客文章的大部分时间,以及作为构建 InfluxDB IOx 存储引擎一部分的许多贡献。
背景
Apache Parquet 是一种越来越流行的开放格式,用于存储分析数据集,并已成为经济高效、与 DBMS 无关的数据存储事实标准。Parquet 最初是为 Hadoop 生态系统创建的,由于其引人注目的组合,其影响力现已广泛扩展到数据分析生态系统:
- 高压缩比
- 适用于 S3 等商用 Blob 存储
- 广泛的生态系统和工具支持
- 跨多个不同平台和工具的可移植性
- 支持任意结构化数据
越来越多的其他系统,例如 DuckDB 和 Redshift 允许直接查询存储在 Parquet 中的数据,但与它们的本机(自定义)文件格式相比,支持通常仍然是次要考虑因素。此类格式包括 DuckDB 的 .duckdb 文件格式、Apache IOT TsFile、Gorilla 格式等。
以前只有闭源商业实现才能使用的相同复杂查询技术,现在首次作为开源提供。所需的工程能力来自大型、运行良好的开源项目,拥有全球贡献者社区,例如 Apache Arrow 和 Apache Impala。
Parquet 文件格式
在深入了解高效读取 Parquet 的细节之前,了解文件布局很重要。文件格式经过精心设计,可以快速定位所需信息,跳过不相关部分,并高效解码剩余部分。
- Parquet 文件中的数据被分解为称为
RowGroup的水平切片 - 每个
RowGroup包含模式中每个列的单个ColumnChunk
例如,下图说明了一个 Parquet 文件,其中包含“A”、“B”和“C”三列,存储在两个 RowGroup 中,总共有 6 个 ColumnChunk。
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┓ ┃
┃┃┌ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ┐┌ ─ ─ ─ ─ ─ ─ ┃ ┃
┃┃ │ │ ┃ ┃
┃┃│ │ ││ ┃ ┃
┃┃ │ │ ┃ ┃
┃┃│ │ ││ ┃ RowGroup ┃
┃┃ │ │ ┃ 1 ┃
┃┃│ │ ││ ┃ ┃
┃┃ │ │ ┃ ┃
┃┃└ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ┘└ ─ ─ ─ ─ ─ ─ ┃ ┃
┃┃ColumnChunk 1 ColumnChunk 2 ColumnChunk 3 ┃ ┃
┃┃ (Column "A") (Column "B") (Column "C") ┃ ┃
┃┗━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┛ ┃
┃┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┓ ┃
┃┃┌ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ┐┌ ─ ─ ─ ─ ─ ─ ┃ ┃
┃┃ │ │ ┃ ┃
┃┃│ │ ││ ┃ ┃
┃┃ │ │ ┃ ┃
┃┃│ │ ││ ┃ RowGroup ┃
┃┃ │ │ ┃ 2 ┃
┃┃│ │ ││ ┃ ┃
┃┃ │ │ ┃ ┃
┃┃└ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ┘└ ─ ─ ─ ─ ─ ─ ┃ ┃
┃┃ColumnChunk 4 ColumnChunk 5 ColumnChunk 6 ┃ ┃
┃┃ (Column "A") (Column "B") (Column "C") ┃ ┃
┃┗━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┛ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
ColumnChunk 的逻辑值使用多种可用编码之一写入到一个或多个数据页中,这些数据页按顺序附加在文件中。Parquet 文件的末尾是页脚,其中包含重要的元数据,例如
- 文件的模式信息,例如列名和类型
- 文件中
RowGroup和ColumnChunk的位置
页脚还可以包含其他专用数据结构
- 每个
ColumnChunk的可选统计信息,包括最小值/最大值和空值计数 - 指向包含每个独立页面位置的 OffsetIndexes 的可选指针
- 指向包含每个页面的行计数和摘要统计信息的 ColumnIndex 的可选指针
- 指向 BloomFilterData 的可选指针,它可以快速检查
ColumnChunk中是否存在某个值
例如,前图中 2 个行组和 6 个 ColumnChunk 的逻辑结构可能以如下图所示的方式(未按比例)存储在 Parquet 文件中。ColumnChunk 的页排在前面,后面是页脚。数据、编码方案的有效性以及 Parquet 编码器的设置决定了每个 ColumnChunk 所需的页数和大小。在此示例中,ColumnChunk 1 需要 2 页,而 ColumnChunk 6 只需要 1 页。除了其他信息,页脚还包含每个数据页的位置和列的类型。
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┃
┃ Data Page for ColumnChunk 1 ("A") ◀─┃─ ─ ─│
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┃ │
┃ Data Page for ColumnChunk 1 ("A") ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃ │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┃
┃ Data Page for ColumnChunk 2 ("B") ┃ │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┃ │
┃ Data Page for ColumnChunk 3 ("C") ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃ │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┃
┃ Data Page for ColumnChunk 3 ("C") ┃ │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┃ │
┃ Data Page for ColumnChunk 3 ("C") ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃ │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┃
┃ Data Page for ColumnChunk 4 ("A") ◀─┃─ ─ ─│─ ┐
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┃ │ │
┃ Data Page for ColumnChunk 5 ("B") ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃ │ │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┃
┃ Data Page for ColumnChunk 5 ("B") ┃ │ │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┃ │ │
┃ Data Page for ColumnChunk 5 ("B") ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃ │ │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┃
┃ Data Page for ColumnChunk 6 ("C") ┃ │ │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃
┃┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ │ │
┃┃Footer ┃ ┃
┃┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ ┃ │ │
┃┃ ┃File Metadata ┃ ┃ ┃
┃┃ ┃ Schema, etc ┃ ┃ ┃ │ │
┃┃ ┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ ┃ ┃
┃┃ ┃ ┃Row Group 1 Metadata ┃ ┃ ┃ ┃ │ │
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ ┃ ┃ ┃ ┃
┃┃ ┃ ┃┃Column "A" Metadata┃ Location of ┃ ┃ ┃ ┃ │ │
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ first Data ┣ ─ ─ ╋ ╋ ╋ ─ ─
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ Page, row ┃ ┃ ┃ ┃ │
┃┃ ┃ ┃┃Column "B" Metadata┃ counts, ┃ ┃ ┃ ┃
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ sizes, ┃ ┃ ┃ ┃ │
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ min/max ┃ ┃ ┃ ┃
┃┃ ┃ ┃┃Column "C" Metadata┃ values, etc ┃ ┃ ┃ ┃ │
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ ┃ ┃ ┃ ┃
┃┃ ┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃ ┃ ┃ │
┃┃ ┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ ┃ ┃
┃┃ ┃ ┃Row Group 2 Metadata ┃ ┃ ┃ ┃ │
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ Location of ┃ ┃ ┃ ┃
┃┃ ┃ ┃┃Column "A" Metadata┃ first Data ┃ ┃ ┃ ┃ │
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ Page, row ┣ ─ ─ ╋ ╋ ╋ ─ ─ ─ ─
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ counts, ┃ ┃ ┃ ┃
┃┃ ┃ ┃┃Column "B" Metadata┃ sizes, ┃ ┃ ┃ ┃
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ min/max ┃ ┃ ┃ ┃
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ values, etc ┃ ┃ ┃ ┃
┃┃ ┃ ┃┃Column "C" Metadata┃ ┃ ┃ ┃ ┃
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ ┃ ┃ ┃ ┃
┃┃ ┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃ ┃ ┃
┃┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃ ┃
┃┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
创建 Parquet 文件时需要考虑许多重要标准,例如如何最佳地排序/聚类数据以及将其结构化为 RowGroup 和数据页。此类“物理设计”考虑因素复杂,值得单独写一系列文章,本文不予赘述。相反,我们专注于如何利用可用结构使查询非常快。
优化查询
在任何查询处理系统中,以下技术通常会提高性能
- 减少必须从辅助存储传输进行处理的数据量(减少 I/O)
- 减少解码数据的计算负载(减少 CPU)
- 交错/流水线化数据的读取和解码(提高并行性)
同样的原则也适用于查询 Parquet 文件,如下所述
解码优化
Parquet 通过使用复杂的编码技术(如行程长度压缩、字典编码、增量编码等)实现了令人印象深刻的压缩比。因此,CPU 密集型的解码任务可能会主导查询延迟。Parquet 读取器可以使用多种技术来提高此任务的延迟和吞吐量,正如我们在 Rust 实现中所做的那样。
向量化解码
大多数分析系统一次将多个值解码为列式内存格式(例如 Apache Arrow),而不是逐行处理数据。这通常称为向量化或列式处理,因为它具有以下优点:
- 分摊根据解码列的类型进行切换的调度开销
- 通过从
ColumnChunk读取连续值来提高缓存局部性 - 通常允许在一条指令中解码多个值。
- 通过一次性分配大块内存,避免许多小的堆分配,从而为字符串和字节数组等可变长度类型节省大量开销
因此,Rust Parquet Reader 实现了专门的解码器,用于将 Parquet 直接读取到列式内存格式(Arrow Arrays)。
流式解码
在不同的 ColumnChunk 中,哪些行存储在哪个页面中没有关系。例如,第 10,000 行的逻辑值可能在列 A 的第一个页面中,而在列 B 的第三个页面中。
向量化解码最简单的方法,也是 Parquet 解码器通常最初实现的方法,是一次性解码整个 RowGroup(或 ColumnChunk)。
然而,鉴于 Parquet 的高压缩比,一个 RowGroup 可能包含数百万行。一次解码如此多行并非最佳,因为它
- 需要大量中间 RAM:优化用于处理的典型内存格式(如 Apache Arrow)比其 Parquet 编码形式需要更多内存。
-
增加查询延迟:后续处理步骤(如过滤或聚合)只有在整个
RowGroup(或ColumnChunk)解码完成后才能开始。
因此,最好的 Parquet 读取器通过按需生成可配置大小的行批次来支持“流式传输”数据。批次大小必须足够大以分摊解码开销,但又要足够小以实现高效内存使用,并允许下游处理在后续批次解码时同时开始。
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃
┃ Data Page for ColumnChunk 1 │◀┃─ ┌── ─── ─── ─── ─── ┐
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ │ ┏━━━━━━━┓ ┌ ─ ┐ ┌ ─ ┐ ┌ ─ ┐ │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ ┃ ┃ │ │
┃ Data Page for ColumnChunk 1 │ ┃ │ ┃ ┃ ─ ▶│ │ │ │ │ │ │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ ─ ─┃ ┃─ ┤ │ ─ ─ ─ ─ ─ ─ │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ │ ┃ ┃ A B C │
┃ Data Page for ColumnChunk 2 │◀┃─ ┗━━━━━━━┛ │ └── ─── ─── ─── ─── ┘
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ │ Parquet
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ Decoder │ ...
┃ Data Page for ColumnChunk 3 │ ┃ │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ │ ┌── ─── ─── ─── ─── ┐
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ │ ┌ ─ ┐ ┌ ─ ┐ ┌ ─ ┐ │
┃ Data Page for ColumnChunk 3 │◀┃─ │ │ │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ ─ ▶│ │ │ │ │ │ │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ │ ─ ─ ─ ─ ─ ─ │
┃ Data Page for ColumnChunk 3 │ ┃ A B C │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃ └── ─── ─── ─── ─── ┘
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
Parquet file Smaller in memory
batches for
processing
虽然流式传输是一个不难解释的特性,但解码的状态性质,特别是在多列和任意嵌套数据中,行与值之间的关系不固定,需要复杂的中间缓冲和大量的工程工作才能正确处理。
字典保留
字典编码,也称为分类编码,是一种技术,其中列中的每个值不直接存储,而是存储在单独列表(称为“字典”)中的索引。这种技术实现了第三范式的许多优点,适用于具有重复值(低基数)的列,并且对于“城市”等字符串列特别有效。
ColumnChunk 中的第一个页面可以选择是字典页面,其中包含列类型的值列表。此 ColumnChunk 中的后续页面可以编码此字典中的索引,而不是直接编码值。
鉴于此编码的有效性,如果 Parquet 解码器只是将字典数据解码为本机类型,它将低效地重复相同的值,这对字符串数据尤其灾难性。为了高效处理字典编码数据,在解码期间必须保留编码。方便的是,许多列式格式,例如 Arrow DictionaryArray,都支持这种兼容的编码。
保留字典编码在读取到 Arrow 数组时显着提高了性能,在某些情况下超过60 倍,同时显着减少了内存使用。
保留字典的主要复杂因素是字典是按 ColumnChunk 存储的,因此字典在 RowGroup 之间会发生变化。读取器必须自动为跨多个 RowGroup 的批次重新计算字典,同时还要优化批次大小能被每个 RowGroup 的行数整除的情况。此外,一个列可能只部分字典编码,这进一步复杂了实现。有关此技术及其复杂性的更多信息,请参阅关于将此技术应用于 C++ Parquet 读取器的博客文章。
投影下推
最基本的 Parquet 优化,也是 Parquet 文件最常描述的优化,是投影下推,它同时减少了 I/O 和 CPU 需求。此处的投影意味着“选择部分而非全部列”。鉴于 Parquet 组织数据的方式,读取和解码仅参考列所需的 ColumnChunk 非常简单。
例如,考虑以下形式的 SQL 查询:
SELECT B from table where A > 35
此查询仅需要列 A 和 B 的数据(而不是 C),并且可以将投影“下推”到 Parquet 读取器。
具体来说,使用页脚中的信息,Parquet 读取器可以完全跳过获取 (I/O) 和解码 (CPU) 存储列 C 数据的数据页(在我们示例中为 ColumnChunk 3 和 ColumnChunk 6)。
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
┌─────▶ Data Page for ColumnChunk 1 ("A") ┃
│ ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
│ ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
├─────▶ Data Page for ColumnChunk 1 ("A") ┃
│ ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
│ ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
├─────▶ Data Page for ColumnChunk 2 ("B") ┃
│ ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
│ ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
│ ┃ Data Page for ColumnChunk 3 ("C") ┃
│ ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
A query that │ ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
accesses only │ ┃ Data Page for ColumnChunk 3 ("C") ┃
columns A and B │ ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
can read only the │ ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
relevant pages, ─────┤ ┃ Data Page for ColumnChunk 3 ("C") ┃
skipping any Data │ ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
Page for column C │ ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
├─────▶ Data Page for ColumnChunk 4 ("A") ┃
│ ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
│ ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
├─────▶ Data Page for ColumnChunk 5 ("B") ┃
│ ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
│ ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
├─────▶ Data Page for ColumnChunk 5 ("B") ┃
│ ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
│ ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
└─────▶ Data Page for ColumnChunk 5 ("B") ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
┃ Data Page for ColumnChunk 6 ("C") ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
谓词下推
与投影下推类似,谓词下推也避免从 Parquet 文件中获取和解码数据,但通过使用过滤表达式来实现。此技术通常需要与查询引擎(例如 DataFusion)更紧密地集成,以在扫描期间确定有效谓词并对其进行评估。不幸的是,如果没有仔细的 API 设计,Parquet 解码器和查询引擎最终可能会紧密耦合,从而阻碍重用(例如,Cloudera Parquet 谓词下推文档中有不同的 Impala 和 Spark 实现)。Rust Parquet 读取器使用 RowSelection API 来避免这种耦合。
RowGroup 剪枝
谓词下推最简单的形式,许多基于 Parquet 的查询引擎都支持,是利用页脚中存储的统计信息来跳过整个 RowGroup。我们将此操作称为 RowGroup 剪枝,它类似于许多传统数据仓库系统中的分区剪枝。
对于上面的示例查询,如果特定 RowGroup 中 A 的最大值小于 35,则解码器可以跳过获取和解码该整个 RowGroup 中的任何 ColumnChunk。
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃Row Group 1 Metadata ┃
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃
┃ ┃Column "A" Metadata Min:0 Max:15 ┃◀╋ ┐
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃ Using the min
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ │ and max values
┃ ┃Column "B" Metadata ┃ ┃ from the
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃ │ metadata,
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ RowGroup 1 can
┃ ┃Column "C" Metadata ┃ ┃ ├ ─ ─ be entirely
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃ skipped
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ │ (pruned) when
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ searching for
┃Row Group 2 Metadata ┃ │ rows with A >
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ 35,
┃ ┃Column "A" Metadata Min:10 Max:50 ┃◀╋ ┘
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃
┃ ┃Column "B" Metadata ┃ ┃
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃
┃ ┃Column "C" Metadata ┃ ┃
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
请注意,基于最小值和最大值的剪枝对许多数据布局和列类型有效,但并非全部。具体而言,对于具有许多不同的伪随机值(例如标识符或 UUID)的列,它效果不佳。值得庆幸的是,对于这种用例,Parquet 还支持每个 ColumnChunk 的 Bloom 过滤器。我们正在积极努力在 Apache Rust 的实现中添加 Bloom 过滤器支持。
页面剪枝
一种更复杂的谓词下推形式是使用页脚元数据中可选的页面索引来排除整个数据页。解码器仅解码其他列中相应的行,通常跳过整个页面。
由于各种原因,不同 ColumnChunk 中的页面通常包含不同数量的行,这一事实使此优化复杂化。虽然页面索引可以识别一列中所需的页面,但从一列中剪枝一个页面并不能立即排除其他列中的整个页面。
页面剪枝按以下步骤进行:
- 结合谓词和页面索引来识别要跳过的页面
- 使用偏移量索引确定非跳过页面对应的行范围
- 计算非跳过页面之间范围的交集,并仅解码这些行
最后一点的实现非常复杂,特别是对于嵌套列表,其中一行可能对应多个值。幸运的是,Rust Parquet 读取器在内部隐藏了这种复杂性,并且可以解码任意行选择。
例如,扫描下图所示的存储在 5 个数据页中的列 A 和 B
如果谓词是 A > 35,
- 页面 1 使用页面索引剪枝(最大值为
20),留下[200->onwards]的 RowSelection, - Parquet 读取器完全跳过页面 3(因为其最后一行索引为
99) - (仅)通过读取页面 2、4 和 5 来读取相关行。
如果谓词改为 A > 35 AND B = "F",则页面索引更加有效。
- 使用
A > 35,像之前一样得到[200->onwards]的 RowSelection - 对 B 的剩余页面 4 和页面 5 使用
B = "F",得到[100-244]的 RowSelection - 两个 RowSelection 取交集得到合并的 RowSelection
[200-244] - Parquet 读取器仅从页面 2 和页面 4 解码这 50 行。
┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃
┃ ┌──────────────┐ │ ┌──────────────┐ │ ┃
┃ │ │ │ │ │ │ ┃
┃ │ │ │ │ Page │ │
│ │ │ │ │ 3 │ ┃
┃ │ │ │ │ min: "A" │ │ ┃
┃ │ │ │ │ │ max: "C" │ ┃
┃ │ Page │ │ │ first_row: 0 │ │
│ │ 1 │ │ │ │ ┃
┃ │ min: 10 │ │ └──────────────┘ │ ┃
┃ │ │ max: 20 │ │ ┌──────────────┐ ┃
┃ │ first_row: 0 │ │ │ │ │
│ │ │ │ │ Page │ ┃
┃ │ │ │ │ 4 │ │ ┃
┃ │ │ │ │ │ min: "D" │ ┃
┃ │ │ │ │ max: "G" │ │
│ │ │ │ │first_row: 100│ ┃
┃ └──────────────┘ │ │ │ │ ┃
┃ │ ┌──────────────┐ │ │ │ ┃
┃ │ │ │ └──────────────┘ │
│ │ Page │ │ ┌──────────────┐ ┃
┃ │ 2 │ │ │ │ │ ┃
┃ │ │ min: 30 │ │ │ Page │ ┃
┃ │ max: 40 │ │ │ 5 │ │
│ │first_row: 200│ │ │ min: "H" │ ┃
┃ │ │ │ │ max: "Z" │ │ ┃
┃ │ │ │ │ │first_row: 250│ ┃
┃ └──────────────┘ │ │ │ │
│ │ └──────────────┘ ┃
┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃
┃ ColumnChunk ColumnChunk ┃
┃ A B
━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛
从 Arrow C++(以及扩展的 pyarrow/pandas)读写这些索引的支持已在 PARQUET-1404 中进行跟踪。
延迟物化
前两种谓词下推形式仅在解码值之前对为 RowGroup、ColumnChunk 和数据页存储的元数据进行操作。然而,相同的技术也适用于解码一个或多个列的值之后,但在解码其他列之前,这通常称为“延迟物化”。
此技术在以下情况下特别有效:
- 谓词非常具有选择性,即过滤掉大量行
- 每行很大,无论是由于宽行(例如 JSON blobs)还是许多列
- 所选数据聚集在一起
- 谓词所需的列解码成本相对较低,例如 PrimitiveArray / DictionaryArray
SPARK-36527 和 Impala 中有关于此技术优点的额外讨论。
例如,给定上面的谓词 A > 35 AND B = "F",其中引擎使用页面索引来确定 RowSelection [100-244] 中只有 50 行可以匹配,使用延迟物化,Parquet 解码器
- 解码列 A 的 50 个值
- 对这 50 个值评估
A > 35 - 在这种情况下,只有 5 行通过,得到 RowSelection
- RowSelection[205-206]
- RowSelection[238-240]
- 仅解码列 B 的这 5 行
Row Index
┌────────────────────┐ ┌────────────────────┐
200 │ 30 │ │ "F" │
└────────────────────┘ └────────────────────┘
... ...
┌────────────────────┐ ┌────────────────────┐
205 │ 37 │─ ─ ─ ─ ─ ─▶│ "F" │
├────────────────────┤ ├────────────────────┤
206 │ 36 │─ ─ ─ ─ ─ ─▶│ "G" │
└────────────────────┘ └────────────────────┘
... ...
┌────────────────────┐ ┌────────────────────┐
238 │ 36 │─ ─ ─ ─ ─ ─▶│ "F" │
├────────────────────┤ ├────────────────────┤
239 │ 36 │─ ─ ─ ─ ─ ─▶│ "G" │
├────────────────────┤ ├────────────────────┤
240 │ 40 │─ ─ ─ ─ ─ ─▶│ "G" │
└────────────────────┘ └────────────────────┘
... ...
┌────────────────────┐ ┌────────────────────┐
244 │ 26 │ │ "D" │
└────────────────────┘ └────────────────────┘
Column A Column B
Values Values
在某些情况下,例如我们的示例中 B 存储单个字符值,延迟物化的开销可能超过解码的节省。然而,当满足上述某些条件时,节省可能是巨大的。查询引擎必须决定下推哪些谓词以及以何种顺序应用它们才能获得最佳结果。
虽然这超出了本文的范围,但相同的技术可以应用于多个谓词以及多个列上的谓词。有关更多信息,请参阅 Parquet crate 中的 RowFilter 接口,以及 DataFusion 中的 row_filter 实现。
I/O 下推
虽然 Parquet 是为在 HDFS 分布式文件系统上高效访问而设计的,但它与 AWS S3 等商用 blob 存储系统配合得很好,因为它们具有非常相似的特性。
- 相对较慢的“随机访问”读取:在每个请求中读取大块(MBs)数据比对小块数据发出许多请求效率高得多
- 检索第一个字节之前的显著延迟
- 高每请求成本: 通常按请求计费,无论读取字节数多少,这鼓励减少请求数量,每个请求读取一个大的连续数据段。
为了从这些系统高效读取,Parquet 读取器必须
- 最小化 I/O 请求的数量,同时应用各种下推技术以避免获取大量未使用的数据。
- 与适当的任务调度机制集成,以交错 I/O 和对获取数据的处理,以避免管道瓶颈。
由于这些是重大的工程和集成挑战,许多 Parquet 读取器仍然要求文件完全获取到本地存储。
为了处理文件而获取整个文件并不理想,原因有以下几点:
- 高延迟:直到整个文件被获取后才能开始解码(Parquet 元数据位于文件末尾,因此解码器必须在解码其余部分之前看到文件末尾)
- 浪费工作:获取整个文件会获取所有必要数据,但也可能获取大量不必要的数据,这些数据在读取页脚后将被跳过。这不必要地增加了成本。
- 需要昂贵的“本地附加”存储(或内存):许多云环境不提供带有本地附加存储的计算资源——它们要么依赖昂贵的网络块存储(如 AWS EBS),要么将本地存储限制在某些类别的 VM。
避免缓冲整个文件的需要,需要一个复杂的 Parquet 解码器,与 I/O 子系统集成,能够首先获取和解码元数据,然后针对相关数据块进行范围获取,并与 Parquet 数据的解码交错进行。这种优化需要仔细的工程设计,以从对象存储中获取足够大的数据块,从而使每请求开销不会抵消减少传输字节所带来的收益。SPARK-36529 更详细地描述了顺序处理的挑战。
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
│
│
Step 1: Fetch │
Parquet Parquet metadata
file on ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━▼━━━━━━━┓
Remote ┃ ▒▒▒▒▒▒▒▒▒▒ ▒▒▒▒▒▒▒▒▒▒ ░░░░░░░░░░ ┃
Object ┃ ▒▒▒data▒▒▒ ▒▒▒data▒▒▒ ░metadata░ ┃
Store ┃ ▒▒▒▒▒▒▒▒▒▒ ▒▒▒▒▒▒▒▒▒▒ ░░░░░░░░░░ ┃
┗━━━━━━━━━━━▲━━━━━━━━━━━━━━━━━━━━━▲━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
│ └ ─ ─ ─
│
│ Step 2: Fetch only
─ ─ ─ ─ ─ ─ ─ ─ ─ relevant data blocks
此图中未包含请求合并和确保实际实现所需的最小请求大小等细节。
Rust Parquet crate 提供了一个异步 Parquet 读取器,可以高效地从任何支持范围请求的 AsyncFileReader 读取,它具有以下特点:
- 高效地从任何支持范围请求的存储介质读取
- 与 Rust 的 futures 生态系统集成,避免阻塞等待网络 I/O 的线程,并可以轻松交错 CPU 和网络
- 同时请求多个范围,允许实现合并相邻范围,并行获取范围等。
- 使用前面描述的下推技术,尽可能消除数据获取
- 与 Apache Arrow object_store crate 轻松集成,您可以在此处了解更多信息
为了说明可能性,下图显示了从远程文件获取页脚元数据,使用该元数据确定要读取哪些数据页,然后同时获取数据和解码的时间线。为了匹配网络延迟、带宽和可用 CPU,通常需要同时对多个文件执行此过程。
begin
metadata read of end read
read ─ ─ ─ ┐ data of data │
begin complete block block
read of │ │ │ │
metadata ─ ─ ─ ┐ At any time, there are
│ │ │ │ │ multiple network
│ ▼ ▼ ▼ ▼ requests outstanding to
file 1 │ ░░░░░░░░░░ ▒▒▒read▒▒▒ ▒▒▒read▒▒▒ │ hide the individual
│ ░░░read░░░ ▒▒▒data▒▒▒ ▒▒▒data▒▒▒ request latency
│ ░metadata░ ▓▓decode▓▓
│ ░░░░░░░░░░ ▓▓▓data▓▓▓
│ │
│
│ ░░░░░░░░░░ ▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒read▒▒▒▒│▒▒▒▒▒▒▒▒▒▒▒▒▒▒
file 2 │ ░░░read░░░ ▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒data▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒
│ ░metadata░ │ ▓▓▓▓▓decode▓▓▓▓▓▓
│ ░░░░░░░░░░ ▓▓▓▓▓▓data▓▓▓▓▓▓▓
│ │
│
│ ░░│░░░░░░░ ▒▒▒read▒▒▒ ▒▒▒▒read▒▒▒▒▒
file 3 │ ░░░read░░░ ▒▒▒data▒▒▒ ▒▒▒▒data▒▒▒▒▒ ...
│ ░m│tadata░ ▓▓decode▓▓
│ ░░░░░░░░░░ ▓▓▓data▓▓▓
└───────────────────────────────────────┼──────────────────────────────▶Time
│
结论
我们希望您喜欢阅读关于 Parquet 文件格式以及用于快速查询 Parquet 文件的各种技术。
我们相信,大多数开源 Parquet 实现之所以没有本文所述的广泛功能,是因为这需要巨大的努力,而这在以前只有资金雄厚的商业企业才能做到,而这些企业又将其实现闭源。
然而,随着 Apache Arrow 社区(包括 Rust 实践者和更广泛的 Arrow 社区)的成长和质量提升,我们能够协作构建尖端的开源实现,这令人振奋且非常令人满意。本文所述的技术是许多工程师(来自公司、业余爱好者和全球各地)在多个存储库中贡献的结果,尤其是 Apache Arrow DataFusion、Apache Arrow 和 Apache Arrow Ballista。
如果您有兴趣加入 DataFusion 社区,请联系我们。