流、序列化和 IPC#

写入和读取流#

Arrow 定义了两种用于序列化记录批次的二进制格式

  • 流格式:用于发送任意长度的记录批次序列。该格式必须从头到尾处理,不支持随机访问

  • 文件或随机访问格式:用于序列化固定数量的记录批次。支持随机访问,因此与内存映射结合使用时非常有用

要理解本节,请务必先阅读有关内存和 IO 的部分。

使用流#

首先,让我们创建一个小的记录批次

In [1]: import pyarrow as pa

In [2]: data = [
   ...:     pa.array([1, 2, 3, 4]),
   ...:     pa.array(['foo', 'bar', 'baz', None]),
   ...:     pa.array([True, None, False, True])
   ...: ]
   ...: 

In [3]: batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])

In [4]: batch.num_rows
Out[4]: 4

In [5]: batch.num_columns
Out[5]: 3

现在,我们可以开始写入包含多个批次的流。为此,我们使用RecordBatchStreamWriter,它可以写入可写的NativeFile对象或可写的 Python 对象。为方便起见,可以使用new_stream()创建此对象

In [6]: sink = pa.BufferOutputStream()

In [7]: with pa.ipc.new_stream(sink, batch.schema) as writer:
   ...:    for i in range(5):
   ...:       writer.write_batch(batch)
   ...: 

这里我们使用了内存中的 Arrow 缓冲区流(sink),但这也可以是套接字或其他 IO 接收器。

创建StreamWriter时,我们传递了模式,因为该特定流中发送的所有批次的模式(列名和类型)必须相同。现在我们可以

In [8]: buf = sink.getvalue()

In [9]: buf.size
Out[9]: 1984

现在buf包含完整的流作为内存中的字节缓冲区。我们可以使用RecordBatchStreamReader或便利函数pyarrow.ipc.open_stream读取此类流

In [10]: with pa.ipc.open_stream(buf) as reader:
   ....:       schema = reader.schema
   ....:       batches = [b for b in reader]
   ....: 

In [11]: schema
Out[11]: 
f0: int64
f1: string
f2: bool

In [12]: len(batches)
Out[12]: 5

我们可以检查返回的批次是否与原始输入相同

In [13]: batches[0].equals(batch)
Out[13]: True

重要的一点是,如果输入源支持零拷贝读取(例如内存映射或pyarrow.BufferReader),那么返回的批次也是零拷贝的,并且在读取时不会分配任何新内存。

写入和读取随机访问文件#

RecordBatchFileWriter具有与RecordBatchStreamWriter相同的 API。您可以使用new_file()创建一个

In [14]: sink = pa.BufferOutputStream()

In [15]: with pa.ipc.new_file(sink, batch.schema) as writer:
   ....:    for i in range(10):
   ....:       writer.write_batch(batch)
   ....: 

In [16]: buf = sink.getvalue()

In [17]: buf.size
Out[17]: 4226

RecordBatchFileReaderRecordBatchStreamReader之间的区别在于,输入源必须具有用于随机访问的seek方法。流读取器只需要读取操作。我们还可以使用open_file()方法打开文件

In [18]: with pa.ipc.open_file(buf) as reader:
   ....:    num_record_batches = reader.num_record_batches
   ....: 

In [19]: b = reader.get_batch(3)

因为我们可以访问整个有效载荷,所以我们知道文件中记录批次的数量,并且可以随机读取任何批次。

In [20]: num_record_batches
Out[20]: 10

In [21]: b.equals(batch)
Out[21]: True

从流和文件格式读取 pandas#

流和文件读取器类有一个特殊的read_pandas方法,可以简化读取多个记录批次并将其转换为单个 DataFrame 输出。

In [22]: with pa.ipc.open_file(buf) as reader:
   ....:    df = reader.read_pandas()
   ....: 

In [23]: df[:5]
Out[23]: 
   f0    f1     f2
0   1   foo   True
1   2   bar   None
2   3   baz  False
3   4  None   True
4   1   foo   True

高效写入和读取 Arrow 数据#

Arrow 经过零拷贝和内存映射数据优化,可以轻松读写数组,消耗最少的驻留内存。

在写入和读取原始 Arrow 数据时,我们可以使用 Arrow 文件格式或 Arrow 流格式。

要将数组转储到文件,您可以使用new_file(),它将提供一个新的RecordBatchFileWriter实例,可用于将数据批次写入该文件。

例如,要写入一个包含 10M 整数的数组,我们可以将其写入 1000 个包含 10000 个条目的块。

In [24]: BATCH_SIZE = 10000

In [25]: NUM_BATCHES = 1000

In [26]: schema = pa.schema([pa.field('nums', pa.int32())])

In [27]: with pa.OSFile('bigfile.arrow', 'wb') as sink:
   ....:    with pa.ipc.new_file(sink, schema) as writer:
   ....:       for row in range(NUM_BATCHES):
   ....:             batch = pa.record_batch([pa.array(range(BATCH_SIZE), type=pa.int32())], schema)
   ....:             writer.write(batch)
   ....: 

记录批次支持多列,因此在实践中,我们总是写入相当于一个Table

分批写入是有效的,因为理论上我们只需要将当前正在写入的批次保留在内存中。但在读回时,我们可以通过直接从磁盘映射数据来更有效,并避免在读取时分配任何新内存。

在正常情况下,读回我们的文件将消耗几百兆字节的内存。

In [28]: with pa.OSFile('bigfile.arrow', 'rb') as source:
   ....:    loaded_array = pa.ipc.open_file(source).read_all()
   ....: 

In [29]: print("LEN:", len(loaded_array))
LEN: 10000000

In [30]: print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
RSS: 38MB

为了更有效地从磁盘读取大数据,我们可以内存映射文件,这样 Arrow 就可以直接引用从磁盘映射的数据,而无需分配自己的内存。在这种情况下,操作系统将能够懒惰地分页映射内存,并在压力下将其分页出去而无需任何回写成本,从而更容易读取比总内存更大的数组。

In [31]: with pa.memory_map('bigfile.arrow', 'rb') as source:
   ....:    loaded_array = pa.ipc.open_file(source).read_all()
   ....: 

In [32]: print("LEN:", len(loaded_array))
LEN: 10000000

In [33]: print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
RSS: 0MB

注意

其他高级 API,如read_table()也提供了一个memory_map选项。但在这些情况下,内存映射无法帮助减少驻留内存消耗。有关详细信息,请参阅读取 Parquet 和内存映射