扩展 PyArrow#
使用 PyCapsule 接口控制向 (Py)Arrow 的转换#
Arrow C 数据接口 允许在 Arrow 的不同实现之间移动 Arrow 数据。这是一个通用的、跨语言的接口,不特定于 Python,但对于 Python 库,此接口通过 Python 特定层进行了扩展:Arrow PyCapsule 接口。
此 Python 接口确保支持 C 数据接口的不同库可以以标准方式导出 Arrow 数据结构并相互识别对象。
如果您有一个 Python 库提供底层包含 Arrow 兼容数据的数据结构,您可以在这些对象上实现以下方法
__arrow_c_schema__用于模式或类型对象。__arrow_c_array__用于数组和记录批次(连续表)。__arrow_c_stream__用于分块数组、表和数据流。
这些方法返回 PyCapsule 对象,有关确切语义的更多详细信息可以在规范中找到。
当您的数据结构定义了这些方法时,PyArrow 构造函数(见下文)将识别这些对象支持此协议,并将它们零拷贝转换为 PyArrow 数据结构。对于任何其他支持此协议的库在摄取数据时,也是如此。
类似地,如果您的库具有接受用户提供数据的函数,您可以通过检查这些方法的存在来添加对此协议的支持,从而接受任何 Arrow 数据(而不是硬编码支持特定的 Arrow 生产者,如 PyArrow)。
要使用 PyArrow 通过此协议消费数据,可以使用以下构造函数来创建各种 PyArrow 对象
结果类 |
PyArrow 构造函数 |
支持的协议 |
|---|---|---|
数组 |
||
数组, 流 |
||
数组 |
||
数组, 流 |
||
流 |
||
模式 |
||
模式 |
可以通过使用 pyarrow.field() 消费与模式兼容的对象,然后访问结果 Field 的 .type 来创建 DataType。
使用 __arrow_array__ 协议控制向 pyarrow.Array 的转换#
pyarrow.array() 函数内置支持 Python 序列、numpy 数组和 pandas 一维对象(Series、Index、Categorical 等)将它们转换为 Arrow 数组。可以通过实现 __arrow_array__ 方法(类似于 numpy 的 __array__ 协议)来扩展对其他类数组对象的支持。
例如,要支持将您的 duck 数组类转换为 Arrow 数组,请定义 __arrow_array__ 方法以返回 Arrow 数组
class MyDuckArray:
...
def __arrow_array__(self, type=None):
# convert the underlying array values to a PyArrow Array
import pyarrow
return pyarrow.array(..., type=type)
__arrow_array__ 方法接受一个可选的 type 关键字,该关键字通过 pyarrow.array() 传递。该方法允许返回 Array 或 ChunkedArray。
注意
对于控制 Python 对象到 Arrow 数据转换的更通用方法,请考虑 Arrow PyCapsule 接口。它不特定于 PyArrow,并支持转换其他对象,如表和模式。
定义扩展类型(“用户定义类型”)#
Arrow 提供了一种扩展类型的概念,允许用户用附加语义注释数据类型。这使得开发人员可以指定自定义序列化和反序列化例程(例如,到Python 标量和pandas),并更容易地解释数据。
在 Arrow 中,扩展类型通过使用自定义类型名称注释任何内置的 Arrow 数据类型(“存储类型”)来指定,并且可选地,一个字节字符串可以用于提供附加元数据(在本文档中称为“参数”)。这些出现在 Field 的 custom_metadata 中的 ARROW:extension:name 和 ARROW:extension:metadata 键。
请注意,由于这些注释是 Arrow 规范的一部分,它们可能被其他(非 Python)Arrow 消费者(如 PySpark)识别。
PyArrow 允许您通过子类化 ExtensionType 并为派生类提供自己的扩展名称和(反)序列化任何参数的机制来从 Python 定义扩展类型。例如,我们可以为分数定义一个自定义的有理数类型,该类型可以表示为一对整数
class RationalType(pa.ExtensionType):
def __init__(self, data_type: pa.DataType):
if not pa.types.is_integer(data_type):
raise TypeError(f"data_type must be an integer type not {data_type}")
super().__init__(
pa.struct(
[
("numer", data_type),
("denom", data_type),
],
),
"my_package.rational",
)
def __arrow_ext_serialize__(self) -> bytes:
# No parameters are necessary
return b""
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
# Sanity checks, not required but illustrate the method signature.
assert pa.types.is_struct(storage_type)
assert pa.types.is_integer(storage_type[0].type)
assert storage_type[0].type == storage_type[1].type
assert serialized == b""
# return an instance of this subclass
return RationalType(storage_type[0].type)
特殊方法 __arrow_ext_serialize__ 和 __arrow_ext_deserialize__ 定义了扩展类型实例的序列化和反序列化。
现在可以使用它来创建包含扩展类型的数组和表
>>> rational_type = RationalType(pa.int32())
>>> rational_type.extension_name
'my_package.rational'
>>> rational_type.storage_type
StructType(struct<numer: int32, denom: int32>)
>>> storage_array = pa.array(
... [
... {"numer": 10, "denom": 17},
... {"numer": 20, "denom": 13},
... ],
... type=rational_type.storage_type,
... )
>>> arr = rational_type.wrap_array(storage_array)
>>> # or equivalently
>>> arr = pa.ExtensionArray.from_storage(rational_type, storage_array)
>>> arr
<pyarrow.lib.ExtensionArray object at 0x1067f5420>
-- is_valid: all not null
-- child 0 type: int32
[
10,
20
]
-- child 1 type: int32
[
17,
13
]
此数组可以包含在 RecordBatches 中,通过 IPC 发送并在另一个 Python 进程中接收。接收进程必须明确注册扩展类型以进行反序列化,否则它将回退到存储类型
>>> pa.register_extension_type(RationalType(pa.int32()))
例如,创建 RecordBatch 并使用 IPC 协议将其写入流
>>> batch = pa.RecordBatch.from_arrays([arr], ["ext"])
>>> sink = pa.BufferOutputStream()
>>> with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
... writer.write_batch(batch)
>>> buf = sink.getvalue()
然后读回会产生正确的类型
>>> with pa.ipc.open_stream(buf) as reader:
... result = reader.read_all()
>>> result.column("ext").type
RationalType(StructType(struct<numer: int32, denom: int32>))
此外,请注意,虽然我们注册了具体类型 RationalType(pa.int32()),但 RationalType(integer_type) 对所有 Arrow 整数类型都使用相同的扩展名("my_package.rational")。因此,上述代码也允许用户(反)序列化这些数据类型
>>> big_rational_type = RationalType(pa.int64())
>>> storage_array = pa.array(
... [
... {"numer": 10, "denom": 17},
... {"numer": 20, "denom": 13},
... ],
... type=big_rational_type.storage_type,
... )
>>> arr = big_rational_type.wrap_array(storage_array)
>>> batch = pa.RecordBatch.from_arrays([arr], ["ext"])
>>> sink = pa.BufferOutputStream()
>>> with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
... writer.write_batch(batch)
>>> buf = sink.getvalue()
>>> with pa.ipc.open_stream(buf) as reader:
... result = reader.read_all()
>>> result.column("ext").type
RationalType(StructType(struct<numer: int64, denom: int64>))
接收应用程序不必是 Python,但如果它已实现自己的扩展类型来接收它,它仍然可以将扩展类型识别为“my_package.rational”类型。如果该类型未在接收应用程序中注册,它将回退到存储类型。
参数化扩展类型#
上面的示例说明了如何构造一个不需要存储类型之外的额外元数据的扩展类型。但 Arrow 还提供了更灵活的参数化扩展类型。
这里给出的示例实现了 pandas “period” 数据类型的扩展类型,表示时间跨度(例如,一天、一个月、一个季度等频率)。它存储为 int64 数组,解释为自 1970 年以来给定频率的时间跨度数。
class PeriodType(pa.ExtensionType):
def __init__(self, freq):
# attributes need to be set first before calling
# super init (as that calls serialize)
self._freq = freq
super().__init__(pa.int64(), "my_package.period")
@property
def freq(self):
return self._freq
def __arrow_ext_serialize__(self):
return "freq={}".format(self.freq).encode()
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
# Return an instance of this subclass given the serialized
# metadata.
serialized = serialized.decode()
assert serialized.startswith("freq=")
freq = serialized.split("=")[1]
return PeriodType(freq)
在这里,我们确保将重建实例所需的所有信息(在 __arrow_ext_deserialize__ 类方法中)存储在序列化元数据中,在本例中是频率字符串。
请注意,一旦创建,数据类型实例被认为是不可变的。在上面的示例中,freq 参数因此存储在一个私有属性中,并带有一个公共只读属性来访问它。
自定义扩展数组类#
默认情况下,所有具有扩展类型的数组都被构造或反序列化为内置的 ExtensionArray 对象。然而,人们可能希望子类化 ExtensionArray 以添加特定于扩展类型的一些自定义逻辑。Arrow 允许通过在扩展类型的定义中添加一个特殊方法 __arrow_ext_class__ 来实现这一点。
例如,让我们考虑 Numpy 快速入门中关于 3D 空间点的示例。我们可以将这些点存储为固定大小的列表,我们希望能够将数据提取为 2D Numpy 数组 (N, 3) 而无需任何复制
class Point3DArray(pa.ExtensionArray):
def to_numpy_array(self):
return self.storage.flatten().to_numpy().reshape((-1, 3))
class Point3DType(pa.ExtensionType):
def __init__(self):
super().__init__(pa.list_(pa.float32(), 3), "my_package.Point3DType")
def __arrow_ext_serialize__(self):
return b""
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
return Point3DType()
def __arrow_ext_class__(self):
return Point3DArray
使用此扩展类型构建的数组现在具有预期的自定义数组类
>>> storage = pa.array([[1, 2, 3], [4, 5, 6]], pa.list_(pa.float32(), 3))
>>> arr = pa.ExtensionArray.from_storage(Point3DType(), storage)
>>> arr
<__main__.Point3DArray object at 0x7f40dea80670>
[
[
1,
2,
3
],
[
4,
5,
6
]
]
然后用户可以使用扩展类中的附加方法
>>> arr.to_numpy_array()
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)
此数组可以通过 IPC 发送,并在另一个 Python 进程中接收,自定义扩展数组类将被保留(只要接收进程在读取 IPC 数据之前使用 register_extension_type() 注册扩展类型)。
自定义标量转换#
如果您希望您的自定义扩展类型的标量在调用 ExtensionScalar.as_py() 时转换为自定义类型,您可以通过子类化 ExtensionScalar 来重写 ExtensionScalar.as_py() 方法。例如,如果我们希望上面示例中的 3D 点类型返回自定义 3D 点类而不是列表,我们将实现
from collections import namedtuple
Point3D = namedtuple("Point3D", ["x", "y", "z"])
class Point3DScalar(pa.ExtensionScalar):
def as_py(self) -> Point3D:
return Point3D(*self.value.as_py())
class Point3DType(pa.ExtensionType):
def __init__(self):
super().__init__(pa.list_(pa.float32(), 3), "my_package.Point3DType")
def __arrow_ext_serialize__(self):
return b""
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
return Point3DType()
def __arrow_ext_scalar_class__(self):
return Point3DScalar
使用此扩展类型构建的数组现在提供可转换为我们的 Point3D 类的标量
>>> storage = pa.array([[1, 2, 3], [4, 5, 6]], pa.list_(pa.float32(), 3))
>>> arr = pa.ExtensionArray.from_storage(Point3DType(), storage)
>>> arr[0].as_py()
Point3D(x=1.0, y=2.0, z=3.0)
>>> arr.to_pylist()
[Point3D(x=1.0, y=2.0, z=3.0), Point3D(x=4.0, y=5.0, z=6.0)]
转换为 pandas#
如果您的扩展类型存在相应的 pandas 扩展数组,则可以控制具有扩展类型的列到 pandas(在 Table.to_pandas() 中)的转换。
为此,需要实现 ExtensionType.to_pandas_dtype() 方法,并且应该返回一个 pandas.api.extensions.ExtensionDtype 子类实例。
以上面的 pandas period 类型为例,这将如下所示
class PeriodType(pa.ExtensionType):
...
def to_pandas_dtype(self):
import pandas as pd
return pd.PeriodDtype(freq=self.freq)
其次,pandas ExtensionDtype 反过来需要实现 __from_arrow__ 方法:一个给定 PyArrow 扩展类型的 Array 或 ChunkedArray 可以构造相应的 pandas ExtensionArray 的方法。此方法应具有以下签名
class MyExtensionDtype(pd.api.extensions.ExtensionDtype):
...
def __from_arrow__(self, array: pyarrow.Array/ChunkedArray) -> pandas.ExtensionArray:
...
通过这种方式,您可以控制 PyArrow 扩展类型的 PyArrow Array 到可以存储在 DataFrame 中的 pandas ExtensionArray 的转换。
规范扩展类型#
您可以在 规范扩展类型 部分找到规范扩展类型的官方列表。在这里,我们添加了如何在 PyArrow 中使用它们的示例。
固定大小张量#
要创建具有相同形状(固定形状张量数组)的张量数组,我们首先需要定义一个具有值类型和形状的固定形状张量扩展类型
>>> tensor_type = pa.fixed_shape_tensor(pa.int32(), (2, 2))
然后我们需要具有 pyarrow.list_() 类型的存储数组,其中 value_type` 是固定形状张量值类型,列表大小是 tensor_type 形状元素的乘积。然后我们可以使用 pa.ExtensionArray.from_storage() 方法创建张量数组
>>> arr = [[1, 2, 3, 4], [10, 20, 30, 40], [100, 200, 300, 400]]
>>> storage = pa.array(arr, pa.list_(pa.int32(), 4))
>>> tensor_array = pa.ExtensionArray.from_storage(tensor_type, storage)
我们还可以创建另一个具有不同值类型的张量数组
>>> tensor_type_2 = pa.fixed_shape_tensor(pa.float32(), (2, 2))
>>> storage_2 = pa.array(arr, pa.list_(pa.float32(), 4))
>>> tensor_array_2 = pa.ExtensionArray.from_storage(tensor_type_2, storage_2)
扩展数组可以用作 pyarrow.Table 或 pyarrow.RecordBatch 中的列
>>> data = [
... pa.array([1, 2, 3]),
... pa.array(["foo", "bar", None]),
... pa.array([True, None, True]),
... tensor_array,
... tensor_array_2
... ]
>>> my_schema = pa.schema([("f0", pa.int8()),
... ("f1", pa.string()),
... ("f2", pa.bool_()),
... ("tensors_int", tensor_type),
... ("tensors_float", tensor_type_2)])
>>> table = pa.Table.from_arrays(data, schema=my_schema)
>>> table
pyarrow.Table
f0: int8
f1: string
f2: bool
tensors_int: extension<arrow.fixed_shape_tensor[value_type=int32, shape=[2,2]]>
tensors_float: extension<arrow.fixed_shape_tensor[value_type=float, shape=[2,2]]>
----
f0: [[1,2,3]]
f1: [["foo","bar",null]]
f2: [[true,null,true]]
tensors_int: [[[1,2,3,4],[10,20,30,40],[100,200,300,400]]]
tensors_float: [[[1,2,3,4],[10,20,30,40],[100,200,300,400]]]
我们还可以将张量数组转换为单个多维 numpy ndarray。转换后,Arrow 数组的长度成为 numpy ndarray 的第一个维度
>>> numpy_tensor = tensor_array_2.to_numpy_ndarray()
>>> numpy_tensor
array([[[ 1., 2.],
[ 3., 4.]],
[[ 10., 20.],
[ 30., 40.]],
[[100., 200.],
[300., 400.]]])
>>> numpy_tensor.shape
(3, 2, 2)
注意
两个可选参数 permutation 和 dim_names 旨在向用户提供有关数据逻辑布局与物理布局的信息。
转换为 numpy ndarray 仅适用于平凡的排列(None 或 [0, 1, ... N-1],其中 N 是张量维度数)。
反过来,我们也可以将 numpy ndarray 转换为固定形状张量数组
>>> pa.FixedShapeTensorArray.from_numpy_ndarray(numpy_tensor)
<pyarrow.lib.FixedShapeTensorArray object at ...>
[
[
1,
2,
3,
4
],
[
10,
20,
30,
40
],
[
100,
200,
300,
400
]
]
转换后,ndarray 的第一个维度成为 PyArrow 扩展数组的长度。我们可以在示例中看到,形状为 (3, 2, 2) 的 ndarray 变为长度为 3 的 Arrow 数组,其张量元素的形状为 (2, 2)。
# ndarray of shape (3, 2, 2)
>>> numpy_tensor.shape
(3, 2, 2)
# arrow array of length 3 with tensor elements of shape (2, 2)
>>> pyarrow_tensor_array = pa.FixedShapeTensorArray.from_numpy_ndarray(numpy_tensor)
>>> len(pyarrow_tensor_array)
3
>>> pyarrow_tensor_array.type.shape
[2, 2]
扩展类型也可以定义 permutation 和 dim_names。例如
>>> tensor_type = pa.fixed_shape_tensor(pa.float64(), [2, 2, 3], permutation=[0, 2, 1])
或
>>> tensor_type = pa.fixed_shape_tensor(pa.bool_(), [2, 2, 3], dim_names=["C", "H", "W"])
对于 NCHW 格式,其中
N:图像数量,在本例中是数组的长度,并且始终位于第一个维度
C:图像通道数
H:图像高度
W:图像宽度