表格数据集#

pyarrow.dataset 模块提供了高效处理表格、可能大于内存和多文件数据集的功能。这包括:

  • 支持不同来源和文件格式以及不同文件系统(本地、云)的统一接口。

  • 来源发现(爬取目录、处理基于目录的分区数据集、基本模式规范化等)。

  • 通过谓词下推(过滤行)、投影(选择和派生列)以及可选的并行读取进行优化读取。

目前支持的文件格式有 Parquet、Feather / Arrow IPC、CSV 和 ORC(请注意,ORC 数据集目前只能读取,尚不能写入)。目标是在未来将支持扩展到其他文件格式和数据源(例如数据库连接)。

对于熟悉现有用于读取 Parquet 数据集的 pyarrow.parquet.ParquetDataset 的用户:pyarrow.dataset 的目标类似,但它不限于 Parquet 格式,也不与 Python 绑定:相同的 datasets API 在 R 绑定或 Arrow 中也可用。此外,pyarrow.dataset 拥有改进的性能和新功能(例如,在文件内部进行过滤,而不仅仅是基于分区键过滤)。

读取数据集#

对于下面的例子,我们创建一个由两个 parquet 文件组成的目录小数据集:

In [1]: import tempfile

In [2]: import pathlib

In [3]: import pyarrow as pa

In [4]: import pyarrow.parquet as pq

In [5]: import numpy as np

In [6]: base = pathlib.Path(tempfile.mkdtemp(prefix="pyarrow-"))

In [7]: (base / "parquet_dataset").mkdir(exist_ok=True)

# creating an Arrow Table
In [8]: table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5})

# writing it into two parquet files
In [9]: pq.write_table(table.slice(0, 5), base / "parquet_dataset/data1.parquet")

In [10]: pq.write_table(table.slice(5, 10), base / "parquet_dataset/data2.parquet")

数据集发现#

Dataset 对象可以通过 dataset() 函数创建。我们可以向它传递包含数据文件的目录路径。

In [11]: import pyarrow.dataset as ds

In [12]: dataset = ds.dataset(base / "parquet_dataset", format="parquet")

In [13]: dataset
Out[13]: <pyarrow._dataset.FileSystemDataset at 0x7fe030eacd00>

除了搜索基本目录,dataset() 还接受单个文件路径或文件路径列表。

创建 Dataset 对象并不会立即开始读取数据。如果需要,它只会爬取目录以查找所有文件。

In [14]: dataset.files
Out[14]: 
['/tmp/pyarrow-9_qw6tb_/parquet_dataset/data1.parquet',
 '/tmp/pyarrow-9_qw6tb_/parquet_dataset/data2.parquet']

……并推断数据集的模式(默认从第一个文件推断)。

In [15]: print(dataset.schema.to_string(show_field_metadata=False))
a: int64
b: double
c: int64

使用 Dataset.to_table() 方法,我们可以将数据集(或其一部分)读取到 pyarrow Table 中(请注意,根据数据集的大小,这可能需要大量内存,有关过滤/迭代加载,请参见下文)。

In [16]: dataset.to_table()
Out[16]: 
pyarrow.Table
a: int64
b: double
c: int64
----
a: [[0,1,2,3,4],[5,6,7,8,9]]
b: [[1.1373880927195317,-0.5829723903558806,1.0058469280348412,-0.7464153609137238,-0.49764206660895927],[-0.9559528016600878,-2.007285802328415,-0.3980382178636434,1.8734612388530516,0.8336648980261276]]
c: [[1,2,1,2,1],[2,1,2,1,2]]

# converting to pandas to see the contents of the scanned table
In [17]: dataset.to_table().to_pandas()
Out[17]: 
   a         b  c
0  0  1.137388  1
1  1 -0.582972  2
2  2  1.005847  1
3  3 -0.746415  2
4  4 -0.497642  1
5  5 -0.955953  2
6  6 -2.007286  1
7  7 -0.398038  2
8  8  1.873461  1
9  9  0.833665  2

读取不同文件格式#

以上示例使用 Parquet 文件作为数据集源,但 Dataset API 为多种文件格式和文件系统提供了统一的接口。目前,支持 Parquet、ORC、Feather / Arrow IPC 和 CSV 文件格式;未来计划支持更多格式。

如果我们将表保存为 Feather 文件而不是 Parquet 文件:

In [18]: import pyarrow.feather as feather

In [19]: feather.write_feather(table, base / "data.feather")

……那么我们可以使用相同的函数读取 Feather 文件,但需要指定 format="feather"

In [20]: dataset = ds.dataset(base / "data.feather", format="feather")

In [21]: dataset.to_table().to_pandas().head()
Out[21]: 
   a         b  c
0  0  1.137388  1
1  1 -0.582972  2
2  2  1.005847  1
3  3 -0.746415  2
4  4 -0.497642  1

自定义文件格式#

字符串格式名称,如:

ds.dataset(..., format="parquet")

是默认构造的 ParquetFileFormat 的简写。

ds.dataset(..., format=ds.ParquetFileFormat())

FileFormat 对象可以使用关键字进行自定义。例如:

parquet_format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']})
ds.dataset(..., format=parquet_format)

将配置列 "a" 在扫描时进行字典编码。

过滤数据#

为避免在只需要子集时读取所有数据,可以使用 columnsfilter 关键字。

columns 关键字可用于仅读取指定的列:

In [22]: dataset = ds.dataset(base / "parquet_dataset", format="parquet")

In [23]: dataset.to_table(columns=['a', 'b']).to_pandas()
Out[23]: 
   a         b
0  0  1.137388
1  1 -0.582972
2  2  1.005847
3  3 -0.746415
4  4 -0.497642
5  5 -0.955953
6  6 -2.007286
7  7 -0.398038
8  8  1.873461
9  9  0.833665

使用 filter 关键字,不匹配过滤谓词的行将不会包含在返回的表中。该关键字需要一个布尔 Expression,引用至少一列。

In [24]: dataset.to_table(filter=ds.field('a') >= 7).to_pandas()
Out[24]: 
   a         b  c
0  7 -0.398038  2
1  8  1.873461  1
2  9  0.833665  2

In [25]: dataset.to_table(filter=ds.field('c') == 2).to_pandas()
Out[25]: 
   a         b  c
0  1 -0.582972  2
1  3 -0.746415  2
2  5 -0.955953  2
3  7 -0.398038  2
4  9  0.833665  2

构建这些 Expression 对象最简单的方法是使用 field() 辅助函数。任何列(而不仅仅是分区列)都可以使用 field() 函数(它创建一个 FieldExpression)进行引用。提供了运算符重载来组合过滤器,包括比较(相等、大于/小于等)、集合成员测试和布尔组合(&|~)。

In [26]: ds.field('a') != 3
Out[26]: <pyarrow.compute.Expression (a != 3)>

In [27]: ds.field('a').isin([1, 2, 3])
Out[27]: 
<pyarrow.compute.Expression is_in(a, {value_set=int64:[
  1,
  2,
  3
], null_matching_behavior=MATCH})>

In [28]: (ds.field('a') > ds.field('b')) & (ds.field('b') > 1)
Out[28]: <pyarrow.compute.Expression ((a > b) and (b > 1))>

请注意,Expression 对象不能通过 Python 逻辑运算符 andornot 组合。

投影列#

columns 关键字可用于通过传递列名列表来读取数据集的列子集。该关键字还可以与表达式结合用于更复杂的投影。

在这种情况下,我们传递一个字典,其中键是结果列名,值是用于构造列值的表达式。

In [29]: projection = {
   ....:     "a_renamed": ds.field("a"),
   ....:     "b_as_float32": ds.field("b").cast("float32"),
   ....:     "c_1": ds.field("c") == 1,
   ....: }
   ....: 

In [30]: dataset.to_table(columns=projection).to_pandas().head()
Out[30]: 
   a_renamed  b_as_float32    c_1
0          0      1.137388   True
1          1     -0.582972  False
2          2      1.005847   True
3          3     -0.746415  False
4          4     -0.497642   True

该字典还决定了列选择(只有字典中的键会作为列存在于结果表中)。如果你想在现有列的基础上**额外**包含一个派生列,你可以根据数据集模式构建字典:

In [31]: projection = {col: ds.field(col) for col in dataset.schema.names}

In [32]: projection.update({"b_large": ds.field("b") > 1})

In [33]: dataset.to_table(columns=projection).to_pandas().head()
Out[33]: 
   a         b  c  b_large
0  0  1.137388  1     True
1  1 -0.582972  2    False
2  2  1.005847  1     True
3  3 -0.746415  2    False
4  4 -0.497642  1    False

读取分区数据#

上面展示了一个由文件组成的扁平目录数据集。然而,数据集可以利用嵌套目录结构来定义分区数据集,其中子目录名称包含有关该目录中存储的数据子集的信息。

例如,按年和月分区的数据集在磁盘上可能看起来像这样:

dataset_name/
  year=2007/
    month=01/
       data0.parquet
       data1.parquet
       ...
    month=02/
       data0.parquet
       data1.parquet
       ...
    month=03/
    ...
  year=2008/
    month=01/
    ...
  ...

上述分区方案使用了“/key=value/”目录名,这在 Apache Hive 中很常见。

让我们创建一个小的分区数据集。write_to_dataset() 函数可以写入这种类似 Hive 的分区数据集。

In [34]: table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5,
   ....:                   'part': ['a'] * 5 + ['b'] * 5})
   ....: 

In [35]: pq.write_to_dataset(table, "parquet_dataset_partitioned",
   ....:                     partition_cols=['part'])
   ....: 

上面创建了一个包含两个子目录(“part=a”和“part=b”)的目录,并且写入这些目录的 Parquet 文件不再包含“part”列。

使用 dataset() 读取此数据集时,我们现在使用 partitioning 关键字指定数据集应使用类似 Hive 的分区方案:

In [36]: dataset = ds.dataset("parquet_dataset_partitioned", format="parquet",
   ....:                      partitioning="hive")
   ....: 

In [37]: dataset.files
Out[37]: 
['parquet_dataset_partitioned/part=a/1ee1e9045ad24a2ba5a45b7f1b7e3d05-0.parquet',
 'parquet_dataset_partitioned/part=b/1ee1e9045ad24a2ba5a45b7f1b7e3d05-0.parquet']

尽管分区字段未包含在实际的 Parquet 文件中,但在扫描此数据集时,它们将被添加回结果表中。

In [38]: dataset.to_table().to_pandas().head(3)
Out[38]: 
   a         b  c part
0  0 -0.459856  1    a
1  1  1.137802  2    a
2  2  0.160218  1    a

我们现在可以根据分区键进行过滤,如果不匹配过滤器,则完全避免加载文件。

In [39]: dataset.to_table(filter=ds.field("part") == "b").to_pandas()
Out[39]: 
   a         b  c part
0  5  0.129500  2    b
1  6  0.754953  1    b
2  7 -1.534345  2    b
3  8 -1.097663  1    b
4  9 -0.558693  2    b

不同的分区方案#

上面的例子使用了类似 Hive 的目录方案,例如“/year=2009/month=11/day=15”。我们通过传递 partitioning="hive" 关键字来指定这一点。在这种情况下,分区键的类型是从文件路径推断的。

也可以使用 partitioning() 函数明确定义分区键的模式。例如:

part = ds.partitioning(
    pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int32())]),
    flavor="hive"
)
dataset = ds.dataset(..., partitioning=part)

也支持“目录分区”,其中文件路径中的段表示分区键的值,不包含名称(字段名隐含在段的索引中)。例如,给定字段名“year”、“month”和“day”,一个路径可能是“/2019/11/15”。

由于名称未包含在文件路径中,因此在构建目录分区时必须指定这些名称。

part = ds.partitioning(field_names=["year", "month", "day"])

目录分区还支持提供完整的模式,而不是从文件路径推断类型。

从云存储读取#

除了本地文件,pyarrow 还支持从云存储读取。目前,支持 HDFSAmazon S3-compatible storage

当传递文件 URI 时,文件系统将自动推断。例如,指定 S3 路径:

dataset = ds.dataset("s3://voltrondata-labs-datasets/nyc-taxi/")

通常,您会希望自定义连接参数,然后可以创建一个文件系统对象并将其传递给 filesystem 关键字。

from pyarrow import fs

s3  = fs.S3FileSystem(region="us-east-2")
dataset = ds.dataset("voltrondata-labs-datasets/nyc-taxi/", filesystem=s3)

目前可用的类有 S3FileSystemHadoopFileSystem。有关更多详细信息,请参阅 文件系统接口 文档。

从 Minio 读取#

除了云存储,pyarrow 还支持从模拟 S3 API 的 MinIO 对象存储实例读取数据。与 toxiproxy 结合使用,这对于测试或基准测试非常有用。

from pyarrow import fs

# By default, MinIO will listen for unencrypted HTTP traffic.
minio = fs.S3FileSystem(scheme="http", endpoint_override="localhost:9000")
dataset = ds.dataset("voltrondata-labs-datasets/nyc-taxi/", filesystem=minio)

使用 Parquet 数据集#

虽然 Datasets API 为不同的文件格式提供了统一的接口,但 Parquet 数据集存在一些特定方法。

一些处理框架(如 Dask)会(可选地)使用一个 _metadata 文件,其中包含分区数据集的模式和行组元数据信息。使用这样的文件可以更有效地创建 Parquet 数据集,因为它不需要推断模式和爬取所有 Parquet 文件的目录(对于访问文件成本较高的文件系统尤其如此)。parquet_dataset() 函数允许我们从具有 _metadata 文件的分区数据集创建数据集:

dataset = ds.parquet_dataset("/path/to/dir/_metadata")

默认情况下,为 Parquet 数据集构建的 Dataset 对象将每个片段映射到一个 Parquet 文件。如果您希望片段映射到 Parquet 文件的每个行组,可以使用片段的 split_by_row_group() 方法。

fragments = list(dataset.get_fragments())
fragments[0].split_by_row_group()

此方法返回一个新片段列表,每个片段映射到原始片段(Parquet 文件)的每个行组。get_fragments()split_by_row_group() 都接受可选的过滤表达式以获取过滤后的片段列表。

数据集的手动指定#

dataset() 函数允许轻松创建数据集,查看目录,爬取所有子目录以获取文件和分区信息。然而,有时不需要发现,并且数据集的文件和分区已经已知(例如,当此信息存储在元数据中时)。在这种情况下,可以显式创建数据集,而无需任何自动发现或推断。

在本例中,我们将使用一个文件名包含额外分区信息的数据集。

# creating a dummy dataset: directory with two files
In [40]: table = pa.table({'col1': range(3), 'col2': np.random.randn(3)})

In [41]: (base / "parquet_dataset_manual").mkdir(exist_ok=True)

In [42]: pq.write_table(table, base / "parquet_dataset_manual" / "data_2018.parquet")

In [43]: pq.write_table(table, base / "parquet_dataset_manual" / "data_2019.parquet")

要从文件列表创建数据集,我们需要手动指定路径、模式、格式、文件系统和分区表达式。

In [44]: from pyarrow import fs

In [45]: schema = pa.schema([("year", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())])

In [46]: dataset = ds.FileSystemDataset.from_paths(
   ....:     ["data_2018.parquet", "data_2019.parquet"], schema=schema, format=ds.ParquetFileFormat(),
   ....:     filesystem=fs.SubTreeFileSystem(str(base / "parquet_dataset_manual"), fs.LocalFileSystem()),
   ....:     partitions=[ds.field('year') == 2018, ds.field('year') == 2019])
   ....: 

由于我们为文件指定了“分区表达式”,这些信息在读取数据时被具体化为列,并可用于过滤。

In [47]: dataset.to_table().to_pandas()
Out[47]: 
   year  col1      col2
0  2018     0 -0.977647
1  2018     1  0.200212
2  2018     2  1.112589
3  2019     0 -0.977647
4  2019     1  0.200212
5  2019     2  1.112589

In [48]: dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
Out[48]: 
   year  col1      col2
0  2019     0 -0.977647
1  2019     1  0.200212
2  2019     2  1.112589

手动列出文件的另一个好处是文件顺序控制着数据的顺序。当执行有序读取(或读取到表中)时,返回的行将与给定文件的顺序匹配。这仅适用于使用文件列表构建数据集的情况。如果文件是通过扫描目录发现的,则不保证顺序。

迭代(内存外或流式)读取#

前面的示例演示了如何使用 to_table() 将数据读取到表中。这在数据集较小或只需要读取少量数据时很有用。数据集 API 包含其他方法,可以以流式方式读取和处理大量数据。

最简单的方法是使用 Dataset.to_batches() 方法。此方法返回记录批次的迭代器。例如,我们可以使用此方法计算列的平均值,而无需将整个列加载到内存中。

In [49]: import pyarrow.compute as pc

In [50]: col2_sum = 0

In [51]: count = 0

In [52]: for batch in dataset.to_batches(columns=["col2"], filter=~ds.field("col2").is_null()):
   ....:     col2_sum += pc.sum(batch.column("col2")).as_py()
   ....:     count += batch.num_rows
   ....: 

In [53]: mean_a = col2_sum/count

自定义批次大小#

数据集的迭代读取通常称为数据集的“扫描”,pyarrow 使用一个名为 Scanner 的对象来完成此操作。to_table()to_batches() 方法会自动为您创建 Scanner 对象。您传递给这些方法的任何参数都将传递给 Scanner 构造函数。

其中一个参数是 batch_size。它控制扫描器返回的批次的最大大小。如果数据集由小文件组成,或者这些文件本身由小行组组成,则批次仍可能小于 batch_size。例如,一个每个行组有 10,000 行的 parquet 文件将生成最多 10,000 行的批次,除非将 batch_size 设置为较小的值。

默认批次大小为一百万行,这通常是一个不错的默认值,但如果您要读取大量列,您可能需要对其进行自定义。

关于事务与 ACID 保证的说明#

数据集 API 不提供事务支持或任何 ACID 保证。这会影响读取和写入。并发读取没有问题。并发写入或与读取同时进行的写入可能会出现意外行为。可以使用各种方法来避免对相同文件进行操作,例如为每个写入器使用唯一的基名模板、为新文件使用临时目录,或者单独存储文件列表而不是依赖目录发现。

在写入过程中意外终止进程可能导致系统处于不一致状态。写入调用通常在要写入的字节完全交付给操作系统页面缓存后立即返回。即使写入操作已完成,如果在写入调用后立即发生突然断电,文件的一部分仍可能丢失。

大多数文件格式都有在末尾写入的“魔数”。这意味着可以安全地检测和丢弃部分文件写入。CSV 文件格式没有这样的概念,部分写入的 CSV 文件可能会被检测为有效文件。

写入数据集#

数据集 API 还通过 write_dataset() 简化了向数据集写入数据。当您想要分区数据或需要写入大量数据时,这会很有用。基本的数据集写入类似于写入表,只是您指定一个目录而不是文件名。

In [54]: table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 5})

In [55]: ds.write_dataset(table, "sample_dataset", format="parquet")

上面的示例将在我们的 sample_dataset 目录中创建一个名为 part-0.parquet 的文件。

警告

如果您再次运行该示例,它将替换现有的 part-0.parquet 文件。将文件附加到现有数据集需要为每次调用 ds.write_dataset 指定新的 basename_template 以避免覆盖。

写入分区数据#

分区对象可用于指定输出数据应如何分区。这使用了我们用于读取数据集的相同类型的分区对象。要将上述数据写入分区目录,我们只需要指定我们希望数据集如何分区。例如:

In [56]: part = ds.partitioning(
   ....:     pa.schema([("c", pa.int16())]), flavor="hive"
   ....: )
   ....: 

In [57]: ds.write_dataset(table, "partitioned_dataset", format="parquet", partitioning=part)

这将创建两个文件。我们数据的一半将位于 dataset_root/c=1 目录中,另一半将位于 dataset_root/c=2 目录中。

分区性能考量#

分区数据集有两个影响性能的方面:它增加了文件数量,并在文件周围创建了目录结构。这两个方面都有优点也有缺点。根据配置和数据集的大小,成本可能超过收益。

由于分区将数据集分成多个文件,因此分区数据集可以并行读写。然而,每个额外的文件都会在文件系统交互的处理中增加一点开销。它还会增加整个数据集的大小,因为每个文件都包含一些共享元数据。例如,每个 Parquet 文件都包含模式和组级统计信息。分区数量是文件数量的下限。如果您将数据集按日期(一年数据)分区,您将至少有 365 个文件。如果您再按另一个维度(有 1,000 个唯一值)分区,您将有多达 365,000 个文件。这种精细的分区通常会导致文件很小,主要由元数据组成。

分区数据集创建嵌套的文件夹结构,这些结构允许我们在扫描时修剪要加载的文件。然而,这会增加发现数据集中文件的开销,因为我们需要递归地“列出目录”才能找到数据文件。过于精细的分区可能会导致问题:按日期分区一年的数据集将需要 365 次列出调用才能找到所有文件;再添加一个基数为 1,000 的列,这将变为 365,365 次调用。

最优化分区布局将取决于您的数据、访问模式以及将要读取数据的系统。大多数系统,包括 Arrow,应该都能在各种文件大小和分区布局下工作,但您应该避免一些极端情况。以下准则可以帮助避免一些已知的最坏情况:

  • 避免文件小于 20MB 和大于 2GB。

  • 避免分区布局中包含超过 10,000 个不同分区。

对于文件中包含组概念的文件格式,例如 Parquet,也适用类似的准则。行组可以在读取时提供并行性,并允许根据统计数据跳过数据,但非常小的组可能会导致元数据在文件大小中占据很大一部分。Arrow 的文件写入器在大多数情况下都为组大小提供了合理的默认值。

写入时配置打开文件#

将数据写入磁盘时,有一些参数对于优化写入可能很重要,例如每个文件的行数和写入期间允许的最大打开文件数。

使用 write_dataset()max_open_files 参数设置最大打开文件数。

如果 max_open_files 设置大于 0,则这将限制可以保持打开的文件最大数量。这仅适用于写入分区数据集,其中行根据其分区值调度到适当的文件。如果尝试打开过多文件,则最近最少使用的文件将被关闭。如果此设置过低,您最终可能会将数据碎片化为许多小文件。

如果您的进程同时使用其他文件处理程序,无论是通过数据集扫描器还是其他方式,您可能会达到系统文件处理程序限制。例如,如果您正在扫描一个包含 300 个文件的数据集并写入 900 个文件,那么总共 1200 个文件可能超出系统限制。(在 Linux 上,这可能是“打开文件过多”错误。)您可以降低此 max_open_files 设置,或增加系统上的文件处理程序限制。默认值为 900,这允许扫描器在达到默认 Linux 限制 1024 之前打开一定数量的文件。

write_dataset() 中使用的另一个重要配置是 max_rows_per_file

使用 write_dataset()max_rows_per_files 参数设置每个文件中写入的最大行数。

如果 max_rows_per_file 设置大于 0,则这将限制任何单个文件中放置的行数。否则将没有限制,每个输出目录中将创建一个文件,除非需要关闭文件以遵守 max_open_files。此设置是控制文件大小的主要方式。对于写入大量数据的工作负载,如果没有行数上限,文件可能会变得非常大,导致下游读取器出现内存不足错误。行数与文件大小之间的关系取决于数据集模式以及数据压缩(如果压缩了)的程度。

写入时配置每个组的行数#

可以配置每个组写入磁盘的数据量。此配置包括下限和上限。形成行组所需的最小行数由 write_dataset()min_rows_per_group 参数定义。

注意

如果 min_rows_per_group 设置大于 0,这将导致数据集写入器批量处理传入数据,并且仅在累积了足够的行时才将行组写入磁盘。如果其他选项(例如 max_open_filesmax_rows_per_file)强制较小的行组大小,则最终的行组大小可能小于此值。

每个组允许的最大行数由 write_dataset()max_rows_per_group 参数定义。

如果 max_rows_per_group 设置大于 0,则数据集写入器可能会将大量传入批次拆分为多个行组。如果设置了此值,则还应设置 min_rows_per_group,否则您可能会得到非常小的行组(例如,如果传入的行组大小仅略大于此值)。

行组内置于 Parquet 和 IPC/Feather 格式中,但不影响 JSON 或 CSV。在 Arrow 中回读 Parquet 和 IPC 格式时,行组边界成为记录批次边界,决定了下游读取器的默认批次大小。此外,Parquet 文件中的行组具有列统计信息,这可以帮助读取器跳过不相关的数据,但会增加文件大小。作为一个极端示例,如果将 Parquet 中的 max_rows_per_group=1,则文件会很大,因为大部分文件将是行组统计信息。

写入大量数据#

上述示例从表中写入数据。如果要写入大量数据,您可能无法将所有内容加载到单个内存表。幸运的是,write_dataset() 方法也接受记录批次的迭代器。这使得重新分区大型数据集而无需将整个数据集加载到内存中变得非常简单,例如:

In [58]: old_part = ds.partitioning(
   ....:     pa.schema([("c", pa.int16())]), flavor="hive"
   ....: )
   ....: 

In [59]: new_part = ds.partitioning(
   ....:     pa.schema([("c", pa.int16())]), flavor=None
   ....: )
   ....: 

In [60]: input_dataset = ds.dataset("partitioned_dataset", partitioning=old_part)

# A scanner can act as an iterator of record batches but you could also receive
# data from the network (e.g. via flight), from your own scanning, or from any
# other method that yields record batches.  In addition, you can pass a dataset
# into write_dataset directly but this method is useful if you want to customize
# the scanner (e.g. to filter the input dataset or set a maximum batch size)
In [61]: scanner = input_dataset.scanner()

In [62]: ds.write_dataset(scanner, "repartitioned_dataset", format="parquet", partitioning=new_part)

在上述示例运行后,我们的数据将位于 dataset_root/1 和 dataset_root/2 目录中。在这个简单的示例中,我们没有更改数据的结构(只更改了目录命名方案),但您也可以使用此机制来更改用于分区数据集的列。当您希望以特定方式查询数据并且可以利用分区来减少需要读取的数据量时,这会很有用。

自定义和检查写入的文件#

默认情况下,数据集 API 将创建名为“part-i.format”的文件,其中“i”是在写入过程中生成的整数,“format”是 write_dataset 调用中指定的文件格式。对于简单数据集,可能可以知道将创建哪些文件,但对于较大或分区的数据集则不那么容易。file_visitor 关键字可用于提供一个访问器,该访问器将在每次创建文件时被调用:

In [63]: def file_visitor(written_file):
   ....:     print(f"path={written_file.path}")
   ....:     print(f"size={written_file.size} bytes")
   ....:     print(f"metadata={written_file.metadata}")
   ....: 
In [64]: ds.write_dataset(table, "dataset_visited", format="parquet", partitioning=part,
   ....:                  file_visitor=file_visitor)
   ....: 
path=dataset_visited/c=1/part-0.parquet
size=815 bytes
metadata=<pyarrow._parquet.FileMetaData object at 0x7fe02cd5c090>
  created_by: parquet-cpp-arrow version 22.0.0
  num_columns: 2
  num_rows: 5
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 0
path=dataset_visited/c=2/part-0.parquet
size=817 bytes
metadata=<pyarrow._parquet.FileMetaData object at 0x7fe02cd07830>
  created_by: parquet-cpp-arrow version 22.0.0
  num_columns: 2
  num_rows: 5
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 0

这将允许您收集属于数据集的文件名并将其存储在其他地方,这在您下次需要读取数据时想要避免扫描目录时非常有用。它还可以用于生成其他工具(如 Dask 或 Spark)用于创建数据集索引的 _metadata 索引文件。

写入时配置特定于格式的参数#

除了所有格式共享的通用选项外,还有特定于特定格式的格式特定选项。例如,在写入 Parquet 文件时允许截断时间戳:

In [65]: parquet_format = ds.ParquetFileFormat()

In [66]: write_options = parquet_format.make_write_options(allow_truncated_timestamps=True)

In [67]: ds.write_dataset(table, "sample_dataset2", format="parquet", partitioning=part,
   ....:                  file_options=write_options)
   ....: