表格数据集#
pyarrow.dataset 模块提供了高效处理表格型、可能大于内存的以及多文件数据集的功能。其中包括:
一个支持不同来源、不同文件格式及不同文件系统(本地、云端)的统一接口。
源发现(遍历目录、处理基于目录的分区数据集、基础模式规范化等)。
优化读取,支持谓词下推(过滤行)、投影(选择和派生列)以及可选的并行读取。
目前支持的文件格式包括 Parquet、Feather / Arrow IPC、CSV 和 ORC(注意:ORC 数据集目前仅支持读取,暂不支持写入)。未来的目标是将支持扩展到其他文件格式和数据源(例如数据库连接)。
对于熟悉现有的 pyarrow.parquet.ParquetDataset 来读取 Parquet 数据集的用户:pyarrow.dataset 的目标相似,但它不局限于 Parquet 格式,也不绑定于 Python:相同的 Dataset API 也在 R 绑定或 Arrow 中提供。此外,pyarrow.dataset 拥有更高的性能和新功能(例如在文件内部进行过滤,而不仅仅是在分区键上过滤)。
读取数据集#
对于下面的示例,我们先创建一个小型数据集,包含一个包含两个 Parquet 文件的目录。
>>> import tempfile
>>> import pathlib
>>> import pyarrow as pa
>>> import pyarrow.parquet as pq
>>> import numpy as np
>>>
>>> base = pathlib.Path(tempfile.mkdtemp(prefix="pyarrow-"))
>>> (base / "parquet_dataset").mkdir(exist_ok=True)
>>>
>>> # creating an Arrow Table
>>> np.random.seed(0)
>>> table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5})
>>>
>>> # writing it into two parquet files
>>> pq.write_table(table.slice(0, 5), base / "parquet_dataset/data1.parquet")
>>> pq.write_table(table.slice(5, 10), base / "parquet_dataset/data2.parquet")
数据集发现#
Dataset 对象可以使用 dataset() 函数创建。我们可以将包含数据文件的目录路径传递给它。
>>> import pyarrow.dataset as ds
>>> dataset = ds.dataset(base / "parquet_dataset", format="parquet")
>>> dataset
<pyarrow._dataset.FileSystemDataset object at ...>
除了搜索基目录外,dataset() 还接受单个文件路径或文件路径列表。
创建 Dataset 对象本身不会开始读取数据。如果需要,它只会遍历目录以找到所有文件
>>> dataset.files
['.../parquet_dataset/data1.parquet', '.../parquet_dataset/data2.parquet']
……并推断数据集的模式(默认从第一个文件推断)。
>>> print(dataset.schema.to_string(show_field_metadata=False))
a: int64
b: double
c: int64
使用 Dataset.to_table() 方法,我们可以将数据集(或其一部分)读取为 pyarrow Table(注意:根据数据集的大小,这可能需要大量内存,请参阅下文关于过滤/迭代加载的内容)。
>>> dataset.to_table()
pyarrow.Table
a: int64
b: double
c: int64
----
a: [[0,1,2,3,4],[5,6,7,8,9]]
b: [[...],[...]]
c: [[1,2,1,2,1],[2,1,2,1,2]]
>>> # converting to pandas to see the contents of the scanned table
>>> dataset.to_table().to_pandas()
a b c
0 0 1.764052 1
1 1 0.400157 2
2 2 0.978738 1
3 3 2.240893 2
4 4 1.867558 1
5 5 -0.977278 2
6 6 0.950088 1
7 7 -0.151357 2
8 8 -0.103219 1
9 9 0.410599 2
读取不同的文件格式#
上述示例使用 Parquet 文件作为数据集源,但 Dataset API 为多种文件格式和文件系统提供了统一的接口。目前支持 Parquet、ORC、Feather / Arrow IPC 和 CSV 文件格式;未来计划支持更多格式。
如果我们改用 Feather 文件而不是 Parquet 文件来保存表格……
>>> import pyarrow.feather as feather
>>>
>>> feather.write_feather(table, base / "data.feather")
……那么我们可以使用相同的函数读取 Feather 文件,只需指定 format="feather"。
>>> dataset = ds.dataset(base / "data.feather", format="feather")
>>> dataset.to_table().to_pandas().head()
a b c
0 0 1.764052 1
1 1 0.400157 2
2 2 0.978738 1
3 3 2.240893 2
4 4 1.867558 1
自定义文件格式#
字符串格式名称,如
ds.dataset(..., format="parquet")
是默认构建的 ParquetFileFormat 的简写。
ds.dataset(..., format=ds.ParquetFileFormat())
FileFormat 对象可以使用关键字进行自定义。例如:
parquet_format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']})
ds.dataset(..., format=parquet_format)
将配置在扫描时对列 "a" 进行字典编码。
过滤数据#
为了在仅需要数据集子集时避免读取所有数据,可以使用 columns 和 filter 关键字。
columns 关键字可用于仅读取指定的列。
>>> dataset = ds.dataset(base / "parquet_dataset", format="parquet")
>>> dataset.to_table(columns=['a', 'b']).to_pandas()
a b
0 0 1.764052
1 1 0.400157
2 2 0.978738
3 3 2.240893
4 4 1.867558
5 5 -0.977278
6 6 0.950088
7 7 -0.151357
8 8 -0.103219
9 9 0.410599
使用 filter 关键字,不符合过滤谓词的行将不会包含在返回的表中。该关键字期望一个引用至少一个列的布尔型 Expression。
>>> dataset.to_table(filter=ds.field('a') >= 7).to_pandas()
a b c
0 7 -0.151357 2
1 8 -0.103219 1
2 9 0.410599 2
>>> dataset.to_table(filter=ds.field('c') == 2).to_pandas()
a b c
0 1 0.400157 2
1 3 2.240893 2
2 5 -0.977278 2
3 7 -0.151357 2
4 9 0.410599 2
构建这些 Expression 对象最简单的方法是使用 field() 辅助函数。任何列(不仅是分区列)都可以使用 field() 函数进行引用(该函数会创建一个 FieldExpression)。系统提供了运算符重载来组合过滤器,包括比较(相等、大于/小于等)、集合成员资格测试以及布尔组合(&、|、~)。
>>> ds.field('a') != 3
<pyarrow.compute.Expression (a != 3)>
>>> ds.field('a').isin([1, 2, 3])
<pyarrow.compute.Expression is_in(a, {value_set=int64:[
1,
2,
3
], null_matching_behavior=MATCH})>
>>> (ds.field('a') > ds.field('b')) & (ds.field('b') > 1)
<pyarrow.compute.Expression ((a > b) and (b > 1))>
请注意,Expression 对象不能使用 Python 逻辑运算符 and、or 和 not 进行组合。
投影列#
columns 关键字可通过传入列名列表来读取数据集列的子集。该关键字还可以与表达式组合使用,进行更复杂的投影。
在这种情况下,我们传入一个字典,其中键是结果列的名称,值是用于构造列值的表达式。
>>> projection = {
... "a_renamed": ds.field("a"),
... "b_as_float32": ds.field("b").cast("float32"),
... "c_1": ds.field("c") == 1,
... }
>>> dataset.to_table(columns=projection).to_pandas().head()
a_renamed b_as_float32 c_1
0 0 1.764052 True
1 1 0.400157 False
2 2 0.978738 True
3 3 2.240893 False
4 4 1.867558 True
该字典还决定了列的选择(只有字典中的键才会作为列出现在结果表中)。如果您想在现有列的基础上包含派生列,可以根据数据集模式构建字典。
>>> projection = {col: ds.field(col) for col in dataset.schema.names}
>>> projection.update({"b_large": ds.field("b") > 1})
>>> dataset.to_table(columns=projection).to_pandas().head()
a b c b_large
0 0 1.764052 1 True
1 1 0.400157 2 False
2 2 0.978738 1 False
3 3 2.240893 2 True
4 4 1.867558 1 True
读取分区数据#
上面展示了一个由扁平目录结构组成的数据集。然而,数据集可以利用定义分区数据集的嵌套目录结构,其中子目录名称包含了关于该目录中存储了哪些数据子集的信息。
例如,一个按年和月分区的数据集在磁盘上可能看起来像这样:
dataset_name/
year=2007/
month=01/
data0.parquet
data1.parquet
...
month=02/
data0.parquet
data1.parquet
...
month=03/
...
year=2008/
month=01/
...
...
上述分区方案使用了 Apache Hive 中常见的 “/key=value/” 目录名称。
让我们创建一个小型的分区数据集。write_to_dataset() 函数可以写入这种类似 Hive 的分区数据集。
>>> table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5,
... 'part': ['a'] * 5 + ['b'] * 5})
>>> pq.write_to_dataset(table, "parquet_dataset_partitioned",
... partition_cols=['part'])
上述操作创建了一个包含两个子目录(“part=a” 和 “part=b”)的目录,并且这些目录中写入的 Parquet 文件不再包含 “part” 列。
使用 dataset() 读取此数据集时,我们现在通过 partitioning 关键字指定该数据集应使用类似 Hive 的分区方案。
>>> dataset = ds.dataset("parquet_dataset_partitioned", format="parquet",
... partitioning="hive")
>>> dataset.files
['parquet_dataset_partitioned/part=a/...-0.parquet', 'parquet_dataset_partitioned/part=b/...-0.parquet']
虽然分区字段未包含在实际的 Parquet 文件中,但当扫描此数据集时,它们会被添加回结果表中。
>>> dataset.to_table().to_pandas().head(3)
a b c part
0 0 0.144044 1 a
1 1 1.454274 2 a
2 2 0.761038 1 a
我们现在可以在分区键上进行过滤,如果文件不符合过滤条件,这完全避免了加载这些文件。
>>> dataset.to_table(filter=ds.field("part") == "b").to_pandas()
a b c part
0 5 0.333674 2 b
1 6 1.494079 1 b
2 7 -0.205158 2 b
3 8 0.313068 1 b
4 9 -0.854096 2 b
不同的分区方案#
上面的示例使用了类似 Hive 的目录方案,如 “/year=2009/month=11/day=15”。我们通过传入 partitioning="hive" 关键字来指定这一点。在这种情况下,分区键的类型是从文件路径中推断出来的。
还可以使用 partitioning() 函数显式定义分区键的模式。例如:
>>> part = ds.partitioning(
... pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int32())]),
... flavor="hive"
... )
>>> dataset = ds.dataset(..., partitioning=part)
“目录分区”也受到支持,其中文件路径中的段表示分区键的值,而不包含名称(字段名称在段的索引中是隐含的)。例如,给定字段名 “year”、“month” 和 “day”,一个路径可能是 “/2019/11/15”。
由于名称未包含在文件路径中,因此在构建目录分区时必须指定它们。
>>> part = ds.partitioning(field_names=["year", "month", "day"])
目录分区还支持提供完整的模式,而不是从文件路径推断类型。
从云存储读取#
除了本地文件外,pyarrow 还支持从云存储读取。目前支持 HDFS 和 兼容 Amazon S3 的存储。
当传入文件 URI 时,文件系统会被自动推断。例如,指定一个 S3 路径:
>>> dataset = ds.dataset("s3://arrow-datasets/nyc-taxi/")
通常情况下,您需要自定义连接参数,然后可以创建一个文件系统对象并将其传递给 filesystem 关键字。
>>> from pyarrow import fs
>>> s3 = fs.S3FileSystem(region="us-east-1")
>>> dataset = ds.dataset("arrow-datasets/nyc-taxi/", filesystem=s3)
目前可用的类包括 S3FileSystem 和 HadoopFileSystem。有关更多详细信息,请参阅 文件系统接口 文档。
从 Minio 读取#
除了云存储,pyarrow 还支持从模拟 S3 API 的 MinIO 对象存储实例读取。与 toxiproxy 配合使用,这对于测试或基准测试非常有用。
>>> # By default, MinIO will listen for unencrypted HTTP traffic.
>>> minio = fs.S3FileSystem(scheme="http", endpoint_override="localhost:9000")
>>> dataset = ds.dataset("arrow-datasets/nyc-taxi/", filesystem=minio)
使用 Parquet 数据集#
虽然 Datasets API 为不同的文件格式提供了统一的接口,但针对 Parquet 数据集仍存在一些特定的方法。
一些处理框架(如 Dask)在分区数据集(可选)中使用 _metadata 文件,其中包含有关完整数据集的模式和行组元数据的信息。使用此类文件可以更高效地创建 Parquet 数据集,因为它不需要推断模式并遍历目录来查找所有 Parquet 文件(这对于访问文件成本较高的文件系统尤其重要)。parquet_dataset() 函数允许我们从带有 _metadata 文件的分区数据集创建数据集。
>>> dataset = ds.parquet_dataset("/path/to/dir/_metadata")
默认情况下,为 Parquet 数据集构建的 Dataset 对象将每个片段映射到一个单独的 Parquet 文件。如果您希望片段映射到 Parquet 文件的每个行组,可以使用片段的 split_by_row_group() 方法。
>>> fragments = list(dataset.get_fragments())
>>> fragments[0].split_by_row_group()
此方法返回一组映射到原始片段(Parquet 文件)每个行组的新片段。 get_fragments() 和 split_by_row_group() 都接受一个可选的过滤表达式以获取过滤后的片段列表。
手动指定数据集#
dataset() 函数允许轻松创建查看目录的数据集,遍历所有子目录以查找文件和分区信息。然而,有时不需要发现,且数据集的文件和分区已经是已知的(例如,当这些信息存储在元数据中时)。在这种情况下,可以显式创建一个没有任何自动发现或推断的数据集。
对于这里的示例,我们将使用一个文件名包含额外分区信息的数据集。
>>> # creating a dummy dataset: directory with two files
>>> table = pa.table({'col1': range(3), 'col2': np.random.randn(3)})
>>> (base / "parquet_dataset_manual").mkdir(exist_ok=True)
>>> pq.write_table(table, base / "parquet_dataset_manual" / "data_2018.parquet")
>>> pq.write_table(table, base / "parquet_dataset_manual" / "data_2019.parquet")
要从文件列表创建数据集,我们需要手动指定路径、模式、格式、文件系统和分区表达式。
>>> schema = pa.schema([("year", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())])
>>>
>>> dataset = ds.FileSystemDataset.from_paths(
... ["data_2018.parquet", "data_2019.parquet"], schema=schema, format=ds.ParquetFileFormat(),
... filesystem=fs.SubTreeFileSystem(str(base / "parquet_dataset_manual"), fs.LocalFileSystem()),
... partitions=[ds.field('year') == 2018, ds.field('year') == 2019])
由于我们为文件指定了“分区表达式”,当读取数据时,这些信息会作为列具体化,并可用于过滤。
>>> dataset.to_table().to_pandas()
year col1 col2
0 2018 0 -2.552990
1 2018 1 0.653619
2 2018 2 0.864436
3 2019 0 -2.552990
4 2019 1 0.653619
5 2019 2 0.864436
>>> dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
year col1 col2
0 2019 0 -2.552990
1 2019 1 0.653619
2 2019 2 0.864436
手动列出文件的另一个好处是文件的顺序控制了数据的顺序。当执行有序读取(或读取到表格)时,返回的行将与给定的文件顺序匹配。这仅在数据集使用文件列表构建时适用。如果文件是通过扫描目录发现的,则不提供顺序保证。
迭代(外存或流式)读取#
前面的示例演示了如何使用 to_table() 将数据读取到表中。如果数据集很小,或者只需要读取少量数据,这很有用。Dataset API 包含其他方法,可以以流式方式读取和处理大量数据。
最简单的方法是使用 Dataset.to_batches() 方法。此方法返回记录批次的迭代器。例如,我们可以使用此方法计算列的平均值,而无需将整列加载到内存中。
>>> import pyarrow.compute as pc
>>>
>>> col2_sum = 0
>>> count = 0
>>> for batch in dataset.to_batches(columns=["col2"], filter=~ds.field("col2").is_null()):
... col2_sum += pc.sum(batch.column("col2")).as_py()
... count += batch.num_rows
>>> mean_a = col2_sum/count
自定义批次大小#
对数据集的迭代读取通常称为数据集的“扫描”,pyarrow 使用名为 Scanner 的对象来执行此操作。Scanner 由数据集的 to_table() 和 to_batches() 方法自动为您创建。您传递给这些方法的任何参数都将传递给 Scanner 构造函数。
这些参数之一是 batch_size。这控制扫描程序返回的批次的最大大小。如果数据集由小文件组成,或者这些文件本身由小行组组成,批次仍然可能小于 batch_size。例如,每个行组有 10,000 行的 parquet 文件产生的批次最多为 10,000 行,除非 batch_size 设置为更小的值。
默认批次大小为一百万行,通常这是一个很好的默认值,但如果您要读取大量列,可能需要自定义它。
关于事务和 ACID 保证的说明#
Dataset API 不提供事务支持或任何 ACID 保证。这会影响读取和写入。并发读取没有问题。并发写入或写入与读取同时发生可能会导致意外行为。可以使用多种方法来避免对同一文件进行操作,例如为每个写入器使用唯一的基名模板、为新文件使用临时目录,或单独存储文件列表,而不是依赖目录发现。
在写入过程中意外终止进程可能导致系统处于不一致状态。写入调用通常在要写入的字节完全交付给操作系统页面缓存后立即返回。即使写入操作已完成,如果在写入调用后立即发生突然断电,文件的一部分仍可能丢失。
大多数文件格式都有在末尾写入的“魔数”。这意味着可以安全地检测和丢弃部分文件写入。CSV 文件格式没有这样的概念,部分写入的 CSV 文件可能会被检测为有效文件。
写入数据集#
Dataset API 还简化了使用 write_dataset() 将数据写入数据集的过程。当您想要分区数据或需要写入大量数据时,这非常有用。基础数据集写入类似于写入表格,只是您指定目录而不是文件名。
>>> table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 5})
>>> ds.write_dataset(table, "sample_dataset", format="parquet")
上述示例将在我们的 sample_dataset 目录中创建一个名为 part-0.parquet 的单个文件。
警告
如果您再次运行该示例,它将替换现有的 part-0.parquet 文件。要向现有数据集追加文件,需要在每次调用 ds.write_dataset 时指定一个新的 basename_template 以避免覆盖。
写入分区数据#
分区对象可用于指定应如何对输出数据进行分区。这使用与我们读取数据集时使用的相同类型的分区对象。要将上述数据写入分区目录,我们只需要指定我们希望如何对数据集进行分区。例如:
>>> part = ds.partitioning(
... pa.schema([("c", pa.int16())]), flavor="hive"
... )
>>> ds.write_dataset(table, "partitioned_dataset", format="parquet", partitioning=part)
这将创建两个文件。我们一半的数据将位于 dataset_root/c=1 目录中,另一半将位于 dataset_root/c=2 目录中。
分区性能考量#
分区数据集有两个影响性能的方面:它增加了文件数量,并围绕文件创建了目录结构。这两者既有好处也有代价。根据配置和数据集大小,代价可能超过收益。
因为分区将数据集拆分为多个文件,所以分区数据集可以并行读取和写入。然而,每个附加文件都会增加一点文件系统交互处理开销。它还增加了整体数据集大小,因为每个文件都有一些共享元数据。例如,每个 parquet 文件都包含模式和组级统计信息。分区数量是文件数量的下限。如果您按日期对一年的数据进行分区,您将拥有至少 365 个文件。如果您按另一个具有 1,000 个唯一值的维度进一步分区,您将拥有多达 365,000 个文件。这种细粒度的分区通常会导致大量由元数据组成的小文件。
分区数据集创建嵌套文件夹结构,这允许我们在扫描时修剪加载哪些文件。然而,这增加了发现数据集中文件的开销,因为我们需要递归地“列出目录”以找到数据文件。太细的分区可能会在这里导致问题:按日期对一年的数据进行分区需要 365 次列出调用才能找到所有文件;增加另一个基数为 1,000 的列将使该数字达到 365,365 次。
最优化分区布局将取决于您的数据、访问模式以及将要读取数据的系统。大多数系统,包括 Arrow,应该都能在各种文件大小和分区布局下工作,但您应该避免一些极端情况。以下准则可以帮助避免一些已知的最坏情况:
避免文件小于 20MB 和大于 2GB。
避免分区布局中包含超过 10,000 个不同分区。
对于文件中包含组概念的文件格式,例如 Parquet,也适用类似的准则。行组可以在读取时提供并行性,并允许根据统计数据跳过数据,但非常小的组可能会导致元数据在文件大小中占据很大一部分。Arrow 的文件写入器在大多数情况下都为组大小提供了合理的默认值。
配置写入期间打开的文件#
将数据写入磁盘时,有一些参数对于优化写入非常重要,例如每文件的行数和写入期间允许的最大打开文件数。
使用 write_dataset() 的 max_open_files 参数设置最大打开文件数。
如果 max_open_files 设置大于 0,这将限制可以同时保持打开状态的最大文件数。这仅适用于写入分区数据集,其中行根据其分区值分发到相应的文件。如果尝试打开太多文件,最近最少使用的文件将被关闭。如果此设置设置得太低,您可能会将数据碎片化为许多小文件。
如果您的进程同时在使用其他文件句柄(无论是使用数据集扫描程序还是其他方式),您可能会达到系统文件句柄限制。例如,如果您正在扫描包含 300 个文件的数据集并写入 900 个文件,总共 1200 个文件可能会超过系统限制。(在 Linux 上,这可能是“Too Many Open Files”错误。)您可以降低此 max_open_files 设置或增加系统上的文件句柄限制。默认值为 900,允许扫描程序在达到 Linux 默认限制 1024 之前打开一定数量的文件。
write_dataset() 中使用的另一个重要配置是 max_rows_per_file。
使用 write_dataset() 的 max_rows_per_files 参数设置每个文件中写入的最大行数。
如果 max_rows_per_file 设置大于 0,这将限制任何单个文件中的行数。否则,将没有限制,并且除非需要关闭文件以遵守 max_open_files,否则每个输出目录中将仅创建一个文件。此设置是控制文件大小的主要方式。对于写入大量数据的工作负载,如果没有行数上限,文件可能会变得非常大,导致下游读取器出现内存不足错误。行数与文件大小之间的关系取决于数据集模式以及数据的压缩程度(如果有的话)。
配置写入期间的每组行数#
可以配置每组写入磁盘的数据量。此配置包含下限和上限。形成行组所需的最少行数通过 write_dataset() 的 min_rows_per_group 参数定义。
注意
如果 min_rows_per_group 设置大于 0,这将导致数据集写入器批量处理传入数据,并且仅在累积了足够的行后才将行组写入磁盘。如果其他选项(例如 max_open_files 或 max_rows_per_file)强制要求较小的行组大小,则最终行组大小可能小于此值。
每组允许的最大行数通过 write_dataset() 的 max_rows_per_group 参数定义。
如果 max_rows_per_group 设置大于 0,数据集写入器可能会将大型传入批次拆分为多个行组。如果设置了此值,也应设置 min_rows_per_group,否则您可能会得到非常小的行组(例如,如果传入行组的大小刚刚大于此值)。
行组内置于 Parquet 和 IPC/Feather 格式中,但不影响 JSON 或 CSV。在 Arrow 中读取 Parquet 和 IPC 格式时,行组边界成为记录批次边界,决定了下游读取器的默认批次大小。此外,Parquet 文件中的行组具有列统计信息,可以帮助读取器跳过不相关的数据,但会增加文件的大小。作为一个极端的例子,如果一个人在 Parquet 中设置 max_rows_per_group=1,他们将得到很大的文件,因为大多数文件将是行组统计信息。
写入大量数据#
上面的示例从表格中写入数据。如果您要写入大量数据,可能无法将所有内容加载到单个内存中表格中。幸运的是,write_dataset() 方法也接受记录批次的迭代。这使得例如在不将整个数据集加载到内存中的情况下重新分区大型数据集变得非常简单。
>>> old_part = ds.partitioning(
... pa.schema([("c", pa.int16())]), flavor="hive"
... )
>>> new_part = ds.partitioning(
... pa.schema([("c", pa.int16())]), flavor=None
... )
>>> input_dataset = ds.dataset("partitioned_dataset", partitioning=old_part)
>>> # A scanner can act as an iterator of record batches but you could also receive
>>> # data from the network (e.g. via flight), from your own scanning, or from any
>>> # other method that yields record batches. In addition, you can pass a dataset
>>> # into write_dataset directly but this method is useful if you want to customize
>>> # the scanner (e.g. to filter the input dataset or set a maximum batch size)
>>> scanner = input_dataset.scanner()
>>>
>>> ds.write_dataset(scanner, "repartitioned_dataset", format="parquet", partitioning=new_part)
在上述示例运行后,我们的数据将位于 dataset_root/1 和 dataset_root/2 目录中。在这个简单的示例中,我们没有更改数据的结构(仅更改了目录命名模式),但您也可以使用此机制来更改用于对数据集进行分区的列。当您期望以特定方式查询数据,并且可以利用分区来减少需要读取的数据量时,这非常有用。
自定义和检查写入的文件#
默认情况下,Dataset API 将创建名为 “part-i.format” 的文件,其中 “i” 是在写入期间生成的整数,“format” 是在 write_dataset 调用中指定的文件格式。对于简单的数据集,可能可以知道将创建哪些文件,但对于较大或已分区的数据集,这并不容易。file_visitor 关键字可用于提供一个在创建每个文件时都会被调用的访问者。
>>> def file_visitor(written_file):
... print(f"path={written_file.path}")
... print(f"size={written_file.size} bytes")
... print(f"metadata={written_file.metadata}")
>>> ds.write_dataset(table, "dataset_visited", format="parquet", partitioning=part,
... file_visitor=file_visitor)
path=dataset_visited/c=.../part-0.parquet
size=... bytes
metadata=<pyarrow._parquet.FileMetaData object at ...>
created_by: parquet-cpp-arrow version ...
num_columns: 2
num_rows: 5
num_row_groups: 1
format_version: 2.6
serialized_size: 0
path=dataset_visited/c=.../part-0.parquet
size=... bytes
metadata=<pyarrow._parquet.FileMetaData object at ...>
created_by: parquet-cpp-arrow version ...
num_columns: 2
num_rows: 5
num_row_groups: 1
format_version: 2.6
serialized_size: 0
这将允许您收集属于数据集的文件名并将它们存储在其他地方,当您希望在下次需要读取数据时避免扫描目录时,这非常有用。它还可用于生成由其他工具(如 Dask 或 Spark)使用的 _metadata 索引文件,以创建数据集的索引。
在写入期间配置特定于格式的参数#
除了所有格式共享的通用选项外,还有特定于某种格式的独特选项。例如,要在写入 Parquet 文件时允许截断的时间戳:
>>> parquet_format = ds.ParquetFileFormat()
>>> write_options = parquet_format.make_write_options(allow_truncated_timestamps=True)
>>> ds.write_dataset(table, "sample_dataset2", format="parquet", partitioning=part,
... file_options=write_options)