高级 ArrayStream 实现#
- class ArrayStream(obj, schema=None)#
高级 ArrayStream 表示
ArrayStream 是 nanoarrow 对零个或多个连续数组的高级表示,这些数组不一定已经物化。这与 nanoarrow
Array
形成对比,后者由零个或多个连续数组组成,但始终完全物化。ArrayStream
类似于 pyarrow 的RecordBatchReader
,除了它还可以表示非结构化数组的流。其范围映射到 Arrow C 流接口表示的 ``ArrowArrayStream``。参数#
- obj数组或类数组流
由
c_array_stream()
规范化的类数组或类数组流对象。- schema类模式,可选
一个可选的模式,传递给
c_array_stream()
。
示例#
>>> import nanoarrow as na >>> na.ArrayStream([1, 2, 3], na.int32()) nanoarrow.ArrayStream<int32>
- close() None #
释放与此流关联的资源
请注意,通常建议使用上下文管理器来确保及时释放与此流关联的资源(例如,打开的文件)。
示例#
>>> import nanoarrow as na >>> stream = na.ArrayStream([1, 2, 3], na.int32()) >>> with stream: ... pass >>> stream.read_all() Traceback (most recent call last): ... RuntimeError: array stream is released
>>> stream = na.ArrayStream([1, 2, 3], na.int32()) >>> stream.close() >>> stream.read_all() Traceback (most recent call last): ... RuntimeError: array stream is released
- static from_path(obj, *args, **kwargs)#
从本地文件路径上的 IPC 流创建 ArrayStream
示例#
>>> import tempfile >>> import os >>> import nanoarrow as na >>> from nanoarrow.ipc import InputStream >>> with tempfile.TemporaryDirectory() as td: ... path = os.path.join(td, "test.arrows") ... with open(path, "wb") as f: ... nbytes = f.write(InputStream.example_bytes()) ... ... with na.ArrayStream.from_path(path) as stream: ... stream.read_all() nanoarrow.Array<non-nullable struct<some_col: int32>>[3] {'some_col': 1} {'some_col': 2} {'some_col': 3}
- static from_readable(obj)#
从可读文件或缓冲区中的 IPC 流创建 ArrayStream
示例#
>>> import nanoarrow as na >>> from nanoarrow.ipc import InputStream >>> with na.ArrayStream.from_readable(InputStream.example_bytes()) as stream: ... stream.read_all() nanoarrow.Array<non-nullable struct<some_col: int32>>[3] {'some_col': 1} {'some_col': 2} {'some_col': 3}
- static from_url(obj, *args, **kwargs)#
从 URL 上的 IPC 流创建 ArrayStream
示例#
>>> import pathlib >>> import tempfile >>> import os >>> import nanoarrow as na >>> from nanoarrow.ipc import InputStream >>> with tempfile.TemporaryDirectory() as td: ... path = os.path.join(td, "test.arrows") ... with open(path, "wb") as f: ... nbytes = f.write(InputStream.example_bytes()) ... ... uri = pathlib.Path(path).as_uri() ... with na.ArrayStream.from_url(uri) as stream: ... stream.read_all() nanoarrow.Array<non-nullable struct<some_col: int32>>[3] {'some_col': 1} {'some_col': 2} {'some_col': 3}
- iter_chunks() Iterable[Array] #
迭代此流中的连续数组
对于
ArrayStream
,这与迭代流本身相同。示例#
>>> import nanoarrow as na >>> stream = na.ArrayStream([1, 2, 3], na.int32()) >>> for chunk in stream: ... print(chunk) nanoarrow.Array<int32>[3] 1 2 3
- iter_py() Iterable #
迭代每个元素的默认 Python 表示形式。
示例#
>>> import nanoarrow as na >>> stream = na.ArrayStream([1, 2, 3], na.int32()) >>> for item in stream.iter_py(): ... print(item) 1 2 3
- iter_tuples() Iterable[Tuple] #
将结构化流的行迭代为元组
示例#
>>> import nanoarrow as na >>> import pyarrow as pa >>> batch = pa.record_batch( ... [pa.array([1, 2, 3]), pa.array(["a", "b", "c"])], ... names=["col1", "col2"] ... ) >>> stream = na.ArrayStream(batch) >>> for item in stream.iter_tuples(): ... print(item) (1, 'a') (2, 'b') (3, 'c')
- read_all() Array #
将整个流物化到
Array
中>>> import nanoarrow as na >>> stream = na.ArrayStream([1, 2, 3], na.int32()) >>> stream.read_all() nanoarrow.Array<int32>[3] 1 2 3
- read_next() Array #
将此流中的下一个连续
Array
物化如果此流中没有更多数组,则此方法会引发
StopIteration
。>>> import nanoarrow as na >>> stream = na.ArrayStream([1, 2, 3], na.int32()) >>> stream.read_next() nanoarrow.Array<int32>[3] 1 2 3
- property schema#
与此流关联的
Schema
>>> import nanoarrow as na >>> stream = na.ArrayStream([1, 2, 3], na.int32()) >>> stream.schema <Schema> int32