读写 Apache Parquet 格式#

Apache Parquet 项目提供了一种标准化的开源列式存储格式,用于数据分析系统。它最初是为了在 Apache Hadoop 中使用而创建的,随后被 Apache DrillApache HiveApache ImpalaApache Spark 等系统采用,作为高性能数据 IO 的共享标准。

Apache Arrow 是处理 Parquet 文件读取或写入时理想的内存中传输层。我们一直在同步开发 Apache Parquet 的 C++ 实现,其中包括一个原生的、支持多线程的 C++ 适配器,用于实现与内存中 Arrow 数据之间的互转。PyArrow 包含了针对该代码的 Python 绑定,因此也可以通过 pandas 读取和写入 Parquet 文件。

获取支持 Parquet 的 pyarrow#

如果您通过 pip 或 conda 安装了 pyarrow,它应该已经内置了对 Parquet 的支持。

>>> import pyarrow.parquet as pq

如果您是从源码构建 pyarrow,则在编译 C++ 库时必须使用 -DARROW_PARQUET=ON,并在构建 pyarrow 时启用 Parquet 扩展。如果您想使用 Parquet 加密,则在编译 C++ 库时还必须使用 -DPARQUET_REQUIRE_ENCRYPTION=ON。有关详细信息,请参阅 Python 开发 页面。

读写单个文件#

函数 read_table()write_table() 分别用于读取和写入 pyarrow.Table 对象。

让我们看一个简单的表格

>>> import numpy as np
>>> import pandas as pd
>>> import pyarrow as pa
>>> df = pd.DataFrame({'one': [-1, np.nan, 2.5],
...                    'two': ['foo', 'bar', 'baz'],
...                    'three': [True, False, True]},
...                    index=list('abc'))
>>> table = pa.Table.from_pandas(df)

我们使用 write_table 将其写入 Parquet 格式

>>> import pyarrow.parquet as pq
>>> pq.write_table(table, 'example.parquet')

这会创建一个单一的 Parquet 文件。在实践中,Parquet 数据集可能由多个目录中的许多文件组成。我们可以使用 read_table 读取单个文件

>>> table2 = pq.read_table('example.parquet')
>>> table2.to_pandas()
   one  two  three
a -1.0  foo   True
b  NaN  bar  False
c  2.5  baz   True

您可以传递要读取的列的子集,这比读取整个文件要快得多(得益于列式布局)

>>> pq.read_table('example.parquet', columns=['one', 'three'])
pyarrow.Table
one: double
three: bool
----
one: [[-1,null,2.5]]
three: [[true,false,true]]

当从以 Pandas 数据框作为源的文件中读取列的子集时,我们使用 read_pandas 来保留任何额外的索引列数据

>>> pq.read_pandas('example.parquet', columns=['two']).to_pandas()
   two
a  foo
b  bar
c  baz

我们不需要使用字符串来指定文件的来源。它可以是以下任意一种:

  • 作为字符串的文件路径

  • 来自 PyArrow 的 NativeFile

  • 一个 Python 文件对象

通常情况下,Python 文件对象的读取性能最差,而字符串文件路径或 NativeFile 实例(尤其是内存映射文件)的性能最佳。

读取 Parquet 与内存映射#

由于 Parquet 数据需要从 Parquet 格式和压缩中解码,因此不能直接从磁盘进行映射。因此,memory_map 选项在某些系统上可能会表现更好,但对常驻内存的消耗帮助不大。

>>> pq_array = pa.parquet.read_table(path, memory_map=True)
>>> print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
RSS: 4299MB

>>> pq_array = pa.parquet.read_table(path, memory_map=False)
>>> print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
RSS: 4299MB

如果您需要处理比内存更大的 Parquet 数据,表格数据集和分区功能可能是您需要的。

Parquet 文件写入选项#

write_table() 有许多选项来控制写入 Parquet 文件时的各种设置。

  • version:要使用的 Parquet 格式版本。'1.0' 确保与旧版读取器兼容,而 '2.4' 及更高版本启用了更多的 Parquet 类型和编码。

  • data_page_size:控制列块内编码数据页的近似大小。目前默认为 1MB。

  • flavor:设置特定于 Parquet 消费者的兼容性选项,例如 'spark' 用于 Apache Spark。

有关更多详细信息,请参阅 write_table() 的文档字符串。

下面描述了一些额外的数据类型处理专用选项。

省略 DataFrame 索引#

当使用 pa.Table.from_pandas 转换为 Arrow 表时,默认情况下会添加一列或多列特殊列来跟踪索引(行标签)。存储索引会占用额外空间,因此如果您的索引没有价值,您可以传递 preserve_index=False 来选择省略它。

>>> df = pd.DataFrame({'one': [-1, np.nan, 2.5],
...                    'two': ['foo', 'bar', 'baz'],
...                    'three': [True, False, True]},
...                    index=list('abc'))
>>> table = pa.Table.from_pandas(df, preserve_index=False)

然后我们有

>>> pq.write_table(table, 'example_noindex.parquet')
>>> t = pq.read_table('example_noindex.parquet')
>>> t.to_pandas()
   one  two  three
0 -1.0  foo   True
1  NaN  bar  False
2  2.5  baz   True

在这里您可以看到索引在往返过程中没有保留下来。

更细粒度的读写#

read_table 使用 ParquetFile 类,该类具有其他功能。

>>> parquet_file = pq.ParquetFile('example.parquet')
>>> parquet_file.metadata
<pyarrow._parquet.FileMetaData object at ...>
  created_by: parquet-cpp-arrow version ...
  num_columns: 4
  num_rows: 3
  num_row_groups: 1
  format_version: 2.6
  serialized_size: ...
>>> parquet_file.schema
<pyarrow._parquet.ParquetSchema object at ...>
required group field_id=-1 schema {
  optional double field_id=-1 one;
  optional binary field_id=-1 two (String);
  optional boolean field_id=-1 three;
  optional binary field_id=-1 __index_level_0__ (String);
}

正如您可以在 Apache Parquet 格式中了解到的那样,一个 Parquet 文件由多个行组组成。read_table 将读取所有行组并将它们连接成一个表。您可以使用 read_row_group 读取单个行组。

>>> parquet_file.num_row_groups
1
>>> parquet_file.read_row_group(0)
pyarrow.Table
one: double
two: large_string
three: bool
__index_level_0__: large_string
----
one: [[-1,null,2.5]]
two: [["foo","bar","baz"]]
three: [[true,false,true]]
__index_level_0__: [["a","b","c"]]

同样,我们可以使用 ParquetWriter 编写一个具有多个行组的 Parquet 文件。

>>> with pq.ParquetWriter('example2.parquet', table.schema) as writer:
...     for i in range(3):
...         writer.write_table(table)
>>> pf2 = pq.ParquetFile('example2.parquet')
>>> pf2.num_row_groups
3

检查 Parquet 文件元数据#

Parquet 文件的 FileMetaData 可以通过如上所示的 ParquetFile 访问。

>>> parquet_file = pq.ParquetFile('example.parquet')
>>> metadata = parquet_file.metadata
>>> metadata
<pyarrow._parquet.FileMetaData object at ...>
  created_by: parquet-cpp-arrow version ...
  num_columns: 4
  num_rows: 3
  num_row_groups: 1
  format_version: 2.6
  serialized_size: ...

或者也可以使用 read_metadata() 直接读取。

>>> metadata = pq.read_metadata('example.parquet')
>>> metadata
<pyarrow._parquet.FileMetaData object at ...>
  created_by: parquet-cpp-arrow version ...
  num_columns: 4
  num_rows: 3
  num_row_groups: 1
  format_version: 2.6
  serialized_size: ...

返回的 FileMetaData 对象允许检查 Parquet 文件元数据,例如行组和列块元数据及统计信息。

>>> metadata.row_group(0)
<pyarrow._parquet.RowGroupMetaData object at ...>
  num_columns: 4
  num_rows: 3
  total_byte_size: 290
  sorting_columns: ()
>>> metadata.row_group(0).column(0)
<pyarrow._parquet.ColumnChunkMetaData object at ...>
  file_offset: 0
  file_path:...
  physical_type: DOUBLE
  num_values: 3
  path_in_schema: one
  is_stats_set: True
  statistics:
    <pyarrow._parquet.Statistics object at ...>
      has_min_max: True
      min: -1.0
      max: 2.5
      null_count: 1
      distinct_count: None
      num_values: 2
      physical_type: DOUBLE
      logical_type: None
      converted_type (legacy): NONE
  geo_statistics:
    None
  compression: SNAPPY
  encodings: ('PLAIN', 'RLE', 'RLE_DICTIONARY')
  has_dictionary_page: True
  dictionary_page_offset: 4
  data_page_offset: 36
  total_compressed_size: 106
  total_uncompressed_size: 102

数据类型处理#

将类型读取为 DictionaryArray#

read_tableParquetDataset 中的 read_dictionary 选项将导致列被读取为 DictionaryArray,在转换为 pandas 时将变为 pandas.Categorical。此选项仅对字符串和二进制列类型有效,对于具有许多重复字符串值的列,它可以显著降低内存使用并提高性能。

>>> pq.read_table('example.parquet', read_dictionary=['two'])
pyarrow.Table
one: double
two: dictionary<values=string, indices=int32, ordered=0>
three: bool
__index_level_0__: large_string
----
one: [[-1,null,2.5]]
two: [  -- dictionary:
["foo","bar","baz"]  -- indices:
[0,1,2]]
three: [[true,false,true]]
__index_level_0__: [["a","b","c"]]

存储时间戳#

某些 Parquet 读取器可能仅支持以毫秒 ('ms') 或微秒 ('us') 分辨率存储的时间戳。由于 pandas 使用纳秒来表示时间戳,这有时会造成麻烦。默认情况下(写入 1.0 版本 Parquet 文件时),纳秒将被转换为微秒('us')。

此外,我们提供 coerce_timestamps 选项,允许您选择所需的分辨率。

>>> pq.write_table(table, 'example.parquet', coerce_timestamps='ms')

如果转换为较低分辨率的值可能导致数据丢失,则默认会引发异常。可以通过传递 allow_truncated_timestamps=True 来禁止此操作。

>>> pq.write_table(table, 'example.parquet', coerce_timestamps='ms',
...                allow_truncated_timestamps=True)

使用较新的 Parquet 格式版本 2.6 时,可以不经过转换直接存储纳秒时间戳。

>>> pq.write_table(table, 'example.parquet', version='2.6')

然而,许多 Parquet 读取器尚不支持此较新的格式版本,因此默认值为写入 1.0 版本的文件。当需要跨不同处理框架的兼容性时,建议使用默认的 1.0 版本。

较旧的 Parquet 实现使用基于 INT96 的时间戳存储,但这现在已被弃用。这包括一些旧版本的 Apache Impala 和 Apache Spark。要以这种格式写入时间戳,请在 write_table 中将 use_deprecated_int96_timestamps 选项设置为 True

>>> pq.write_table(table, 'example.parquet', use_deprecated_int96_timestamps=True)

压缩、编码和文件兼容性#

最常用的 Parquet 实现会在写入文件时使用字典编码;如果字典增长过大,它们会“回退”到普通编码。是否使用字典编码可以通过 use_dictionary 选项进行切换。

>>> pq.write_table(table, 'example.parquet', use_dictionary=False)

行组中列内的数据页可以在编码过程(字典、RLE 编码)之后进行压缩。在 PyArrow 中,我们默认使用 Snappy 压缩,但也支持 Brotli、Gzip、ZSTD、LZ4 和未压缩格式。

>>> pq.write_table(table, 'example.parquet', compression='snappy')
>>> pq.write_table(table, 'example.parquet', compression='gzip')
>>> pq.write_table(table, 'example.parquet', compression='brotli')
>>> pq.write_table(table, 'example.parquet', compression='zstd')
>>> pq.write_table(table, 'example.parquet', compression='lz4')
>>> pq.write_table(table, 'example.parquet', compression='none')

Snappy 通常会带来更好的性能,而 Gzip 可能会产生更小的文件。

'lz4_raw' 也被接受作为 'lz4' 的别名。两者均使用 Parquet 规范中定义的 LZ4_RAW 编解码器。

这些设置也可以按列进行设置。

>>> pq.write_table(table, 'example.parquet', compression={'one': 'snappy', 'two': 'gzip'},
...                use_dictionary=['one', 'two'])

分区数据集(多个文件)#

多个 Parquet 文件构成一个 Parquet 数据集。它们可能以多种方式呈现:

  • Parquet 绝对文件路径的列表

  • 包含定义分区数据集的嵌套目录的目录名称

按年和月分区的数据集在磁盘上看起来可能是这样的

dataset_name/
  year=2007/
    month=01/
       0.parq
       1.parq
       ...
    month=02/
       0.parq
       1.parq
       ...
    month=03/
    ...
  year=2008/
    month=01/
    ...
  ...

写入分区数据集#

您可以为任何属于文件存储的 pyarrow 文件系统(例如本地、HDFS、S3)编写分区数据集。未添加文件系统时的默认行为是使用本地文件系统。

>>> # Local dataset write
>>> pq.write_to_dataset(table, root_path='dataset_name',
...                     partition_cols=['one', 'two'])

在这种情况下,根路径指定了保存数据的父目录。分区列是用于对数据集进行分区的列名。列按照给定的顺序进行分区。分区拆分由分区列中的唯一值决定。

要使用其他文件系统,您只需要添加 filesystem 参数,单个表格写入使用 with 语句进行封装,因此 pq.write_to_dataset 函数无需这样做。

>>> # Remote file-system example
>>> from pyarrow.fs import HadoopFileSystem
>>> fs = HadoopFileSystem(host, port, user=user, kerb_ticket=ticket_cache_path)
>>> pq.write_to_dataset(table, root_path='dataset_name',
...                     partition_cols=['one', 'two'], filesystem=fs)

兼容性说明:如果使用 pq.write_to_dataset 创建一个稍后将由 HIVE 使用的表,则分区列值必须与您运行的 HIVE 版本允许的字符集兼容。

写入 _metadata_common_metadata 文件#

某些处理框架(如 Spark 或 Dask)会(可选地)对分区数据集使用 _metadata_common_metadata 文件。

这些文件包含有关整个数据集架构的信息(对于 _common_metadata),以及可能包含分区数据集中所有文件的所有行组元数据(对于 _metadata)。实际的文件是仅包含元数据的 Parquet 文件。请注意,这并非 Parquet 标准,而是这些框架在实践中设定的一种约定。

使用这些文件可以更有效地创建 Parquet 数据集,因为它可以使用存储的架构和所有行组的文件路径,而不是推断架构并为所有 Parquet 文件爬取目录(对于文件访问昂贵的文件系统尤其如此)。

write_to_dataset() 函数不会自动写入此类元数据文件,但您可以使用它来收集元数据并手动组合和写入它们。

>>> # Write a dataset and collect metadata information of all written files
>>> metadata_collector = []
>>> root_path = "dataset_name_1"
>>> pq.write_to_dataset(table, root_path, metadata_collector=metadata_collector)

>>> # Write the ``_common_metadata`` parquet file without row groups statistics
>>> pq.write_metadata(table.schema, root_path + '/_common_metadata')

>>> # Write the ``_metadata`` parquet file with row groups statistics of all files
>>> pq.write_metadata(
...     table.schema, root_path + '/_metadata',
...     metadata_collector=metadata_collector
... )

当不使用 write_to_dataset() 函数,而是使用 write_table()ParquetWriter 编写分区数据集的单个文件时,也可以使用 metadata_collector 关键字来收集已写入文件的 FileMetaData。在这种情况下,您需要确保在合并元数据之前自己设置包含在行组元数据中的文件路径,并且所有不同文件的架构和收集的 FileMetaData 对象应该是相同的。

>>> import os
>>> os.mkdir("year=2017")

>>> metadata_collector = []
>>> pq.write_table(
...     table, "year=2017/data1.parquet",
...     metadata_collector=metadata_collector
... )

>>> # set the file path relative to the root of the partitioned dataset
>>> metadata_collector[-1].set_file_path("year=2017/data1.parquet")

>>> # combine and write the metadata
>>> metadata = metadata_collector[0]
>>> for _meta in metadata_collector[1:]:
...     metadata.append_row_groups(_meta)
>>> metadata.write_metadata_file("_metadata")

>>> # or use pq.write_metadata to combine and write in a single step
>>> pq.write_metadata(
...     table.schema, "_metadata",
...     metadata_collector=metadata_collector
... )

>>> pq.read_metadata("_metadata")
<pyarrow._parquet.FileMetaData object at ...>
  created_by: parquet-cpp-arrow version ...
  num_columns: 3
  num_rows: 3
  num_row_groups: 1
  format_version: 2.6
  serialized_size: ...

从分区数据集读取#

ParquetDataset 类接受目录名称或文件路径列表,并且可以发现和推断一些常见的分区结构,例如 Hive 产生的分区结构。

>>> dataset = pq.ParquetDataset('dataset_name/')
>>> table = dataset.read()
>>> table
pyarrow.Table
three: bool
one: dictionary<values=string, indices=int32, ordered=0>
two: dictionary<values=string, indices=int32, ordered=0>
----
three: [[true],[true],[false]]
one: [  -- dictionary:
["-1","2.5"]  -- indices:
[0],  -- dictionary:
["-1","2.5"]  -- indices:
[1],  -- dictionary:
[null]  -- indices:
[0]]
two: [  -- dictionary:
["foo","baz","bar"]  -- indices:
[0],  -- dictionary:
["foo","baz","bar"]  -- indices:
[1],  -- dictionary:
["foo","baz","bar"]  -- indices:
[2]]

您也可以使用 pyarrow.parquet 提供的便捷函数 read_table,它避免了额外创建数据集对象的需要。

>>> table = pq.read_table('dataset_name')

注意:原始表中的分区列在加载时其类型将转换为 Arrow 字典类型(pandas 分类)。分区列的顺序在保存/加载过程中不会被保留。如果从远程文件系统读取到 pandas 数据框中,您可能需要运行 sort_index 来保持行顺序(前提是在写入时启用了 preserve_index 选项)。

其他功能

  • 对所有列进行过滤(使用行组统计信息),而不仅仅是分区键。

  • 细粒度分区:支持除 Hive 风格分区之外的目录分区方案(例如 “/2019/11/15/” 而不是 “/year=2019/month=11/day=15/”),并能够为分区键指定架构。

注意

  • 当您希望在读取列子集时将分区键包含在结果中时,需要将分区键显式包含在 columns 关键字中。

与 Spark 一起使用#

Spark 对其读取的 Parquet 文件类型设置了一些限制。选项 flavor='spark' 将自动设置这些选项,并清理 Spark SQL 不支持的字段字符。

多线程读取#

每个读取函数默认都使用多线程来并行读取列。根据 IO 的速度以及解码特定文件中列的开销(特别是使用 GZIP 压缩时),这可以显著提高数据吞吐量。

可以通过指定 use_threads=False 来禁用此功能。

注意

并发使用的线程数由 Arrow 自动推断,可以使用 cpu_count() 函数进行检查。

从云存储读取#

除了本地文件外,pyarrow 还通过 filesystem 关键字支持其他文件系统,例如云文件系统。

>>> from pyarrow import fs

>>> s3  = fs.S3FileSystem(region="us-east-2")
>>> table = pq.read_table("bucket/object/key/prefix", filesystem=s3)

目前,支持 HDFSAmazon S3 兼容存储。有关更多详细信息,请参阅 文件系统接口 文档。对于这些内置文件系统,如果指定为 URI,文件系统也可以从文件路径推断出来。

>>> table = pq.read_table("s3://bucket/object/key/prefix")

如果存在可用的 fsspec 兼容实现,其他文件系统仍然可以被支持。有关更多详细信息,请参阅 将 fsspec 兼容文件系统与 Arrow 结合使用。一个例子是 Azure Blob 存储,它可以通过 adlfs 包进行接口连接。

>>> from adlfs import AzureBlobFileSystem
>>> abfs = AzureBlobFileSystem(account_name="XXXX", account_key="XXXX", container_name="XXXX")
>>> table = pq.read_table("file.parquet", filesystem=abfs)

Parquet 模块化加密(列式加密)#

C++ 从 Apache Arrow 4.0.0 开始支持 Parquet 文件的列式加密,PyArrow 从 Apache Arrow 6.0.0 开始支持。

Parquet 使用信封加密实践,其中文件部分使用“数据加密密钥 (DEKs)”加密,而 DEKs 使用“主加密密钥 (MEKs)”加密。DEKs 由 Parquet 为每个加密文件/列随机生成。MEKs 在用户选择的密钥管理服务 (KMS) 中生成、存储和管理。

读取和写入加密的 Parquet 文件涉及分别向 ParquetWriterParquetFile 传递文件加密和解密属性。

写入加密的 Parquet 文件

>>> encryption_properties = crypto_factory.file_encryption_properties(
...                                  kms_connection_config, encryption_config)
>>> with pq.ParquetWriter(filename, schema,
...                      encryption_properties=encryption_properties) as writer:
...    writer.write_table(table)

读取加密的 Parquet 文件

>>> decryption_properties = crypto_factory.file_decryption_properties(
...                                                  kms_connection_config)
>>> parquet_file = pq.ParquetFile(filename,
...                               decryption_properties=decryption_properties)

为了创建加密和解密属性,应创建并使用 KMS 客户端详细信息初始化 pyarrow.parquet.encryption.CryptoFactory,如下所述。

KMS 客户端#

主加密密钥应保存在部署在用户组织中的生产级密钥管理系统 (KMS) 中并进行管理。使用 Parquet 加密需要为 KMS 服务器实现客户端类。任何 KmsClient 实现都应实现由 pyarrow.parquet.encryption.KmsClient 定义的非正式接口,如下所示。

>>> import pyarrow.parquet.encryption as pe
>>> class MyKmsClient(pe.KmsClient):
...
...    """An example KmsClient implementation skeleton"""
...    def __init__(self, kms_connection_configuration):
...       pe.KmsClient.__init__(self)
...       # Any KMS-specific initialization based on
...       # kms_connection_configuration comes here
...
...    def wrap_key(self, key_bytes, master_key_identifier):
...       wrapped_key = ... # call KMS to wrap key_bytes with key specified by
...                         # master_key_identifier
...       return wrapped_key
...
...    def unwrap_key(self, wrapped_key, master_key_identifier):
...       key_bytes = ... # call KMS to unwrap wrapped_key with key specified by
...                       # master_key_identifier
...       return key_bytes

具体实现将在运行时由用户提供的工厂函数加载。此工厂函数将用于初始化 pyarrow.parquet.encryption.CryptoFactory,以创建文件加密和解密属性。

例如,为了使用上面定义的 MyKmsClient

>>> def kms_client_factory(kms_connection_configuration):
...    return MyKmsClient(kms_connection_configuration)

>>> crypto_factory = pe.CryptoFactory(kms_client_factory)

可以在 Apache Arrow GitHub 存储库中找到这种用于开源 KMS 的类的 示例。生产 KMS 客户端应在与组织的安全管理员协作下进行设计,并由具有访问控制管理经验的开发人员构建。一旦创建了此类,它就可以通过工厂方法传递给应用程序,并像上面的加密 Parquet 读/写示例那样被一般 PyArrow 用户所利用。

KMS 连接配置#

连接到 KMS 的配置(创建文件加密和解密属性时使用的 pyarrow.parquet.encryption.KmsConnectionConfig)包括以下选项:

  • kms_instance_url:KMS 实例的 URL。

  • kms_instance_id:将用于加密的 KMS 实例 ID(如果有多个 KMS 实例可用)。

  • key_access_token:将传递给 KMS 的授权令牌。

  • custom_kms_conf:具有 KMS 类型特定配置的字符串字典。

加密配置#

pyarrow.parquet.encryption.EncryptionConfiguration(创建文件加密属性时使用)包括以下选项:

  • footer_key:用于页脚加密/签名的主密钥 ID。

  • column_keys:哪些列使用哪个密钥加密。字典,其中主密钥 ID 为键,列名列表为值,例如 {key1: [col1, col2], key2: [col3]}。请参阅下面关于嵌套字段的说明。

  • encryption_algorithm:Parquet 加密算法。可以是 AES_GCM_V1(默认)或 AES_GCM_CTR_V1

  • plaintext_footer:是否以纯文本格式写入文件页脚(否则将被加密)。

  • double_wrapping:是否使用双重包装——即数据加密密钥 (DEKs) 使用密钥加密密钥 (KEKs) 加密,而 KEKs 依次使用主加密密钥 (MEKs) 加密。如果设置为 false,则使用单一包装——DEKs 直接使用 MEKs 加密。

  • cache_lifetime:缓存实体(密钥加密密钥、本地包装密钥、KMS 客户端对象)的生存期,表示为 datetime.timedelta

  • internal_key_material:是否将密钥材料存储在 Parquet 文件页脚中;此模式不会生成额外文件。如果设置为 false,密钥材料将存储在同一文件夹中的单独文件中,这使得可以对不可变的 Parquet 文件进行密钥轮换。

  • data_key_length_bits:由 Parquet 密钥管理工具随机生成的数据加密密钥 (DEKs) 的长度。可以是 128、192 或 256 位。

注意

double_wrapping 为 true 时,Parquet 实现了一种“双重信封加密”模式,最大限度地减少了程序与 KMS 服务器的交互。在此模式下,DEKs 使用 KEKs 加密(由 Parquet 随机生成)。KEKs 在 KMS 中使用 MEKs 加密;结果和 KEK 本身被缓存在进程内存中。

示例加密配置

>>> encryption_config = pe.EncryptionConfiguration(
...    footer_key="footer_key_name",
...    column_keys={
...       "column_key_name": ["Column1", "Column2"],
...    },
... )

注意

具有嵌套字段(结构或映射数据类型)的列可以作为一个整体进行加密,或者仅对单个字段进行加密。为根列名配置加密密钥以使用该密钥加密所有嵌套字段,或为单个叶嵌套字段配置密钥。

按照惯例,映射列 m 的键和值字段名称分别为 m.key_value.keym.key_value.value。结构列 s 的内部字段 f 的名称为 s.f

在上述示例中,通过分别为列 ms 配置该密钥,所有内部字段都使用相同的密钥加密。

具有嵌套字段的列的示例加密配置,其中所有列都使用由 column_key_id 标识的相同密钥加密

>>> import pyarrow.parquet.encryption as pe

schema = pa.schema([
  ("MapColumn", pa.map_(pa.string(), pa.int32())),
  ("StructColumn", pa.struct([("f1", pa.int32()), ("f2", pa.string())])),
])

encryption_config = pe.EncryptionConfiguration(
   footer_key="footer_key_name",
   column_keys={
      "column_key_id": [ "MapColumn", "StructColumn" ],
   },
)

具有嵌套字段的列的示例加密配置,其中一些内部字段使用由 column_key_id 标识的相同密钥加密

>>> import pyarrow.parquet.encryption as pe

>>> schema = pa.schema([
...   ("MapColumn", pa.map_(pa.string(), pa.int32())),
...   ("StructColumn", pa.struct([("f1", pa.int32()), ("f2", pa.string())])),
... ])

>>> encryption_config = pe.EncryptionConfiguration(
...    footer_key="footer_key_name",
...    column_keys={
...       "column_key_id": [ "MapColumn.key_value.value", "StructColumn.f1" ],
...    },
... )

解密配置#

pyarrow.parquet.encryption.DecryptionConfiguration(创建文件解密属性时使用)是可选的,它包括以下选项:

  • cache_lifetime:缓存实体(密钥加密密钥、本地包装密钥、KMS 客户端对象)的生存期,表示为 datetime.timedelta

内容定义分块 (CDC)#

注意

此功能是实验性的,在未来的版本中可能会发生变化。

PyArrow 引入了一项实验性功能,用于使用内容定义分块 (CDC) 为内容寻址存储 (CAS) 系统优化 Parquet 文件。此功能实现了跨文件的高效数据去重,从而改善网络传输和存储效率。

启用后,数据页将根据内容定义的分块边界进行写入,这些边界由基于数据实际内容识别块边界的滚动哈希算法确定。当列中的数据被修改(例如插入、删除或更新)时,此方法最大限度地减少了更改的数据页数量。

可以通过在 Parquet 写入器中设置 use_content_defined_chunking 参数来启用此功能。它接受布尔值或字典进行配置。

  • True:使用默认配置:
    • 最小块大小:256 KiB

    • 最大块大小:1024 KiB

    • 归一化级别:0

  • dict:允许自定义分块参数:
    • min_chunk_size:最小块大小(以字节为单位,默认:256 KiB)。

    • max_chunk_size:最大块大小(以字节为单位,默认:1024 KiB)。

    • norm_level:调整块大小分布的归一化级别(默认:0)。

请注意,块大小是在应用任何编码或压缩之前根据逻辑值计算的。数据页的实际大小可能会根据所使用的编码和压缩而有所不同。

注意

为了充分利用此功能,您应确保 Parquet 写入选项在不同的写入和文件之间保持一致。对不同的文件使用不同的写入选项(如压缩、编码或行组大小)可能会阻止正确的去重并导致存储效率低下。

>>> table = pa.Table.from_pandas(df)

>>> # Enable content-defined chunking with default settings
>>> pq.write_table(table, 'example.parquet', use_content_defined_chunking=True)

>>> # Enable content-defined chunking with custom settings
>>> pq.write_table(
...     table,
...     'example_custom.parquet',
...     use_content_defined_chunking={
...         'min_chunk_size': 128 * 1024,  # 128 KiB
...         'max_chunk_size': 512 * 1024,  # 512 KiB
...     }
... )