流式传输、序列化和 IPC#
写入和读取流#
Arrow 定义了两种用于序列化记录批次的二进制格式
流式格式:用于发送任意长度的记录批次序列。该格式必须从头到尾处理,不支持随机访问
文件或随机访问格式:用于序列化固定数量的记录批次。支持随机访问,因此在与内存映射一起使用时非常有用
要学习本节,请确保先阅读有关 内存和 IO 的部分。
使用流#
首先,让我们创建一个小的记录批次
In [1]: import pyarrow as pa
In [2]: data = [
...: pa.array([1, 2, 3, 4]),
...: pa.array(['foo', 'bar', 'baz', None]),
...: pa.array([True, None, False, True])
...: ]
...:
In [3]: batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])
In [4]: batch.num_rows
Out[4]: 4
In [5]: batch.num_columns
Out[5]: 3
现在,我们可以开始编写一个包含一些批次的流。为此,我们使用 RecordBatchStreamWriter
,它可以写入可写的 NativeFile
对象或可写的 Python 对象。为方便起见,可以使用 new_stream()
创建此对象
In [6]: sink = pa.BufferOutputStream()
In [7]: with pa.ipc.new_stream(sink, batch.schema) as writer:
...: for i in range(5):
...: writer.write_batch(batch)
...:
这里我们使用了一个内存中的 Arrow 缓冲流 (sink
),但这也可以是一个套接字或其他 IO sink。
创建 StreamWriter
时,我们传递 schema,因为 schema(列名和类型)对于此特定流中发送的所有批次必须相同。现在我们可以执行
In [8]: buf = sink.getvalue()
In [9]: buf.size
Out[9]: 1984
现在 buf
包含完整的流作为内存字节缓冲区。我们可以使用 RecordBatchStreamReader
或便捷函数 pyarrow.ipc.open_stream
读取此类流
In [10]: with pa.ipc.open_stream(buf) as reader:
....: schema = reader.schema
....: batches = [b for b in reader]
....:
In [11]: schema
Out[11]:
f0: int64
f1: string
f2: bool
In [12]: len(batches)
Out[12]: 5
我们可以检查返回的批次是否与原始输入相同
In [13]: batches[0].equals(batch)
Out[13]: True
重要的一点是,如果输入源支持零拷贝读取(例如像内存映射或 pyarrow.BufferReader
),则返回的批次也是零拷贝的,并且在读取时不会分配任何新内存。
写入和读取随机访问文件#
RecordBatchFileWriter
具有与 RecordBatchStreamWriter
相同的 API。您可以使用 new_file()
创建一个
In [14]: sink = pa.BufferOutputStream()
In [15]: with pa.ipc.new_file(sink, batch.schema) as writer:
....: for i in range(10):
....: writer.write_batch(batch)
....:
In [16]: buf = sink.getvalue()
In [17]: buf.size
Out[17]: 4226
RecordBatchFileReader
和 RecordBatchStreamReader
之间的区别在于,输入源必须具有用于随机访问的 seek
方法。流读取器仅需要读取操作。我们还可以使用 open_file()
方法打开文件
In [18]: with pa.ipc.open_file(buf) as reader:
....: num_record_batches = reader.num_record_batches
....:
In [19]: b = reader.get_batch(3)
因为我们可以访问整个负载,所以我们知道文件中记录批次的数量,并且可以随机读取任何一个。
In [20]: num_record_batches
Out[20]: 10
In [21]: b.equals(batch)
Out[21]: True
为 pandas 读取流和文件格式#
流和文件读取器类有一个特殊的 read_pandas
方法,可以简化读取多个记录批次并将其转换为单个 DataFrame 输出
In [22]: with pa.ipc.open_file(buf) as reader:
....: df = reader.read_pandas()
....:
In [23]: df[:5]
Out[23]:
f0 f1 f2
0 1 foo True
1 2 bar None
2 3 baz False
3 4 None True
4 1 foo True
高效写入和读取 Arrow 数据#
由于针对零拷贝和内存映射数据进行了优化,Arrow 允许轻松读取和写入数组,从而消耗最少的驻留内存。
在写入和读取原始 Arrow 数据时,我们可以使用 Arrow 文件格式或 Arrow 流式传输格式。
要将数组转储到文件,可以使用 new_file()
,它将提供一个新的 RecordBatchFileWriter
实例,可用于将数据批次写入该文件。
例如,要写入一个包含 1000 万个整数的数组,我们可以将其写入 1000 个 10000 个条目的块中
In [24]: BATCH_SIZE = 10000
In [25]: NUM_BATCHES = 1000
In [26]: schema = pa.schema([pa.field('nums', pa.int32())])
In [27]: with pa.OSFile('bigfile.arrow', 'wb') as sink:
....: with pa.ipc.new_file(sink, schema) as writer:
....: for row in range(NUM_BATCHES):
....: batch = pa.record_batch([pa.array(range(BATCH_SIZE), type=pa.int32())], schema)
....: writer.write(batch)
....:
记录批次支持多个列,因此在实践中,我们始终写入等效的 Table
。
批量写入是有效的,因为理论上我们只需要将当前正在写入的批次保存在内存中。但是在读回时,我们可以通过直接映射磁盘中的数据并避免在读取时分配任何新内存来更加高效。
在正常情况下,读回我们的文件将消耗几百兆字节的内存
In [28]: with pa.OSFile('bigfile.arrow', 'rb') as source:
....: loaded_array = pa.ipc.open_file(source).read_all()
....:
In [29]: print("LEN:", len(loaded_array))
LEN: 10000000
In [30]: print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
RSS: 38MB
为了更有效地从磁盘读取大数据,我们可以将文件进行内存映射,以便 Arrow 可以直接引用从磁盘映射的数据,而无需分配其自己的内存。在这种情况下,操作系统将能够延迟分页映射的内存,并在压力下将其分页出去,而无需任何写回成本,从而可以更轻松地读取大于总内存的数组。
In [31]: with pa.memory_map('bigfile.arrow', 'rb') as source:
....: loaded_array = pa.ipc.open_file(source).read_all()
....:
In [32]: print("LEN:", len(loaded_array))
LEN: 10000000
In [33]: print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
RSS: 0MB
注意
其他高级 API(如 read_table()
)也提供 memory_map
选项。但在这些情况下,内存映射无法帮助减少驻留内存消耗。有关详细信息,请参阅 读取 Parquet 和内存映射。