毫秒级延迟查询 Parquet 数据


已发布 2022 年 12 月 26 日
作者 tustvold 和 alamb

毫秒级延迟查询 Parquet 数据

注意:本文最初发表于 InfluxData 博客

我们认为,直接查询 Apache Parquet 文件中的数据可以实现与大多数专用文件格式相似或更高的存储效率和查询性能。虽然这需要大量的工程投入,但 Parquet 的开放格式和广泛的生态系统支持带来的好处使其成为众多数据系统的明智选择。

在本文中,我们将解释在 Apache Arrow Rust Parquet 读取器 中实现的快速查询 Parquet 格式数据所需的几种高级技术。这些技术共同使 Rust 实现成为最快(如果不是最快)的 Parquet 文件查询实现之一,无论是在本地磁盘还是远程对象存储上。它能够在几毫秒内查询 GB 量级的 Parquet 数据。

我们要感谢并致谢 InfluxData 对这项工作的支持。InfluxData 对开源软件有着深厚而持续的承诺,它赞助了我们撰写这篇博文的大部分时间,以及在构建 InfluxDB IOx 存储引擎 过程中做出的许多贡献。

背景

Apache Parquet 是一种越来越流行的用于存储分析数据集的开放格式,并且已成为经济高效、与数据库管理系统无关的数据存储的事实标准。Parquet 最初是为 Hadoop 生态系统创建的,现在由于其引人注目的组合,其影响力已广泛扩展到数据分析生态系统:

  • 高压缩率
  • 适用于 S3 等商用 Blob 存储
  • 广泛的生态系统和工具支持
  • 可移植到许多不同的平台和工具
  • 支持任意结构化数据

越来越多的其他系统,例如 DuckDBRedshift,允许直接查询存储在 Parquet 中的数据,但与它们的原生(自定义)文件格式相比,支持仍然通常是次要考虑因素。此类格式包括 DuckDB .duckdb 文件格式、Apache IOT TsFileGorilla 格式等。

以前只有闭源商业实现中才可用的相同复杂查询技术,现在首次作为开源提供。所需的工程能力来自拥有全球贡献者社区的大型、运作良好的开源项目,例如 Apache ArrowApache Impala

Parquet 文件格式

在深入研究高效读取Parquet 的细节之前,了解文件布局非常重要。该文件格式经过精心设计,可以快速定位所需信息,跳过不相关的部分,并高效地解码剩余部分。

  • Parquet 文件中的数据被分成称为 RowGroup 的水平切片
  • 每个 RowGroup 包含模式中每个列的一个 ColumnChunk

例如,下图说明了一个 Parquet 文件,其中包含三列“A”、“B”和“C”,存储在两个 RowGroup 中,总共 6 个 ColumnChunk

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┓          ┃
┃┃┌ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ┐┌ ─ ─ ─ ─ ─ ─  ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃│             │             ││              ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃│             │             ││              ┃ RowGroup ┃
┃┃             │                            │ ┃     1    ┃
┃┃│             │             ││              ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃└ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ┘└ ─ ─ ─ ─ ─ ─  ┃          ┃
┃┃ColumnChunk 1  ColumnChunk 2 ColumnChunk 3  ┃          ┃
┃┃ (Column "A")   (Column "B")  (Column "C")  ┃          ┃
┃┗━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┛          ┃
┃┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┓          ┃
┃┃┌ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ┐┌ ─ ─ ─ ─ ─ ─  ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃│             │             ││              ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃│             │             ││              ┃ RowGroup ┃
┃┃             │                            │ ┃     2    ┃
┃┃│             │             ││              ┃          ┃
┃┃             │                            │ ┃          ┃
┃┃└ ─ ─ ─ ─ ─ ─ └ ─ ─ ─ ─ ─ ─ ┘└ ─ ─ ─ ─ ─ ─  ┃          ┃
┃┃ColumnChunk 4  ColumnChunk 5 ColumnChunk 6  ┃          ┃
┃┃ (Column "A")   (Column "B")  (Column "C")  ┃          ┃
┃┗━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━┛          ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

ColumnChunk 的逻辑值使用多种可用编码 之一写入文件中顺序追加的一个或多个数据页。Parquet 文件的末尾是一个页脚,其中包含重要的元数据,例如

  • 文件的模式信息,例如列名和类型
  • 文件中 RowGroupColumnChunk 的位置

页脚还可以包含其他专用数据结构

  • 每个 ColumnChunk 的可选统计信息,包括最小值/最大值和空值计数
  • 指向包含每个单独页面位置的 OffsetIndexes 的可选指针
  • 指向包含每页行数和摘要统计信息的 ColumnIndex 的可选指针
  • 指向 BloomFilterData 的可选指针,可以快速检查值是否存在于 ColumnChunk

例如,上图中 2 个 Row Group 和 6 个 ColumnChunk 的逻辑结构可能存储在 Parquet 文件中,如下图所示(未按比例)。ColumnChunk 的页面首先出现,然后是页脚。数据、编码方案的有效性和 Parquet 编码器的设置决定了每个 ColumnChunk 所需的页面数量和大小。在本例中,ColumnChunk 1 需要 2 页,而 ColumnChunk 6 只需要 1 页。除其他信息外,页脚还包含每个数据页的位置和列的类型。

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 1 ("A")             ◀─┃─ ─ ─│
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │
┃ Data Page for ColumnChunk 1 ("A")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 2 ("B")               ┃     │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │
┃ Data Page for ColumnChunk 3 ("C")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 3 ("C")               ┃     │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │
┃ Data Page for ColumnChunk 3 ("C")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 4 ("A")             ◀─┃─ ─ ─│─ ┐
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │  │
┃ Data Page for ColumnChunk 5 ("B")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │  │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 5 ("B")               ┃     │  │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃     │  │
┃ Data Page for ColumnChunk 5 ("B")               ┃
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃     │  │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐  ┃
┃ Data Page for ColumnChunk 6 ("C")               ┃     │  │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃     │  │
┃┃Footer                                        ┃ ┃
┃┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ ┃     │  │
┃┃ ┃File Metadata                             ┃ ┃ ┃
┃┃ ┃ Schema, etc                              ┃ ┃ ┃     │  │
┃┃ ┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓     ┃ ┃ ┃
┃┃ ┃ ┃Row Group 1 Metadata              ┃     ┃ ┃ ┃     │  │
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓             ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "A" Metadata┃ Location of ┃     ┃ ┃ ┃     │  │
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ first Data  ┣ ─ ─ ╋ ╋ ╋ ─ ─
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ Page, row   ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┃Column "B" Metadata┃ counts,     ┃     ┃ ┃ ┃
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ sizes,      ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ min/max     ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "C" Metadata┃ values, etc ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛             ┃     ┃ ┃ ┃
┃┃ ┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛     ┃ ┃ ┃        │
┃┃ ┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓     ┃ ┃ ┃
┃┃ ┃ ┃Row Group 2 Metadata              ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ Location of ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "A" Metadata┃ first Data  ┃     ┃ ┃ ┃        │
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ Page, row   ┣ ─ ─ ╋ ╋ ╋ ─ ─ ─ ─
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ counts,     ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "B" Metadata┃ sizes,      ┃     ┃ ┃ ┃
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛ min/max     ┃     ┃ ┃ ┃
┃┃ ┃ ┃┏━━━━━━━━━━━━━━━━━━━┓ values, etc ┃     ┃ ┃ ┃
┃┃ ┃ ┃┃Column "C" Metadata┃             ┃     ┃ ┃ ┃
┃┃ ┃ ┃┗━━━━━━━━━━━━━━━━━━━┛             ┃     ┃ ┃ ┃
┃┃ ┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛     ┃ ┃ ┃
┃┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃ ┃
┃┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

创建 Parquet 文件时,有许多重要的标准需要考虑,例如如何优化数据顺序/集群并将数据结构化为 RowGroup 和数据页。这种“物理设计”的考虑很复杂,值得用一系列文章来阐述,本文不作讨论。相反,我们专注于如何使用可用的结构来快速进行查询。

优化查询

在任何查询处理系统中,以下技术通常可以提高性能

  1. 减少必须从辅助存储传输以进行处理的数据量(减少 I/O)
  2. 减少解码数据的计算负载(减少 CPU)
  3. 交错/流水线化数据的读取和解码(提高并行性)

同样的原则也适用于查询 Parquet 文件,如下所述

解码优化

Parquet 通过使用复杂的编码技术(例如行程长度编码、字典编码、增量编码等)来实现令人印象深刻的压缩率。因此,CPU 密集型解码任务可能会主导查询延迟。Parquet 读取器可以使用多种技术来提高此任务的延迟和吞吐量,正如我们在 Rust 实现中所做的那样。

向量化解码

大多数分析系统一次解码多个值到列式内存格式(例如 Apache Arrow),而不是逐行处理数据。这通常称为向量化或列式处理,它是有益的,因为它

  • 摊销调度开销以切换正在解码的列的类型
  • 通过从 ColumnChunk 读取连续值来提高缓存局部性
  • 通常允许在一条指令中解码多个值。
  • 避免使用单个大分配进行许多小的堆分配,从而为字符串和字节数组等可变长度类型节省大量成本

因此,Rust Parquet 读取器实现了专门的解码器,用于将 Parquet 直接读取到列式内存格式(Arrow 数组)中。

流式解码

ColumnChunk 之间哪些行存储在哪些页面中没有关系。例如,第 10,000 行的逻辑值可能在 A 列的第一页和 B 列的第三页中。

最简单的向量化解码方法(通常在 Parquet 解码器中最初实现的方法)是一次解码整个 RowGroup(或 ColumnChunk)。

然而,鉴于 Parquet 的高压缩率,单个 RowGroup 很可能包含数百万行。一次解码这么多行不是最佳的,因为它

  • **需要大量的中间 RAM**:针对处理进行优化的典型内存格式(例如 Apache Arrow)需要的内存比其 Parquet 编码格式多得多。
  • **增加查询延迟**:后续处理步骤(如过滤或聚合)只能在解码整个 RowGroup(或 ColumnChunk)后才能开始。

因此,最好的 Parquet 读取器支持通过按需生成可配置大小的行批次来“流式传输”数据。批大小必须足够大以摊销解码开销,但又要足够小以实现高效的内存使用,并允许下游处理在解码后续批次的同时并发开始。

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃
┃ Data Page for ColumnChunk 1 │◀┃─                   ┌── ─── ─── ─── ─── ┐
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃ │   ┏━━━━━━━┓        ┌ ─ ┐ ┌ ─ ┐ ┌ ─ ┐ │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃     ┃       ┃      │                   │
┃ Data Page for ColumnChunk 1 │ ┃ │   ┃       ┃   ─ ▶│ │   │ │   │ │   │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃  ─ ─┃       ┃─ ┤   │  ─ ─   ─ ─   ─ ─  │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃ │   ┃       ┃           A    B     C   │
┃ Data Page for ColumnChunk 2 │◀┃─    ┗━━━━━━━┛  │   └── ─── ─── ─── ─── ┘
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃ │    Parquet
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃      Decoder   │            ...
┃ Data Page for ColumnChunk 3 │ ┃ │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃                │   ┌── ─── ─── ─── ─── ┐
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃ │                    ┌ ─ ┐ ┌ ─ ┐ ┌ ─ ┐ │
┃ Data Page for ColumnChunk 3 │◀┃─               │   │                   │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃                 ─ ▶│ │   │ │   │ │   │
┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃                    │  ─ ─   ─ ─   ─ ─  │
┃ Data Page for ColumnChunk 3 │ ┃                         A    B     C   │
┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─  ┃                    └── ─── ─── ─── ─── ┘
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

      Parquet file                                    Smaller in memory
                                                         batches for
                                                         processing

尽管流式传输并不是一个难以解释的功能,但解码的有状态特性,尤其是在跨越多列和任意嵌套数据的情况下,行与值之间的关系并不固定,这需要复杂的中间缓冲和大量的工程工作来正确处理。

字典保留

字典编码,也称为分类编码,是一种技术,其中列中的每个值不是直接存储的,而是存储在一个称为“字典”的单独列表中的索引。这种技术实现了第三范式对于具有重复值(低基数)的列的许多好处,并且对于诸如“城市”之类的字符串列特别有效。

ColumnChunk中的第一页可以选择是字典页,其中包含列类型的值列表。此ColumnChunk中的后续页面可以对该字典中的索引进行编码,而不是直接对值进行编码。

鉴于这种编码的有效性,如果Parquet解码器简单地将字典数据解码为原生类型,它将低效地一遍又一遍地复制相同的值,这对字符串数据来说尤其灾难性。为了高效地处理字典编码的数据,必须在解码过程中保留编码。方便的是,许多列式格式,例如Arrow DictionaryArray,都支持这种兼容的编码。

保留字典编码可以极大地提高读取Arrow数组时的性能,在某些情况下超过60倍,并且使用的内存也大大减少。

保留字典的主要复杂因素是字典是按ColumnChunk存储的,因此字典在RowGroup之间会发生变化。读取器必须自动为跨越多个RowGroup的批次重新计算字典,同时还要针对批次大小均匀划分每个RowGroup的行数的情况进行优化。此外,一列可能只部分字典编码,这进一步增加了实现的复杂性。有关此技术及其复杂性的更多信息,请参阅关于将此技术应用于C++ Parquet读取器的博客文章

投影下推

最基本的Parquet优化,也是Parquet文件中描述最多的优化,是*投影下推*,它可以减少I/O和CPU需求。在这种情况下,投影意味着“选择部分而非全部列”。考虑到Parquet如何组织数据,只读取和解码引用列所需的ColumnChunk是很简单的。

例如,考虑以下形式的SQL查询

SELECT B from table where A > 35

此查询只需要A列和B列(而不是C列)的数据,并且可以将投影“下推”到Parquet读取器。

具体来说,使用页脚中的信息,Parquet读取器可以完全跳过获取(I/O)和解码(CPU)存储C列数据的Data Pages(在我们的示例中是ColumnChunk 3和ColumnChunk 6)。

                             ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
                             ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ┌─────▶ Data Page for ColumnChunk 1 ("A") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 1 ("A") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 2 ("B") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       │     ┃ Data Page for ColumnChunk 3 ("C") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
   A query that        │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
  accesses only        │     ┃ Data Page for ColumnChunk 3 ("C") ┃
 columns A and B       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
can read only the      │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
 relevant pages,  ─────┤     ┃ Data Page for ColumnChunk 3 ("C") ┃
skipping any Data      │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
Page for column C      │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 4 ("A") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 5 ("B") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       ├─────▶ Data Page for ColumnChunk 5 ("B") ┃
                       │     ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                       │     ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                       └─────▶ Data Page for ColumnChunk 5 ("B") ┃
                             ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                             ┃┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐┃
                             ┃ Data Page for ColumnChunk 6 ("C") ┃
                             ┃└ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘┃
                             ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

谓词下推

与投影下推类似,**谓词**下推也避免了从Parquet文件获取和解码数据,但它使用的是过滤表达式。这种技术通常需要与查询引擎(如DataFusion)更紧密地集成,以便确定有效的谓词并在扫描期间对其进行评估。不幸的是,如果没有仔细的API设计,Parquet解码器和查询引擎可能会紧密耦合,从而阻止重用(例如,在Cloudera Parquet谓词下推文档中有不同的Impala和Spark实现)。Rust Parquet读取器使用RowSelection API来避免这种耦合。

RowGroup修剪

最简单的谓词下推形式,由许多基于Parquet的查询引擎支持,使用存储在页脚中的统计信息来跳过整个RowGroup。我们将此操作称为RowGroup *修剪*,它类似于许多经典数据仓库系统中的分区修剪

对于上面的示例查询,如果特定RowGroup中A的最大值小于35,则解码器可以跳过从该**整个**RowGroup获取和解码任何ColumnChunk

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃Row Group 1 Metadata                      ┃
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃
┃ ┃Column "A" Metadata    Min:0 Max:15   ┃◀╋ ┐
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃       Using the min
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃ │     and max values
┃ ┃Column "B" Metadata                   ┃ ┃       from the
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃ │     metadata,
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃       RowGroup 1  can
┃ ┃Column "C" Metadata                   ┃ ┃ ├ ─ ─ be entirely
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃       skipped
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ │     (pruned) when
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓       searching for
┃Row Group 2 Metadata                      ┃ │     rows with A >
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃       35,
┃ ┃Column "A" Metadata   Min:10 Max:50   ┃◀╋ ┘
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃
┃ ┃Column "B" Metadata                   ┃ ┃
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┃ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┃
┃ ┃Column "C" Metadata                   ┃ ┃
┃ ┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛

请注意,对最小值和最大值进行修剪对于许多数据布局和列类型都是有效的,但并非所有类型都有效。具体来说,它对具有许多不同的伪随机值(例如标识符或uuid)的列不太有效。幸运的是,对于这种用例,Parquet还支持每个ColumnChunk布隆过滤器。我们正在积极致力于在Apache Rust的实现中添加布隆过滤器支持。

页面修剪

一种更复杂的谓词下推形式使用页脚元数据中的可选页面索引来排除整个Data Pages。解码器只解码其他列中相应的行,通常会跳过整个页面。

由于各种原因,不同ColumnChunk中的页面通常包含不同数量的行,这一事实使这种优化变得复杂。虽然页面索引可以识别一列中所需的页面,但从一列中修剪页面并不能立即排除其他列中的整个页面。

页面修剪过程如下

  • 结合使用谓词和页面索引来识别要跳过的页面
  • 使用偏移量索引来确定哪些行范围对应于未跳过的页面
  • 计算未跳过页面 across 的范围的交集,并仅解码这些行

最后一点实现起来非常复杂,尤其是对于嵌套列表,其中一行可能对应于多个值。幸运的是,Rust Parquet读取器在内部隐藏了这种复杂性,并且可以解码任意的RowSelections

例如,要扫描存储在5个Data Pages中的A列和B列,如下图所示

如果谓词是A > 35

  • 使用页面索引(最大值是20)修剪页面1,留下[200->onwards]的RowSelection,
  • Parquet读取器完全跳过页面3(因为它的最后一行索引是99
  • 通过读取页面2、4和5来读取(仅)相关行。

如果谓词是A > 35 AND B = "F",则页面索引更加有效

  • 使用A > 35,与之前一样,产生[200->onwards]的RowSelection
  • 在B的剩余页面4和页面5上使用B = "F",产生[100-244]的RowSelection
  • 将两个RowSelection相交,得到组合的RowSelection [200-244]
  • Parquet读取器只解码页面2和页面4中的这50行。
┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━
   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┃
┃     ┌──────────────┐  │     ┌──────────────┐  │  ┃
┃  │  │              │     │  │              │     ┃
┃     │              │  │     │     Page     │  │
   │  │              │     │  │      3       │     ┃
┃     │              │  │     │   min: "A"   │  │  ┃
┃  │  │              │     │  │   max: "C"   │     ┃
┃     │     Page     │  │     │ first_row: 0 │  │
   │  │      1       │     │  │              │     ┃
┃     │   min: 10    │  │     └──────────────┘  │  ┃
┃  │  │   max: 20    │     │  ┌──────────────┐     ┃
┃     │ first_row: 0 │  │     │              │  │
   │  │              │     │  │     Page     │     ┃
┃     │              │  │     │      4       │  │  ┃
┃  │  │              │     │  │   min: "D"   │     ┃
┃     │              │  │     │   max: "G"   │  │
   │  │              │     │  │first_row: 100│     ┃
┃     └──────────────┘  │     │              │  │  ┃
┃  │  ┌──────────────┐     │  │              │     ┃
┃     │              │  │     └──────────────┘  │
   │  │     Page     │     │  ┌──────────────┐     ┃
┃     │      2       │  │     │              │  │  ┃
┃  │  │   min: 30    │     │  │     Page     │     ┃
┃     │   max: 40    │  │     │      5       │  │
   │  │first_row: 200│     │  │   min: "H"   │     ┃
┃     │              │  │     │   max: "Z"   │  │  ┃
┃  │  │              │     │  │first_row: 250│     ┃
┃     └──────────────┘  │     │              │  │
   │                       │  └──────────────┘     ┃
┃   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃       ColumnChunk            ColumnChunk         ┃
┃            A                      B
 ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛

在Arrow C++以及扩展的pyarrow/pandas中读取和写入这些索引的支持在PARQUET-1404中进行跟踪。

延迟物化

前两种谓词下推形式仅对解码值之前为RowGroupColumnChunk和Data Pages存储的元数据进行操作。然而,同样的技术也扩展到解码一列或多列的值*之后*但在解码其他列之前的值,这通常被称为“延迟物化”。

当满足以下条件时,此技术特别有效

  • 谓词的选择性非常强,即过滤掉大量行
  • 由于宽行(例如JSON blob)或多列,每行都很大
  • 所选数据聚集在一起
  • 谓词所需的列解码成本相对较低,例如PrimitiveArray / DictionaryArray

SPARK-36527Impala中有关于此技术优势的更多讨论。

例如,给定上面的谓词A > 35 AND B = "F",其中引擎使用页面索引来确定RowSelection [100-244]中只有50行可以匹配,使用延迟物化,Parquet解码器

  • 解码A列的50个值
  • 对这50个值评估A > 35
  • 在这种情况下,只有5行通过, resulting in the RowSelection
    • RowSelection[205-206]
    • RowSelection[238-240]
  • 只解码这些选择中B列的5行
  Row Index
             ┌────────────────────┐            ┌────────────────────┐
       200   │         30         │            │        "F"         │
             └────────────────────┘            └────────────────────┘
                      ...                               ...
             ┌────────────────────┐            ┌────────────────────┐
       205   │         37         │─ ─ ─ ─ ─ ─▶│        "F"         │
             ├────────────────────┤            ├────────────────────┤
       206   │         36         │─ ─ ─ ─ ─ ─▶│        "G"         │
             └────────────────────┘            └────────────────────┘
                      ...                               ...
             ┌────────────────────┐            ┌────────────────────┐
       238   │         36         │─ ─ ─ ─ ─ ─▶│        "F"         │
             ├────────────────────┤            ├────────────────────┤
       239   │         36         │─ ─ ─ ─ ─ ─▶│        "G"         │
             ├────────────────────┤            ├────────────────────┤
       240   │         40         │─ ─ ─ ─ ─ ─▶│        "G"         │
             └────────────────────┘            └────────────────────┘
                      ...                               ...
             ┌────────────────────┐            ┌────────────────────┐
      244    │         26         │            │        "D"         │
             └────────────────────┘            └────────────────────┘


                   Column A                          Column B
                    Values                            Values

在某些情况下,例如在我们的示例中,B存储单字符值,延迟物化机制的成本可能超过解码节省的成本。但是,当满足上面列出的某些条件时,节省的成本可能非常可观。查询引擎必须决定要下推哪些谓词以及按什么顺序应用它们才能获得最佳结果。

虽然这超出了本文档的范围,但相同的技术也可以应用于多个谓词以及多列上的谓词。有关更多信息,请参阅Parquet crate中的RowFilter接口,以及DataFusion中的row_filter实现。

I/O 下推

虽然 Parquet 的设计初衷是为了在 HDFS 分布式文件系统 上实现高效访问,但它与 AWS S3 等商用对象存储系统也能很好地配合使用,因为它们具有非常相似的特性。

  • “随机访问”读取速度相对较慢:每次请求读取大块数据(MB 级)比发出多个请求读取较小部分数据效率更高。
  • 检索第一个字节之前的延迟很大。
  • 每次请求成本高:通常按请求计费,无论读取的字节数多少,这促使减少请求次数,每次读取一大块连续的数据。

为了从这样的系统中进行最佳读取,Parquet 读取器必须:

  1. 最大限度地减少 I/O 请求次数,同时应用各种下推技术,避免获取大量未使用的数据。
  2. 与适当的任务调度机制集成,将 I/O 和对已获取数据的处理交错进行,避免流水线瓶颈。

由于这些是巨大的工程和集成挑战,许多 Parquet 读取器仍然需要将文件完整地获取到本地存储。

为了处理文件而获取整个文件并非理想选择,原因如下:

  1. 高延迟:在获取整个文件之前无法开始解码(Parquet 元数据位于文件的末尾,因此解码器必须先看到文件末尾才能解码其余部分)。
  2. 浪费工作:获取整个文件会获取所有必要的数据,但也可能获取大量不必要的数据,这些数据在读取页脚后会被跳过。这会不必要地增加成本。
  3. 需要昂贵的“本地连接”存储(或内存):许多云环境不提供具有本地连接存储的计算资源——它们要么依赖昂贵的网络块存储(如 AWS EBS),要么将本地存储限制在某些类型的虚拟机上。

避免缓冲整个文件的需求需要一个复杂的 Parquet 解码器,它与 I/O 子系统集成,可以首先获取和解码元数据,然后对相关数据块进行范围获取,并与 Parquet 数据的解码交错进行。这种优化需要谨慎的工程设计,以便从对象存储中获取足够大的数据块,从而使每次请求的开销不会超过减少传输字节数带来的收益。SPARK-36529 更详细地描述了顺序处理的挑战。

                       ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
                                                                │
                       │
               Step 1: Fetch                                    │
 Parquet       Parquet metadata
 file on ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━▼━━━━━━━┓
 Remote  ┃      ▒▒▒▒▒▒▒▒▒▒          ▒▒▒▒▒▒▒▒▒▒               ░░░░░░░░░░ ┃
 Object  ┃      ▒▒▒data▒▒▒          ▒▒▒data▒▒▒               ░metadata░ ┃
  Store  ┃      ▒▒▒▒▒▒▒▒▒▒          ▒▒▒▒▒▒▒▒▒▒               ░░░░░░░░░░ ┃
         ┗━━━━━━━━━━━▲━━━━━━━━━━━━━━━━━━━━━▲━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
                     │                     └ ─ ─ ─
                                                  │
                     │                   Step 2: Fetch only
                      ─ ─ ─ ─ ─ ─ ─ ─ ─ relevant data blocks

此图中未包含合并请求和确保实际实现所需的最小请求大小等细节。

Rust Parquet crate 提供了一个异步 Parquet 读取器,可以高效地从任何 AsyncFileReader 读取,该读取器可以:

  • 高效地从任何支持范围请求的存储介质读取。
  • 与 Rust 的 futures 生态系统集成,避免线程阻塞等待网络 I/O,并且可以轻松地交错 CPU 和网络操作
  • 同时请求多个范围,允许实现合并相邻范围、并行获取范围等。
  • 使用前面描述的下推技术,尽可能避免获取数据。
  • 与 Apache Arrow object_store crate 轻松集成,您可以在此处阅读更多相关信息。

为了了解什么是可能的,下图显示了从远程文件获取页脚元数据、使用该元数据确定要读取的数据页,然后同时获取数据和解码的时间线。为了匹配网络延迟、带宽和可用 CPU,通常必须对多个文件同时执行此过程。

                           begin
          metadata        read of   end read
            read  ─ ─ ─ ┐   data    of data          │
 begin    complete         block     block
read of                 │   │        │               │
metadata  ─ ─ ─ ┐                                       At any time, there are
             │          │   │        │               │     multiple network
             │  ▼       ▼   ▼        ▼                  requests outstanding to
  file 1     │ ░░░░░░░░░░   ▒▒▒read▒▒▒   ▒▒▒read▒▒▒  │    hide the individual
             │ ░░░read░░░   ▒▒▒data▒▒▒   ▒▒▒data▒▒▒        request latency
             │ ░metadata░                         ▓▓decode▓▓
             │ ░░░░░░░░░░                         ▓▓▓data▓▓▓
             │                                       │
             │
             │ ░░░░░░░░░░  ▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒read▒▒▒▒│▒▒▒▒▒▒▒▒▒▒▒▒▒▒
   file 2    │ ░░░read░░░  ▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒data▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒
             │ ░metadata░                            │              ▓▓▓▓▓decode▓▓▓▓▓▓
             │ ░░░░░░░░░░                                           ▓▓▓▓▓▓data▓▓▓▓▓▓▓
             │                                       │
             │
             │                                     ░░│░░░░░░░  ▒▒▒read▒▒▒  ▒▒▒▒read▒▒▒▒▒
   file 3    │                                     ░░░read░░░  ▒▒▒data▒▒▒  ▒▒▒▒data▒▒▒▒▒      ...
             │                                     ░m│tadata░            ▓▓decode▓▓
             │                                     ░░░░░░░░░░            ▓▓▓data▓▓▓
             └───────────────────────────────────────┼──────────────────────────────▶Time


                                                     │

结论

希望您喜欢阅读有关 Parquet 文件格式以及用于快速查询 Parquet 文件的各种技术的内容。

我们认为,大多数开源 Parquet 实现没有本文所述的广泛功能的原因是,这需要付出巨大的努力,而这在以前只有资金雄厚的商业企业才能做到,而这些企业将其开发的Parquet一直闭源。

然而,随着 Apache Arrow 社区的发展壮大以及质量的提高,Rust 从业者和更广泛的 Arrow 社区,我们合作构建尖端开源实现的能力令人振奋,并且获得了巨大的满足感。本博客中描述的技术是许多工程师在多个代码库(特别是 Apache Arrow DataFusionApache ArrowApache Arrow Ballista)中共同努力的成果。

如果您有兴趣加入 DataFusion 社区,请联系我们