读取和写入 Apache Parquet 格式#

Apache Parquet 项目提供了一种标准化的开源列式存储格式,用于数据分析系统。它最初是为了在 Apache Hadoop 中使用而创建的,并且像 Apache Drill、Apache Hive、Apache Impala 和 Apache Spark 这样的系统都将它作为高性能数据 IO 的共享标准。

Apache Arrow 是用于读取或写入 Parquet 文件数据的理想内存内传输层。我们一直在同时开发 Apache Parquet 的 C++ 实现,其中包括一个本地、多线程的 C++ 适配器,用于在内存内 Arrow 数据之间进行转换。PyArrow 包含此代码的 Python 绑定,因此它也能够使用 pandas 读取和写入 Parquet 文件。

获取支持 Parquet 的 pyarrow#

如果您使用 pip 或 conda 安装了 pyarrow,它应该与捆绑的 Parquet 支持一起构建。

In [1]: import pyarrow.parquet as pq

如果您是从源代码构建 pyarrow,则在编译 C++ 库时必须使用 -DARROW_PARQUET=ON,并在构建 pyarrow 时启用 Parquet 扩展。如果您想使用 Parquet 加密,那么在编译 C++ 库时还必须使用 -DPARQUET_REQUIRE_ENCRYPTION=ON。有关更多详细信息,请参阅 Python 开发页面。

读取和写入单个文件#

函数 read_table() 和 write_table() 分别读取和写入 pyarrow.Table 对象。

让我们看一个简单的表格

In [2]: import numpy as np

In [3]: import pandas as pd

In [4]: import pyarrow as pa

In [5]: df = pd.DataFrame({'one': [-1, np.nan, 2.5],
   ...:                    'two': ['foo', 'bar', 'baz'],
   ...:                    'three': [True, False, True]},
   ...:                    index=list('abc'))
   ...: 

In [6]: table = pa.Table.from_pandas(df)

我们使用 write_table 将其写入 Parquet 格式

In [7]: import pyarrow.parquet as pq

In [8]: pq.write_table(table, 'example.parquet')

这将创建一个单个 Parquet 文件。在实践中,Parquet 数据集可能包含许多目录中的许多文件。我们可以使用 read_table 读取单个文件

In [9]: table2 = pq.read_table('example.parquet')

In [10]: table2.to_pandas()
Out[10]: 
   one  two  three
a -1.0  foo   True
b  NaN  bar  False
c  2.5  baz   True

您可以传递要读取的列的子集,这可能比读取整个文件快得多(由于列式布局)

In [11]: pq.read_table('example.parquet', columns=['one', 'three'])
Out[11]: 
pyarrow.Table
one: double
three: bool
----
one: [[-1,null,2.5]]
three: [[true,false,true]]

当从使用 Pandas 数据框作为源的文件中读取列的子集时,我们使用 read_pandas 来维护任何额外的索引列数据

In [12]: pq.read_pandas('example.parquet', columns=['two']).to_pandas()
Out[12]: 
   two
a  foo
b  bar
c  baz

我们不需要使用字符串来指定文件的来源。它可以是以下任何一种

  • 字符串形式的文件路径

  • 来自 PyArrow 的 NativeFile

  • Python 文件对象

通常,Python 文件对象将具有最差的读取性能,而字符串文件路径或 NativeFile 的实例(特别是内存映射)将具有最佳性能。

读取 Parquet 和内存映射#

由于 Parquet 数据需要从 Parquet 格式和压缩中解码,因此无法直接从磁盘映射。因此,memory_map 选项可能在某些系统上性能更好,但不会对驻留内存消耗有很大帮助。

>>> pq_array = pa.parquet.read_table("area1.parquet", memory_map=True)
>>> print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
RSS: 4299MB

>>> pq_array = pa.parquet.read_table("area1.parquet", memory_map=False)
>>> print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
RSS: 4299MB

如果您需要处理大于内存的 Parquet 数据,表格数据集和分区可能是您正在寻找的内容。

Parquet 文件写入选项#

write_table() 有许多选项可以控制写入 Parquet 文件时的各种设置。

  • version,要使用的 Parquet 格式版本。'1.0' 确保与旧的读取器兼容,而 '2.4' 和更大的值将启用更多 Parquet 类型和编码。

  • data_page_size,用于控制列块内编码数据页的近似大小。这目前默认为 1MB。

  • flavor,用于设置特定于 Parquet 使用者的兼容性选项,例如 Apache Spark 的 'spark'。

有关更多详细信息,请参阅 write_table() 文档字符串。

下面描述了一些额外的特定于数据类型处理的选项。

省略 DataFrame 索引#

当使用 pa.Table.from_pandas 转换为 Arrow 表格时,默认情况下会添加一个或多个特殊列来跟踪索引(行标签)。存储索引会占用额外的空间,因此如果您的索引没有价值,您可以选择通过传递 preserve_index=False 来省略它

In [13]: df = pd.DataFrame({'one': [-1, np.nan, 2.5],
   ....:                    'two': ['foo', 'bar', 'baz'],
   ....:                    'three': [True, False, True]},
   ....:                    index=list('abc'))
   ....: 

In [14]: df
Out[14]: 
   one  two  three
a -1.0  foo   True
b  NaN  bar  False
c  2.5  baz   True

In [15]: table = pa.Table.from_pandas(df, preserve_index=False)

然后我们有

In [16]: pq.write_table(table, 'example_noindex.parquet')

In [17]: t = pq.read_table('example_noindex.parquet')

In [18]: t.to_pandas()
Out[18]: 
   one  two  three
0 -1.0  foo   True
1  NaN  bar  False
2  2.5  baz   True

在这里您看到索引没有在往返过程中存活下来。

更细粒度的读取和写入#

read_table 使用 ParquetFile 类,它具有其他功能

In [19]: parquet_file = pq.ParquetFile('example.parquet')

In [20]: parquet_file.metadata
Out[20]: 
<pyarrow._parquet.FileMetaData object at 0x7f15a07eec50>
  created_by: parquet-cpp-arrow version 18.0.0-SNAPSHOT
  num_columns: 4
  num_rows: 3
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 2581

In [21]: parquet_file.schema
Out[21]: 
<pyarrow._parquet.ParquetSchema object at 0x7f15ee75edc0>
required group field_id=-1 schema {
  optional double field_id=-1 one;
  optional binary field_id=-1 two (String);
  optional boolean field_id=-1 three;
  optional binary field_id=-1 __index_level_0__ (String);
}

正如您可以在 Apache Parquet 格式中了解更多的那样,Parquet 文件包含多个行组。read_table 将读取所有行组并将它们连接到一个表中。您可以使用 read_row_group 读取单个行组

In [22]: parquet_file.num_row_groups
Out[22]: 1

In [23]: parquet_file.read_row_group(0)
Out[23]: 
pyarrow.Table
one: double
two: string
three: bool
__index_level_0__: string
----
one: [[-1,null,2.5]]
two: [["foo","bar","baz"]]
three: [[true,false,true]]
__index_level_0__: [["a","b","c"]]

我们可以类似地使用 ParquetWriter 以多个行组的形式写入 Parquet 文件

In [24]: with pq.ParquetWriter('example2.parquet', table.schema) as writer:
   ....:    for i in range(3):
   ....:       writer.write_table(table)
   ....: 

In [25]: pf2 = pq.ParquetFile('example2.parquet')

In [26]: pf2.num_row_groups
Out[26]: 3

检查 Parquet 文件元数据#

Parquet 文件的 FileMetaData 可以通过 ParquetFile 访问,如上所示

In [27]: parquet_file = pq.ParquetFile('example.parquet')

In [28]: metadata = parquet_file.metadata

也可以使用 read_metadata() 直接读取

In [29]: metadata = pq.read_metadata('example.parquet')

In [30]: metadata
Out[30]: 
<pyarrow._parquet.FileMetaData object at 0x7f15a060a480>
  created_by: parquet-cpp-arrow version 18.0.0-SNAPSHOT
  num_columns: 4
  num_rows: 3
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 2581

返回的 FileMetaData 对象允许检查 Parquet 文件元数据,例如行组和列块元数据以及统计信息

In [31]: metadata.row_group(0)
Out[31]: 
<pyarrow._parquet.RowGroupMetaData object at 0x7f15ee7eb2e0>
  num_columns: 4
  num_rows: 3
  total_byte_size: 282
  sorting_columns: ()

In [32]: metadata.row_group(0).column(0)
Out[32]: 
<pyarrow._parquet.ColumnChunkMetaData object at 0x7f15ee7eb5b0>
  file_offset: 0
  file_path: 
  physical_type: DOUBLE
  num_values: 3
  path_in_schema: one
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at 0x7f15ee7eb600>
      has_min_max: True
      min: -1.0
      max: 2.5
      null_count: 1
      distinct_count: None
      num_values: 2
      physical_type: DOUBLE
      logical_type: None
      converted_type (legacy): NONE
  compression: SNAPPY
  encodings: ('PLAIN', 'RLE', 'RLE_DICTIONARY')
  has_dictionary_page: True
  dictionary_page_offset: 4
  data_page_offset: 36
  total_compressed_size: 104
  total_uncompressed_size: 100

数据类型处理#

将类型读取为 DictionaryArray#

read_table 和 ParquetDataset 中的 read_dictionary 选项将导致列被读取为 DictionaryArray,这在转换为 pandas 时将变为 pandas.Categorical。此选项仅对字符串和二进制列类型有效,对于具有许多重复字符串值的列,它可以显着降低内存使用量并提高性能。

pq.read_table(table, where, read_dictionary=['binary_c0', 'stringb_c2'])

存储时间戳#

一些 Parquet 阅读器可能只支持以毫秒 ('ms') 或微秒 ('us') 分辨率存储的时间戳。由于 pandas 使用纳秒来表示时间戳,这偶尔会造成麻烦。默认情况下(在写入版本 1.0 的 Parquet 文件时),纳秒将被转换为微秒 ('us')。

此外,我们提供了 coerce_timestamps 选项,允许您选择所需的分辨率

pq.write_table(table, where, coerce_timestamps='ms')

如果转换为较低分辨率值会导致数据丢失,默认情况下将引发异常。这可以通过传递 allow_truncated_timestamps=True 来抑制

pq.write_table(table, where, coerce_timestamps='ms',
               allow_truncated_timestamps=True)

使用较新的 Parquet 格式版本 2.6 时,可以存储具有纳秒的时间戳而无需转换

pq.write_table(table, where, version='2.6')

但是,许多 Parquet 阅读器尚不支持此较新的格式版本,因此默认情况下是写入版本 1.0 文件。当需要跨不同处理框架的兼容性时,建议使用默认版本 1.0。

较旧的 Parquet 实现使用基于 INT96 的时间戳存储,但这现在已弃用。这包括 Apache Impala 和 Apache Spark 的一些旧版本。要在这种格式中写入时间戳,请在 write_table 中将 use_deprecated_int96_timestamps 选项设置为 True。

pq.write_table(table, where, use_deprecated_int96_timestamps=True)

压缩、编码和文件兼容性#

最常用的 Parquet 实现使用字典编码来写入文件;如果字典变得太大,则会“回退”到纯编码。是否使用字典编码可以通过 use_dictionary 选项来切换。

pq.write_table(table, where, use_dictionary=False)

在行组中,列中的数据页可以在编码过程(字典编码、RLE 编码)完成后进行压缩。在 PyArrow 中,我们默认使用 Snappy 压缩,但也支持 Brotli、Gzip、ZSTD、LZ4 和未压缩。

pq.write_table(table, where, compression='snappy')
pq.write_table(table, where, compression='gzip')
pq.write_table(table, where, compression='brotli')
pq.write_table(table, where, compression='zstd')
pq.write_table(table, where, compression='lz4')
pq.write_table(table, where, compression='none')

Snappy 通常能带来更好的性能,而 Gzip 可能生成更小的文件。

这些设置也可以在列级别进行设置。

pq.write_table(table, where, compression={'foo': 'snappy', 'bar': 'gzip'},
               use_dictionary=['foo', 'bar'])

分区数据集(多个文件)#

多个 Parquet 文件构成一个 Parquet *数据集*。这些文件可能以多种方式呈现。

  • 一个 Parquet 绝对文件路径列表。

  • 一个目录名称,包含定义分区数据集的嵌套目录。

按年份和月份分区的 dataset 在磁盘上的样子可能如下所示。

dataset_name/
  year=2007/
    month=01/
       0.parq
       1.parq
       ...
    month=02/
       0.parq
       1.parq
       ...
    month=03/
    ...
  year=2008/
    month=01/
    ...
  ...

写入分区数据集#

您可以为任何 pyarrow 文件系统写入分区数据集,该文件系统是文件存储(例如本地文件系统、HDFS、S3)。当未添加任何文件系统时,默认行为是使用本地文件系统。

# Local dataset write
pq.write_to_dataset(table, root_path='dataset_name',
                    partition_cols=['one', 'two'])

在这种情况下,根路径指定了要保存数据的父目录。分区列是用于对数据集进行分区的列名。列按给定的顺序进行分区。分区拆分由分区列中的唯一值决定。

要使用其他文件系统,您只需要添加文件系统参数,单个表写入使用 with 语句进行包装,因此无需使用 pq.write_to_dataset 函数。

# Remote file-system example
from pyarrow.fs import HadoopFileSystem
fs = HadoopFileSystem(host, port, user=user, kerb_ticket=ticket_cache_path)
pq.write_to_dataset(table, root_path='dataset_name',
                    partition_cols=['one', 'two'], filesystem=fs)

兼容性说明:如果使用 pq.write_to_dataset 创建一个将被 HIVE 使用的表,则分区列值必须与您运行的 HIVE 版本的允许字符集兼容。

写入 _metadata_common_metadata 文件#

一些处理框架,例如 Spark 或 Dask(可选)使用 _metadata_common_metadata 文件来处理分区数据集。

这些文件包含有关完整数据集架构的信息(对于 _common_metadata),并可能包含分区数据集中所有文件的所有行组元数据(对于 _metadata)。实际的文件只是元数据 Parquet 文件。请注意,这不是 Parquet 标准,而是这些框架在实践中制定的约定。

使用这些文件可以更有效地创建 Parquet 数据集,因为它可以使用存储的架构和所有行组的文件路径,而不是推断架构和遍历所有 Parquet 文件的目录(对于访问文件成本高昂的文件系统尤其如此)。

write_to_dataset() 函数不会自动写入此类元数据文件,但您可以使用它来收集元数据,并手动组合和写入这些元数据。

# Write a dataset and collect metadata information of all written files
metadata_collector = []
pq.write_to_dataset(table, root_path, metadata_collector=metadata_collector)

# Write the ``_common_metadata`` parquet file without row groups statistics
pq.write_metadata(table.schema, root_path / '_common_metadata')

# Write the ``_metadata`` parquet file with row groups statistics of all files
pq.write_metadata(
    table.schema, root_path / '_metadata',
    metadata_collector=metadata_collector
)

当不使用 write_to_dataset() 函数,而是使用 write_table()ParquetWriter 来写入分区数据集的单个文件时,metadata_collector 关键字也可以用于收集写入文件的 FileMetaData。在这种情况下,您需要在组合元数据之前自己确保设置行组元数据中包含的文件路径,并且所有不同文件和收集的 FileMetaData 对象的架构应该相同。

metadata_collector = []
pq.write_table(
    table1, root_path / "year=2017/data1.parquet",
    metadata_collector=metadata_collector
)

# set the file path relative to the root of the partitioned dataset
metadata_collector[-1].set_file_path("year=2017/data1.parquet")

# combine and write the metadata
metadata = metadata_collector[0]
for _meta in metadata_collector[1:]:
    metadata.append_row_groups(_meta)
metadata.write_metadata_file(root_path / "_metadata")

# or use pq.write_metadata to combine and write in a single step
pq.write_metadata(
    table1.schema, root_path / "_metadata",
    metadata_collector=metadata_collector
)

从分区数据集读取#

ParquetDataset 类接受目录名称或文件路径列表,并且可以发现并推断一些常见的分区结构,例如由 Hive 生成的结构。

dataset = pq.ParquetDataset('dataset_name/')
table = dataset.read()

您还可以使用由 pyarrow.parquet 公开的便利函数 read_table,它可以避免额外的 Dataset 对象创建步骤。

table = pq.read_table('dataset_name')

注意:原始表中的分区列在加载时将被转换为 Arrow 字典类型(pandas 分类)。分区列的顺序不会在保存/加载过程中保留。如果从远程文件系统读取到 pandas 数据帧,您可能需要运行 sort_index 来维护行顺序(只要在写入时启用了 preserve_index 选项)。

其他功能

  • 对所有列(使用行组统计信息)进行过滤,而不是只对分区键进行过滤。

  • 细粒度分区:除了类似 Hive 的分区之外,还支持目录分区方案(例如“/2019/11/15/”而不是“/year=2019/month=11/day=15/”),以及指定分区键架构的能力。

注意

  • 当您希望在读取列子集时将分区键包含在结果中时,需要在 columns 关键字中显式包含分区键。

与 Spark 配合使用#

Spark 对其将读取的 Parquet 文件类型有一些限制。选项 flavor='spark' 将自动设置这些选项,并还会清除 Spark SQL 不支持的字段字符。

多线程读取#

默认情况下,每个读取函数都使用多线程来并行读取列。根据 IO 的速度以及解码特定文件中的列的成本(特别是使用 GZIP 压缩时),这可以显着提高数据吞吐量。

可以通过指定 use_threads=False 来禁用此功能。

注意

要使用的线程数量由 Arrow 自动推断,可以使用 cpu_count() 函数进行检查。

从云存储读取#

除了本地文件之外,pyarrow 还支持其他文件系统,例如云文件系统,通过 filesystem 关键字。

from pyarrow import fs

s3  = fs.S3FileSystem(region="us-east-2")
table = pq.read_table("bucket/object/key/prefix", filesystem=s3)

目前,支持 HDFSAmazon S3-compatible storage。有关更多详细信息,请参阅 文件系统接口 文档。对于这些内置文件系统,如果文件路径指定为 URI,则也可以从文件路径推断出文件系统。

table = pq.read_table("s3://bucket/object/key/prefix")

如果存在与 fsspec 兼容的实现,则仍然可以支持其他文件系统。有关更多详细信息,请参阅 使用与 Arrow 兼容的 fsspec 文件系统。一个例子是 Azure Blob 存储,它可以通过 adlfs 包进行接口。

from adlfs import AzureBlobFileSystem

abfs = AzureBlobFileSystem(account_name="XXXX", account_key="XXXX", container_name="XXXX")
table = pq.read_table("file.parquet", filesystem=abfs)

Parquet 模块化加密(列加密)#

从 Apache Arrow 4.0.0 开始,C++ 中的 Parquet 文件支持列加密,从 Apache Arrow 6.0.0 开始,PyArrow 中也支持列加密。

Parquet 使用信封加密方法,其中文件部分使用“数据加密密钥”(DEK)进行加密,而 DEK 使用“主加密密钥”(MEK)进行加密。DEK 由 Parquet 为每个加密文件/列随机生成。MEK 由用户选择的密钥管理服务 (KMS) 生成、存储和管理。

读取和写入加密的 Parquet 文件涉及将文件加密和解密属性分别传递给 ParquetWriterParquetFile

写入加密的 Parquet 文件

encryption_properties = crypto_factory.file_encryption_properties(
                                 kms_connection_config, encryption_config)
with pq.ParquetWriter(filename, schema,
                     encryption_properties=encryption_properties) as writer:
   writer.write_table(table)

读取加密的 Parquet 文件

decryption_properties = crypto_factory.file_decryption_properties(
                                                 kms_connection_config)
parquet_file = pq.ParquetFile(filename,
                              decryption_properties=decryption_properties)

为了创建加密和解密属性,应该创建一个 pyarrow.parquet.encryption.CryptoFactory 并使用 KMS 客户端详细信息进行初始化,如下所示。

KMS 客户端#

主加密密钥应该保存在用户组织中部署的生产级密钥管理系统 (KMS) 中并由其管理。使用 Parquet 加密需要为 KMS 服务器实现一个客户端类。任何 KmsClient 实现都应该实现由 pyarrow.parquet.encryption.KmsClient 定义的非正式接口,如下所示。

import pyarrow.parquet.encryption as pe

class MyKmsClient(pe.KmsClient):

   """An example KmsClient implementation skeleton"""
   def __init__(self, kms_connection_configuration):
      pe.KmsClient.__init__(self)
      # Any KMS-specific initialization based on
      # kms_connection_configuration comes here

   def wrap_key(self, key_bytes, master_key_identifier):
      wrapped_key = ... # call KMS to wrap key_bytes with key specified by
                        # master_key_identifier
      return wrapped_key

   def unwrap_key(self, wrapped_key, master_key_identifier):
      key_bytes = ... # call KMS to unwrap wrapped_key with key specified by
                      # master_key_identifier
      return key_bytes

具体实现将由用户提供的工厂函数在运行时加载。此工厂函数将用于初始化 pyarrow.parquet.encryption.CryptoFactory 以创建文件加密和解密属性。

例如,要使用上面定义的 MyKmsClient

def kms_client_factory(kms_connection_configuration):
   return MyKmsClient(kms_connection_configuration)

crypto_factory = CryptoFactory(kms_client_factory)

可以在 Apache Arrow GitHub 存储库中找到此类类的一个 示例,它适用于开源 KMS。生产 KMS 客户端应与组织的安全管理员合作设计,并由具有访问控制管理经验的开发人员构建。创建此类类后,可以通过工厂方法将其传递给应用程序,并由一般的 PyArrow 用户使用,如上面加密的 parquet 写入/读取示例所示。

KMS 连接配置#

KMS 连接的配置(在创建文件加密和解密属性时使用 pyarrow.parquet.encryption.KmsConnectionConfig)包括以下选项。

  • kms_instance_url,KMS 实例的 URL。

  • kms_instance_id,用于加密的 KMS 实例 ID(如果有多个 KMS 实例可用)。

  • key_access_token,将传递给 KMS 的授权令牌。

  • custom_kms_conf,包含特定于 KMS 类型配置的字符串字典。

加密配置#

pyarrow.parquet.encryption.EncryptionConfiguration(用于创建文件加密属性)包含以下选项

  • footer_key,用于页脚加密/签名的主密钥 ID。

  • column_keys,使用哪些密钥对哪些列进行加密。字典,以主密钥 ID 为键,以列名列表为值,例如 {key1: [col1, col2], key2: [col3]}

  • encryption_algorithm,Parquet 加密算法。可以是 AES_GCM_V1(默认)或 AES_GCM_CTR_V1

  • plaintext_footer,是否以纯文本方式写入文件页脚(否则会加密)。

  • double_wrapping,是否使用双重包装 - 其中数据加密密钥 (DEK) 使用密钥加密密钥 (KEK) 加密,而 KEK 又使用主加密密钥 (MEK) 加密。如果设置为 false,则使用单重包装 - 其中 DEK 直接使用 MEK 加密。

  • cache_lifetime,缓存实体(密钥加密密钥、本地包装密钥、KMS 客户端对象)的生命周期,表示为 datetime.timedelta

  • internal_key_material,是否将密钥材料存储在 Parquet 文件页脚中;此模式不会生成其他文件。如果设置为 false,密钥材料将存储在同一文件夹中的单独文件中,这使得能够对不可变的 Parquet 文件进行密钥轮换。

  • data_key_length_bits,数据加密密钥 (DEK) 的长度,由 Parquet 密钥管理工具随机生成。可以是 128、192 或 256 位。

注意

double_wrapping 为 true 时,Parquet 实现了一种“双重信封加密”模式,最大程度地减少了程序与 KMS 服务器的交互。在此模式下,DEK 使用“密钥加密密钥”(KEK,由 Parquet 随机生成)加密。KEK 使用 KMS 中的“主加密密钥”(MEK)加密;结果和 KEK 本身被缓存到进程内存中。

加密配置示例

encryption_config = pq.EncryptionConfiguration(
   footer_key="footer_key_name",
   column_keys={
      "column_key_name": ["Column1", "Column2"],
   },
)

解密配置#

pyarrow.parquet.encryption.DecryptionConfiguration(用于创建文件解密属性)是可选的,它包含以下选项

  • cache_lifetime,缓存实体(密钥加密密钥、本地包装密钥、KMS 客户端对象)的生命周期,表示为 datetime.timedelta