毫秒级延迟查询 Parquet


已发布 2022年12月26日
作者 tustvold 和 alamb

毫秒级延迟查询 Parquet

注意:本文最初发表在 InfluxData 博客上。

我们认为,直接查询 Apache Parquet 文件中的数据可以实现与大多数专用文件格式相似或更好的存储效率和查询性能。虽然这需要大量的工程工作,但 Parquet 的开放格式和广泛的生态系统支持使其成为各种数据系统的显而易见的选择。

在本文中,我们将解释快速查询 Parquet 格式存储的数据所需的几种先进技术,这些技术我们在 Apache Arrow Rust Parquet 读取器中实现。 这些技术共同使 Rust 实现成为查询 Parquet 文件最快的实现之一(即使不是最快的)——无论是在本地磁盘还是远程对象存储上。 它能够在 毫秒级的时间内查询 GB 级的 Parquet 数据。

我们要感谢并感谢 InfluxData 对这项工作的支持。 InfluxData 对开源软件有着深刻和持续的承诺,它赞助了我们撰写这篇博文的大部分时间,以及作为构建 InfluxDB IOx 存储引擎的一部分的许多贡献。

背景

Apache Parquet 是一种越来越流行的开放格式,用于存储 分析数据集,并且由于其引人注目的以下组合,已成为具有成本效益的、与 DBMS 无关的数据存储的事实标准:

  • 高压缩率
  • 适合 S3 等商品 blob 存储
  • 广泛的生态系统和工具支持
  • 跨许多不同平台和工具的可移植性
  • 支持 任意结构化数据

越来越多的其他系统,例如 DuckDBRedshift 允许直接查询存储在 Parquet 中的数据,但与它们的本机(自定义)文件格式相比,支持通常仍然是次要考虑因素。 这种格式包括 DuckDB .duckdb 文件格式、Apache IOT TsFileGorilla 格式等。

以前仅在闭源商业实现中提供的相同的复杂查询技术,现在首次作为开源提供。 所需的工程能力来自大型、运行良好的开源项目,这些项目拥有全球贡献者社区,例如 Apache ArrowApache Impala

Parquet 文件格式

在深入研究如何有效读取 Parquet 的细节之前,了解文件布局非常重要。 文件格式经过精心设计,可以快速定位所需信息、跳过无关部分并有效解码剩余内容。

  • Parquet 文件中的数据被分成称为 RowGroup 的水平切片
  • 每个 RowGroup 包含 schema 中每一列的单个 ColumnChunk

例如,下图说明了一个 Parquet 文件,其中包含存储在两个 RowGroup 中的三列“A”、“B”和“C”,总共 6 个 ColumnChunk

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┓          ┃
┃┃┌ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ┐┌ ─ ─ ─ ─ ─ ─  ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃│             │             ││              ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃│             │             ││              ┃ RowGroup ┃
┃┃             │                            │ ┃     1    ┃
┃┃│             │             ││              ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃└ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ┘└ ─ ─ ─ ─ ─ ─  ┃          ┃
┃┃ColumnChunk 1  ColumnChunk 2 ColumnChunk 3  ┃          ┃
┃┃ (Column "A")   (Column "B")  (Column "C")  ┃          ┃
┃┗━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┛          ┃
┃┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┓          ┃
┃┃┌ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ┐┌ ─ ─ ─ ─ ─ ─  ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃│             │             ││              ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃│             │             ││              ┃ RowGroup ┃
┃┃             │                            │ ┃     2    ┃
┃┃│             │             ││              ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃└ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ┘└ ─ ─ ─ ─ ─ ─  ┃          ┃
┃┃ColumnChunk 4  ColumnChunk 5 ColumnChunk 6  ┃          ┃
┃┃ (Column "A")   (Column "B")  (Column "C")  ┃          ┃
┃┗━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┛          ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

ColumnChunk 的逻辑值使用许多 可用编码之一写入到一个或多个按顺序附加在文件中的数据页中。 在 Parquet 文件的末尾是一个页脚,其中包含重要的元数据,例如

  • 文件的 schema 信息,例如列名和类型
  • 文件中 RowGroupColumnChunk 的位置

页脚可能还包含其他专门的数据结构

  • 每个 ColumnChunk 的可选统计信息,包括最小值/最大值和空值计数
  • 指向 OffsetIndexes 的可选指针,其中包含每个单独页面的位置
  • 指向 ColumnIndex 的可选指针,其中包含每个页面的行数和摘要统计信息
  • 指向 BloomFilterData 的可选指针,可以快速检查值是否存在于 ColumnChunk

例如,前一个图中的 2 个行组和 6 个 ColumnChunk 的逻辑结构可以存储在 Parquet 文件中,如下图所示(未按比例)。 ColumnChunk 的页面首先出现,然后是页脚。 数据、编码方案的有效性以及 Parquet 编码器的设置决定了每个 ColumnChunk 所需的页面数量和大小。 在这种情况下,ColumnChunk 1 需要 2 个页面,而 ColumnChunk 6 只需要 1 个页面。 除了其他信息外,页脚还包含每个数据页面的位置和列的类型。

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 1 ("A")             ◀─┃─ ─ ─│
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │
┃ Data Page for ColumnChunk 1 ("A")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 2 ("B")               ┃     │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │
┃ Data Page for ColumnChunk 3 ("C")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 3 ("C")               ┃     │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │
┃ Data Page for ColumnChunk 3 ("C")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 4 ("A")             ◀─┃─ ─ ─│─ ┐
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │  │
┃ Data Page for ColumnChunk 5 ("B")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │  │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 5 ("B")               ┃     │  │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │  │
┃ Data Page for ColumnChunk 5 ("B")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │  │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 6 ("C")               ┃     │  │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃     │  │
┃┃Footer                                        ┃ ┃
┃┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ ┃     │  │
┃┃ ┃File Metadata                             ┃ ┃ ┃
┃┃ ┃ Schema, etc                              ┃ ┃ ┃     │  │
┃┃ ┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓     ┃ ┃ ┃
┃┃ ┃ ┃Row Group 1 Metadata              ┃     ┃ ┃ ┃     │  │
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓             ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "A" Metadata┃ Location of ┃     ┃ ┃ ┃     │  │
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ first Data  ┣ ─ ─ ╋ ╋ ╋ ─ ─
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ Page, row   ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┃Column "B" Metadata┃ counts,     ┃     ┃ ┃ ┃
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ sizes,      ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ min/max     ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "C" Metadata┃ values, etc ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛             ┃     ┃ ┃ ┃
┃┃ ┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛     ┃ ┃ ┃        │
┃┃ ┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓     ┃ ┃ ┃
┃┃ ┃ ┃Row Group 2 Metadata              ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ Location of ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "A" Metadata┃ first Data  ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ Page, row   ┣ ─ ─ ╋ ╋ ╋ ─ ─ ─ ─
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ counts,     ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "B" Metadata┃ sizes,      ┃     ┃ ┃ ┃
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ min/max     ┃     ┃ ┃ ┃
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ values, etc ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "C" Metadata┃             ┃     ┃ ┃ ┃
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛             ┃     ┃ ┃ ┃
┃┃ ┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛     ┃ ┃ ┃
┃┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃ ┃
┃┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

创建 Parquet 文件时,需要考虑许多重要的标准,例如如何最佳地对数据进行排序/聚类并将其构建到 RowGroup 和数据页面中。 这种“物理设计”方面的考虑非常复杂,值得单独写一系列文章,并且本文不会涉及。 相反,我们专注于如何使用可用的结构来使查询非常快。

优化查询

在任何查询处理系统中,以下技术通常可以提高性能

  1. 减少必须从辅助存储传输以进行处理的数据(减少 I/O)
  2. 减少解码数据的计算负载(减少 CPU)
  3. 交错/流水线化数据的读取和解码(提高并行性)

相同的原则适用于查询 Parquet 文件,如下所述

解码优化

Parquet 通过使用 复杂的编码技术(例如游程压缩、字典编码、增量编码等)实现了令人印象深刻的压缩率。 因此,CPU 绑定的解码任务可能会占据查询延迟的主导地位。 Parquet 读取器可以使用多种技术来提高此任务的延迟和吞吐量,就像我们在 Rust 实现中所做的那样。

向量化解码

大多数分析系统一次解码多个值到列式内存格式(例如 Apache Arrow),而不是逐行处理数据。 这通常称为向量化或列式处理,并且有益,因为它

  • 分摊了根据要解码的列的类型进行切换的调度开销
  • 通过从 ColumnChunk 读取连续值来提高缓存局部性
  • 通常允许在单个指令中解码多个值。
  • 通过单个大的分配避免了许多小的堆分配,从而为可变长度类型(例如字符串和字节数组)节省了大量资金

因此,Rust Parquet 读取器实现了专门的解码器,用于将 Parquet 直接读取到 列式内存格式(Arrow 数组)中。

流式解码

哪些行存储在 ColumnChunk 的哪些页面中没有关系。 例如,第 10,000 行的逻辑值可能在 A 列的第一页中,而在 B 列的第三页中。

向量化解码最简单的方法,也是 Parquet 解码器最初实现的方法,是一次解码整个 RowGroup(或 ColumnChunk)。

但是,考虑到 Parquet 的高压缩率,单个 RowGroup 可能包含数百万行。 一次解码这么多行不是最佳的,因为它

  • 需要大量的中间 RAM:针对处理优化的典型内存格式(例如 Apache Arrow)比其 Parquet 编码格式需要更多。
  • 增加查询延迟:只有在解码整个 RowGroup(或 ColumnChunk)之后,后续处理步骤(如过滤或聚合)才能开始。

因此,最好的 Parquet 读取器支持“流式传输”数据,方法是按需生成可配置大小的行批次。批次大小必须足够大,以摊销解码开销,但又足够小,以实现高效的内存使用,并允许下游处理在后续批次解码的同时并发开始。

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
┃ Data Page for ColumnChunk 1 │◀┃─                   ┌── ─── ─── ─── ─── ┐
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃ │   ┏━━━━━━━┓        ┌ ─ ┐ ┌ ─ ┐ ┌ ─ ┐ │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃     ┃       ┃      │                   │
┃ Data Page for ColumnChunk 1 │ ┃ │   ┃       ┃   ─ ▶│ │   │ │   │ │   │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃  ─ ─┃       ┃─ ┤   │  ─ ─   ─ ─   ─ ─  │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃ │   ┃       ┃           A    B     C   │
┃ Data Page for ColumnChunk 2 │◀┃─    ┗━━━━━━━┛  │   └── ─── ─── ─── ─── ┘
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃ │    Parquet
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃      Decoder   │            ...
┃ Data Page for ColumnChunk 3 │ ┃ │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃                │   ┌── ─── ─── ─── ─── ┐
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃ │                    ┌ ─ ┐ ┌ ─ ┐ ┌ ─ ┐ │
┃ Data Page for ColumnChunk 3 │◀┃─               │   │                   │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃                 ─ ▶│ │   │ │   │ │   │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃                    │  ─ ─   ─ ─   ─ ─  │
┃ Data Page for ColumnChunk 3 │ ┃                         A    B     C   │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃                    └── ─── ─── ─── ─── ┘
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

      Parquet file                                    Smaller in memory
                                                         batches for
                                                         processing

虽然流式传输不是一个难以解释的功能,但解码的stateful性质,尤其是在多个列和任意嵌套数据上,其中行和值之间的关系不是固定的,需要复杂的中间缓冲和大量的工程工作才能正确处理。

字典保留

字典编码,也称为分类编码,是一种技术,其中列中的每个值不是直接存储,而是存储在名为“字典”的单独列表中的索引。这种技术实现了第三范式的许多好处,适用于具有重复值(低基数)的列,并且对于字符串列(例如“城市”)尤其有效。

ColumnChunk中的第一个页面可以选择是一个字典页面,包含该列类型的值列表。然后,此ColumnChunk中的后续页面可以编码到此字典的索引,而不是直接编码值。

鉴于这种编码的有效性,如果 Parquet 解码器只是将字典数据解码为本机类型,它将低效地重复复制相同的值,这对于字符串数据来说尤其糟糕。为了有效地处理字典编码的数据,必须在解码期间保留编码。方便的是,许多列式格式(例如 Arrow DictionaryArray)都支持这种兼容的编码。

保留字典编码可以显著提高读取到 Arrow 数组时的性能,在某些情况下超过60 倍,并且使用的内存也明显更少。

保留字典的主要复杂因素是字典存储在每个ColumnChunk中,因此字典在RowGroup之间会发生变化。读取器必须自动为跨多个RowGroup的批次重新计算字典,同时优化批次大小均匀划分为每个RowGroup的行数的情况。此外,一列可能只是部分字典编码,这进一步复杂化了实现。有关此技术及其复杂性的更多信息,请参见关于将此技术应用于 C++ Parquet 读取器的博客文章

投影下推

最基本的 Parquet 优化,也是 Parquet 文件最常描述的优化是投影下推,它减少了 I/O 和 CPU 需求。上下文中的投影意味着“选择某些但不是全部的列”。鉴于 Parquet 组织数据的方式,仅读取和解码引用列所需的ColumnChunk很简单。

例如,考虑以下形式的 SQL 查询

SELECT B from table where A > 35

此查询仅需要列 A 和 B 的数据(而不是 C),并且可以将投影“下推”到 Parquet 读取器。

具体来说,使用页脚中的信息,Parquet 读取器可以完全跳过提取 (I/O) 和解码 (CPU) 存储列 C 数据的 Data Pages(在我们示例中为ColumnChunk 3 和ColumnChunk 6)。

                             ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
                             ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ┌─────▶ Data Page for ColumnChunk 1 ("A") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 1 ("A") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 2 ("B") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       │     ┃ Data Page for ColumnChunk 3 ("C") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
   A query that        │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
  accesses only        │     ┃ Data Page for ColumnChunk 3 ("C") ┃
 columns A and B       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
can read only the      │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
 relevant pages,  ─────┤     ┃ Data Page for ColumnChunk 3 ("C") ┃
skipping any Data      │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
Page for column C      │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 4 ("A") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 5 ("B") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 5 ("B") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       └─────▶ Data Page for ColumnChunk 5 ("B") ┃
                             ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                             ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                             ┃ Data Page for ColumnChunk 6 ("C") ┃
                             ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                             ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

谓词下推

与投影下推类似,谓词下推也避免了从 Parquet 文件中提取和解码数据,但使用过滤器表达式来实现这一点。此技术通常需要与查询引擎(例如DataFusion)更紧密的集成,以确定有效的谓词并在扫描期间对其进行评估。不幸的是,如果没有仔细的 API 设计,Parquet 解码器和查询引擎最终可能会紧密耦合,从而阻止重用(例如,Cloudera Parquet Predicate Pushdown 文档中有不同的 Impala 和 Spark 实现)。Rust Parquet 读取器使用RowSelection API 来避免这种耦合。

RowGroup 剪枝

许多基于 Parquet 的查询引擎支持的最简单的谓词下推形式是使用存储在页脚中的统计信息来跳过整个RowGroup。我们称此操作为RowGroup剪枝,它类似于许多经典数据仓库系统中的分区剪枝

对于上面的示例查询,如果特定RowGroup中 A 的最大值小于 35,则解码器可以跳过从该整个RowGroup中提取和解码任何ColumnChunk

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃Row Group 1 Metadata                      ┃
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃
┃ ┃Column "A" Metadata    Min:0 Max:15   ┃◀╋ ┐
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃       Using the min
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ │     and max values
┃ ┃Column "B" Metadata                   ┃ ┃       from the
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃ │     metadata,
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃       RowGroup 1  can
┃ ┃Column "C" Metadata                   ┃ ┃ ├ ─ ─ be entirely
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃       skipped
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ │     (pruned) when
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓       searching for
┃Row Group 2 Metadata                      ┃ │     rows with A >
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃       35,
┃ ┃Column "A" Metadata   Min:10 Max:50   ┃◀╋ ┘
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃
┃ ┃Column "B" Metadata                   ┃ ┃
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃
┃ ┃Column "C" Metadata                   ┃ ┃
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

请注意,对最小值和最大值进行剪枝对于许多数据布局和列类型有效,但并非全部有效。具体来说,对于具有许多不同的伪随机值(例如,标识符或 uuid)的列,它不如有效。值得庆幸的是,对于此用例,Parquet 还支持每个ColumnChunk Bloom Filters。我们正在积极致力于在 Apache Rust 的实现中添加 bloom 过滤器支持。

页面剪枝

一种更复杂的谓词下推形式是使用页脚元数据中的可选页面索引来排除整个 Data Pages。解码器仅解码来自其他列的相应行,通常跳过整个页面。

由于各种原因,不同ColumnChunk中的页面通常包含不同数量的行,这一事实使此优化变得复杂。虽然页面索引可能从一列中标识所需的页面,但从一列中剪枝页面并不能立即排除其他列中的整个页面。

页面剪枝按如下方式进行

  • 使用谓词与页面索引结合使用来标识要跳过的页面
  • 使用偏移索引来确定哪些行范围对应于非跳过页面
  • 计算非跳过页面上的范围的交集,并且仅解码这些行

最后一点的实现非常重要,特别是对于嵌套列表,其中单行可能对应于多个值。幸运的是,Rust Parquet 读取器在内部隐藏了这种复杂性,并且可以解码任意的RowSelections

例如,要扫描存储在 5 个 Data Pages 中的列 A 和 B,如下图所示

如果谓词是A > 35

  • 使用页面索引剪枝 Page 1(最大值为20),留下 RowSelection [200->onwards],
  • Parquet 读取器完全跳过 Page 3(因为它的最后一行索引为99
  • 通过读取页面 2、4 和 5 来读取(仅)相关行。

如果谓词改为A > 35 AND B = "F",则页面索引会更有效

  • 使用A > 35,产生 RowSelection [200->onwards],如前所述
  • 在 B 的剩余页面 4 和页面 5 上使用B = "F",产生 RowSelection [100-244]
  • 相交的两个 RowSelection 留下组合的 RowSelection [200-244]
  • Parquet 读取器仅解码页面 2 和页面 4 中的 50 行。
┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━
   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┃
┃     ┌──────────────┐  │     ┌──────────────┐  │  ┃
┃  │  │              │     │  │              │     ┃
┃     │              │  │     │     Page     │  │
   │  │              │     │  │      3       │     ┃
┃     │              │  │     │   min: "A"   │  │  ┃
┃  │  │              │     │  │   max: "C"   │     ┃
┃     │     Page     │  │     │ first_row: 0 │  │
   │  │      1       │     │  │              │     ┃
┃     │   min: 10    │  │     └──────────────┘  │  ┃
┃  │  │   max: 20    │     │  ┌──────────────┐     ┃
┃     │ first_row: 0 │  │     │              │  │
   │  │              │     │  │     Page     │     ┃
┃     │              │  │     │      4       │  │  ┃
┃  │  │              │     │  │   min: "D"   │     ┃
┃     │              │  │     │   max: "G"   │  │
   │  │              │     │  │first_row: 100│     ┃
┃     └──────────────┘  │     │              │  │  ┃
┃  │  ┌──────────────┐     │  │              │     ┃
┃     │              │  │     └──────────────┘  │
   │  │     Page     │     │  ┌──────────────┐     ┃
┃     │      2       │  │     │              │  │  ┃
┃  │  │   min: 30    │     │  │     Page     │     ┃
┃     │   max: 40    │  │     │      5       │  │
   │  │first_row: 200│     │  │   min: "H"   │     ┃
┃     │              │  │     │   max: "Z"   │  │  ┃
┃  │  │              │     │  │first_row: 250│     ┃
┃     └──────────────┘  │     │              │  │
   │                       │  └──────────────┘     ┃
┃   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃       ColumnChunk            ColumnChunk         ┃
┃            A                      B
 ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛

PARQUET-1404中跟踪了对从 Arrow C++ 以及 pyarrow/pandas 读取和写入这些索引的支持。

延迟物化

前两种形式的谓词下推仅在解码值之前对存储在RowGroupColumnChunk和 Data Pages 中的元数据进行操作。但是,相同的技术也扩展到在一个或多个列的值解码之后但在解码其他列之前,这通常称为“延迟物化”。

当以下情况时,此技术尤其有效

  • 谓词非常具有选择性,即过滤掉大量的行
  • 每行都很大,要么是由于宽行(例如,JSON blob),要么是由于有很多列
  • 所选数据聚集在一起
  • 谓词所需的列解码起来相对便宜,例如 PrimitiveArray / DictionaryArray

SPARK-36527Impala中还有关于此技术优点的其他讨论。

例如,给定上面的谓词A > 35 AND B = "F",其中引擎使用页面索引来确定在 RowSelection [100-244] 中只有 50 行可能匹配,使用延迟物化,Parquet 解码器

  • 解码列 A 的 50 个值
  • 在这些 50 个值上评估A > 35
  • 在这种情况下,只有 5 行通过,从而导致 RowSelection
    • RowSelection[205-206]
    • RowSelection[238-240]
  • 仅解码这些选定的 5 行 Column B
  Row Index
             ┌────────────────────┐            ┌────────────────────┐
       200   │         30         │            │        "F"         │
             └────────────────────┘            └────────────────────┘
                      ...                               ...
             ┌────────────────────┐            ┌────────────────────┐
       205   │         37         │─ ─ ─ ─ ─ ─▶│        "F"         │
             ├────────────────────┤            ├────────────────────┤
       206   │         36         │─ ─ ─ ─ ─ ─▶│        "G"         │
             └────────────────────┘            └────────────────────┘
                      ...                               ...
             ┌────────────────────┐            ┌────────────────────┐
       238   │         36         │─ ─ ─ ─ ─ ─▶│        "F"         │
             ├────────────────────┤            ├────────────────────┤
       239   │         36         │─ ─ ─ ─ ─ ─▶│        "G"         │
             ├────────────────────┤            ├────────────────────┤
       240   │         40         │─ ─ ─ ─ ─ ─▶│        "G"         │
             └────────────────────┘            └────────────────────┘
                      ...                               ...
             ┌────────────────────┐            ┌────────────────────┐
      244    │         26         │            │        "D"         │
             └────────────────────┘            └────────────────────┘


                   Column A                          Column B
                    Values                            Values

在某些情况下,例如我们的示例,其中 B 存储单个字符值,延迟物化机制的成本可能超过解码中的节省。但是,当满足上面列出的一些条件时,节省可能是巨大的。查询引擎必须决定要下推哪些谓词以及以何种顺序应用它们以获得最佳结果。

虽然这超出了本文档的范围,但相同的技术可以应用于多个谓词以及多个列上的谓词。有关更多信息,请参阅 Parquet crate 中的 RowFilter 接口,以及 DataFusion 中的 row_filter 实现。

I/O 下推

虽然 Parquet 被设计用于高效访问 HDFS 分布式文件系统,但它与商品 blob 存储系统(如 AWS S3)配合得非常好,因为它们具有非常相似的特性

  • 相对较慢的“随机访问”读取:每次请求读取大的(MB 级)数据块比对较小部分发出许多请求效率更高
  • 检索第一个字节之前存在明显的延迟
  • 高单次请求成本:通常按请求收费,无论读取的字节数是多少,这鼓励减少请求次数,每次读取较大的连续数据块。

为了以最佳方式从这些系统读取数据,Parquet 读取器必须

  1. 最大限度地减少 I/O 请求的数量,同时应用各种下推技术,以避免获取大量未使用的数据。
  2. 与适当的任务调度机制集成,以便交错处理 I/O 和获取的数据,以避免管道瓶颈。

由于这些都是重大的工程和集成挑战,许多 Parquet 读取器仍然需要将文件完整地提取到本地存储。

为了处理而提取整个文件并非理想选择,原因如下

  1. 高延迟:在提取整个文件之前无法开始解码(Parquet 元数据位于文件末尾,因此解码器必须先看到末尾才能解码其余部分)
  2. 浪费工作:提取整个文件会提取所有必要的数据,但同时也可能提取许多不必要的数据,这些数据在读取页脚后将被跳过。这不必要地增加了成本。
  3. 需要昂贵的“本地连接”存储(或内存):许多云环境不提供具有本地连接存储的计算资源 – 它们要么依赖昂贵的网络块存储(如 AWS EBS),要么将本地存储限制为某些类别的 VM。

避免需要缓冲整个文件需要一个复杂的 Parquet 解码器,该解码器与 I/O 子系统集成,可以首先提取和解码元数据,然后对相关数据块进行范围提取,并与 Parquet 数据的解码交错进行。此优化需要仔细的工程设计,才能从对象存储中提取足够大的数据块,以使每次请求的开销不会超过减少传输字节数带来的收益。SPARK-36529 更详细地描述了顺序处理的挑战。

                       ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
                                                                │
                       │
               Step 1: Fetch                                    │
 Parquet       Parquet metadata
 file on ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━▼━━━━━━━┓
 Remote  ┃      ▒▒▒▒▒▒▒▒▒▒          ▒▒▒▒▒▒▒▒▒▒               ░░░░░░░░░░ ┃
 Object  ┃      ▒▒▒data▒▒▒          ▒▒▒data▒▒▒               ░metadata░ ┃
  Store  ┃      ▒▒▒▒▒▒▒▒▒▒          ▒▒▒▒▒▒▒▒▒▒               ░░░░░░░░░░ ┃
         ┗━━━━━━━━━━━▲━━━━━━━━━━━━━━━━━━━━━▲━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
                     │                     └ ─ ─ ─
                                                  │
                     │                   Step 2: Fetch only
                      ─ ─ ─ ─ ─ ─ ─ ─ ─ relevant data blocks

此图示中未包括诸如合并请求和确保实际实现所需的最小请求大小之类的详细信息。

Rust Parquet crate 提供了一个异步 Parquet 读取器,可以有效地从任何 AsyncFileReader 读取数据,该读取器

  • 可以有效地从任何支持范围请求的存储介质读取数据
  • 与 Rust 的 futures 生态系统集成,以避免阻塞线程等待网络 I/O 并且可以轻松地交错 CPU 和网络
  • 同时请求多个范围,允许实现合并相邻范围、并行获取范围等。
  • 使用之前描述的下推技术来消除尽可能多地获取数据的情况
  • 可以轻松地与 Apache Arrow object_store crate 集成,您可以在 此处 阅读更多相关信息

为了让您了解可能实现的效果,下图显示了从远程文件提取页脚元数据、使用该元数据确定要读取的数据页,然后同时提取数据和解码的时间线。为了匹配网络延迟、带宽和可用的 CPU,通常必须一次对多个文件执行此过程。

                           begin
          metadata        read of   end read
            read  ─ ─ ─ ┐   data    of data          │
 begin    complete         block     block
read of                 │   │        │               │
metadata  ─ ─ ─ ┐                                       At any time, there are
             │          │   │        │               │     multiple network
             │  ▼       ▼   ▼        ▼                  requests outstanding to
  file 1     │ ░░░░░░░░░░   ▒▒▒read▒▒▒   ▒▒▒read▒▒▒  │    hide the individual
             │ ░░░read░░░   ▒▒▒data▒▒▒   ▒▒▒data▒▒▒        request latency
             │ ░metadata░                         ▓▓decode▓▓
             │ ░░░░░░░░░░                         ▓▓▓data▓▓▓
             │                                       │
             │
             │ ░░░░░░░░░░  ▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒read▒▒▒▒│▒▒▒▒▒▒▒▒▒▒▒▒▒▒
   file 2    │ ░░░read░░░  ▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒data▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒
             │ ░metadata░                            │              ▓▓▓▓▓decode▓▓▓▓▓▓
             │ ░░░░░░░░░░                                           ▓▓▓▓▓▓data▓▓▓▓▓▓▓
             │                                       │
             │
             │                                     ░░│░░░░░░░  ▒▒▒read▒▒▒  ▒▒▒▒read▒▒▒▒▒
   file 3    │                                     ░░░read░░░  ▒▒▒data▒▒▒  ▒▒▒▒data▒▒▒▒▒      ...
             │                                     ░m│tadata░            ▓▓decode▓▓
             │                                     ░░░░░░░░░░            ▓▓▓data▓▓▓
             └───────────────────────────────────────┼──────────────────────────────▶Time


                                                     │

结论

我们希望您喜欢阅读有关 Parquet 文件格式以及用于快速查询 Parquet 文件的各种技术的文章。

我们认为,大多数 Parquet 开源实现没有本文中描述的广泛功能的原因是,它需要付出巨大的努力,而以前只有资金充足的商业企业才能做到,这些企业保持其实现闭源。

但是,随着 Apache Arrow 社区、Rust 从业者和更广泛的 Arrow 社区的增长和质量,我们协作并构建尖端开源实现的能力令人振奋和非常满意。此博客中描述的技术是许多工程师在跨公司、爱好者和全球多个存储库中的贡献的结果,特别是 Apache Arrow DataFusionApache ArrowApache Arrow Ballista.

如果您有兴趣加入 DataFusion 社区,请联系我们