表格数据集#

pyarrow.dataset 模块提供了高效处理表格数据的功能,这些数据可能大于内存,并且是多文件的。这包括

  • 支持不同来源、文件格式和不同文件系统(本地、云)的统一接口。

  • 源的发现(爬取目录、处理基于目录的分区数据集、基本模式规范化等)

  • 使用谓词下推(过滤行)、投影(选择和派生列)以及可选的并行读取进行优化读取。

当前支持的文件格式为 Parquet、Feather / Arrow IPC、CSV 和 ORC(请注意,ORC 数据集目前只能读取,还不能写入)。目标是在未来将支持扩展到其他文件格式和数据源(例如,数据库连接)。

对于那些熟悉现有的 pyarrow.parquet.ParquetDataset 用于读取 Parquet 数据集的人来说:pyarrow.dataset 的目标类似,但并非专门针对 Parquet 格式,也不局限于 Python:相同的 datasets API 在 R 绑定或 Arrow 中公开。此外,pyarrow.dataset 具有更高的性能和新功能(例如,在文件内而不是仅在分区键上进行过滤)。

读取数据集#

对于下面的示例,让我们创建一个包含两个 parquet 文件的小数据集,该数据集位于一个目录中

In [1]: import tempfile

In [2]: import pathlib

In [3]: import pyarrow as pa

In [4]: import pyarrow.parquet as pq

In [5]: import numpy as np

In [6]: base = pathlib.Path(tempfile.mkdtemp(prefix="pyarrow-"))

In [7]: (base / "parquet_dataset").mkdir(exist_ok=True)

# creating an Arrow Table
In [8]: table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5})

# writing it into two parquet files
In [9]: pq.write_table(table.slice(0, 5), base / "parquet_dataset/data1.parquet")

In [10]: pq.write_table(table.slice(5, 10), base / "parquet_dataset/data2.parquet")

数据集发现#

可以使用 dataset() 函数创建一个 Dataset 对象。我们可以将包含数据文件的目录路径传递给它。

In [11]: import pyarrow.dataset as ds

In [12]: dataset = ds.dataset(base / "parquet_dataset", format="parquet")

In [13]: dataset
Out[13]: <pyarrow._dataset.FileSystemDataset at 0x7fe41378bb20>

除了搜索基本目录之外,dataset() 还接受单个文件或文件路径列表的路径。

创建 Dataset 对象本身不会开始读取数据。如果需要,它只会爬取目录以查找所有文件

In [14]: dataset.files
Out[14]: 
['/tmp/pyarrow-c54gjofy/parquet_dataset/data1.parquet',
 '/tmp/pyarrow-c54gjofy/parquet_dataset/data2.parquet']

… 并推断数据集的 schema(默认情况下从第一个文件推断)

In [15]: print(dataset.schema.to_string(show_field_metadata=False))
a: int64
b: double
c: int64

使用 Dataset.to_table() 方法,我们可以将数据集(或其中的一部分)读取到 pyarrow 表中(请注意,根据数据集的大小,这可能需要大量内存,请参阅下面的过滤/迭代加载)

In [16]: dataset.to_table()
Out[16]: 
pyarrow.Table
a: int64
b: double
c: int64
----
a: [[0,1,2,3,4],[5,6,7,8,9]]
b: [[-1.1423398343185012,-0.9461515760216167,2.0972627853077754,0.5151372091045947,0.249422801290706],[0.9102115779123913,3.634010291375531,0.4046825981197749,-0.9952879610759159,1.5447254069340937]]
c: [[1,2,1,2,1],[2,1,2,1,2]]

# converting to pandas to see the contents of the scanned table
In [17]: dataset.to_table().to_pandas()
Out[17]: 
   a         b  c
0  0 -1.142340  1
1  1 -0.946152  2
2  2  2.097263  1
3  3  0.515137  2
4  4  0.249423  1
5  5  0.910212  2
6  6  3.634010  1
7  7  0.404683  2
8  8 -0.995288  1
9  9  1.544725  2

读取不同的文件格式#

上面的示例使用 Parquet 文件作为数据集来源,但 Dataset API 在多种文件格式和文件系统上提供了统一的接口。目前,支持 Parquet、ORC、Feather / Arrow IPC 和 CSV 文件格式;未来计划支持更多格式。

如果我们以 Feather 文件而不是 Parquet 文件的形式保存该表

In [18]: import pyarrow.feather as feather

In [19]: feather.write_feather(table, base / "data.feather")

…然后我们可以使用相同的函数读取 Feather 文件,但需要指定 format="feather"

In [20]: dataset = ds.dataset(base / "data.feather", format="feather")

In [21]: dataset.to_table().to_pandas().head()
Out[21]: 
   a         b  c
0  0 -1.142340  1
1  1 -0.946152  2
2  2  2.097263  1
3  3  0.515137  2
4  4  0.249423  1

自定义文件格式#

格式名称,如

ds.dataset(..., format="parquet")

是默认构造的 ParquetFileFormat 的简写形式

ds.dataset(..., format=ds.ParquetFileFormat())

可以使用关键字自定义 FileFormat 对象。例如

parquet_format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']})
ds.dataset(..., format=parquet_format)

将在扫描时将列 "a" 配置为字典编码。

过滤数据#

为了避免在只需要一个子集时读取所有数据,可以使用 columnsfilter 关键字。

可以使用 columns 关键字仅读取指定的列

In [22]: dataset = ds.dataset(base / "parquet_dataset", format="parquet")

In [23]: dataset.to_table(columns=['a', 'b']).to_pandas()
Out[23]: 
   a         b
0  0 -1.142340
1  1 -0.946152
2  2  2.097263
3  3  0.515137
4  4  0.249423
5  5  0.910212
6  6  3.634010
7  7  0.404683
8  8 -0.995288
9  9  1.544725

使用 filter 关键字,与过滤谓词不匹配的行将不会包含在返回的表中。该关键字需要一个布尔 Expression,该表达式引用至少一列

In [24]: dataset.to_table(filter=ds.field('a') >= 7).to_pandas()
Out[24]: 
   a         b  c
0  7  0.404683  2
1  8 -0.995288  1
2  9  1.544725  2

In [25]: dataset.to_table(filter=ds.field('c') == 2).to_pandas()
Out[25]: 
   a         b  c
0  1 -0.946152  2
1  3  0.515137  2
2  5  0.910212  2
3  7  0.404683  2
4  9  1.544725  2

构造这些 Expression 对象的最简单方法是使用 field() 辅助函数。可以使用 field() 函数(创建一个 FieldExpression)引用任何列 - 不仅仅是分区列。提供了运算符重载来组合过滤器,包括比较(等于、大于/小于等)、集合成员测试和布尔组合(&|~

In [26]: ds.field('a') != 3
Out[26]: <pyarrow.compute.Expression (a != 3)>

In [27]: ds.field('a').isin([1, 2, 3])
Out[27]: 
<pyarrow.compute.Expression is_in(a, {value_set=int64:[
  1,
  2,
  3
], null_matching_behavior=MATCH})>

In [28]: (ds.field('a') > ds.field('b')) & (ds.field('b') > 1)
Out[28]: <pyarrow.compute.Expression ((a > b) and (b > 1))>

请注意,Expression 对象不能通过 python 逻辑运算符 andornot 进行组合。

投影列#

可以通过传递列名称列表,使用 columns 关键字读取数据集的列子集。该关键字还可以与表达式结合使用,用于更复杂的投影。

在这种情况下,我们传递一个字典,其中键是结果列名称,值是用于构造列值的表达式

In [29]: projection = {
   ....:     "a_renamed": ds.field("a"),
   ....:     "b_as_float32": ds.field("b").cast("float32"),
   ....:     "c_1": ds.field("c") == 1,
   ....: }
   ....: 

In [30]: dataset.to_table(columns=projection).to_pandas().head()
Out[30]: 
   a_renamed  b_as_float32    c_1
0          0     -1.142340   True
1          1     -0.946152  False
2          2      2.097263   True
3          3      0.515137  False
4          4      0.249423   True

该字典还确定列选择(只有字典中的键才会作为列出现在结果表中)。如果要在现有列的基础上包含派生列,则可以从数据集 schema 构建字典

In [31]: projection = {col: ds.field(col) for col in dataset.schema.names}

In [32]: projection.update({"b_large": ds.field("b") > 1})

In [33]: dataset.to_table(columns=projection).to_pandas().head()
Out[33]: 
   a         b  c  b_large
0  0 -1.142340  1    False
1  1 -0.946152  2    False
2  2  2.097263  1     True
3  3  0.515137  2    False
4  4  0.249423  1    False

读取分区数据#

上面显示了一个由包含文件的扁平目录组成的数据集。但是,数据集可以利用嵌套的目录结构来定义分区数据集,其中子目录名称包含有关该目录中存储的数据子集的信息。

例如,按年和月分区的数据集在磁盘上的外观可能如下所示

dataset_name/
  year=2007/
    month=01/
       data0.parquet
       data1.parquet
       ...
    month=02/
       data0.parquet
       data1.parquet
       ...
    month=03/
    ...
  year=2008/
    month=01/
    ...
  ...

上面的分区方案使用“/key=value/”目录名称,如 Apache Hive 中所示。

让我们创建一个小的分区数据集。 write_to_dataset() 函数可以编写此类类似 Hive 的分区数据集。

In [34]: table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5,
   ....:                   'part': ['a'] * 5 + ['b'] * 5})
   ....: 

In [35]: pq.write_to_dataset(table, "parquet_dataset_partitioned",
   ....:                     partition_cols=['part'])
   ....: 

上面创建了一个包含两个子目录(“part=a”和“part=b”)的目录,并且写入这些目录中的 Parquet 文件不再包含“part”列。

使用 dataset() 读取此数据集时,我们现在指定数据集应使用 partitioning 关键字使用类似 hive 的分区方案

In [36]: dataset = ds.dataset("parquet_dataset_partitioned", format="parquet",
   ....:                      partitioning="hive")
   ....: 

In [37]: dataset.files
Out[37]: 
['parquet_dataset_partitioned/part=a/0830537e25e449b68b3160184c6302df-0.parquet',
 'parquet_dataset_partitioned/part=b/0830537e25e449b68b3160184c6302df-0.parquet']

尽管分区字段未包含在实际的 Parquet 文件中,但在扫描此数据集时,它们将添加回结果表中

In [38]: dataset.to_table().to_pandas().head(3)
Out[38]: 
   a         b  c part
0  0  1.750073  1    a
1  1  1.345434  2    a
2  2  0.413815  1    a

我们现在可以按分区键进行过滤,如果它们与过滤器不匹配,则可以完全避免加载文件

In [39]: dataset.to_table(filter=ds.field("part") == "b").to_pandas()
Out[39]: 
   a         b  c part
0  5 -1.002496  2    b
1  6  0.164121  1    b
2  7 -0.598744  2    b
3  8 -0.469405  1    b
4  9  2.311087  2    b

不同的分区方案#

上面的示例使用类似 hive 的目录方案,例如“/year=2009/month=11/day=15”。我们通过传递 partitioning="hive" 关键字来指定这一点。在这种情况下,分区键的类型是从文件路径推断出来的。

也可以使用 partitioning() 函数显式定义分区键的 schema。例如

part = ds.partitioning(
    pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int32())]),
    flavor="hive"
)
dataset = ds.dataset(..., partitioning=part)

还支持“目录分区”,其中文件路径中的段表示分区键的值,而不包含名称(字段名称在段的索引中是隐式的)。例如,给定字段名称“year”、“month”和“day”,一个路径可能是“/2019/11/15”。

由于名称未包含在文件路径中,因此必须在构造目录分区时指定这些名称

part = ds.partitioning(field_names=["year", "month", "day"])

目录分区还支持提供完整的 schema,而不是从文件路径推断类型。

从云存储读取#

除了本地文件外,pyarrow 还支持从云存储读取。目前,支持 HDFSAmazon S3 兼容存储

当传递文件 URI 时,将推断文件系统。例如,指定 S3 路径

dataset = ds.dataset("s3://voltrondata-labs-datasets/nyc-taxi/")

通常,您需要自定义连接参数,然后可以创建文件系统对象并将其传递给 filesystem 关键字

from pyarrow import fs

s3  = fs.S3FileSystem(region="us-east-2")
dataset = ds.dataset("voltrondata-labs-datasets/nyc-taxi/", filesystem=s3)

目前可用的类是 S3FileSystemHadoopFileSystem。有关更多详细信息,请参阅 文件系统接口 文档。

从 Minio 读取#

除了云存储之外,pyarrow 还支持从模拟 S3 API 的 MinIO 对象存储实例读取数据。与 toxiproxy 搭配使用,这对于测试或基准测试很有用。

from pyarrow import fs

# By default, MinIO will listen for unencrypted HTTP traffic.
minio = fs.S3FileSystem(scheme="http", endpoint_override="localhost:9000")
dataset = ds.dataset("voltrondata-labs-datasets/nyc-taxi/", filesystem=minio)

使用 Parquet 数据集#

虽然数据集 API 为不同的文件格式提供了统一的接口,但对于 Parquet 数据集,还存在一些特定的方法。

一些处理框架(如 Dask,可选)在分区数据集中使用 _metadata 文件,该文件包含关于完整数据集的模式和行组元数据的信息。使用这样的文件可以更有效地创建 Parquet 数据集,因为它不需要推断模式并爬取目录以查找所有 Parquet 文件(对于访问文件成本较高的文件系统尤其如此)。parquet_dataset() 函数允许我们从带有 _metadata 文件的分区数据集创建数据集。

dataset = ds.parquet_dataset("/path/to/dir/_metadata")

默认情况下,为 Parquet 数据集构造的 Dataset 对象将每个片段映射到单个 Parquet 文件。如果希望片段映射到 Parquet 文件的每个行组,可以使用片段的 split_by_row_group() 方法

fragments = list(dataset.get_fragments())
fragments[0].split_by_row_group()

此方法返回一个新片段列表,这些片段映射到原始片段(Parquet 文件)的每个行组。get_fragments()split_by_row_group() 都接受一个可选的筛选表达式,以获取筛选后的片段列表。

手动指定数据集#

dataset() 函数可以轻松地创建数据集,该数据集可以查看目录,爬取所有子目录以查找文件和分区信息。但是,有时不需要发现,并且数据集的文件和分区是已知的(例如,当此信息存储在元数据中时)。在这种情况下,可以显式创建数据集,而无需任何自动发现或推断。

在此示例中,我们将使用一个数据集,其中文件名包含额外的分区信息

# creating a dummy dataset: directory with two files
In [40]: table = pa.table({'col1': range(3), 'col2': np.random.randn(3)})

In [41]: (base / "parquet_dataset_manual").mkdir(exist_ok=True)

In [42]: pq.write_table(table, base / "parquet_dataset_manual" / "data_2018.parquet")

In [43]: pq.write_table(table, base / "parquet_dataset_manual" / "data_2019.parquet")

要从文件列表创建数据集,我们需要手动指定路径、模式、格式、文件系统和分区表达式

In [44]: from pyarrow import fs

In [45]: schema = pa.schema([("year", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())])

In [46]: dataset = ds.FileSystemDataset.from_paths(
   ....:     ["data_2018.parquet", "data_2019.parquet"], schema=schema, format=ds.ParquetFileFormat(),
   ....:     filesystem=fs.SubTreeFileSystem(str(base / "parquet_dataset_manual"), fs.LocalFileSystem()),
   ....:     partitions=[ds.field('year') == 2018, ds.field('year') == 2019])
   ....: 

由于我们为文件指定了“分区表达式”,因此在读取数据时,此信息将作为列物化,并可用于筛选

In [47]: dataset.to_table().to_pandas()
Out[47]: 
   year  col1      col2
0  2018     0 -1.564310
1  2018     1 -1.397956
2  2018     2 -1.809460
3  2019     0 -1.564310
4  2019     1 -1.397956
5  2019     2 -1.809460

In [48]: dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
Out[48]: 
   year  col1      col2
0  2019     0 -1.564310
1  2019     1 -1.397956
2  2019     2 -1.809460

手动列出文件的另一个好处是,文件的顺序控制着数据的顺序。当执行有序读取(或读取到表)时,返回的行将与给定的文件顺序匹配。这仅适用于使用文件列表构建数据集时。当通过扫描目录来发现文件时,不保证任何顺序。

迭代(核外或流式)读取#

前面的示例演示了如何使用 to_table() 将数据读取到表中。如果数据集很小或者只需要读取少量数据,则这很有用。数据集 API 包含其他方法,可以流式方式读取和处理大量数据。

最简单的方法是使用方法 Dataset.to_batches()。此方法返回记录批次的迭代器。例如,我们可以使用此方法来计算列的平均值,而无需将整个列加载到内存中

In [49]: import pyarrow.compute as pc

In [50]: col2_sum = 0

In [51]: count = 0

In [52]: for batch in dataset.to_batches(columns=["col2"], filter=~ds.field("col2").is_null()):
   ....:     col2_sum += pc.sum(batch.column("col2")).as_py()
   ....:     count += batch.num_rows
   ....: 

In [53]: mean_a = col2_sum/count

自定义批次大小#

数据集的迭代读取通常称为数据集的“扫描”,而 pyarrow 使用一个称为 Scanner 的对象来执行此操作。to_table()to_batches() 方法会自动为您创建 Scanner。您传递给这些方法的任何参数都将传递给 Scanner 构造函数。

其中一个参数是 batch_size。这控制扫描器返回的批次的最大大小。如果数据集由小文件组成或者这些文件本身由小的行组组成,则批次仍然可以小于 batch_size。例如,除非将 batch_size 设置为较小的值,否则每个行组包含 10,000 行的 Parquet 文件将产生最多 10,000 行的批次。

默认的批次大小为一百万行,这通常是一个很好的默认值,但是如果要读取大量列,则可能需要对其进行自定义。

关于事务和 ACID 保证的说明#

数据集 API 不提供事务支持或任何 ACID 保证。这会影响读取和写入。并发读取是可以的。并发写入或与读取同时发生的写入可能会产生意外行为。可以使用各种方法来避免对相同的文件进行操作,例如为每个写入器使用唯一的 basename 模板,为新文件使用临时目录,或者单独存储文件列表,而不是依赖目录发现。

在写入过程中意外终止进程可能会使系统处于不一致的状态。写入调用通常在要写入的字节完全传递到 OS 页面缓存后立即返回。即使写入操作已完成,如果在写入调用后立即发生突然的电源故障,则仍有可能丢失部分文件。

大多数文件格式在末尾都有魔术数字。这意味着可以安全地检测并丢弃部分文件写入。CSV 文件格式没有任何这样的概念,并且部分写入的 CSV 文件可能会被检测为有效。

写入数据集#

数据集 API 还使用 write_dataset() 简化了将数据写入数据集的操作。当您要对数据进行分区或需要写入大量数据时,这很有用。基本数据集写入类似于写入表,只不过您指定的是目录而不是文件名。

In [54]: table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 5})

In [55]: ds.write_dataset(table, "sample_dataset", format="parquet")

上面的示例将在我们的 sample_dataset 目录中创建一个名为 part-0.parquet 的文件。

警告

如果您再次运行该示例,它将替换现有的 part-0.parquet 文件。将文件附加到现有数据集需要为每次调用 ds.write_dataset 指定一个新的 basename_template,以避免覆盖。

写入分区数据#

可以使用分区对象来指定应如何对输出数据进行分区。这使用与我们用于读取数据集的分区对象相同的类型。要将上述数据写入分区目录,我们只需要指定要如何对数据集进行分区。例如

In [56]: part = ds.partitioning(
   ....:     pa.schema([("c", pa.int16())]), flavor="hive"
   ....: )
   ....: 

In [57]: ds.write_dataset(table, "partitioned_dataset", format="parquet", partitioning=part)

这将创建两个文件。我们一半的数据将位于 dataset_root/c=1 目录中,另一半将位于 dataset_root/c=2 目录中。

分区性能注意事项#

分区数据集有两个方面会影响性能:它增加了文件数量,并在文件周围创建了目录结构。这两者都有好处和成本。根据配置和数据集的大小,成本可能会超过收益。

由于分区将数据集拆分为多个文件,因此可以并行读取和写入分区数据集。但是,每个附加的文件都会在文件系统交互的处理中增加少量开销。它还会增加整体数据集大小,因为每个文件都有一些共享的元数据。例如,每个 Parquet 文件都包含模式和组级统计信息。分区的数量是文件数量的下限。如果按日期对一年数据的数据集进行分区,则至少有 365 个文件。如果您进一步按另一个具有 1,000 个唯一值的维度进行分区,则将有多达 365,000 个文件。这种精细的分区通常会导致小文件,这些文件主要由元数据组成。

分区数据集创建嵌套的文件夹结构,这些结构允许我们修剪在扫描中加载的文件。但是,这会增加在数据集中发现文件的开销,因为我们需要递归地“列出目录”以查找数据文件。太精细的分区可能会导致问题:按日期对一年数据的数据集进行分区将需要 365 个列表调用才能找到所有文件;添加另一个具有基数 1,000 的列将使该调用达到 365,365 次。

最理想的分区布局将取决于您的数据、访问模式以及读取数据的系统。大多数系统,包括 Arrow,都应该能在各种文件大小和分区布局下工作,但您应该避免极端情况。以下指南可以帮助您避免一些已知的最坏情况。

  • 避免文件小于 20MB 或大于 2GB。

  • 避免分区布局超过 10,000 个不同的分区。

对于文件格式中具有组概念的,例如 Parquet,也适用类似的指导原则。行组在读取时可以提供并行性,并允许基于统计数据跳过数据,但非常小的组可能会导致元数据成为文件大小的重要组成部分。在大多数情况下,Arrow 的文件写入器为组大小提供了合理的默认值。

配置写入期间打开的文件#

将数据写入磁盘时,有一些参数对于优化写入非常重要,例如每个文件的行数以及写入期间允许的最大打开文件数。

使用 write_dataset()max_open_files 参数设置最大打开文件数。

如果 max_open_files 设置为大于 0 的值,则将限制可以保持打开的最大文件数。这仅适用于写入分区数据集,其中行根据其分区值被分派到相应的文件。如果尝试打开的文件过多,则将关闭最近最少使用的文件。如果此设置设置得太低,您最终可能会将数据碎片化成许多小文件。

如果您的进程同时使用其他文件句柄(无论使用数据集扫描器还是其他方式),您可能会达到系统文件句柄限制。例如,如果您正在扫描包含 300 个文件的数据集,并写入 900 个文件,则总共 1200 个文件可能会超出系统限制。(在 Linux 上,这可能会出现“打开文件过多”的错误。)您可以降低 max_open_files 设置,或增加系统上的文件句柄限制。默认值为 900,允许扫描器在达到 Linux 默认的 1024 文件限制之前打开一定数量的文件。

write_dataset() 中使用的另一个重要配置是 max_rows_per_file

使用 write_dataset()max_rows_per_files 参数设置每个文件中写入的最大行数。

如果 max_rows_per_file 设置为大于 0 的值,则将限制任何单个文件中放置的行数。否则,将没有限制,并且将在每个输出目录中创建一个文件,除非需要关闭文件以满足 max_open_files 的限制。此设置是控制文件大小的主要方式。对于写入大量数据的工作负载,如果没有行数上限,文件可能会变得非常大,从而导致下游读取器出现内存不足错误。行数和文件大小之间的关系取决于数据集模式以及数据压缩程度(如果有的话)。

配置写入期间每个组的行数#

可以配置每个组写入磁盘的数据量。此配置包括下限和上限。构成行组所需的最小行数由 write_dataset()min_rows_per_group 参数定义。

注意

如果 min_rows_per_group 设置为大于 0 的值,则会导致数据集写入器批量处理传入数据,并且仅在累积了足够的行时才将行组写入磁盘。如果其他选项(例如 max_open_filesmax_rows_per_file)强制使用较小的行组大小,则最终的行组大小可能会小于此值。

每个组允许的最大行数由 write_dataset()max_rows_per_group 参数定义。

如果 max_rows_per_group 设置为大于 0 的值,则数据集写入器可能会将大的传入批次拆分为多个行组。如果设置了此值,则还应设置 min_rows_per_group,否则您可能会得到非常小的行组(例如,如果传入的行组大小仅略大于此值)。

行组内置于 Parquet 和 IPC/Feather 格式中,但不影响 JSON 或 CSV。在 Arrow 中读取 Parquet 和 IPC 格式时,行组边界将成为记录批次的边界,从而确定下游读取器的默认批次大小。此外,Parquet 文件中的行组具有列统计信息,可以帮助读取器跳过不相关的数据,但可能会增加文件大小。作为一个极端的例子,如果在 Parquet 中设置 max_rows_per_group=1,则会得到大型文件,因为大多数文件将是行组统计信息。

写入大量数据#

以上示例从表中写入数据。如果要写入大量数据,您可能无法将所有数据加载到单个内存中的表中。幸运的是,write_dataset() 方法还接受记录批次的迭代器。例如,这使得重新分区大型数据集变得非常简单,而无需将整个数据集加载到内存中。

In [58]: old_part = ds.partitioning(
   ....:     pa.schema([("c", pa.int16())]), flavor="hive"
   ....: )
   ....: 

In [59]: new_part = ds.partitioning(
   ....:     pa.schema([("c", pa.int16())]), flavor=None
   ....: )
   ....: 

In [60]: input_dataset = ds.dataset("partitioned_dataset", partitioning=old_part)

# A scanner can act as an iterator of record batches but you could also receive
# data from the network (e.g. via flight), from your own scanning, or from any
# other method that yields record batches.  In addition, you can pass a dataset
# into write_dataset directly but this method is useful if you want to customize
# the scanner (e.g. to filter the input dataset or set a maximum batch size)
In [61]: scanner = input_dataset.scanner()

In [62]: ds.write_dataset(scanner, "repartitioned_dataset", format="parquet", partitioning=new_part)

在上面的示例运行后,我们的数据将位于 dataset_root/1 和 dataset_root/2 目录中。在这个简单的示例中,我们没有更改数据的结构(仅更改了目录命名方案),但您也可以使用此机制来更改用于对数据集进行分区的列。当您希望以特定方式查询数据时,这非常有用,并且您可以利用分区来减少需要读取的数据量。

自定义和检查写入的文件#

默认情况下,数据集 API 将创建名为“part-i.format”的文件,其中“i”是在写入期间生成的整数,“format”是在 write_dataset 调用中指定的文件格式。对于简单的数据集,可能可以知道将创建哪些文件,但对于较大或分区的数据集,则不太容易。可以使用 file_visitor 关键字来提供一个访问器,该访问器将在创建每个文件时被调用。

In [63]: def file_visitor(written_file):
   ....:     print(f"path={written_file.path}")
   ....:     print(f"size={written_file.size} bytes")
   ....:     print(f"metadata={written_file.metadata}")
   ....: 
In [64]: ds.write_dataset(table, "dataset_visited", format="parquet", partitioning=part,
   ....:                  file_visitor=file_visitor)
   ....: 
path=dataset_visited/c=1/part-0.parquet
size=793 bytes
metadata=<pyarrow._parquet.FileMetaData object at 0x7fe412578a90>
  created_by: parquet-cpp-arrow version 19.0.0
  num_columns: 2
  num_rows: 5
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 0
path=dataset_visited/c=2/part-0.parquet
size=795 bytes
metadata=<pyarrow._parquet.FileMetaData object at 0x7fe4138da9d0>
  created_by: parquet-cpp-arrow version 19.0.0
  num_columns: 2
  num_rows: 5
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 0

这将允许您收集属于数据集的文件名并将其存储在其他位置,这在您希望避免下次需要读取数据时扫描目录时非常有用。它还可以用于生成其他工具(例如 Dask 或 Spark)使用的 _metadata 索引文件,以创建数据集的索引。

配置写入期间特定于格式的参数#

除了所有格式共享的通用选项之外,还有特定于格式的选项,这些选项对于特定格式是唯一的。例如,要允许在写入 Parquet 文件时截断时间戳。

In [65]: parquet_format = ds.ParquetFileFormat()

In [66]: write_options = parquet_format.make_write_options(allow_truncated_timestamps=True)

In [67]: ds.write_dataset(table, "sample_dataset2", format="parquet", partitioning=part,
   ....:                  file_options=write_options)
   ....: