扩展 pyarrow#
使用 PyCapsule 接口控制转换为 (Py)Arrow#
Arrow C 数据接口 允许在 Arrow 的不同实现之间移动 Arrow 数据。这是一个通用的、跨语言的接口,不特定于 Python,但对于 Python 库,此接口通过 Python 特定的层进行扩展:Arrow PyCapsule 接口。
此 Python 接口确保支持 C 数据接口的不同库可以以标准方式导出 Arrow 数据结构,并识别彼此的对象。
如果您有一个 Python 库,它提供在底层保存 Arrow 兼容数据的数据结构,您可以在这些对象上实现以下方法
__arrow_c_schema__
用于模式或类型对象。__arrow_c_array__
用于数组和记录批次(连续表)。__arrow_c_stream__
用于分块数组、表和数据流。
这些方法返回 PyCapsule 对象,有关确切语义的更多详细信息可以在 规范 中找到。
当您的数据结构定义了这些方法时,PyArrow 构造函数(见下文)将识别这些对象支持此协议,并将它们零拷贝地转换为 PyArrow 数据结构。对于任何其他支持此协议以摄取数据的库来说,情况也是如此。
类似地,如果您的库有接受用户提供数据的功能,您可以通过检查这些方法的存在来添加对此协议的支持,从而接受任何 Arrow 数据(而不是硬编码对特定 Arrow 生成器(例如 PyArrow)的支持)。
对于通过此协议使用 PyArrow 消费数据,可以使用以下构造函数来创建各种 PyArrow 对象
结果类 |
PyArrow 构造函数 |
支持的协议 |
---|---|---|
数组 |
||
数组, 流 |
||
数组 |
||
数组, 流 |
||
流 |
||
模式 |
||
模式 |
可以通过使用 pyarrow.field()
消费模式兼容对象,然后访问结果字段的 .type
来创建 DataType
。
使用 __arrow_array__
协议控制转换为 pyarrow.Array#
pyarrow.array()
函数内置支持 Python 序列、numpy 数组和 pandas 1D 对象(Series、Index、Categorical, ..)将它们转换为 Arrow 数组。可以通过实现 __arrow_array__
方法(类似于 numpy 的 __array__
协议)将其扩展到其他类数组对象。
例如,为了支持将您的 duck 数组类转换为 Arrow 数组,请定义 __arrow_array__
方法以返回 Arrow 数组
class MyDuckArray:
...
def __arrow_array__(self, type=None):
# convert the underlying array values to a pyarrow Array
import pyarrow
return pyarrow.array(..., type=type)
__arrow_array__
方法采用可选的 type
关键字,该关键字从 pyarrow.array()
传递。该方法允许返回 Array
或 ChunkedArray
。
注意
对于更一般地控制将 Python 对象转换为 Arrow 数据的方法,请考虑 Arrow PyCapsule 接口。它不特定于 PyArrow,并且支持转换其他对象,例如表格和模式。
定义扩展类型(“用户定义的类型”)#
Arrow 提供了一种扩展类型的概念,允许用户使用附加语义注释数据类型。这允许开发人员指定自定义序列化和反序列化例程(例如,到 Python 标量 和 pandas),并更轻松地解释数据。
在 Arrow 中,扩展类型 是通过使用自定义类型名称以及可选的字节串(可用于提供附加元数据,在本文档中称为“参数”)来注释任何内置 Arrow 数据类型(“存储类型”)来指定的。这些显示为字段的 custom_metadata
中的 ARROW:extension:name
和 ARROW:extension:metadata
键。
请注意,由于这些注释是 Arrow 规范的一部分,因此它们可能被其他(非 Python)Arrow 消费者(例如 PySpark)识别。
PyArrow 允许您通过子类化 ExtensionType
并为派生类提供自己的扩展名称和机制来序列化/反序列化任何参数,从而从 Python 定义扩展类型。例如,我们可以为可以表示为整数对的分数定义一个自定义有理类型
class RationalType(pa.ExtensionType):
def __init__(self, data_type: pa.DataType):
if not pa.types.is_integer(data_type):
raise TypeError(f"data_type must be an integer type not {data_type}")
super().__init__(
pa.struct(
[
("numer", data_type),
("denom", data_type),
],
),
"my_package.rational",
)
def __arrow_ext_serialize__(self) -> bytes:
# No parameters are necessary
return b""
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
# Sanity checks, not required but illustrate the method signature.
assert pa.types.is_struct(storage_type)
assert pa.types.is_integer(storage_type[0].type)
assert storage_type[0].type == storage_type[1].type
assert serialized == b""
# return an instance of this subclass
return RationalType(storage_type[0].type)
特殊方法 __arrow_ext_serialize__
和 __arrow_ext_deserialize__
定义扩展类型实例的序列化和反序列化。
现在可以使用它来创建保存扩展类型的数组和表
>>> rational_type = RationalType(pa.int32())
>>> rational_type.extension_name
'my_package.rational'
>>> rational_type.storage_type
StructType(struct<numer: int32, denom: int32>)
>>> storage_array = pa.array(
... [
... {"numer": 10, "denom": 17},
... {"numer": 20, "denom": 13},
... ],
... type=rational_type.storage_type,
... )
>>> arr = rational_type.wrap_array(storage_array)
>>> # or equivalently
>>> arr = pa.ExtensionArray.from_storage(rational_type, storage_array)
>>> arr
<pyarrow.lib.ExtensionArray object at 0x1067f5420>
-- is_valid: all not null
-- child 0 type: int32
[
10,
20
]
-- child 1 type: int32
[
17,
13
]
此数组可以包含在 RecordBatches 中,通过 IPC 发送并在另一个 Python 进程中接收。接收进程必须显式注册扩展类型以进行反序列化,否则它将回退到存储类型
>>> pa.register_extension_type(RationalType(pa.int32()))
例如,创建一个 RecordBatch 并使用 IPC 协议将其写入流
>>> batch = pa.RecordBatch.from_arrays([arr], ["ext"])
>>> sink = pa.BufferOutputStream()
>>> with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
... writer.write_batch(batch)
>>> buf = sink.getvalue()
然后将其读回会产生正确的类型
>>> with pa.ipc.open_stream(buf) as reader:
... result = reader.read_all()
>>> result.column("ext").type
RationalType(StructType(struct<numer: int32, denom: int32>))
此外,请注意,虽然我们注册了具体类型 RationalType(pa.int32())
,但相同的扩展名 ("my_package.rational"
) 由 RationalType(integer_type)
用于所有 Arrow 整数类型。因此,上面的代码还允许用户(反)序列化这些数据类型
>>> big_rational_type = RationalType(pa.int64())
>>> storage_array = pa.array(
... [
... {"numer": 10, "denom": 17},
... {"numer": 20, "denom": 13},
... ],
... type=big_rational_type.storage_type,
... )
>>> arr = big_rational_type.wrap_array(storage_array)
>>> batch = pa.RecordBatch.from_arrays([arr], ["ext"])
>>> sink = pa.BufferOutputStream()
>>> with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
... writer.write_batch(batch)
>>> buf = sink.getvalue()
>>> with pa.ipc.open_stream(buf) as reader:
... result = reader.read_all()
>>> result.column("ext").type
RationalType(StructType(struct<numer: int64, denom: int64>))
接收应用程序不需要是 Python,但如果它已实现自己的扩展类型来接收它,则仍然可以将扩展类型识别为“my_package.rational”类型。如果在接收应用程序中未注册该类型,它将回退到存储类型。
参数化扩展类型#
上面的示例说明了如何构造不需要除其存储类型之外的任何其他元数据的扩展类型。但 Arrow 还提供了更灵活的、参数化的扩展类型。
此处给出的示例实现了 pandas “period” 数据类型的扩展类型,表示时间跨度(例如,一天、一个月、一个季度等的频率)。它存储为一个 int64 数组,该数组被解释为自 1970 年以来给定频率的时间跨度的数量。
class PeriodType(pa.ExtensionType):
def __init__(self, freq):
# attributes need to be set first before calling
# super init (as that calls serialize)
self._freq = freq
super().__init__(pa.int64(), "my_package.period")
@property
def freq(self):
return self._freq
def __arrow_ext_serialize__(self):
return "freq={}".format(self.freq).encode()
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
# Return an instance of this subclass given the serialized
# metadata.
serialized = serialized.decode()
assert serialized.startswith("freq=")
freq = serialized.split("=")[1]
return PeriodType(freq)
在这里,我们确保将所有信息存储在序列化的元数据中,这些元数据是重建实例(在 __arrow_ext_deserialize__
类方法中)所需的,在本例中是频率字符串。
请注意,一旦创建,数据类型实例被认为是不可变的。因此,在上面的示例中,freq
参数存储在具有公共只读属性的私有属性中以访问它。
自定义扩展数组类#
默认情况下,具有扩展类型的所有数组都构造或反序列化为内置的 ExtensionArray
对象。然而,人们可能希望子类化 ExtensionArray
以便添加一些特定于扩展类型的自定义逻辑。Arrow 允许通过向扩展类型的定义添加特殊方法 __arrow_ext_class__
来实现这一点。
例如,让我们考虑来自 Numpy Quickstart 的 3D 空间中的点的示例。我们可以将这些存储为固定大小的列表,我们希望能够将数据提取为二维 Numpy 数组 (N, 3)
,而无需任何复制
class Point3DArray(pa.ExtensionArray):
def to_numpy_array(self):
return self.storage.flatten().to_numpy().reshape((-1, 3))
class Point3DType(pa.ExtensionType):
def __init__(self):
super().__init__(pa.list_(pa.float32(), 3), "my_package.Point3DType")
def __arrow_ext_serialize__(self):
return b""
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
return Point3DType()
def __arrow_ext_class__(self):
return Point3DArray
使用此扩展类型构建的数组现在具有预期的自定义数组类
>>> storage = pa.array([[1, 2, 3], [4, 5, 6]], pa.list_(pa.float32(), 3))
>>> arr = pa.ExtensionArray.from_storage(Point3DType(), storage)
>>> arr
<__main__.Point3DArray object at 0x7f40dea80670>
[
[
1,
2,
3
],
[
4,
5,
6
]
]
扩展类中的附加方法随后可供用户使用
>>> arr.to_numpy_array()
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)
此数组可以通过 IPC 发送,在另一个 Python 进程中接收,并且自定义扩展数组类将被保留(只要接收进程在使用 register_extension_type()
读取 IPC 数据之前注册扩展类型)。
自定义标量转换#
如果您希望自定义扩展类型的标量在调用 ExtensionScalar.as_py()
时转换为自定义类型,可以通过子类化 ExtensionScalar
来重写 ExtensionScalar.as_py()
方法。例如,如果我们希望上面的示例 3D 点类型返回一个自定义的 3D 点类而不是列表,我们将实现
from collections import namedtuple
Point3D = namedtuple("Point3D", ["x", "y", "z"])
class Point3DScalar(pa.ExtensionScalar):
def as_py(self) -> Point3D:
return Point3D(*self.value.as_py())
class Point3DType(pa.ExtensionType):
def __init__(self):
super().__init__(pa.list_(pa.float32(), 3), "my_package.Point3DType")
def __arrow_ext_serialize__(self):
return b""
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
return Point3DType()
def __arrow_ext_scalar_class__(self):
return Point3DScalar
使用此扩展类型构建的数组现在提供转换为我们的 Point3D
类的标量
>>> storage = pa.array([[1, 2, 3], [4, 5, 6]], pa.list_(pa.float32(), 3))
>>> arr = pa.ExtensionArray.from_storage(Point3DType(), storage)
>>> arr[0].as_py()
Point3D(x=1.0, y=2.0, z=3.0)
>>> arr.to_pylist()
[Point3D(x=1.0, y=2.0, z=3.0), Point3D(x=4.0, y=5.0, z=6.0)]
转换为 pandas#
对于具有扩展类型的列,转换为 pandas (在 Table.to_pandas()
中) 可以控制,以防您的扩展类型存在相应的 pandas 扩展数组。
为此,需要实现 ExtensionType.to_pandas_dtype()
方法,并且应返回一个 pandas.api.extensions.ExtensionDtype
子类实例。
以上面的 pandas 周期类型为例,这将如下所示
class PeriodType(pa.ExtensionType):
...
def to_pandas_dtype(self):
import pandas as pd
return pd.PeriodDtype(freq=self.freq)
其次,pandas ExtensionDtype
本身需要实现 __from_arrow__
方法:一个给定扩展类型的 pyarrow Array 或 ChunkedArray 可以构造相应的 pandas ExtensionArray
的方法。此方法应具有以下签名
class MyExtensionDtype(pd.api.extensions.ExtensionDtype):
...
def __from_arrow__(self, array: pyarrow.Array/ChunkedArray) -> pandas.ExtensionArray:
...
这样,您可以控制将 pyarrow Array
的 pyarrow 扩展类型转换为可以存储在 DataFrame 中的 pandas ExtensionArray
。
规范扩展类型#
您可以在规范扩展类型部分找到规范扩展类型的官方列表。 在这里,我们添加了如何在 pyarrow 中使用它们的示例。
固定大小张量#
要创建具有相等形状的张量数组(固定形状张量数组),我们首先需要定义一个具有值类型和形状的固定形状张量扩展类型
>>> tensor_type = pa.fixed_shape_tensor(pa.int32(), (2, 2))
然后我们需要具有 pyarrow.list_()
类型的存储数组,其中 value_type`
是固定形状张量值类型,列表大小是 tensor_type
形状元素的乘积。 然后,我们可以使用 pa.ExtensionArray.from_storage()
方法创建一个张量数组
>>> arr = [[1, 2, 3, 4], [10, 20, 30, 40], [100, 200, 300, 400]]
>>> storage = pa.array(arr, pa.list_(pa.int32(), 4))
>>> tensor_array = pa.ExtensionArray.from_storage(tensor_type, storage)
我们还可以创建另一个具有不同值类型的张量数组
>>> tensor_type_2 = pa.fixed_shape_tensor(pa.float32(), (2, 2))
>>> storage_2 = pa.array(arr, pa.list_(pa.float32(), 4))
>>> tensor_array_2 = pa.ExtensionArray.from_storage(tensor_type_2, storage_2)
扩展数组可以用作 pyarrow.Table
或 pyarrow.RecordBatch
中的列
>>> data = [
... pa.array([1, 2, 3]),
... pa.array(["foo", "bar", None]),
... pa.array([True, None, True]),
... tensor_array,
... tensor_array_2
... ]
>>> my_schema = pa.schema([("f0", pa.int8()),
... ("f1", pa.string()),
... ("f2", pa.bool_()),
... ("tensors_int", tensor_type),
... ("tensors_float", tensor_type_2)])
>>> table = pa.Table.from_arrays(data, schema=my_schema)
>>> table
pyarrow.Table
f0: int8
f1: string
f2: bool
tensors_int: extension<arrow.fixed_shape_tensor[value_type=int32, shape=[2,2]]>
tensors_float: extension<arrow.fixed_shape_tensor[value_type=float, shape=[2,2]]>
----
f0: [[1,2,3]]
f1: [["foo","bar",null]]
f2: [[true,null,true]]
tensors_int: [[[1,2,3,4],[10,20,30,40],[100,200,300,400]]]
tensors_float: [[[1,2,3,4],[10,20,30,40],[100,200,300,400]]]
我们还可以将张量数组转换为单个多维 numpy ndarray。通过转换,箭头数组的长度成为 numpy ndarray 中的第一个维度
>>> numpy_tensor = tensor_array_2.to_numpy_ndarray()
>>> numpy_tensor
array([[[ 1., 2.],
[ 3., 4.]],
[[ 10., 20.],
[ 30., 40.]],
[[100., 200.],
[300., 400.]]])
>>> numpy_tensor.shape
(3, 2, 2)
注意
可选参数 permutation
和 dim_names
都旨在向用户提供有关数据逻辑布局与物理布局相比的信息。
转换为 numpy ndarray 仅适用于微不足道的排列(None
或 [0, 1, ... N-1]
,其中 N
是张量维度的数量)。
反之亦然,我们可以将 numpy ndarray 转换为固定形状张量数组
>>> pa.FixedShapeTensorArray.from_numpy_ndarray(numpy_tensor)
<pyarrow.lib.FixedShapeTensorArray object at ...>
[
[
1,
2,
3,
4
],
[
10,
20,
30,
40
],
[
100,
200,
300,
400
]
]
通过转换,ndarray 的第一个维度成为 pyarrow 扩展数组的长度。 我们可以在示例中看到,形状为 (3, 2, 2)
的 ndarray 变成长度为 3 的箭头数组,张量元素的形状为 (2, 2)
。
# ndarray of shape (3, 2, 2)
>>> numpy_tensor.shape
(3, 2, 2)
# arrow array of length 3 with tensor elements of shape (2, 2)
>>> pyarrow_tensor_array = pa.FixedShapeTensorArray.from_numpy_ndarray(numpy_tensor)
>>> len(pyarrow_tensor_array)
3
>>> pyarrow_tensor_array.type.shape
[2, 2]
扩展类型也可以定义 permutation
和 dim_names
。 例如
>>> tensor_type = pa.fixed_shape_tensor(pa.float64(), [2, 2, 3], permutation=[0, 2, 1])
或
>>> tensor_type = pa.fixed_shape_tensor(pa.bool_(), [2, 2, 3], dim_names=["C", "H", "W"])
对于 NCHW
格式,其中
N:图像的数量,在我们的例子中是数组的长度,并且始终位于第一个维度
C:图像的通道数
H:图像的高度
W:图像的宽度