nanoarrow for Python#

nanoarrow Python 包提供了与 nanoarrow C 库的绑定。与 nanoarrow C 库一样,它提供了工具来促进使用 Arrow C 数据Arrow C 流 接口。

安装#

nanoarrow Python 绑定可从 PyPIconda-forge 获取

pip install nanoarrow
conda install nanoarrow -c conda-forge

开发版本(基于 main 分支)也可使用

pip install --extra-index-url https://pypi.fury.io/arrow-nightlies/ \
    --prefer-binary --pre nanoarrow

如果你可以导入命名空间,那么你就可以开始了!

import nanoarrow as na

数据类型、数组和数组流#

Arrow C 数据和 Arrow C 流接口包含三个结构:ArrowSchema 代表数组的数据类型,ArrowArray 代表数组的值,以及 ArrowArrayStream,它代表具有共同 ArrowSchema 的零个或多个 ArrowArray。这些概念对应于 Python 包中的 nanoarrow.Schemananoarrow.Arraynanoarrow.ArrayStream

na.int32()
<Schema> int32
na.Array([1, 2, 3], na.int32())
nanoarrow.Array<int32>[3]
1
2
3

The nanoarrow.Array 可以容纳包含任意数量块的数组,这反映了现实情况,即许多数组容器(例如 pyarrow.ChunkedArraypolars.Series)都支持这种方式。

chunked = na.Array.from_chunks([[1, 2, 3], [4, 5, 6]], na.int32())
chunked
nanoarrow.Array<int32>[6]
1
2
3
4
5
6

当对象被构造时,Array 的块始终被完全物化,而 ArrayStream 的块不一定已经被解析。

stream = na.ArrayStream(chunked)
stream
nanoarrow.ArrayStream<int32>
with stream:
    for chunk in stream:
        print(chunk)
nanoarrow.Array<int32>[3]
1
2
3
nanoarrow.Array<int32>[3]
4
5
6

The nanoarrow.ArrayStream 还提供了一个接口,用于访问 nanoarrow 的 Arrow IPC 阅读器

url = "https://github.com/apache/arrow-experiments/raw/main/data/arrow-commits/arrow-commits.arrows"
na.ArrayStream.from_url(url)
nanoarrow.ArrayStream<non-nullable struct<commit: string, time: timestamp('us', 'UTC'), files: int3...>

这些对象实现了 Arrow PyCapsule 接口,用于生产和消费,并且在许多情况下可以与 pyarrow 对象互换。

import pyarrow as pa

pa.field(na.int32())
pyarrow.Field<: int32>
pa.chunked_array(chunked)
<pyarrow.lib.ChunkedArray object at 0x12a49a250>
[
  [
    1,
    2,
    3
  ],
  [
    4,
    5,
    6
  ]
]
pa.array(chunked.chunk(1))
<pyarrow.lib.Int32Array object at 0x11b552500>
[
  4,
  5,
  6
]
na.Array(pa.array([10, 11, 12]))
nanoarrow.Array<int64>[3]
10
11
12
na.Schema(pa.string())
<Schema> string

低级 C 库绑定#

nanoarrow Python 包还提供了围绕 Arrow C 接口结构的更低级包装器。你可以使用 nanoarrow.c_schema()nanoarrow.c_array()nanoarrow.c_array_stream() 来创建它们。

模式#

使用 nanoarrow.c_schema() 将对象转换为 ArrowSchema 并将其包装为 Python 对象。这适用于任何实现 Arrow PyCapsule 接口 的对象(例如 pyarrow.Schemapyarrow.DataTypepyarrow.Field)。

na.c_schema(pa.decimal128(10, 3))
<nanoarrow.c_schema.CSchema decimal128(10, 3)>
- format: 'd:10,3'
- name: ''
- flags: 2
- metadata: NULL
- dictionary: NULL
- children[0]:

使用 c_schema() 非常适合测试和用于从一个库传递到另一个库的短暂模式对象。为了以更方便的形式提取模式的字段,请使用 Schema()

schema = na.Schema(pa.decimal128(10, 3))
schema.precision, schema.scale
(10, 3)

The CSchema object cleans up after itself: when the object is deleted, the underlying ArrowSchema is released.

数组#

你可以使用 nanoarrow.c_array() 将数组类对象转换为 ArrowArray,将其包装为 Python 对象,并附加一个可用于解释其内容的模式。这适用于任何实现 Arrow PyCapsule 接口 的对象(例如 pyarrow.Arraypyarrow.RecordBatch)。

na.c_array(["one", "two", "three", None], na.string())
<nanoarrow.c_array.CArray string>
- length: 4
- offset: 0
- null_count: 1
- buffers: (4754305168, 4754307808, 4754310464)
- dictionary: NULL
- children[0]:

使用 c_array() 非常适合测试和用于从一个库传递到另一个库的短暂数组对象。对于更高级的接口,请使用 Array()

array = na.Array(["one", "two", "three", None], na.string())
array.to_pylist()
['one', 'two', 'three', None]
array.buffers
(nanoarrow.c_lib.CBufferView(bool[1 b] 11100000),
 nanoarrow.c_lib.CBufferView(int32[20 b] 0 3 6 11 11),
 nanoarrow.c_lib.CBufferView(string[11 b] b'onetwothree'))

高级用户可以使用 c_array_from_buffers() 直接从缓冲区创建数组

na.c_array_from_buffers(
    na.string(),
    2,
    [None, na.c_buffer([0, 3, 6], na.int32()), b"abcdef"]
)
<nanoarrow.c_array.CArray string>
- length: 2
- offset: 0
- null_count: 0
- buffers: (0, 5002908320, 4999694624)
- dictionary: NULL
- children[0]:

数组流#

你可以使用 nanoarrow.c_array_stream() 将表示具有共同 CSchemaCArray 序列的对象包装为 ArrowArrayStream,并将其包装为 Python 对象。这适用于任何实现 Arrow PyCapsule 接口 的对象(例如 pyarrow.RecordBatchReaderpyarrow.ChunkedArray)。

pa_batch = pa.record_batch({"col1": [1, 2, 3]})
reader = pa.RecordBatchReader.from_batches(pa_batch.schema, [pa_batch])
array_stream = na.c_array_stream(reader)
array_stream
<nanoarrow.c_array_stream.CArrayStream>
- get_schema(): struct<col1: int64>

你可以使用 .get_next() 从流中拉取下一个数组,或者像迭代器一样使用它。当流中没有更多数组时,.get_next() 方法将引发 StopIteration

for array in array_stream:
    print(array)
<nanoarrow.c_array.CArray struct<col1: int64>>
- length: 3
- offset: 0
- null_count: 0
- buffers: (0,)
- dictionary: NULL
- children[1]:
  'col1': <nanoarrow.c_array.CArray int64>
    - length: 3
    - offset: 0
    - null_count: 0
    - buffers: (0, 2642948588352)
    - dictionary: NULL
    - children[0]:

对于更高级的接口,请使用 ArrayStream()

reader = pa.RecordBatchReader.from_batches(pa_batch.schema, [pa_batch])
na.ArrayStream(reader).read_all()
nanoarrow.Array<non-nullable struct<col1: int64>>[3]
{'col1': 1}
{'col1': 2}
{'col1': 3}

开发#

nanoarrow 的 Python 绑定使用 setuptools 进行管理。这意味着你可以使用以下命令构建项目

git clone https://github.com/apache/arrow-nanoarrow.git
cd arrow-nanoarrow/python
pip install -e .

测试使用 pytest

# Install dependencies
pip install -e ".[test]"

# Run tests
pytest -vvx

目前需要 CMake 来确保 Python 包中 vendored 的 nanoarrow 副本与工作树中的 nanoarrow 源代码保持同步。