高级 ArrayStream 实现#

class ArrayStream(obj, schema=None)#

高级 ArrayStream 表示

ArrayStream 是 nanoarrow 对零个或多个连续数组的高级表示,这些数组不一定已经物化。这与 nanoarrow Array 形成对比,后者由零个或多个连续数组组成,但始终完全物化。

ArrayStream 类似于 pyarrow 的 RecordBatchReader,除了它还可以表示非结构化数组的流。其范围映射到 Arrow C 流接口表示的 ``ArrowArrayStream``。

参数#

obj数组或类数组流

c_array_stream() 规范化的类数组或类数组流对象。

schema类模式,可选

一个可选的模式,传递给 c_array_stream()

示例#

>>> import nanoarrow as na
>>> na.ArrayStream([1, 2, 3], na.int32())
nanoarrow.ArrayStream<int32>
close() None#

释放与此流关联的资源

请注意,通常建议使用上下文管理器来确保及时释放与此流关联的资源(例如,打开的文件)。

示例#

>>> import nanoarrow as na
>>> stream = na.ArrayStream([1, 2, 3], na.int32())
>>> with stream:
...     pass
>>> stream.read_all()
Traceback (most recent call last):
...
RuntimeError: array stream is released
>>> stream = na.ArrayStream([1, 2, 3], na.int32())
>>> stream.close()
>>> stream.read_all()
Traceback (most recent call last):
...
RuntimeError: array stream is released
static from_path(obj, *args, **kwargs)#

从本地文件路径上的 IPC 流创建 ArrayStream

示例#

>>> import tempfile
>>> import os
>>> import nanoarrow as na
>>> from nanoarrow.ipc import InputStream
>>> with tempfile.TemporaryDirectory() as td:
...     path = os.path.join(td, "test.arrows")
...     with open(path, "wb") as f:
...         nbytes = f.write(InputStream.example_bytes())
...
...     with na.ArrayStream.from_path(path) as stream:
...         stream.read_all()
nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
{'some_col': 1}
{'some_col': 2}
{'some_col': 3}
static from_readable(obj)#

从可读文件或缓冲区中的 IPC 流创建 ArrayStream

示例#

>>> import nanoarrow as na
>>> from nanoarrow.ipc import InputStream
>>> with na.ArrayStream.from_readable(InputStream.example_bytes()) as stream:
...     stream.read_all()
nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
{'some_col': 1}
{'some_col': 2}
{'some_col': 3}
static from_url(obj, *args, **kwargs)#

从 URL 上的 IPC 流创建 ArrayStream

示例#

>>> import pathlib
>>> import tempfile
>>> import os
>>> import nanoarrow as na
>>> from nanoarrow.ipc import InputStream
>>> with tempfile.TemporaryDirectory() as td:
...     path = os.path.join(td, "test.arrows")
...     with open(path, "wb") as f:
...         nbytes = f.write(InputStream.example_bytes())
...
...     uri = pathlib.Path(path).as_uri()
...     with na.ArrayStream.from_url(uri) as stream:
...         stream.read_all()
nanoarrow.Array<non-nullable struct<some_col: int32>>[3]
{'some_col': 1}
{'some_col': 2}
{'some_col': 3}
iter_chunks() Iterable[Array]#

迭代此流中的连续数组

对于 ArrayStream,这与迭代流本身相同。

示例#

>>> import nanoarrow as na
>>> stream = na.ArrayStream([1, 2, 3], na.int32())
>>> for chunk in stream:
...     print(chunk)
nanoarrow.Array<int32>[3]
1
2
3
iter_py() Iterable#

迭代每个元素的默认 Python 表示形式。

示例#

>>> import nanoarrow as na
>>> stream = na.ArrayStream([1, 2, 3], na.int32())
>>> for item in stream.iter_py():
...     print(item)
1
2
3
iter_tuples() Iterable[Tuple]#

将结构化流的行迭代为元组

示例#

>>> import nanoarrow as na
>>> import pyarrow as pa
>>> batch = pa.record_batch(
...     [pa.array([1, 2, 3]), pa.array(["a", "b", "c"])],
...     names=["col1", "col2"]
... )
>>> stream = na.ArrayStream(batch)
>>> for item in stream.iter_tuples():
...     print(item)
(1, 'a')
(2, 'b')
(3, 'c')
read_all() Array#

将整个流物化到 Array

>>> import nanoarrow as na
>>> stream = na.ArrayStream([1, 2, 3], na.int32())
>>> stream.read_all()
nanoarrow.Array<int32>[3]
1
2
3
read_next() Array#

将此流中的下一个连续 Array 物化

如果此流中没有更多数组,则此方法会引发 StopIteration

>>> import nanoarrow as na
>>> stream = na.ArrayStream([1, 2, 3], na.int32())
>>> stream.read_next()
nanoarrow.Array<int32>[3]
1
2
3
property schema#

与此流关联的 Schema

>>> import nanoarrow as na
>>> stream = na.ArrayStream([1, 2, 3], na.int32())
>>> stream.schema
<Schema> int32