扩展 pyarrow#
使用 PyCapsule 接口控制转换为 (Py)Arrow#
Arrow C 数据接口 允许在 Arrow 的不同实现之间移动 Arrow 数据。这是一个通用的、跨语言的接口,并非特定于 Python,但对于 Python 库,此接口通过一个 Python 特定层进行了扩展:Arrow PyCapsule 接口。
此 Python 接口确保支持 C 数据接口的不同库能够以标准方式导出 Arrow 数据结构,并相互识别彼此的对象。
如果您有一个 Python 库提供了在底层保存 Arrow 兼容数据的结构,则可以在这些对象上实现以下方法
__arrow_c_schema__
用于模式或类型对象。__arrow_c_array__
用于数组和记录批(连续表)。__arrow_c_stream__
用于分块数组、表和数据流。
这些方法返回 PyCapsule 对象,有关确切语义的更多详细信息,请参阅 规范。
当您的数据结构定义了这些方法后,PyArrow 构造函数(见下文)会将这些对象识别为支持此协议,并以零拷贝方式将其转换为 PyArrow 数据结构。对于支持此协议的任何其他库在摄取数据时也是如此。
类似地,如果您的库具有接受用户提供数据的函数,则可以通过检查这些方法的存在来添加对该协议的支持,因此可以接受任何 Arrow 数据(而不是硬编码对特定 Arrow 生产者(如 PyArrow)的支持)。
要使用 PyArrow 通过此协议使用数据,可以使用以下构造函数创建各种 PyArrow 对象
结果类 |
PyArrow 构造函数 |
支持的协议 |
---|---|---|
数组 |
||
数组,流 |
||
数组 |
||
数组,流 |
||
流 |
||
模式 |
||
模式 |
可以使用 DataType
通过使用 pyarrow.field()
使用与模式兼容的对象创建,然后访问结果字段的 .type
。
使用 __arrow_array__
协议控制转换为 pyarrow.Array#
pyarrow.array()
函数内置支持 Python 序列、NumPy 数组和 Pandas 1D 对象(Series、Index、Categorical 等)将其转换为 Arrow 数组。可以通过实现 __arrow_array__
方法(类似于 NumPy 的 __array__
协议)来扩展其他类似数组的对象。
例如,要支持将您的鸭子数组类转换为 Arrow 数组,请定义 __arrow_array__
方法以返回 Arrow 数组
class MyDuckArray:
...
def __arrow_array__(self, type=None):
# convert the underlying array values to a pyarrow Array
import pyarrow
return pyarrow.array(..., type=type)
__arrow_array__
方法接受一个可选的 type
关键字,该关键字从 pyarrow.array()
传递过来。该方法允许返回 Array
或 ChunkedArray
。
注意
有关控制 Python 对象转换为 Arrow 数据的更通用方法,请参阅 Arrow PyCapsule 接口。它不特定于 PyArrow,并且支持转换其他对象,例如表和模式。
定义扩展类型(“用户定义类型”)#
Arrow 提供了扩展类型的概念,允许用户用额外的语义注释数据类型。这允许开发人员指定自定义序列化和反序列化例程(例如,到 Python 标量 和 Pandas)并更轻松地解释数据。
在 Arrow 中,扩展类型 通过使用自定义类型名称和可选的字节字符串注释任何内置 Arrow 数据类型(“存储类型”)来指定,该字节字符串可用于提供其他元数据(在本文档中称为“参数”)。这些显示为字段的 custom_metadata
中的 ARROW:extension:name
和 ARROW:extension:metadata
键。
请注意,由于这些注释是 Arrow 规范的一部分,因此其他(非 Python)Arrow 使用者(如 PySpark)可能会识别它们。
PyArrow 允许您通过子类化 ExtensionType
并为派生类提供自己的扩展名称和机制来(反)序列化任何参数来从 Python 定义扩展类型。例如,我们可以为分数定义一个自定义有理数类型,可以将其表示为一对整数
class RationalType(pa.ExtensionType):
def __init__(self, data_type: pa.DataType):
if not pa.types.is_integer(data_type):
raise TypeError(f"data_type must be an integer type not {data_type}")
super().__init__(
pa.struct(
[
("numer", data_type),
("denom", data_type),
],
),
"my_package.rational",
)
def __arrow_ext_serialize__(self) -> bytes:
# No parameters are necessary
return b""
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
# Sanity checks, not required but illustrate the method signature.
assert pa.types.is_struct(storage_type)
assert pa.types.is_integer(storage_type[0].type)
assert storage_type[0].type == storage_type[1].type
assert serialized == b""
# return an instance of this subclass
return RationalType(storage_type[0].type)
特殊方法 __arrow_ext_serialize__
和 __arrow_ext_deserialize__
定义扩展类型实例的序列化和反序列化。
现在可以使用它来创建包含扩展类型的数组和表
>>> rational_type = RationalType(pa.int32())
>>> rational_type.extension_name
'my_package.rational'
>>> rational_type.storage_type
StructType(struct<numer: int32, denom: int32>)
>>> storage_array = pa.array(
... [
... {"numer": 10, "denom": 17},
... {"numer": 20, "denom": 13},
... ],
... type=rational_type.storage_type,
... )
>>> arr = rational_type.wrap_array(storage_array)
>>> # or equivalently
>>> arr = pa.ExtensionArray.from_storage(rational_type, storage_array)
>>> arr
<pyarrow.lib.ExtensionArray object at 0x1067f5420>
-- is_valid: all not null
-- child 0 type: int32
[
10,
20
]
-- child 1 type: int32
[
17,
13
]
此数组可以包含在 RecordBatch 中,通过 IPC 发送并在另一个 Python 进程中接收。接收进程必须显式注册扩展类型以进行反序列化,否则它将回退到存储类型
>>> pa.register_extension_type(RationalType(pa.int32()))
例如,创建一个 RecordBatch 并使用 IPC 协议将其写入流
>>> batch = pa.RecordBatch.from_arrays([arr], ["ext"])
>>> sink = pa.BufferOutputStream()
>>> with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
... writer.write_batch(batch)
>>> buf = sink.getvalue()
然后读回它会产生正确的类型
>>> with pa.ipc.open_stream(buf) as reader:
... result = reader.read_all()
>>> result.column("ext").type
RationalType(StructType(struct<numer: int32, denom: int32>))
此外,请注意,虽然我们注册了具体类型 RationalType(pa.int32())
,但 RationalType(integer_type)
对所有 Arrow 整数类型都使用相同的扩展名称("my_package.rational"
)。因此,以上代码还允许用户(反)序列化这些数据类型
>>> big_rational_type = RationalType(pa.int64())
>>> storage_array = pa.array(
... [
... {"numer": 10, "denom": 17},
... {"numer": 20, "denom": 13},
... ],
... type=big_rational_type.storage_type,
... )
>>> arr = big_rational_type.wrap_array(storage_array)
>>> batch = pa.RecordBatch.from_arrays([arr], ["ext"])
>>> sink = pa.BufferOutputStream()
>>> with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
... writer.write_batch(batch)
>>> buf = sink.getvalue()
>>> with pa.ipc.open_stream(buf) as reader:
... result = reader.read_all()
>>> result.column("ext").type
RationalType(StructType(struct<numer: int64, denom: int64>))
接收应用程序不需要是 Python,但如果它实现了自己的扩展类型来接收它,则仍然可以将扩展类型识别为“my_package.rational”类型。如果接收应用程序中未注册该类型,则它将回退到存储类型。
参数化扩展类型#
以上示例说明了如何构造一个除了存储类型之外不需要其他元数据的扩展类型。但 Arrow 还提供了更灵活的参数化扩展类型。
此处给出的示例实现了 Pandas“时期”数据类型 的扩展类型,表示时间跨度(例如,一天、一个月、一个季度等的频率)。它存储为一个 int64 数组,该数组被解释为自 1970 年以来给定频率的时间跨度的数量。
class PeriodType(pa.ExtensionType):
def __init__(self, freq):
# attributes need to be set first before calling
# super init (as that calls serialize)
self._freq = freq
super().__init__(pa.int64(), "my_package.period")
@property
def freq(self):
return self._freq
def __arrow_ext_serialize__(self):
return "freq={}".format(self.freq).encode()
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
# Return an instance of this subclass given the serialized
# metadata.
serialized = serialized.decode()
assert serialized.startswith("freq=")
freq = serialized.split("=")[1]
return PeriodType(freq)
在这里,我们确保将重建实例所需的所有信息存储在序列化元数据中(在 __arrow_ext_deserialize__
类方法中),在本例中为频率字符串。
请注意,一旦创建,数据类型实例就被认为是不可变的。在上面的示例中,freq
参数因此存储在一个私有属性中,并使用一个公共只读属性来访问它。
自定义扩展数组类#
默认情况下,所有具有扩展类型的数组都构建或反序列化为内置的 ExtensionArray
对象。然而,人们可能希望子类化 ExtensionArray
以添加一些特定于扩展类型的自定义逻辑。Arrow 允许通过在扩展类型的定义中添加一个特殊方法 __arrow_ext_class__
来做到这一点。
例如,让我们考虑来自 NumPy 快速入门 的 3D 空间中点的示例。我们可以将其存储为固定大小的列表,我们希望能够将数据提取为一个 2D NumPy 数组 (N, 3)
而不进行任何复制
class Point3DArray(pa.ExtensionArray):
def to_numpy_array(self):
return self.storage.flatten().to_numpy().reshape((-1, 3))
class Point3DType(pa.ExtensionType):
def __init__(self):
super().__init__(pa.list_(pa.float32(), 3), "my_package.Point3DType")
def __arrow_ext_serialize__(self):
return b""
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
return Point3DType()
def __arrow_ext_class__(self):
return Point3DArray
使用此扩展类型构建的数组现在具有预期的自定义数组类
>>> storage = pa.array([[1, 2, 3], [4, 5, 6]], pa.list_(pa.float32(), 3))
>>> arr = pa.ExtensionArray.from_storage(Point3DType(), storage)
>>> arr
<__main__.Point3DArray object at 0x7f40dea80670>
[
[
1,
2,
3
],
[
4,
5,
6
]
]
然后,用户可以使用扩展类中的其他方法
>>> arr.to_numpy_array()
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)
此数组可以通过 IPC 发送,在另一个 Python 进程中接收,并且自定义扩展数组类将被保留(只要接收进程在读取 IPC 数据之前使用 register_extension_type()
注册了扩展类型)。
自定义标量转换#
如果希望在调用 ExtensionScalar.as_py()
时,自定义扩展类型的标量转换为自定义类型,可以通过继承 ExtensionScalar
并重写 ExtensionScalar.as_py()
方法来实现。例如,如果我们希望上面示例中的 3D 点类型返回自定义的 3D 点类而不是列表,我们将实现
from collections import namedtuple
Point3D = namedtuple("Point3D", ["x", "y", "z"])
class Point3DScalar(pa.ExtensionScalar):
def as_py(self) -> Point3D:
return Point3D(*self.value.as_py())
class Point3DType(pa.ExtensionType):
def __init__(self):
super().__init__(pa.list_(pa.float32(), 3), "my_package.Point3DType")
def __arrow_ext_serialize__(self):
return b""
@classmethod
def __arrow_ext_deserialize__(cls, storage_type, serialized):
return Point3DType()
def __arrow_ext_scalar_class__(self):
return Point3DScalar
使用此扩展类型构建的数组现在提供转换为我们 Point3D
类的标量。
>>> storage = pa.array([[1, 2, 3], [4, 5, 6]], pa.list_(pa.float32(), 3))
>>> arr = pa.ExtensionArray.from_storage(Point3DType(), storage)
>>> arr[0].as_py()
Point3D(x=1.0, y=2.0, z=3.0)
>>> arr.to_pylist()
[Point3D(x=1.0, y=2.0, z=3.0), Point3D(x=4.0, y=5.0, z=6.0)]
转换为 Pandas#
如果存在与您的扩展类型对应的 pandas 扩展数组,则可以控制具有扩展类型的列转换为 Pandas(在 Table.to_pandas()
中)的方式。
为此,需要实现 ExtensionType.to_pandas_dtype()
方法,并且应该返回 pandas.api.extensions.ExtensionDtype
子类实例。
以上面使用的 Pandas 周期类型为例,这将如下所示:
class PeriodType(pa.ExtensionType):
...
def to_pandas_dtype(self):
import pandas as pd
return pd.PeriodDtype(freq=self.freq)
其次,Pandas ExtensionDtype
需要实现 __from_arrow__
方法:此方法在给定扩展类型的 Pyarrow Array 或 ChunkedArray 时,可以构造相应的 Pandas ExtensionArray
。此方法应具有以下签名:
class MyExtensionDtype(pd.api.extensions.ExtensionDtype):
...
def __from_arrow__(self, array: pyarrow.Array/ChunkedArray) -> pandas.ExtensionArray:
...
这样,您可以控制将 Pyarrow 扩展类型的 Pyarrow Array
转换为可以存储在 DataFrame 中的 Pandas ExtensionArray
。
规范扩展类型#
您可以在 规范扩展类型 部分找到规范扩展类型的官方列表。在这里,我们添加了关于如何在 Pyarrow 中使用它们的示例。
固定大小张量#
要创建具有相同形状的张量数组(固定形状张量数组),我们首先需要使用值类型和形状定义一个固定形状张量扩展类型。
>>> tensor_type = pa.fixed_shape_tensor(pa.int32(), (2, 2))
然后我们需要使用 pyarrow.list_()
类型的存储数组,其中 value_type`
是固定形状张量的值类型,列表大小是 tensor_type
形状元素的乘积。然后,我们可以使用 pa.ExtensionArray.from_storage()
方法创建张量数组。
>>> arr = [[1, 2, 3, 4], [10, 20, 30, 40], [100, 200, 300, 400]]
>>> storage = pa.array(arr, pa.list_(pa.int32(), 4))
>>> tensor_array = pa.ExtensionArray.from_storage(tensor_type, storage)
我们还可以创建另一个具有不同值类型的张量数组。
>>> tensor_type_2 = pa.fixed_shape_tensor(pa.float32(), (2, 2))
>>> storage_2 = pa.array(arr, pa.list_(pa.float32(), 4))
>>> tensor_array_2 = pa.ExtensionArray.from_storage(tensor_type_2, storage_2)
扩展数组可以用作 pyarrow.Table
或 pyarrow.RecordBatch
中的列。
>>> data = [
... pa.array([1, 2, 3]),
... pa.array(["foo", "bar", None]),
... pa.array([True, None, True]),
... tensor_array,
... tensor_array_2
... ]
>>> my_schema = pa.schema([("f0", pa.int8()),
... ("f1", pa.string()),
... ("f2", pa.bool_()),
... ("tensors_int", tensor_type),
... ("tensors_float", tensor_type_2)])
>>> table = pa.Table.from_arrays(data, schema=my_schema)
>>> table
pyarrow.Table
f0: int8
f1: string
f2: bool
tensors_int: extension<arrow.fixed_shape_tensor[value_type=int32, shape=[2,2]]>
tensors_float: extension<arrow.fixed_shape_tensor[value_type=float, shape=[2,2]]>
----
f0: [[1,2,3]]
f1: [["foo","bar",null]]
f2: [[true,null,true]]
tensors_int: [[[1,2,3,4],[10,20,30,40],[100,200,300,400]]]
tensors_float: [[[1,2,3,4],[10,20,30,40],[100,200,300,400]]]
我们还可以将张量数组转换为单个多维 NumPy ndarray。转换后,Arrow 数组的长度将成为 NumPy ndarray 中的第一维。
>>> numpy_tensor = tensor_array_2.to_numpy_ndarray()
>>> numpy_tensor
array([[[ 1., 2.],
[ 3., 4.]],
[[ 10., 20.],
[ 30., 40.]],
[[100., 200.],
[300., 400.]]])
>>> numpy_tensor.shape
(3, 2, 2)
注意
两个可选参数 permutation
和 dim_names
旨在为用户提供有关数据逻辑布局与物理布局的信息。
只有对于简单的排列(None
或 [0, 1, ... N-1]
,其中 N
是张量维数)才可能转换为 NumPy ndarray。
反之亦然,我们可以将 NumPy ndarray 转换为固定形状张量数组。
>>> pa.FixedShapeTensorArray.from_numpy_ndarray(numpy_tensor)
<pyarrow.lib.FixedShapeTensorArray object at ...>
[
[
1,
2,
3,
4
],
[
10,
20,
30,
40
],
[
100,
200,
300,
400
]
]
转换后,ndarray 的第一维将成为 Pyarrow 扩展数组的长度。在示例中可以看到,形状为 (3, 2, 2)
的 ndarray 将变为长度为 3 的 Arrow 数组,其张量元素的形状为 (2, 2)
。
# ndarray of shape (3, 2, 2)
>>> numpy_tensor.shape
(3, 2, 2)
# arrow array of length 3 with tensor elements of shape (2, 2)
>>> pyarrow_tensor_array = pa.FixedShapeTensorArray.from_numpy_ndarray(numpy_tensor)
>>> len(pyarrow_tensor_array)
3
>>> pyarrow_tensor_array.type.shape
[2, 2]
扩展类型还可以定义 permutation
和 dim_names
。例如:
>>> tensor_type = pa.fixed_shape_tensor(pa.float64(), [2, 2, 3], permutation=[0, 2, 1])
或
>>> tensor_type = pa.fixed_shape_tensor(pa.bool_(), [2, 2, 3], dim_names=["C", "H", "W"])
对于 NCHW
格式,其中:
N:图像数量,在本例中为数组的长度,始终位于第一维。
C:图像的通道数。
H:图像的高度。
W:图像的宽度。