读取和写入数据¶
使用 Apache Arrow 从磁盘读取和写入数据的相关示例。
写入 Parquet 文件¶
给定一个包含 100 个数字的数组,从 0 到 99
import numpy as np
import pyarrow as pa
arr = pa.array(np.arange(100))
print(f"{arr[0]} .. {arr[-1]}")
0 .. 99
要将其写入 Parquet 文件,由于 Parquet 是一种包含多个命名列的格式,我们必须创建一个 pyarrow.Table
,以便我们得到一个单列表,然后可以将其写入 Parquet 文件。
table = pa.Table.from_arrays([arr], names=["col1"])
一旦我们有了表,就可以使用 pyarrow.parquet
模块提供的函数将其写入 Parquet 文件。
import pyarrow.parquet as pq
pq.write_table(table, "example.parquet", compression=None)
读取 Parquet 文件¶
给定一个 Parquet 文件,可以使用 pyarrow.parquet.read_table()
函数将其读回 pyarrow.Table
。
import pyarrow.parquet as pq
table = pq.read_table("example.parquet")
生成的表将包含与 parquet 文件中存在的相同列,作为 ChunkedArray
。
print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]
读取 Parquet 数据子集¶
使用 pyarrow.parquet.read_table()
读取 Parquet 文件时,可以使用 filters
和 columns
参数限制哪些列和行将被读入内存。
import pyarrow.parquet as pq
table = pq.read_table("example.parquet",
columns=["col1"],
filters=[
("col1", ">", 5),
("col1", "<", 10),
])
生成的表将只包含投影的列和过滤的行。有关过滤器语法的详细信息,请参阅 pyarrow.parquet.read_table()
文档。
print(table)
pyarrow.Table
col1: int64
----
col1: [[6,7,8,9]]
将 Arrow 数组保存到磁盘¶
除了使用 arrow 读取和保存 Parquet 等常见文件格式外,还可以将数据以原始 arrow 格式转储,这允许直接从磁盘内存映射数据。这种格式称为 Arrow IPC 格式。
给定一个包含 100 个数字的数组,从 0 到 99
import numpy as np
import pyarrow as pa
arr = pa.array(np.arange(100))
print(f"{arr[0]} .. {arr[-1]}")
0 .. 99
我们可以通过创建一个 pyarrow.RecordBatch
并将其写入磁盘来保存数组。
schema = pa.schema([
pa.field('nums', arr.type)
])
with pa.OSFile('arraydata.arrow', 'wb') as sink:
with pa.ipc.new_file(sink, schema=schema) as writer:
batch = pa.record_batch([arr], schema=schema)
writer.write(batch)
如果我们要将多个数组保存到同一个文件中,我们只需要相应地调整 schema
并将它们全部添加到 record_batch
调用中。
从磁盘内存映射 Arrow 数组¶
以 Arrow IPC 格式写入磁盘的 Arrow 数组可以直接从磁盘内存映射回来。
with pa.memory_map('arraydata.arrow', 'r') as source:
loaded_arrays = pa.ipc.open_file(source).read_all()
arr = loaded_arrays[0]
print(f"{arr[0]} .. {arr[-1]}")
0 .. 99
写入 CSV 文件¶
可以使用 pyarrow.csv.write_csv()
函数将 Arrow pyarrow.Table
写入 CSV 文件。
arr = pa.array(range(100))
table = pa.Table.from_arrays([arr], names=["col1"])
import pyarrow.csv
pa.csv.write_csv(table, "table.csv",
write_options=pa.csv.WriteOptions(include_header=True))
增量写入 CSV 文件¶
如果您需要在生成或检索数据时增量地将数据写入 CSV 文件,并且不想将整个表保存在内存中以一次性写入,可以使用 pyarrow.csv.CSVWriter
增量写入数据。
schema = pa.schema([("col1", pa.int32())])
with pa.csv.CSVWriter("table.csv", schema=schema) as writer:
for chunk in range(10):
datachunk = range(chunk*10, (chunk+1)*10)
table = pa.Table.from_arrays([pa.array(datachunk)], schema=schema)
writer.write(table)
同样可以将 pyarrow.RecordBatch
写入,就像写入表一样传递它们。
读取 CSV 文件¶
Arrow 可以使用优化的代码路径从 CSV 读取 pyarrow.Table
实体,该代码路径可以利用多个线程。
import pyarrow.csv
table = pa.csv.read_csv("table.csv")
Arrow 将尽力推断数据类型。可以向 pyarrow.csv.read_csv()
提供更多选项来驱动 pyarrow.csv.ConvertOptions
。
print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]
写入分区数据集¶
当您的数据集很大时,通常将数据集拆分为多个独立文件是有意义的。您可以手动执行此操作,也可以使用 pyarrow.dataset.write_dataset()
让 Arrow 为您完成将数据拆分为块的工作。
partitioning
参数允许告诉 pyarrow.dataset.write_dataset()
应该为哪些列拆分数据。
例如,给定 100 个生日,在 2000 年和 2009 年之间
import numpy.random
data = pa.table({"day": numpy.random.randint(1, 31, size=100),
"month": numpy.random.randint(1, 12, size=100),
"year": [2000 + x // 10 for x in range(100)]})
然后我们可以按年份列对数据进行分区,以便将其保存在 10 个不同的文件中。
import pyarrow as pa
import pyarrow.dataset as ds
ds.write_dataset(data, "./partitioned", format="parquet",
partitioning=ds.partitioning(pa.schema([("year", pa.int16())])))
Arrow 默认情况下将在子目录中对数据集进行分区,这将导致 10 个不同的目录,每个目录都用分区列的值命名,每个目录都包含一个文件,其中包含该分区的子集数据。
from pyarrow import fs
localfs = fs.LocalFileSystem()
partitioned_dir_content = localfs.get_file_info(fs.FileSelector("./partitioned", recursive=True))
files = sorted((f.path for f in partitioned_dir_content if f.type == fs.FileType.File))
for file in files:
print(file)
./partitioned/2000/part-0.parquet
./partitioned/2001/part-0.parquet
./partitioned/2002/part-0.parquet
./partitioned/2003/part-0.parquet
./partitioned/2004/part-0.parquet
./partitioned/2005/part-0.parquet
./partitioned/2006/part-0.parquet
./partitioned/2007/part-0.parquet
./partitioned/2008/part-0.parquet
./partitioned/2009/part-0.parquet
读取分区数据¶
在某些情况下,您的数据集可能由多个独立文件组成,每个文件包含一部分数据。
在这种情况下,pyarrow.dataset.dataset()
函数提供了一个接口来发现和读取所有这些文件,作为一个大型数据集。
例如,如果我们有以下结构
examples/
├── dataset1.parquet
├── dataset2.parquet
└── dataset3.parquet
然后,将 pyarrow.dataset.dataset()
函数指向 examples
目录将发现这些 parquet 文件,并将它们全部公开为一个 pyarrow.dataset.Dataset
。
import pyarrow.dataset as ds
dataset = ds.dataset("./examples", format="parquet")
print(dataset.files)
['./examples/dataset1.parquet', './examples/dataset2.parquet', './examples/dataset3.parquet']
可以使用 pyarrow.dataset.Dataset.to_table()
将整个数据集视为一个大型表。虽然每个 parquet 文件只包含 10 行,但将数据集转换为表将将其公开为一个表。
table = dataset.to_table()
print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,5,6,7,8,9],[10,11,12,13,14,15,16,17,18,19],[20,21,22,23,24,25,26,27,28,29]]
请注意,转换为表将强制所有数据加载到内存中。对于大型数据集来说,这通常不是您想要的。
因此,最好依赖 pyarrow.dataset.Dataset.to_batches()
方法,该方法将迭代地一次加载一个数据块,为每个数据块返回一个 pyarrow.RecordBatch
。
for record_batch in dataset.to_batches():
col1 = record_batch.column("col1")
print(f"{col1._name} = {col1[0]} .. {col1[-1]}")
col1 = 0 .. 9
col1 = 10 .. 19
col1 = 20 .. 29
从 S3 读取分区数据¶
pyarrow.dataset.Dataset
还可以抽象来自 S3 或 HDFS 等远程源的分区数据。
from pyarrow import fs
# List content of s3://ursa-labs-taxi-data/2011
s3 = fs.SubTreeFileSystem(
"ursa-labs-taxi-data",
fs.S3FileSystem(region="us-east-2", anonymous=True)
)
for entry in s3.get_file_info(fs.FileSelector("2011", recursive=True)):
if entry.type == fs.FileType.File:
print(entry.path)
2011/01/data.parquet
2011/02/data.parquet
2011/03/data.parquet
2011/04/data.parquet
2011/05/data.parquet
2011/06/data.parquet
2011/07/data.parquet
2011/08/data.parquet
2011/09/data.parquet
2011/10/data.parquet
2011/11/data.parquet
2011/12/data.parquet
可以使用以下方法将存储桶中的数据加载为按 month
分区的一个大型数据集
dataset = ds.dataset("s3://ursa-labs-taxi-data/2011",
partitioning=["month"])
for f in dataset.files[:10]:
print(f)
print("...")
ursa-labs-taxi-data/2011/01/data.parquet
ursa-labs-taxi-data/2011/02/data.parquet
ursa-labs-taxi-data/2011/03/data.parquet
ursa-labs-taxi-data/2011/04/data.parquet
ursa-labs-taxi-data/2011/05/data.parquet
ursa-labs-taxi-data/2011/06/data.parquet
ursa-labs-taxi-data/2011/07/data.parquet
ursa-labs-taxi-data/2011/08/data.parquet
ursa-labs-taxi-data/2011/09/data.parquet
ursa-labs-taxi-data/2011/10/data.parquet
...
然后,可以使用 pyarrow.dataset.Dataset.to_table()
或 pyarrow.dataset.Dataset.to_batches()
来使用数据集,就像使用本地数据集一样。
注意
也可以以 ipc arrow 格式或 feather 格式加载分区数据。
警告
如果以上代码抛出错误,最可能的原因是您的 AWS 凭据未设置。请按照以下说明获取 AWS Access Key Id
和 AWS Secret Access Key
:AWS 凭据。
凭据通常存储在 ~/.aws/credentials
(在 Mac 或 Linux 上)或 C:\Users\<USERNAME>\.aws\credentials
(在 Windows 上)文件中。您需要在适当的位置创建或更新此文件。
文件内容应如下所示
[default]
aws_access_key_id=<YOUR_AWS_ACCESS_KEY_ID>
aws_secret_access_key=<YOUR_AWS_SECRET_ACCESS_KEY>
写入 Feather 文件¶
给定一个包含 100 个数字的数组,从 0 到 99
import numpy as np
import pyarrow as pa
arr = pa.array(np.arange(100))
print(f"{arr[0]} .. {arr[-1]}")
0 .. 99
要将其写入 Feather 文件,由于 Feather 存储多个列,因此我们必须创建一个 pyarrow.Table
,以便我们获得一个单列表,然后可以将其写入 Feather 文件。
table = pa.Table.from_arrays([arr], names=["col1"])
一旦我们有了表格,就可以使用 pyarrow.feather
模块提供的函数将其写入 Feather 文件
import pyarrow.feather as ft
ft.write_feather(table, 'example.feather')
读取 Feather 文件¶
给定一个 Feather 文件,可以使用 pyarrow.feather.read_table()
函数将其读回 pyarrow.Table
。
import pyarrow.feather as ft
table = ft.read_table("example.feather")
生成的表将包含与 parquet 文件中存在的相同列,作为 ChunkedArray
。
print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]
读取行分隔 JSON¶
Arrow 内置支持行分隔 JSON。每行代表一行数据,作为 JSON 对象。
给定一些数据,这些数据存储在一个文件中,其中每行都是一个 JSON 对象,包含一行数据
import tempfile
with tempfile.NamedTemporaryFile(delete=False, mode="w+") as f:
f.write('{"a": 1, "b": 2.0, "c": 1}\n')
f.write('{"a": 3, "b": 3.0, "c": 2}\n')
f.write('{"a": 5, "b": 4.0, "c": 3}\n')
f.write('{"a": 7, "b": 5.0, "c": 4}\n')
可以使用 pyarrow.json.read_json()
将文件内容读回 pyarrow.Table
。
import pyarrow as pa
import pyarrow.json
table = pa.json.read_json(f.name)
print(table.to_pydict())
{'a': [1, 3, 5, 7], 'b': [2.0, 3.0, 4.0, 5.0], 'c': [1, 2, 3, 4]}
写入压缩数据¶
Arrow 支持以压缩格式写入文件,包括对像 Parquet 或 Feather 这样提供原生压缩格式的文件,以及对像 CSV 这样不支持开箱即用压缩格式的文件。
给定一个表
table = pa.table([
pa.array([1, 2, 3, 4, 5])
], names=["numbers"])
写入压缩的 Parquet 或 Feather 数据由 pyarrow.feather.write_feather()
和 pyarrow.parquet.write_table()
函数的 compression
参数驱动。
pa.feather.write_feather(table, "compressed.feather",
compression="lz4")
pa.parquet.write_table(table, "compressed.parquet",
compression="lz4")
您可以参考每个函数的文档以获取支持的完整压缩格式列表。
注意
实际上,Arrow 在写入 Parquet 或 Feather 文件时默认使用压缩。Feather 默认使用 lz4
压缩,而 Parquet 默认使用 snappy
压缩。
对于像 CSV 这样不支持原生压缩的格式,可以使用 pyarrow.CompressedOutputStream
保存压缩数据。
with pa.CompressedOutputStream("compressed.csv.gz", "gzip") as out:
pa.csv.write_csv(table, out)
这需要在读取时解压缩文件,可以使用 pyarrow.CompressedInputStream
完成,如下一节所述。
读取压缩数据¶
Arrow 支持读取压缩文件,包括对像 Parquet 或 Feather 这样提供原生压缩格式的文件,以及对像 CSV 这样不支持原生压缩格式,但已由应用程序压缩的文件。
读取具有原生压缩支持的压缩格式不需要任何特殊处理。例如,我们可以通过简单地调用 pyarrow.feather.read_table()
和 pyarrow.parquet.read_table()
来读取我们在上一节中写入的 Parquet 和 Feather 文件。
table_feather = pa.feather.read_table("compressed.feather")
print(table_feather)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]
table_parquet = pa.parquet.read_table("compressed.parquet")
print(table_parquet)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]
从没有原生压缩支持的格式读取数据需要在解码之前解压缩它们。这可以使用 pyarrow.CompressedInputStream
类完成,该类在将结果提供给实际读取函数之前,使用解压缩操作包装文件。
例如,要读取压缩的 CSV 文件
with pa.CompressedInputStream(pa.OSFile("compressed.csv.gz"), "gzip") as input:
table_csv = pa.csv.read_csv(input)
print(table_csv)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]
注意
在 CSV 的情况下,Arrow 实际上足够聪明,可以尝试使用文件扩展名检测压缩文件。因此,如果您的文件名为 *.gz
或 *.bz2
,则 pyarrow.csv.read_csv()
函数将尝试相应地解压缩它。
table_csv2 = pa.csv.read_csv("compressed.csv.gz")
print(table_csv2)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]