表格数据集#
pyarrow.dataset
模块提供了高效处理表格数据、可能大于内存的数据和多文件数据集的功能。这包括:
支持不同来源、文件格式和文件系统(本地、云)的统一接口。
发现来源(爬取目录、处理基于目录的分区数据集、基本模式规范化等)
通过谓词下推(过滤行)、投影(选择和派生列)以及可选的并行读取进行优化读取。
目前支持的文件格式包括 Parquet、Feather / Arrow IPC、CSV 和 ORC(请注意,ORC 数据集目前只能读取,还不能写入)。未来的目标是将支持扩展到其他文件格式和数据源(例如,数据库连接)。
对于那些熟悉现有的用于读取 Parquet 数据集的 pyarrow.parquet.ParquetDataset
的人:pyarrow.dataset
的目标类似,但不特定于 Parquet 格式,也不绑定到 Python:相同的 datasets API 在 R 绑定或 Arrow 中公开。此外,pyarrow.dataset
具有改进的性能和新功能(例如,在文件内进行过滤,而不仅仅是在分区键上进行过滤)。
读取数据集#
对于下面的示例,让我们创建一个由包含两个 parquet 文件的目录组成的小型数据集
In [1]: import tempfile
In [2]: import pathlib
In [3]: import pyarrow as pa
In [4]: import pyarrow.parquet as pq
In [5]: import numpy as np
In [6]: base = pathlib.Path(tempfile.mkdtemp(prefix="pyarrow-"))
In [7]: (base / "parquet_dataset").mkdir(exist_ok=True)
# creating an Arrow Table
In [8]: table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5})
# writing it into two parquet files
In [9]: pq.write_table(table.slice(0, 5), base / "parquet_dataset/data1.parquet")
In [10]: pq.write_table(table.slice(5, 10), base / "parquet_dataset/data2.parquet")
数据集发现#
可以使用 Dataset
对象通过 dataset()
函数创建。我们可以将包含数据文件的目录路径传递给它
In [11]: import pyarrow.dataset as ds
In [12]: dataset = ds.dataset(base / "parquet_dataset", format="parquet")
In [13]: dataset
Out[13]: <pyarrow._dataset.FileSystemDataset at 0x7f9db4111240>
除了搜索基本目录之外,dataset()
还接受单个文件或文件路径列表的路径。
创建 Dataset
对象不会开始读取数据本身。如果需要,它只会爬取目录以查找所有文件
In [14]: dataset.files
Out[14]:
['/tmp/pyarrow-8b_l1h__/parquet_dataset/data1.parquet',
'/tmp/pyarrow-8b_l1h__/parquet_dataset/data2.parquet']
... 并推断数据集的模式(默认情况下从第一个文件)
In [15]: print(dataset.schema.to_string(show_field_metadata=False))
a: int64
b: double
c: int64
使用 Dataset.to_table()
方法,我们可以将数据集(或其中的一部分)读取到 pyarrow Table 中(请注意,根据数据集的大小,这可能需要大量内存,请参阅下面的过滤/迭代加载)
In [16]: dataset.to_table()
Out[16]:
pyarrow.Table
a: int64
b: double
c: int64
----
a: [[0,1,2,3,4],[5,6,7,8,9]]
b: [[1.9394317933005774,0.7962843149363266,1.6151988276864138,0.8025323995068575,1.7699682337534164],[1.557490334058008,1.659216859118146,1.5597929274748277,-0.078461972813819,-0.3181662401630221]]
c: [[1,2,1,2,1],[2,1,2,1,2]]
# converting to pandas to see the contents of the scanned table
In [17]: dataset.to_table().to_pandas()
Out[17]:
a b c
0 0 1.939432 1
1 1 0.796284 2
2 2 1.615199 1
3 3 0.802532 2
4 4 1.769968 1
5 5 1.557490 2
6 6 1.659217 1
7 7 1.559793 2
8 8 -0.078462 1
9 9 -0.318166 2
读取不同的文件格式#
上面的示例使用 Parquet 文件作为数据集来源,但 Dataset API 提供了跨多个文件格式和文件系统的一致接口。目前,支持 Parquet、ORC、Feather / Arrow IPC 和 CSV 文件格式;未来计划支持更多格式。
如果我们把表格保存为 Feather 文件而不是 Parquet 文件
In [18]: import pyarrow.feather as feather
In [19]: feather.write_feather(table, base / "data.feather")
... 那么我们可以使用相同的函数读取 Feather 文件,但需要指定 format="feather"
In [20]: dataset = ds.dataset(base / "data.feather", format="feather")
In [21]: dataset.to_table().to_pandas().head()
Out[21]:
a b c
0 0 1.939432 1
1 1 0.796284 2
2 2 1.615199 1
3 3 0.802532 2
4 4 1.769968 1
自定义文件格式#
像这样的字符串格式名称
ds.dataset(..., format="parquet")
是默认构造的 ParquetFileFormat
的简写
ds.dataset(..., format=ds.ParquetFileFormat())
可以使用关键字自定义 FileFormat
对象。例如
parquet_format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']})
ds.dataset(..., format=parquet_format)
将配置列 "a"
在扫描时进行字典编码。
过滤数据#
为了避免在只需要子集时读取所有数据,可以使用 columns
和 filter
关键字。
columns
关键字可用于仅读取指定的列
In [22]: dataset = ds.dataset(base / "parquet_dataset", format="parquet")
In [23]: dataset.to_table(columns=['a', 'b']).to_pandas()
Out[23]:
a b
0 0 1.939432
1 1 0.796284
2 2 1.615199
3 3 0.802532
4 4 1.769968
5 5 1.557490
6 6 1.659217
7 7 1.559793
8 8 -0.078462
9 9 -0.318166
使用 filter
关键字,不符合过滤谓词的行将不会包含在返回的表中。该关键字需要一个布尔 Expression
,该表达式至少引用一个列
In [24]: dataset.to_table(filter=ds.field('a') >= 7).to_pandas()
Out[24]:
a b c
0 7 1.559793 2
1 8 -0.078462 1
2 9 -0.318166 2
In [25]: dataset.to_table(filter=ds.field('c') == 2).to_pandas()
Out[25]:
a b c
0 1 0.796284 2
1 3 0.802532 2
2 5 1.557490 2
3 7 1.559793 2
4 9 -0.318166 2
构造这些 Expression
对象的最简单方法是使用 field()
辅助函数。可以使用 field()
函数引用任何列 - 不仅仅是分区列(该函数创建一个 FieldExpression
)。提供了运算符重载来组合过滤器,包括比较(等于、大于/小于等)、集合成员测试和布尔组合(&
、|
、~
)
In [26]: ds.field('a') != 3
Out[26]: <pyarrow.compute.Expression (a != 3)>
In [27]: ds.field('a').isin([1, 2, 3])
Out[27]:
<pyarrow.compute.Expression is_in(a, {value_set=int64:[
1,
2,
3
], null_matching_behavior=MATCH})>
In [28]: (ds.field('a') > ds.field('b')) & (ds.field('b') > 1)
Out[28]: <pyarrow.compute.Expression ((a > b) and (b > 1))>
请注意,Expression
对象不能通过 python 逻辑运算符 and
、or
和 not
组合。
投影列#
columns
关键字可用于通过传递列名列表来读取数据集的列的子集。该关键字还可以与表达式结合使用,用于更复杂的投影。
在这种情况下,我们传递一个字典,其中键是生成的列名,值是用于构造列值的表达式
In [29]: projection = {
....: "a_renamed": ds.field("a"),
....: "b_as_float32": ds.field("b").cast("float32"),
....: "c_1": ds.field("c") == 1,
....: }
....:
In [30]: dataset.to_table(columns=projection).to_pandas().head()
Out[30]:
a_renamed b_as_float32 c_1
0 0 1.939432 True
1 1 0.796284 False
2 2 1.615199 True
3 3 0.802532 False
4 4 1.769968 True
该字典还确定了列的选择(只有字典中的键才会作为列出现在结果表中)。如果要在现有列的基础上包含派生列,则可以从数据集模式构建字典
In [31]: projection = {col: ds.field(col) for col in dataset.schema.names}
In [32]: projection.update({"b_large": ds.field("b") > 1})
In [33]: dataset.to_table(columns=projection).to_pandas().head()
Out[33]:
a b c b_large
0 0 1.939432 1 True
1 1 0.796284 2 False
2 2 1.615199 1 True
3 3 0.802532 2 False
4 4 1.769968 1 True
读取分区数据#
上面显示了一个由包含文件的扁平目录组成的数据集。但是,数据集可以利用嵌套的目录结构来定义分区数据集,其中子目录名称包含有关存储在该目录中的数据子集的信息。
例如,按年和月分区的数据集在磁盘上可能如下所示
dataset_name/
year=2007/
month=01/
data0.parquet
data1.parquet
...
month=02/
data0.parquet
data1.parquet
...
month=03/
...
year=2008/
month=01/
...
...
上述分区方案使用 “/key=value/” 目录名,如 Apache Hive 中所见。
让我们创建一个小的分区数据集。write_to_dataset()
函数可以写入这种类似 hive 的分区数据集。
In [34]: table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5,
....: 'part': ['a'] * 5 + ['b'] * 5})
....:
In [35]: pq.write_to_dataset(table, "parquet_dataset_partitioned",
....: partition_cols=['part'])
....:
上面创建了一个包含两个子目录(“part=a” 和 “part=b”)的目录,并且写入这些目录的 Parquet 文件不再包含 “part” 列。
使用 dataset()
读取此数据集时,我们现在指定数据集应使用带有 partitioning
关键字的类似 hive 的分区方案
In [36]: dataset = ds.dataset("parquet_dataset_partitioned", format="parquet",
....: partitioning="hive")
....:
In [37]: dataset.files
Out[37]:
['parquet_dataset_partitioned/part=a/964f3af907e5429eb3d2f8f32403c0cc-0.parquet',
'parquet_dataset_partitioned/part=b/964f3af907e5429eb3d2f8f32403c0cc-0.parquet']
虽然分区字段未包含在实际的 Parquet 文件中,但在扫描此数据集时,它们将被添加回结果表
In [38]: dataset.to_table().to_pandas().head(3)
Out[38]:
a b c part
0 0 -0.115643 1 a
1 1 -0.082892 2 a
2 2 1.628449 1 a
我们现在可以根据分区键进行过滤,如果它们不匹配过滤器,则可以完全避免加载文件
In [39]: dataset.to_table(filter=ds.field("part") == "b").to_pandas()
Out[39]:
a b c part
0 5 -0.535057 2 b
1 6 2.260900 1 b
2 7 -1.976659 2 b
3 8 -2.247073 1 b
4 9 -0.563857 2 b
不同的分区方案#
上面的示例使用类似 hive 的目录方案,例如 “/year=2009/month=11/day=15”。我们通过传递 partitioning="hive"
关键字来指定这一点。在这种情况下,分区键的类型是从文件路径推断出来的。
也可以使用 partitioning()
函数显式定义分区键的模式。例如
part = ds.partitioning(
pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int32())]),
flavor="hive"
)
dataset = ds.dataset(..., partitioning=part)
还支持“目录分区”,其中文件路径中的段表示分区键的值,但不包括名称(字段名称隐式包含在段的索引中)。例如,给定字段名称“year”、“month”和“day”,一个路径可能是“/2019/11/15”。
由于名称未包含在文件路径中,因此在构造目录分区时必须指定这些名称
part = ds.partitioning(field_names=["year", "month", "day"])
目录分区还支持提供完整的 schema,而不是从文件路径推断类型。
从云存储读取#
除了本地文件,pyarrow 还支持从云存储读取。目前,支持HDFS
和 Amazon S3-compatible storage
。
传递文件 URI 时,将推断文件系统。例如,指定一个 S3 路径
dataset = ds.dataset("s3://voltrondata-labs-datasets/nyc-taxi/")
通常,您需要自定义连接参数,然后可以创建一个文件系统对象并将其传递给 filesystem
关键字
from pyarrow import fs
s3 = fs.S3FileSystem(region="us-east-2")
dataset = ds.dataset("voltrondata-labs-datasets/nyc-taxi/", filesystem=s3)
当前可用的类是 S3FileSystem
和 HadoopFileSystem
。有关更多详细信息,请参见 文件系统接口 文档。
从 Minio 读取#
除了云存储,pyarrow 还支持从模拟 S3 API 的 MinIO 对象存储实例读取。与 toxiproxy 搭配使用,这对于测试或基准测试非常有用。
from pyarrow import fs
# By default, MinIO will listen for unencrypted HTTP traffic.
minio = fs.S3FileSystem(scheme="http", endpoint_override="localhost:9000")
dataset = ds.dataset("voltrondata-labs-datasets/nyc-taxi/", filesystem=minio)
使用 Parquet 数据集#
虽然 Datasets API 为不同的文件格式提供了一个统一的接口,但对于 Parquet 数据集,还存在一些特定的方法。
一些处理框架,例如 Dask(可选),使用包含有关完整数据集的 schema 和行组元数据信息的 _metadata
文件来处理分区数据集。使用这样的文件可以更有效地创建 parquet Dataset,因为它不需要推断 schema 并爬取目录以查找所有 Parquet 文件(对于访问文件成本很高的文件系统来说尤其如此)。parquet_dataset()
函数允许我们从带有 _metadata
文件的分区数据集创建一个 Dataset
dataset = ds.parquet_dataset("/path/to/dir/_metadata")
默认情况下,为 Parquet 数据集构造的 Dataset
对象将每个 fragment 映射到一个 Parquet 文件。如果希望 fragments 映射到 Parquet 文件的每个行组,则可以使用 fragments 的 split_by_row_group()
方法
fragments = list(dataset.get_fragments())
fragments[0].split_by_row_group()
此方法返回一个新 Fragment 列表,该列表映射到原始 Fragment(Parquet 文件)的每个行组。get_fragments()
和 split_by_row_group()
都接受可选的过滤表达式,以获取过滤后的 fragment 列表。
手动指定数据集#
dataset()
函数允许轻松创建 Dataset,查看目录,爬取所有子目录以查找文件和分区信息。但是,有时不需要发现,并且数据集的文件和分区已经是已知的(例如,当此信息存储在元数据中时)。在这种情况下,可以显式地创建 Dataset,而无需任何自动发现或推断。
对于此处的示例,我们将使用一个文件名包含其他分区信息的数据集
# creating a dummy dataset: directory with two files
In [40]: table = pa.table({'col1': range(3), 'col2': np.random.randn(3)})
In [41]: (base / "parquet_dataset_manual").mkdir(exist_ok=True)
In [42]: pq.write_table(table, base / "parquet_dataset_manual" / "data_2018.parquet")
In [43]: pq.write_table(table, base / "parquet_dataset_manual" / "data_2019.parquet")
要从文件列表创建 Dataset,我们需要手动指定路径、schema、格式、文件系统和分区表达式
In [44]: from pyarrow import fs
In [45]: schema = pa.schema([("year", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())])
In [46]: dataset = ds.FileSystemDataset.from_paths(
....: ["data_2018.parquet", "data_2019.parquet"], schema=schema, format=ds.ParquetFileFormat(),
....: filesystem=fs.SubTreeFileSystem(str(base / "parquet_dataset_manual"), fs.LocalFileSystem()),
....: partitions=[ds.field('year') == 2018, ds.field('year') == 2019])
....:
由于我们为文件指定了“分区表达式”,因此在读取数据时,此信息将具体化为列,并可用于过滤
In [47]: dataset.to_table().to_pandas()
Out[47]:
year col1 col2
0 2018 0 0.321222
1 2018 1 -0.910920
2 2018 2 -0.173688
3 2019 0 0.321222
4 2019 1 -0.910920
5 2019 2 -0.173688
In [48]: dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
Out[48]:
year col1 col2
0 2019 0 0.321222
1 2019 1 -0.910920
2 2019 2 -0.173688
手动列出文件的另一个好处是,文件的顺序控制数据的顺序。当执行有序读取(或读取到表)时,返回的行将与给定的文件顺序匹配。这仅适用于使用文件列表构造数据集时。如果通过扫描目录来发现文件,则不保证任何顺序。
迭代(核外或流式)读取#
前面的示例演示了如何使用 to_table()
将数据读取到表中。如果数据集很小或者只需要读取少量数据,这将非常有用。dataset API 包含其他方法,以流式方式读取和处理大量数据。
最简单的方法是使用方法 Dataset.to_batches()
。此方法返回记录批次的迭代器。例如,我们可以使用此方法来计算列的平均值,而无需将整个列加载到内存中
In [49]: import pyarrow.compute as pc
In [50]: col2_sum = 0
In [51]: count = 0
In [52]: for batch in dataset.to_batches(columns=["col2"], filter=~ds.field("col2").is_null()):
....: col2_sum += pc.sum(batch.column("col2")).as_py()
....: count += batch.num_rows
....:
In [53]: mean_a = col2_sum/count
自定义批次大小#
数据集的迭代读取通常称为数据集的“扫描”,pyarrow 使用一个名为 Scanner
的对象来执行此操作。Dataset 的 to_table()
和 to_batches()
方法会自动为您创建一个 Scanner。您传递给这些方法的任何参数都将传递给 Scanner 构造函数。
其中一个参数是 batch_size
。这控制扫描程序返回的批次的最大大小。如果数据集由小文件组成,或者这些文件本身由小行组组成,则批次仍然可以小于 batch_size
。例如,每个行组有 10,000 行的 parquet 文件将生成最多包含 10,000 行的批次,除非将 batch_size
设置为较小的值。
默认批次大小为一百万行,这通常是一个很好的默认值,但如果您要读取大量列,则可能需要自定义它。
关于事务和 ACID 保证的说明#
dataset API 不提供事务支持或任何 ACID 保证。这会影响读取和写入。并发读取是可以的。与读取同时进行的并发写入或写入可能会产生意外的行为。可以使用各种方法来避免操作相同的文件,例如为每个写入器使用唯一的 basename 模板,为新文件使用临时目录,或者单独存储文件列表,而不是依赖目录发现。
在写入过程中意外终止进程可能会使系统处于不一致的状态。写入调用通常会在要写入的字节已完全传递到 OS 页面缓存后立即返回。即使写入操作已完成,如果在写入调用后立即发生突然断电,仍可能丢失部分文件。
大多数文件格式都有在末尾写入的幻数。这意味着可以安全地检测和丢弃部分文件写入。CSV 文件格式没有任何此类概念,并且部分写入的 CSV 文件可能会被检测为有效。
写入数据集#
dataset API 还简化了使用 write_dataset()
将数据写入数据集的过程。当您想要对数据进行分区或需要写入大量数据时,这非常有用。基本的数据集写入类似于写入表,只是您指定一个目录而不是文件名。
In [54]: table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 5})
In [55]: ds.write_dataset(table, "sample_dataset", format="parquet")
上面的示例将在我们的 sample_dataset 目录中创建一个名为 part-0.parquet 的文件。
警告
如果您再次运行该示例,它将替换现有的 part-0.parquet 文件。将文件附加到现有数据集需要为每次调用 ds.write_dataset
指定一个新的 basename_template
,以避免覆盖。
写入分区数据#
可以使用分区对象来指定应如何对输出数据进行分区。这使用与用于读取数据集的分区对象相同的类型。要将上面的数据写入分区目录,我们只需要指定希望如何对数据集进行分区即可。例如
In [56]: part = ds.partitioning(
....: pa.schema([("c", pa.int16())]), flavor="hive"
....: )
....:
In [57]: ds.write_dataset(table, "partitioned_dataset", format="parquet", partitioning=part)
这将创建两个文件。一半的数据将位于 dataset_root/c=1 目录中,另一半将位于 dataset_root/c=2 目录中。
分区性能注意事项#
分区数据集有两个方面会影响性能:它增加了文件数量,并围绕文件创建了一个目录结构。这两者都有好处和成本。根据配置和数据集的大小,成本可能会超过收益。
由于分区将数据集拆分为多个文件,因此可以使用并行性读取和写入分区数据集。但是,每个附加文件都会增加文件系统交互处理中的一些开销。它还会增加整体数据集大小,因为每个文件都有一些共享的元数据。例如,每个 parquet 文件都包含 schema 和组级统计信息。分区数是文件数的下限。如果您按日期对数据集进行分区,并包含一年的数据,您将至少拥有 365 个文件。如果您进一步按另一个维度(包含 1,000 个唯一值)进行分区,您将最多拥有 365,000 个文件。这种细粒度的分区通常会导致小文件,这些文件主要由元数据组成。
分区数据集会创建嵌套的文件夹结构,这允许我们裁剪扫描中加载的文件。然而,这会增加发现数据集中的文件的开销,因为我们需要递归地“列出目录”才能找到数据文件。过于细的分区可能会导致问题:按日期对一年的数据进行分区将需要 365 次列表调用才能找到所有文件;如果再添加一个基数为 1,000 的列,则会产生 365,365 次调用。
最理想的分区布局将取决于您的数据、访问模式以及将读取数据的系统。大多数系统(包括 Arrow)应该可以处理各种文件大小和分区布局,但您应避免某些极端情况。以下准则可以帮助您避免一些已知的最坏情况。
避免小于 20MB 和大于 2GB 的文件。
避免分区布局超过 10,000 个不同的分区。
对于文件格式中具有组的概念(例如 Parquet)的文件,也适用类似的准则。行组可以在读取时提供并行性,并允许基于统计信息进行数据跳过,但是非常小的组可能会导致元数据成为文件大小的重要组成部分。在大多数情况下,Arrow 的文件写入器为组大小提供了合理的默认值。
配置写入期间打开的文件#
将数据写入磁盘时,有一些参数对于优化写入非常重要,例如每个文件的行数和写入期间允许的最大打开文件数。
使用 write_dataset()
的 max_open_files
参数设置最大打开文件数。
如果 max_open_files
设置为大于 0 的值,则会限制可以保持打开状态的最大文件数。这仅适用于写入分区数据集,其中行根据其分区值分派到相应的文件。如果尝试打开的文件过多,则将关闭最近最少使用的文件。如果此设置设置得太低,您最终可能会将数据碎片化为许多小文件。
如果您的进程同时使用其他文件句柄(无论是使用数据集扫描器还是其他方式),您可能会达到系统文件句柄限制。例如,如果您正在扫描具有 300 个文件的数据集并写入到 900 个文件,则总共 1200 个文件可能超过系统限制。(在 Linux 上,这可能是“打开文件过多”错误。)您可以降低此 max_open_files
设置,也可以增加系统上的文件句柄限制。默认值为 900,这允许扫描器打开一些文件,然后达到 Linux 的默认限制 1024。
在 write_dataset()
中使用的另一个重要配置是 max_rows_per_file
。
使用 write_dataset()
的 max_rows_per_files
参数设置每个文件中写入的最大行数。
如果 max_rows_per_file
设置为大于 0 的值,则会限制任何单个文件中放置的行数。否则,将没有限制,并且将在每个输出目录中创建一个文件,除非需要关闭文件以遵守 max_open_files
。此设置是控制文件大小的主要方法。对于写入大量数据的工作负载,如果不设置行数上限,文件可能会变得非常大,从而导致下游读取器出现内存不足错误。行数与文件大小之间的关系取决于数据集模式以及数据的压缩程度(如果有)。
配置写入期间的每组行数#
可以配置每个组写入磁盘的数据量。此配置包括下限和上限。形成行组所需的最小行数由 write_dataset()
的 min_rows_per_group
参数定义。
注意
如果 min_rows_per_group
设置为大于 0 的值,这将导致数据集写入器批量处理传入的数据,并且仅当积累了足够的行时才将行组写入磁盘。如果其他选项(例如 max_open_files
或 max_rows_per_file
)强制使用较小的行组大小,则最终的行组大小可能会小于此值。
每个组允许的最大行数由 write_dataset()
的 max_rows_per_group
参数定义。
如果 max_rows_per_group
设置为大于 0 的值,则数据集写入器可能会将较大的传入批次拆分为多个行组。如果设置了此值,则还应设置 min_rows_per_group
,否则您可能会得到非常小的行组(例如,如果传入的行组大小仅略大于此值)。
行组内置于 Parquet 和 IPC/Feather 格式中,但不影响 JSON 或 CSV。在 Arrow 中读回 Parquet 和 IPC 格式时,行组边界将成为记录批处理边界,从而确定下游读取器的默认批处理大小。此外,Parquet 文件中的行组具有列统计信息,可以帮助读者跳过不相关的数据,但会增加文件大小。作为一个极端的例子,如果在 Parquet 中设置 max_rows_per_group=1,它们将具有大文件,因为大多数文件将是行组统计信息。
写入大量数据#
以上示例从表中写入数据。如果要写入大量数据,您可能无法将所有内容加载到单个内存表。幸运的是,write_dataset()
方法也接受记录批次的迭代。这使得重新分区大型数据集变得非常简单,而无需将整个数据集加载到内存中。
In [58]: old_part = ds.partitioning(
....: pa.schema([("c", pa.int16())]), flavor="hive"
....: )
....:
In [59]: new_part = ds.partitioning(
....: pa.schema([("c", pa.int16())]), flavor=None
....: )
....:
In [60]: input_dataset = ds.dataset("partitioned_dataset", partitioning=old_part)
# A scanner can act as an iterator of record batches but you could also receive
# data from the network (e.g. via flight), from your own scanning, or from any
# other method that yields record batches. In addition, you can pass a dataset
# into write_dataset directly but this method is useful if you want to customize
# the scanner (e.g. to filter the input dataset or set a maximum batch size)
In [61]: scanner = input_dataset.scanner()
In [62]: ds.write_dataset(scanner, "repartitioned_dataset", format="parquet", partitioning=new_part)
在上面的示例运行之后,我们的数据将位于 dataset_root/1 和 dataset_root/2 目录中。在这个简单的例子中,我们没有改变数据的结构(只是改变了目录命名模式),但您也可以使用这种机制来改变哪些列用于分区数据集。当您希望以特定的方式查询您的数据,并且可以利用分区来减少您需要读取的数据量时,这很有用。
自定义和检查写入的文件#
默认情况下,数据集 API 将创建名为 “part-i.format” 的文件,其中 “i” 是在写入期间生成的整数,而 “format” 是在 write_dataset 调用中指定的文件格式。对于简单的数据集,可能可以知道将创建哪些文件,但对于较大或分区的数据集,则不太容易。file_visitor
关键字可用于提供一个访问器,该访问器将在创建每个文件时被调用。
In [63]: def file_visitor(written_file):
....: print(f"path={written_file.path}")
....: print(f"size={written_file.size} bytes")
....: print(f"metadata={written_file.metadata}")
....:
In [64]: ds.write_dataset(table, "dataset_visited", format="parquet", partitioning=part,
....: file_visitor=file_visitor)
....:
path=dataset_visited/c=1/part-0.parquet
size=807 bytes
metadata=<pyarrow._parquet.FileMetaData object at 0x7f9db41d9210>
created_by: parquet-cpp-arrow version 20.0.0
num_columns: 2
num_rows: 5
num_row_groups: 1
format_version: 2.6
serialized_size: 0
path=dataset_visited/c=2/part-0.parquet
size=809 bytes
metadata=<pyarrow._parquet.FileMetaData object at 0x7f9db44e3c40>
created_by: parquet-cpp-arrow version 20.0.0
num_columns: 2
num_rows: 5
num_row_groups: 1
format_version: 2.6
serialized_size: 0
这将允许您收集属于数据集的文件名并将其存储在其他地方,这在您下次需要读取数据时希望避免扫描目录时非常有用。它还可以用于生成 Dask 或 Spark 等其他工具使用的 _metadata 索引文件,以创建数据集的索引。
配置写入期间的格式特定参数#
除了所有格式共享的常见选项之外,还有特定于格式的选项,这些选项对于特定格式是唯一的。例如,允许在写入 Parquet 文件时截断时间戳。
In [65]: parquet_format = ds.ParquetFileFormat()
In [66]: write_options = parquet_format.make_write_options(allow_truncated_timestamps=True)
In [67]: ds.write_dataset(table, "sample_dataset2", format="parquet", partitioning=part,
....: file_options=write_options)
....: