读取和写入数据

使用 Apache Arrow 从磁盘读取和写入数据的相关示例。

写入 Parquet 文件

给定一个包含 100 个数字的数组,从 0 到 99

import numpy as np
import pyarrow as pa

arr = pa.array(np.arange(100))

print(f"{arr[0]} .. {arr[-1]}")
0 .. 99

要将其写入 Parquet 文件,由于 Parquet 是一种包含多个命名列的格式,我们必须创建一个 pyarrow.Table,以便我们得到一个单列表,然后可以将其写入 Parquet 文件。

table = pa.Table.from_arrays([arr], names=["col1"])

一旦我们有了表,就可以使用 pyarrow.parquet 模块提供的函数将其写入 Parquet 文件。

import pyarrow.parquet as pq

pq.write_table(table, "example.parquet", compression=None)

读取 Parquet 文件

给定一个 Parquet 文件,可以使用 pyarrow.parquet.read_table() 函数将其读回 pyarrow.Table

import pyarrow.parquet as pq

table = pq.read_table("example.parquet")

生成的表将包含与 parquet 文件中存在的相同列,作为 ChunkedArray

print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]

读取 Parquet 数据子集

使用 pyarrow.parquet.read_table() 读取 Parquet 文件时,可以使用 filterscolumns 参数限制哪些列和行将被读入内存。

import pyarrow.parquet as pq

table = pq.read_table("example.parquet",
                      columns=["col1"],
                      filters=[
                          ("col1", ">", 5),
                          ("col1", "<", 10),
                      ])

生成的表将只包含投影的列和过滤的行。有关过滤器语法的详细信息,请参阅 pyarrow.parquet.read_table() 文档。

print(table)
pyarrow.Table
col1: int64
----
col1: [[6,7,8,9]]

将 Arrow 数组保存到磁盘

除了使用 arrow 读取和保存 Parquet 等常见文件格式外,还可以将数据以原始 arrow 格式转储,这允许直接从磁盘内存映射数据。这种格式称为 Arrow IPC 格式。

给定一个包含 100 个数字的数组,从 0 到 99

import numpy as np
import pyarrow as pa

arr = pa.array(np.arange(100))

print(f"{arr[0]} .. {arr[-1]}")
0 .. 99

我们可以通过创建一个 pyarrow.RecordBatch 并将其写入磁盘来保存数组。

schema = pa.schema([
    pa.field('nums', arr.type)
])

with pa.OSFile('arraydata.arrow', 'wb') as sink:
    with pa.ipc.new_file(sink, schema=schema) as writer:
        batch = pa.record_batch([arr], schema=schema)
        writer.write(batch)

如果我们要将多个数组保存到同一个文件中,我们只需要相应地调整 schema 并将它们全部添加到 record_batch 调用中。

从磁盘内存映射 Arrow 数组

以 Arrow IPC 格式写入磁盘的 Arrow 数组可以直接从磁盘内存映射回来。

with pa.memory_map('arraydata.arrow', 'r') as source:
    loaded_arrays = pa.ipc.open_file(source).read_all()
arr = loaded_arrays[0]
print(f"{arr[0]} .. {arr[-1]}")
0 .. 99

写入 CSV 文件

可以使用 pyarrow.csv.write_csv() 函数将 Arrow pyarrow.Table 写入 CSV 文件。

arr = pa.array(range(100))
table = pa.Table.from_arrays([arr], names=["col1"])

import pyarrow.csv
pa.csv.write_csv(table, "table.csv",
                 write_options=pa.csv.WriteOptions(include_header=True))

增量写入 CSV 文件

如果您需要在生成或检索数据时增量地将数据写入 CSV 文件,并且不想将整个表保存在内存中以一次性写入,可以使用 pyarrow.csv.CSVWriter 增量写入数据。

schema = pa.schema([("col1", pa.int32())])
with pa.csv.CSVWriter("table.csv", schema=schema) as writer:
    for chunk in range(10):
        datachunk = range(chunk*10, (chunk+1)*10)
        table = pa.Table.from_arrays([pa.array(datachunk)], schema=schema)
        writer.write(table)

同样可以将 pyarrow.RecordBatch 写入,就像写入表一样传递它们。

读取 CSV 文件

Arrow 可以使用优化的代码路径从 CSV 读取 pyarrow.Table 实体,该代码路径可以利用多个线程。

import pyarrow.csv

table = pa.csv.read_csv("table.csv")

Arrow 将尽力推断数据类型。可以向 pyarrow.csv.read_csv() 提供更多选项来驱动 pyarrow.csv.ConvertOptions

print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]

写入分区数据集

当您的数据集很大时,通常将数据集拆分为多个独立文件是有意义的。您可以手动执行此操作,也可以使用 pyarrow.dataset.write_dataset() 让 Arrow 为您完成将数据拆分为块的工作。

partitioning 参数允许告诉 pyarrow.dataset.write_dataset() 应该为哪些列拆分数据。

例如,给定 100 个生日,在 2000 年和 2009 年之间

import numpy.random
data = pa.table({"day": numpy.random.randint(1, 31, size=100),
                 "month": numpy.random.randint(1, 12, size=100),
                 "year": [2000 + x // 10 for x in range(100)]})

然后我们可以按年份列对数据进行分区,以便将其保存在 10 个不同的文件中。

import pyarrow as pa
import pyarrow.dataset as ds

ds.write_dataset(data, "./partitioned", format="parquet",
                 partitioning=ds.partitioning(pa.schema([("year", pa.int16())])))

Arrow 默认情况下将在子目录中对数据集进行分区,这将导致 10 个不同的目录,每个目录都用分区列的值命名,每个目录都包含一个文件,其中包含该分区的子集数据。

from pyarrow import fs

localfs = fs.LocalFileSystem()
partitioned_dir_content = localfs.get_file_info(fs.FileSelector("./partitioned", recursive=True))
files = sorted((f.path for f in partitioned_dir_content if f.type == fs.FileType.File))

for file in files:
    print(file)
./partitioned/2000/part-0.parquet
./partitioned/2001/part-0.parquet
./partitioned/2002/part-0.parquet
./partitioned/2003/part-0.parquet
./partitioned/2004/part-0.parquet
./partitioned/2005/part-0.parquet
./partitioned/2006/part-0.parquet
./partitioned/2007/part-0.parquet
./partitioned/2008/part-0.parquet
./partitioned/2009/part-0.parquet

读取分区数据

在某些情况下,您的数据集可能由多个独立文件组成,每个文件包含一部分数据。

在这种情况下,pyarrow.dataset.dataset() 函数提供了一个接口来发现和读取所有这些文件,作为一个大型数据集。

例如,如果我们有以下结构

examples/
├── dataset1.parquet
├── dataset2.parquet
└── dataset3.parquet

然后,将 pyarrow.dataset.dataset() 函数指向 examples 目录将发现这些 parquet 文件,并将它们全部公开为一个 pyarrow.dataset.Dataset

import pyarrow.dataset as ds

dataset = ds.dataset("./examples", format="parquet")
print(dataset.files)
['./examples/dataset1.parquet', './examples/dataset2.parquet', './examples/dataset3.parquet']

可以使用 pyarrow.dataset.Dataset.to_table() 将整个数据集视为一个大型表。虽然每个 parquet 文件只包含 10 行,但将数据集转换为表将将其公开为一个表。

table = dataset.to_table()
print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,5,6,7,8,9],[10,11,12,13,14,15,16,17,18,19],[20,21,22,23,24,25,26,27,28,29]]

请注意,转换为表将强制所有数据加载到内存中。对于大型数据集来说,这通常不是您想要的。

因此,最好依赖 pyarrow.dataset.Dataset.to_batches() 方法,该方法将迭代地一次加载一个数据块,为每个数据块返回一个 pyarrow.RecordBatch

for record_batch in dataset.to_batches():
    col1 = record_batch.column("col1")
    print(f"{col1._name} = {col1[0]} .. {col1[-1]}")
col1 = 0 .. 9
col1 = 10 .. 19
col1 = 20 .. 29

从 S3 读取分区数据

pyarrow.dataset.Dataset 还可以抽象来自 S3 或 HDFS 等远程源的分区数据。

from pyarrow import fs

# List content of s3://ursa-labs-taxi-data/2011
s3 = fs.SubTreeFileSystem(
    "ursa-labs-taxi-data",
    fs.S3FileSystem(region="us-east-2", anonymous=True)
)
for entry in s3.get_file_info(fs.FileSelector("2011", recursive=True)):
    if entry.type == fs.FileType.File:
        print(entry.path)
2011/01/data.parquet
2011/02/data.parquet
2011/03/data.parquet
2011/04/data.parquet
2011/05/data.parquet
2011/06/data.parquet
2011/07/data.parquet
2011/08/data.parquet
2011/09/data.parquet
2011/10/data.parquet
2011/11/data.parquet
2011/12/data.parquet

可以使用以下方法将存储桶中的数据加载为按 month 分区的一个大型数据集

dataset = ds.dataset("s3://ursa-labs-taxi-data/2011",
                     partitioning=["month"])
for f in dataset.files[:10]:
    print(f)
print("...")
ursa-labs-taxi-data/2011/01/data.parquet
ursa-labs-taxi-data/2011/02/data.parquet
ursa-labs-taxi-data/2011/03/data.parquet
ursa-labs-taxi-data/2011/04/data.parquet
ursa-labs-taxi-data/2011/05/data.parquet
ursa-labs-taxi-data/2011/06/data.parquet
ursa-labs-taxi-data/2011/07/data.parquet
ursa-labs-taxi-data/2011/08/data.parquet
ursa-labs-taxi-data/2011/09/data.parquet
ursa-labs-taxi-data/2011/10/data.parquet
...

然后,可以使用 pyarrow.dataset.Dataset.to_table()pyarrow.dataset.Dataset.to_batches() 来使用数据集,就像使用本地数据集一样。

注意

也可以以 ipc arrow 格式或 feather 格式加载分区数据。

警告

如果以上代码抛出错误,最可能的原因是您的 AWS 凭据未设置。请按照以下说明获取 AWS Access Key IdAWS Secret Access KeyAWS 凭据

凭据通常存储在 ~/.aws/credentials(在 Mac 或 Linux 上)或 C:\Users\<USERNAME>\.aws\credentials(在 Windows 上)文件中。您需要在适当的位置创建或更新此文件。

文件内容应如下所示

[default]
aws_access_key_id=<YOUR_AWS_ACCESS_KEY_ID>
aws_secret_access_key=<YOUR_AWS_SECRET_ACCESS_KEY>

写入 Feather 文件

给定一个包含 100 个数字的数组,从 0 到 99

import numpy as np
import pyarrow as pa

arr = pa.array(np.arange(100))

print(f"{arr[0]} .. {arr[-1]}")
0 .. 99

要将其写入 Feather 文件,由于 Feather 存储多个列,因此我们必须创建一个 pyarrow.Table,以便我们获得一个单列表,然后可以将其写入 Feather 文件。

table = pa.Table.from_arrays([arr], names=["col1"])

一旦我们有了表格,就可以使用 pyarrow.feather 模块提供的函数将其写入 Feather 文件

import pyarrow.feather as ft

ft.write_feather(table, 'example.feather')

读取 Feather 文件

给定一个 Feather 文件,可以使用 pyarrow.feather.read_table() 函数将其读回 pyarrow.Table

import pyarrow.feather as ft

table = ft.read_table("example.feather")

生成的表将包含与 parquet 文件中存在的相同列,作为 ChunkedArray

print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]

读取行分隔 JSON

Arrow 内置支持行分隔 JSON。每行代表一行数据,作为 JSON 对象。

给定一些数据,这些数据存储在一个文件中,其中每行都是一个 JSON 对象,包含一行数据

import tempfile

with tempfile.NamedTemporaryFile(delete=False, mode="w+") as f:
    f.write('{"a": 1, "b": 2.0, "c": 1}\n')
    f.write('{"a": 3, "b": 3.0, "c": 2}\n')
    f.write('{"a": 5, "b": 4.0, "c": 3}\n')
    f.write('{"a": 7, "b": 5.0, "c": 4}\n')

可以使用 pyarrow.json.read_json() 将文件内容读回 pyarrow.Table

import pyarrow as pa
import pyarrow.json

table = pa.json.read_json(f.name)
print(table.to_pydict())
{'a': [1, 3, 5, 7], 'b': [2.0, 3.0, 4.0, 5.0], 'c': [1, 2, 3, 4]}

写入压缩数据

Arrow 支持以压缩格式写入文件,包括对像 Parquet 或 Feather 这样提供原生压缩格式的文件,以及对像 CSV 这样不支持开箱即用压缩格式的文件。

给定一个表

table = pa.table([
    pa.array([1, 2, 3, 4, 5])
], names=["numbers"])

写入压缩的 Parquet 或 Feather 数据由 pyarrow.feather.write_feather()pyarrow.parquet.write_table() 函数的 compression 参数驱动。

pa.feather.write_feather(table, "compressed.feather",
                         compression="lz4")
pa.parquet.write_table(table, "compressed.parquet",
                       compression="lz4")

您可以参考每个函数的文档以获取支持的完整压缩格式列表。

注意

实际上,Arrow 在写入 Parquet 或 Feather 文件时默认使用压缩。Feather 默认使用 lz4 压缩,而 Parquet 默认使用 snappy 压缩。

对于像 CSV 这样不支持原生压缩的格式,可以使用 pyarrow.CompressedOutputStream 保存压缩数据。

with pa.CompressedOutputStream("compressed.csv.gz", "gzip") as out:
    pa.csv.write_csv(table, out)

这需要在读取时解压缩文件,可以使用 pyarrow.CompressedInputStream 完成,如下一节所述。

读取压缩数据

Arrow 支持读取压缩文件,包括对像 Parquet 或 Feather 这样提供原生压缩格式的文件,以及对像 CSV 这样不支持原生压缩格式,但已由应用程序压缩的文件。

读取具有原生压缩支持的压缩格式不需要任何特殊处理。例如,我们可以通过简单地调用 pyarrow.feather.read_table()pyarrow.parquet.read_table() 来读取我们在上一节中写入的 Parquet 和 Feather 文件。

table_feather = pa.feather.read_table("compressed.feather")
print(table_feather)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]
table_parquet = pa.parquet.read_table("compressed.parquet")
print(table_parquet)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]

从没有原生压缩支持的格式读取数据需要在解码之前解压缩它们。这可以使用 pyarrow.CompressedInputStream 类完成,该类在将结果提供给实际读取函数之前,使用解压缩操作包装文件。

例如,要读取压缩的 CSV 文件

with pa.CompressedInputStream(pa.OSFile("compressed.csv.gz"), "gzip") as input:
    table_csv = pa.csv.read_csv(input)
    print(table_csv)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]

注意

在 CSV 的情况下,Arrow 实际上足够聪明,可以尝试使用文件扩展名检测压缩文件。因此,如果您的文件名为 *.gz*.bz2,则 pyarrow.csv.read_csv() 函数将尝试相应地解压缩它。

table_csv2 = pa.csv.read_csv("compressed.csv.gz")
print(table_csv2)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]