高级 ArrayStream 实现#
- class ArrayStream(obj, schema=None)#
高级 ArrayStream 表示
ArrayStream 是 nanoarrow 对零个或多个未必要具体化的连续数组的高级表示。这与 nanoarrow 的
Array形成对比,后者由零个或多个连续数组组成,但始终是完全具体化的。ArrayStream类似于 pyarrow 的RecordBatchReader,不同之处在于它还可以表示非结构数组流。它的范围映射到 Arrow C Stream 接口所表示的 ``ArrowArrayStream`` 的范围。参数#
- obj数组或类数组流
一个由
c_array_stream()净化的类数组或类数组流对象。- schema类 schema,可选
一个可选的 schema,传递给
c_array_stream()。
示例#
>>> import nanoarrow as na >>> na.ArrayStream([1, 2, 3], na.int32()) nanoarrow.ArrayStream<int32>
- close() None#
释放与此流相关的资源
请注意,通常最好使用上下文管理器来确保及时释放与此流相关的资源(例如,打开的文件)。
示例#
>>> import nanoarrow as na >>> stream = na.ArrayStream([1, 2, 3], na.int32()) >>> with stream: ... pass >>> stream.read_all() Traceback (most recent call last): ... RuntimeError: array stream is released
>>> stream = na.ArrayStream([1, 2, 3], na.int32()) >>> stream.close() >>> stream.read_all() Traceback (most recent call last): ... RuntimeError: array stream is released
- static from_path(obj, *args, **kwargs)#
从本地文件路径的 IPC 流创建 ArrayStream
示例#
>>> import tempfile >>> import os >>> import nanoarrow as na >>> from nanoarrow.ipc import InputStream >>> with tempfile.TemporaryDirectory() as td: ... path = os.path.join(td, "test.arrows") ... with open(path, "wb") as f: ... nbytes = f.write(InputStream.example_bytes()) ... ... with na.ArrayStream.from_path(path) as stream: ... stream.read_all() nanoarrow.Array<non-nullable struct<some_col: int32>>[3] {'some_col': 1} {'some_col': 2} {'some_col': 3}
- static from_readable(obj)#
从可读文件或缓冲区中的 IPC 流创建 ArrayStream
示例#
>>> import nanoarrow as na >>> from nanoarrow.ipc import InputStream >>> with na.ArrayStream.from_readable(InputStream.example_bytes()) as stream: ... stream.read_all() nanoarrow.Array<non-nullable struct<some_col: int32>>[3] {'some_col': 1} {'some_col': 2} {'some_col': 3}
- static from_url(obj, *args, **kwargs)#
从 URL 的 IPC 流创建 ArrayStream
示例#
>>> import pathlib >>> import tempfile >>> import os >>> import nanoarrow as na >>> from nanoarrow.ipc import InputStream >>> with tempfile.TemporaryDirectory() as td: ... path = os.path.join(td, "test.arrows") ... with open(path, "wb") as f: ... nbytes = f.write(InputStream.example_bytes()) ... ... uri = pathlib.Path(path).as_uri() ... with na.ArrayStream.from_url(uri) as stream: ... stream.read_all() nanoarrow.Array<non-nullable struct<some_col: int32>>[3] {'some_col': 1} {'some_col': 2} {'some_col': 3}
- iter_chunks() Iterable[Array]#
迭代此流中的连续数组
对于
ArrayStream,这与迭代流本身相同。示例#
>>> import nanoarrow as na >>> stream = na.ArrayStream([1, 2, 3], na.int32()) >>> for chunk in stream: ... print(chunk) nanoarrow.Array<int32>[3] 1 2 3
- iter_py() Iterable#
迭代每个元素的默认 Python 表示。
示例#
>>> import nanoarrow as na >>> stream = na.ArrayStream([1, 2, 3], na.int32()) >>> for item in stream.iter_py(): ... print(item) 1 2 3
- iter_tuples() Iterable[Tuple]#
迭代结构流的行作为元组
示例#
>>> import nanoarrow as na >>> import pyarrow as pa >>> batch = pa.record_batch( ... [pa.array([1, 2, 3]), pa.array(["a", "b", "c"])], ... names=["col1", "col2"] ... ) >>> stream = na.ArrayStream(batch) >>> for item in stream.iter_tuples(): ... print(item) (1, 'a') (2, 'b') (3, 'c')
- read_all() Array#
将整个流具体化为
Array>>> import nanoarrow as na >>> stream = na.ArrayStream([1, 2, 3], na.int32()) >>> stream.read_all() nanoarrow.Array<int32>[3] 1 2 3
- read_next() Array#
具体化此流中的下一个连续
Array如果此流中没有更多数组,此方法将引发
StopIteration。>>> import nanoarrow as na >>> stream = na.ArrayStream([1, 2, 3], na.int32()) >>> stream.read_next() nanoarrow.Array<int32>[3] 1 2 3
- property schema#
与此流相关的
Schema>>> import nanoarrow as na >>> stream = na.ArrayStream([1, 2, 3], na.int32()) >>> stream.schema <Schema> int32