表格数据集#

The pyarrow.dataset 模块提供功能以有效地处理表格数据,这些数据可能大于内存,并且是多文件数据集。 这包括

  • 支持不同来源和文件格式以及不同文件系统(本地、云)的统一接口。

  • 源的发现(爬取目录,处理基于目录的分区数据集,基本模式规范化,..)

  • 通过谓词下推(过滤行)、投影(选择和派生列)以及可选的并行读取来优化读取。

目前支持的文件格式包括 Parquet、Feather/Arrow IPC、CSV 和 ORC(注意 ORC 数据集目前只能读取,还不能写入)。目标是在未来将支持扩展到其他文件格式和数据源(例如数据库连接)。

对于熟悉现有 pyarrow.parquet.ParquetDataset 用于读取 Parquet 数据集的人来说:pyarrow.dataset 的目标类似,但并不局限于 Parquet 格式,也不绑定到 Python:R 绑定或 Arrow 中公开了相同的 Dataset API。此外,pyarrow.dataset 具有改进的性能和新功能(例如,在文件内部进行过滤,而不仅仅是在分区键上进行过滤)。

读取数据集#

对于以下示例,让我们创建一个包含两个 parquet 文件的目录的小型数据集

In [1]: import tempfile

In [2]: import pathlib

In [3]: import pyarrow as pa

In [4]: import pyarrow.parquet as pq

In [5]: import numpy as np

In [6]: base = pathlib.Path(tempfile.mkdtemp(prefix="pyarrow-"))

In [7]: (base / "parquet_dataset").mkdir(exist_ok=True)

# creating an Arrow Table
In [8]: table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5})

# writing it into two parquet files
In [9]: pq.write_table(table.slice(0, 5), base / "parquet_dataset/data1.parquet")

In [10]: pq.write_table(table.slice(5, 10), base / "parquet_dataset/data2.parquet")

数据集发现#

可以使用 Dataset 对象来创建 dataset() 函数。我们可以将包含数据文件的目录路径传递给它

In [11]: import pyarrow.dataset as ds

In [12]: dataset = ds.dataset(base / "parquet_dataset", format="parquet")

In [13]: dataset
Out[13]: <pyarrow._dataset.FileSystemDataset at 0x7f15a07d4d00>

除了搜索基本目录外,dataset() 还接受单个文件的路径或文件路径列表。

创建 Dataset 对象不会开始读取数据本身。如果需要,它只会爬行目录以找到所有文件

In [14]: dataset.files
Out[14]: 
['/tmp/pyarrow-d81qxgsw/parquet_dataset/data1.parquet',
 '/tmp/pyarrow-d81qxgsw/parquet_dataset/data2.parquet']

…并推断数据集的模式(默认情况下从第一个文件推断)

In [15]: print(dataset.schema.to_string(show_field_metadata=False))
a: int64
b: double
c: int64

使用 Dataset.to_table() 方法,我们可以将数据集(或其一部分)读取到 pyarrow 表中(注意,根据数据集的大小,这可能需要大量的内存,请参阅下面的关于过滤/迭代加载的部分)

In [16]: dataset.to_table()
Out[16]: 
pyarrow.Table
a: int64
b: double
c: int64
----
a: [[0,1,2,3,4],[5,6,7,8,9]]
b: [[0.36317825277041943,-0.6529558209491978,-1.9468641250617245,-0.3431508797097218,0.6113909630332509],[0.6886666660186878,-0.8817081403361976,-0.6477052267946208,0.2459231184425388,0.04826541256583963]]
c: [[1,2,1,2,1],[2,1,2,1,2]]

# converting to pandas to see the contents of the scanned table
In [17]: dataset.to_table().to_pandas()
Out[17]: 
   a         b  c
0  0  0.363178  1
1  1 -0.652956  2
2  2 -1.946864  1
3  3 -0.343151  2
4  4  0.611391  1
5  5  0.688667  2
6  6 -0.881708  1
7  7 -0.647705  2
8  8  0.245923  1
9  9  0.048265  2

读取不同的文件格式#

上面的示例使用 Parquet 文件作为数据集源,但 Dataset API 在多种文件格式和文件系统之间提供一致的接口。目前,支持 Parquet、ORC、Feather/Arrow IPC 和 CSV 文件格式;未来计划支持更多格式。

如果我们将表保存为 Feather 文件而不是 Parquet 文件

In [18]: import pyarrow.feather as feather

In [19]: feather.write_feather(table, base / "data.feather")

…那么我们可以使用相同的函数读取 Feather 文件,但要指定 format="feather"

In [20]: dataset = ds.dataset(base / "data.feather", format="feather")

In [21]: dataset.to_table().to_pandas().head()
Out[21]: 
   a         b  c
0  0  0.363178  1
1  1 -0.652956  2
2  2 -1.946864  1
3  3 -0.343151  2
4  4  0.611391  1

自定义文件格式#

格式名称作为字符串,例如

ds.dataset(..., format="parquet")

是默认构造的 ParquetFileFormat 的简写

ds.dataset(..., format=ds.ParquetFileFormat())

可以使用关键字自定义 FileFormat 对象。例如

parquet_format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']})
ds.dataset(..., format=parquet_format)

将配置列 "a" 在扫描时进行字典编码。

过滤数据#

为了避免在只需要子集时读取所有数据,可以使用 columnsfilter 关键字。

可以使用 columns 关键字仅读取指定的列

In [22]: dataset = ds.dataset(base / "parquet_dataset", format="parquet")

In [23]: dataset.to_table(columns=['a', 'b']).to_pandas()
Out[23]: 
   a         b
0  0  0.363178
1  1 -0.652956
2  2 -1.946864
3  3 -0.343151
4  4  0.611391
5  5  0.688667
6  6 -0.881708
7  7 -0.647705
8  8  0.245923
9  9  0.048265

使用 filter 关键字,不匹配过滤谓词的行将不会包含在返回的表中。该关键字期望一个布尔 Expression,该表达式引用至少一个列

In [24]: dataset.to_table(filter=ds.field('a') >= 7).to_pandas()
Out[24]: 
   a         b  c
0  7 -0.647705  2
1  8  0.245923  1
2  9  0.048265  2

In [25]: dataset.to_table(filter=ds.field('c') == 2).to_pandas()
Out[25]: 
   a         b  c
0  1 -0.652956  2
1  3 -0.343151  2
2  5  0.688667  2
3  7 -0.647705  2
4  9  0.048265  2

构造这些 Expression 对象的最简单方法是使用 field() 辅助函数。可以使用 field() 函数引用任何列(不仅仅是分区列)(该函数会创建一个 FieldExpression)。提供了运算符重载来组合过滤器,包括比较(等于、大于/小于等)、集合成员测试和布尔组合(&|~

In [26]: ds.field('a') != 3
Out[26]: <pyarrow.compute.Expression (a != 3)>

In [27]: ds.field('a').isin([1, 2, 3])
Out[27]: 
<pyarrow.compute.Expression is_in(a, {value_set=int64:[
  1,
  2,
  3
], null_matching_behavior=MATCH})>

In [28]: (ds.field('a') > ds.field('b')) & (ds.field('b') > 1)
Out[28]: <pyarrow.compute.Expression ((a > b) and (b > 1))>

注意,Expression 对象不能通过 python 逻辑运算符 andornot 来组合。

投影列#

可以使用 columns 关键字通过向其传递列名称列表来读取数据集的子集。该关键字还可与表达式结合使用,用于更复杂的投影。

在这种情况下,我们将向其传递一个字典,其中键是结果列名,值是用于构造列值的表达式

In [29]: projection = {
   ....:     "a_renamed": ds.field("a"),
   ....:     "b_as_float32": ds.field("b").cast("float32"),
   ....:     "c_1": ds.field("c") == 1,
   ....: }
   ....: 

In [30]: dataset.to_table(columns=projection).to_pandas().head()
Out[30]: 
   a_renamed  b_as_float32    c_1
0          0      0.363178   True
1          1     -0.652956  False
2          2     -1.946864   True
3          3     -0.343151  False
4          4      0.611391   True

该字典还确定列选择(结果表中只会存在字典中的键)。如果你想在现有列中添加派生列,可以从数据集模式构建字典

In [31]: projection = {col: ds.field(col) for col in dataset.schema.names}

In [32]: projection.update({"b_large": ds.field("b") > 1})

In [33]: dataset.to_table(columns=projection).to_pandas().head()
Out[33]: 
   a         b  c  b_large
0  0  0.363178  1    False
1  1 -0.652956  2    False
2  2 -1.946864  1    False
3  3 -0.343151  2    False
4  4  0.611391  1    False

读取分区数据#

上面显示了由包含文件的平面目录组成的数据集。但是,数据集可以利用嵌套的目录结构来定义分区数据集,其中子目录名称包含有关哪个数据子集存储在该目录中的信息。

例如,按年份和月份分区的数据集在磁盘上可能看起来像这样

dataset_name/
  year=2007/
    month=01/
       data0.parquet
       data1.parquet
       ...
    month=02/
       data0.parquet
       data1.parquet
       ...
    month=03/
    ...
  year=2008/
    month=01/
    ...
  ...

上面的分区方案使用的是“/key=value/”目录名,与 Apache Hive 中的目录名一致。

让我们创建一个小型分区数据集。write_to_dataset() 函数可以写入这种 hive 类型的分区数据集。

In [34]: table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5,
   ....:                   'part': ['a'] * 5 + ['b'] * 5})
   ....: 

In [35]: pq.write_to_dataset(table, "parquet_dataset_partitioned",
   ....:                     partition_cols=['part'])
   ....: 

上面创建了一个包含两个子目录(“part=a”和“part=b”)的目录,写入这些目录中的 Parquet 文件不再包含“part”列。

使用 dataset() 读取此数据集时,我们现在指定数据集应该使用 hive 类型的分区方案,并使用 partitioning 关键字

In [36]: dataset = ds.dataset("parquet_dataset_partitioned", format="parquet",
   ....:                      partitioning="hive")
   ....: 

In [37]: dataset.files
Out[37]: 
['parquet_dataset_partitioned/part=a/f5716274a114412ebd1be42cef616bb5-0.parquet',
 'parquet_dataset_partitioned/part=b/f5716274a114412ebd1be42cef616bb5-0.parquet']

虽然分区字段未包含在实际的 Parquet 文件中,但它们将在扫描此数据集时被添加回结果表中

In [38]: dataset.to_table().to_pandas().head(3)
Out[38]: 
   a         b  c part
0  0  0.715366  1    a
1  1  0.207441  2    a
2  2  0.922522  1    a

我们现在可以在分区键上进行过滤,如果它们与过滤器不匹配,则完全避免加载文件

In [39]: dataset.to_table(filter=ds.field("part") == "b").to_pandas()
Out[39]: 
   a         b  c part
0  5 -0.034542  2    b
1  6  0.403384  1    b
2  7 -1.503216  2    b
3  8 -1.517377  1    b
4  9 -0.137676  2    b

不同的分区方案#

上面的示例使用 hive 类型的目录方案,例如“/year=2009/month=11/day=15”。我们通过传递 partitioning="hive" 关键字来指定这一点。在这种情况下,分区键的类型是从文件路径推断出来的。

还可以使用 partitioning() 函数显式定义分区键的模式。例如

part = ds.partitioning(
    pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int32())]),
    flavor="hive"
)
dataset = ds.dataset(..., partitioning=part)

还支持“目录分区”,其中文件路径中的段表示分区键的值,不包括名称(字段名称隐含在段的索引中)。例如,给定字段名称“year”、“month”和“day”,一个路径可能是“/2019/11/15”。

由于名称未包含在文件路径中,因此在构建目录分区时必须指定这些名称

part = ds.partitioning(field_names=["year", "month", "day"])

目录分区还支持提供完整的模式,而不是从文件路径推断类型。

从云存储读取#

除了本地文件之外,pyarrow 还支持从云存储读取。目前,支持 HDFSAmazon S3-compatible storage

传递文件 URI 时,将推断文件系统。例如,指定 S3 路径

dataset = ds.dataset("s3://voltrondata-labs-datasets/nyc-taxi/")

通常,你需要自定义连接参数,然后可以创建文件系统对象并将其传递给 filesystem 关键字

from pyarrow import fs

s3  = fs.S3FileSystem(region="us-east-2")
dataset = ds.dataset("voltrondata-labs-datasets/nyc-taxi/", filesystem=s3)

目前可用的类是 S3FileSystemHadoopFileSystem。有关更多详细信息,请参阅 文件系统接口 文档。

从 Minio 读取#

除了云存储之外,pyarrow 还支持从模拟 S3 API 的 MinIO 对象存储实例读取。与 toxiproxy 配合使用,这对于测试或基准测试非常有用。

from pyarrow import fs

# By default, MinIO will listen for unencrypted HTTP traffic.
minio = fs.S3FileSystem(scheme="http", endpoint_override="localhost:9000")
dataset = ds.dataset("voltrondata-labs-datasets/nyc-taxi/", filesystem=minio)

使用 Parquet 数据集#

虽然 Datasets API 为不同的文件格式提供了统一的接口,但一些特定方法适用于 Parquet 数据集。

一些处理框架,例如 Dask(可选),使用 _metadata 文件来存储分区数据集,其中包含关于整个数据集的模式和行组元数据的相关信息。使用此类文件可以更有效地创建 Parquet 数据集,因为它不需要推断模式并遍历所有 Parquet 文件的目录(对于访问文件成本较高的文件系统来说尤其如此)。parquet_dataset() 函数允许我们从具有 _metadata 文件的分区数据集中创建数据集。

dataset = ds.parquet_dataset("/path/to/dir/_metadata")

默认情况下,为 Parquet 数据集构建的 Dataset 对象将每个片段映射到单个 Parquet 文件。如果您希望片段映射到 Parquet 文件中的每个行组,可以使用片段的 split_by_row_group() 方法。

fragments = list(dataset.get_fragments())
fragments[0].split_by_row_group()

此方法返回一个新的片段列表,这些片段映射到原始片段(Parquet 文件)的每个行组。 get_fragments()split_by_row_group() 都接受可选的过滤器表达式,用于获取过滤后的片段列表。

数据集的手动指定#

dataset() 函数允许轻松创建数据集,它查看目录,遍历所有子目录以查找文件和分区信息。但是,有时不需要发现,并且数据集的文件和分区已经知道(例如,当此信息存储在元数据中时)。在这种情况下,可以显式创建数据集,而无需任何自动发现或推断。

对于此处的示例,我们将使用一个数据集,其中文件名包含额外的分区信息。

# creating a dummy dataset: directory with two files
In [40]: table = pa.table({'col1': range(3), 'col2': np.random.randn(3)})

In [41]: (base / "parquet_dataset_manual").mkdir(exist_ok=True)

In [42]: pq.write_table(table, base / "parquet_dataset_manual" / "data_2018.parquet")

In [43]: pq.write_table(table, base / "parquet_dataset_manual" / "data_2019.parquet")

要从文件列表创建数据集,我们需要手动指定路径、模式、格式、文件系统和分区表达式。

In [44]: from pyarrow import fs

In [45]: schema = pa.schema([("year", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())])

In [46]: dataset = ds.FileSystemDataset.from_paths(
   ....:     ["data_2018.parquet", "data_2019.parquet"], schema=schema, format=ds.ParquetFileFormat(),
   ....:     filesystem=fs.SubTreeFileSystem(str(base / "parquet_dataset_manual"), fs.LocalFileSystem()),
   ....:     partitions=[ds.field('year') == 2018, ds.field('year') == 2019])
   ....: 

由于我们为文件指定了“分区表达式”,因此此信息在读取数据时被物化为列,可用于过滤。

In [47]: dataset.to_table().to_pandas()
Out[47]: 
   year  col1      col2
0  2018     0  0.320099
1  2018     1  1.592948
2  2018     2  0.338350
3  2019     0  0.320099
4  2019     1  1.592948
5  2019     2  0.338350

In [48]: dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
Out[48]: 
   year  col1      col2
0  2019     0  0.320099
1  2019     1  1.592948
2  2019     2  0.338350

手动列出文件的另一个好处是,文件的顺序控制数据的顺序。在执行有序读取(或读取到表中)时,返回的行将与给定的文件顺序匹配。这仅适用于使用文件列表构建数据集的情况。如果文件是通过扫描目录发现的,则不保证顺序。

迭代(内存外或流式)读取#

之前的示例演示了如何使用 to_table() 将数据读取到表中。如果数据集很小或只需要读取少量数据,这将非常有用。数据集 API 包含其他方法以流式方式读取和处理大量数据。

最简单的方法是使用 Dataset.to_batches() 方法。此方法返回一个记录批次的迭代器。例如,我们可以使用此方法计算一列的平均值,而无需将整个列加载到内存中。

In [49]: import pyarrow.compute as pc

In [50]: col2_sum = 0

In [51]: count = 0

In [52]: for batch in dataset.to_batches(columns=["col2"], filter=~ds.field("col2").is_null()):
   ....:     col2_sum += pc.sum(batch.column("col2")).as_py()
   ....:     count += batch.num_rows
   ....: 

In [53]: mean_a = col2_sum/count

自定义批次大小#

对数据集的迭代读取通常被称为数据集的“扫描”,而 pyarrow 使用名为 Scanner 的对象来执行此操作。扫描器由数据集的 to_table()to_batches() 方法自动创建。您传递给这些方法的任何参数都将传递给扫描器构造函数。

其中一个参数是 batch_size。这控制扫描器返回的批次的最大大小。如果数据集由小文件组成,或者这些文件本身由小行组组成,那么批次仍然可以小于 batch_size。例如,一个每个行组有 10,000 行的 Parquet 文件将产生批次,最多包含 10,000 行,除非将 batch_size 设置为更小的值。

默认批次大小为一百万行,这通常是一个不错的默认值,但如果您正在读取大量列,您可能需要自定义它。

关于事务和 ACID 保证的说明#

数据集 API 不提供事务支持或任何 ACID 保证。这会影响读取和写入。并发读取是可以的。并发写入或与读取并发写入可能会产生意外行为。可以使用各种方法来避免对相同文件进行操作,例如为每个写入程序使用唯一的基名模板、为新文件使用临时目录,或将文件列表单独存储,而不是依赖目录发现。

在写入过程中意外终止进程可能会使系统处于不一致状态。写入调用通常在要写入的字节完全传递到操作系统页缓存后立即返回。即使写入操作已完成,如果在写入调用后立即发生突然断电,则文件的一部分可能会丢失。

大多数文件格式在文件末尾都有魔数。这意味着可以安全地检测到部分文件写入并将其丢弃。CSV 文件格式没有任何这样的概念,部分写入的 CSV 文件可能会被检测为有效。

写入数据集#

数据集 API 还使用 write_dataset() 简化了将数据写入数据集的操作。当您想对数据进行分区或需要写入大量数据时,这很有用。基本的 dataset 写入类似于写入表,只是您指定目录而不是文件名。

In [54]: table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 5})

In [55]: ds.write_dataset(table, "sample_dataset", format="parquet")

上面的示例将在我们的 sample_dataset 目录中创建一个名为 part-0.parquet 的单个文件。

警告

如果您再次运行此示例,它将替换现有的 part-0.parquet 文件。将文件追加到现有数据集需要为每次对 ds.write_dataset 的调用指定一个新的 basename_template,以避免覆盖。

写入分区数据#

分区对象可用于指定输出数据的分区方式。这使用了与我们用于读取数据集的相同类型的分区对象。要将上述数据写入分区目录,我们只需要指定希望数据集如何进行分区。例如

In [56]: part = ds.partitioning(
   ....:     pa.schema([("c", pa.int16())]), flavor="hive"
   ....: )
   ....: 

In [57]: ds.write_dataset(table, "partitioned_dataset", format="parquet", partitioning=part)

这将创建两个文件。我们一半的数据将位于 dataset_root/c=1 目录中,另一半将位于 dataset_root/c=2 目录中。

分区性能考虑因素#

分区数据集有两个影响性能的方面:它增加了文件数量,并在文件周围创建目录结构。这两个方面都有利有弊。根据配置和数据集的大小,成本可能超过收益。

因为分区将数据集拆分为多个文件,所以分区数据集可以并行读取和写入。但是,每个附加文件都会为文件系统交互增加一些处理开销。它还会增加数据集的总大小,因为每个文件都有一些共享的元数据。例如,每个 parquet 文件都包含模式和组级统计信息。分区数是文件数的下限。如果您按日期对一年的数据进行分区,您将至少有 365 个文件。如果您按另一个具有 1,000 个唯一值的维度进一步分区,您将最多有 365,000 个文件。这种精细的分区通常会导致小文件,这些文件主要由元数据组成。

分区数据集会创建嵌套的文件夹结构,这些结构允许我们修剪扫描中加载的文件。但是,这会增加发现数据集中的文件所需的开销,因为我们需要递归地“列出目录”才能找到数据文件。过于精细的分区会导致问题:按日期对一年内的數據进行分区将需要 365 次列表调用才能找到所有文件;添加另一个基数为 1,000 的列将使之达到 365,365 次调用。

最优的分区布局将取决于您的数据、访问模式以及哪些系统将读取数据。大多数系统(包括 Arrow)应该可以在各种文件大小和分区布局中工作,但有一些极端情况应该避免。这些指南可以帮助避免一些已知的 worst cases。

  • 避免小于 20MB 和大于 2GB 的文件。

  • 避免分区布局具有超过 10,000 个不同分区的布局。

对于具有文件内组概念的文件格式(例如 Parquet),类似的准则也适用。行组在读取时可以提供并行性,并允许基于统计信息跳过数据,但非常小的组会导致元数据成为文件大小的很大一部分。在大多数情况下,Arrow 的文件写入器为组大小提供了合理的默认值。

配置写入期间打开的文件#

将数据写入磁盘时,有一些参数可以优化写入操作,例如每个文件的行数和写入期间允许的最大打开文件数。

使用 max_open_files 参数设置最大打开文件数 write_dataset()

如果 max_open_files 设置大于 0,则它将限制可以保持打开状态的最大文件数。这仅适用于写入分区数据集,其中行根据其分区值分派到相应的文件。如果尝试打开太多文件,则最近最少使用 (LRU) 的文件将被关闭。如果此设置设置过低,你可能会将数据分散到许多小文件中。

如果你的进程同时使用其他文件句柄(无论是使用数据集扫描器还是其他方式),你可能会遇到系统文件句柄限制。例如,如果你正在扫描包含 300 个文件的数据集并写入 900 个文件,那么总共 1200 个文件可能会超过系统限制。(在 Linux 上,这可能是“打开文件过多”错误。)你可以减少 max_open_files 设置或增加系统上的文件句柄限制。默认值为 900,允许扫描器在达到 Linux 默认限制 1024 之前打开一定数量的文件。

另一个在 write_dataset() 中使用的重要配置是 max_rows_per_file

使用 max_rows_per_files 参数设置每个文件中写入的最大行数 write_dataset()

如果 max_rows_per_file 设置大于 0,则它将限制放置在任何单个文件中的行数。否则将没有限制,并且将在每个输出目录中创建一个文件,除非需要关闭文件以尊重 max_open_files。此设置是控制文件大小的主要方式。对于写入大量数据的负载,文件在没有行计数限制的情况下会变得非常大,从而导致下游读取器出现内存不足错误。行计数和文件大小之间的关系取决于数据集模式以及数据压缩程度(如果有)。

配置写入期间每个组的行数#

可以配置写入磁盘的每个组的数据量。此配置包括一个下限和一个上限。使用 min_rows_per_group 参数定义形成行组所需的最小行数 write_dataset()

注意

如果 min_rows_per_group 设置大于 0,则它将导致数据集写入器对传入数据进行批处理,并且仅在累积了足够多的行时才将行组写入磁盘。如果其他选项(例如 max_open_filesmax_rows_per_file)强制使用更小的行组大小,则最终的行组大小可能小于此值。

使用 max_rows_per_group 参数定义每个组允许的最大行数 write_dataset()

如果 max_rows_per_group 设置大于 0,则数据集写入器可能会将大型传入批处理拆分为多个行组。如果设置了此值,则还应设置 min_rows_per_group,否则你可能会最终得到非常小的行组(例如,如果传入行组大小仅略大于此值)。

行组内置于 Parquet 和 IPC/Feather 格式中,但不影响 JSON 或 CSV。在 Arrow 中读回 Parquet 和 IPC 格式时,行组边界将成为记录批处理边界,从而确定下游读取器的默认批处理大小。此外,Parquet 文件中的行组具有列统计信息,可以帮助读取器跳过不相关的數據,但会增加文件大小。举个极端的例子,如果在 Parquet 中将 max_rows_per_group=1,则文件会很大,因为大多数文件将是行组统计信息。

写入大量数据#

上面的示例从表格中写入数据。如果你正在写入大量数据,你可能无法将所有数据加载到单个内存中表格中。幸运的是,write_dataset() 方法还接受记录批处理的迭代器。这使得重新分区大型数据集变得非常简单,例如,无需将整个数据集加载到内存中。

In [58]: old_part = ds.partitioning(
   ....:     pa.schema([("c", pa.int16())]), flavor="hive"
   ....: )
   ....: 

In [59]: new_part = ds.partitioning(
   ....:     pa.schema([("c", pa.int16())]), flavor=None
   ....: )
   ....: 

In [60]: input_dataset = ds.dataset("partitioned_dataset", partitioning=old_part)

# A scanner can act as an iterator of record batches but you could also receive
# data from the network (e.g. via flight), from your own scanning, or from any
# other method that yields record batches.  In addition, you can pass a dataset
# into write_dataset directly but this method is useful if you want to customize
# the scanner (e.g. to filter the input dataset or set a maximum batch size)
In [61]: scanner = input_dataset.scanner()

In [62]: ds.write_dataset(scanner, "repartitioned_dataset", format="parquet", partitioning=new_part)

在上面的示例运行后,我们的数据将位于 dataset_root/1 和 dataset_root/2 目录中。在这个简单的示例中,我们没有改变数据的结构(只有目录命名方案),但你也可以使用这种机制来改变用于对数据集进行分区的列。当你期望以特定方式查询你的数据时,这很有用,你可以利用分区来减少需要读取的数据量。

自定义和检查写入的文件#

默认情况下,数据集 API 将创建名为“part-i.format”的文件,其中“i”是在写入过程中生成的整数,“format”是在 write_dataset 调用中指定的文件格式。对于简单的数据集,你可能可以知道将创建哪些文件,但对于较大的或分区的数据集,这并不容易。可以使用 file_visitor 关键字来提供一个访问器,该访问器将在创建每个文件时被调用。

In [63]: def file_visitor(written_file):
   ....:     print(f"path={written_file.path}")
   ....:     print(f"size={written_file.size} bytes")
   ....:     print(f"metadata={written_file.metadata}")
   ....: 
In [64]: ds.write_dataset(table, "dataset_visited", format="parquet", partitioning=part,
   ....:                  file_visitor=file_visitor)
   ....: 
path=dataset_visited/c=1/part-0.parquet
size=802 bytes
metadata=<pyarrow._parquet.FileMetaData object at 0x7f15a0639080>
  created_by: parquet-cpp-arrow version 18.0.0-SNAPSHOT
  num_columns: 2
  num_rows: 5
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 0
path=dataset_visited/c=2/part-0.parquet
size=804 bytes
metadata=<pyarrow._parquet.FileMetaData object at 0x7f15a0639080>
  created_by: parquet-cpp-arrow version 18.0.0-SNAPSHOT
  num_columns: 2
  num_rows: 5
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 0

这将允许你收集属于数据集的文件名并将它们存储在其他位置,这在你下次需要读取数据时想要避免扫描目录时非常有用。它还可以用于生成 _metadata 索引文件,该文件由其他工具(如 Dask 或 Spark)使用来创建数据集的索引。

在写入期间配置特定于格式的参数#

除了所有格式共享的常见选项之外,还有一些特定于格式的选项,这些选项是特定格式所独有的。例如,要允许在写入 Parquet 文件时截断时间戳。

In [65]: parquet_format = ds.ParquetFileFormat()

In [66]: write_options = parquet_format.make_write_options(allow_truncated_timestamps=True)

In [67]: ds.write_dataset(table, "sample_dataset2", format="parquet", partitioning=part,
   ....:                  file_options=write_options)
   ....: