扩展 pyarrow#

使用 PyCapsule 接口控制转换为 (Py)Arrow#

Arrow C 数据接口 允许在 Arrow 的不同实现之间移动 Arrow 数据。这是一个通用的、跨语言的接口,并非专门针对 Python,但对于 Python 库,此接口通过一个特定于 Python 的层进行了扩展: Arrow PyCapsule 接口

此 Python 接口确保支持 C 数据接口的不同库能够以标准方式导出 Arrow 数据结构并识别彼此的对象。

如果您的 Python 库提供了底层保存 Arrow 兼容数据的数据结构,则可以在这些对象上实现以下方法

  • __arrow_c_schema__ 用于模式或类型对象。

  • __arrow_c_array__ 用于数组和记录批次(连续表格)。

  • __arrow_c_stream__ 用于分块数组、表格和数据流。

这些方法返回 PyCapsule 对象,有关确切语义的更多详细信息,请参见 规范

当您的数据结构定义了这些方法时,PyArrow 构造函数(请参见下文)会将这些对象识别为支持此协议,并将其零拷贝转换为 PyArrow 数据结构。对于支持此协议以摄取数据的任何其他库,情况也是如此。

同样,如果您的库具有接受用户提供的数据的函数,则可以通过检查这些方法是否存在来添加对此协议的支持,从而接受任何 Arrow 数据(而不是硬编码对特定 Arrow 生成器(例如 PyArrow)的支持)。

对于通过此协议使用 PyArrow 消费数据,可以使用以下构造函数创建各种 PyArrow 对象

可以使用 pyarrow.field() 消耗与模式兼容的对象来创建 DataType,然后访问结果 Field 的 .type

使用 __arrow_array__ 协议控制转换为 pyarrow.Array#

pyarrow.array() 函数内置支持将 Python 序列、numpy 数组和 pandas 1D 对象(Series、Index、Categorical 等)转换为 Arrow 数组。通过实现 __arrow_array__ 方法(类似于 numpy 的 __array__ 协议),可以将其扩展到其他类数组对象。

例如,要支持将您的 duck 数组类转换为 Arrow 数组,请定义 __arrow_array__ 方法以返回 Arrow 数组

class MyDuckArray:

    ...

    def __arrow_array__(self, type=None):
        # convert the underlying array values to a pyarrow Array
        import pyarrow
        return pyarrow.array(..., type=type)

__arrow_array__ 方法采用可选的 type 关键字,该关键字从 pyarrow.array() 传递。该方法允许返回 ArrayChunkedArray

注意

有关控制 Python 对象转换为 Arrow 数据的更通用方法,请考虑 Arrow PyCapsule 接口。它并非特定于 PyArrow,并且支持转换其他对象,例如表格和模式。

定义扩展类型(“用户定义的类型”)#

Arrow 提供了一种扩展类型的概念,允许用户使用附加语义注释数据类型。这使开发人员既可以指定自定义序列化和反序列化例程(例如,针对 Python 标量pandas),又可以更轻松地解释数据。

在 Arrow 中,扩展类型 通过使用自定义类型名称和可选的字节串来注释任何内置的 Arrow 数据类型(“存储类型”)来指定,该字节串可用于提供额外的元数据(在本文档中称为“参数”)。它们在 Field 的 custom_metadata 中显示为 ARROW:extension:nameARROW:extension:metadata 键。

请注意,由于这些注释是 Arrow 规范的一部分,因此它们可能会被其他(非 Python)Arrow 使用者(例如 PySpark)识别。

PyArrow 允许您通过继承 ExtensionType 并为派生类提供自己的扩展名称和(反)序列化任何参数的机制,从 Python 定义扩展类型。例如,我们可以为可以表示为整数对的分数定义自定义有理类型

class RationalType(pa.ExtensionType):

    def __init__(self, data_type: pa.DataType):
        if not pa.types.is_integer(data_type):
            raise TypeError(f"data_type must be an integer type not {data_type}")

        super().__init__(
            pa.struct(
                [
                    ("numer", data_type),
                    ("denom", data_type),
                ],
            ),
            "my_package.rational",
        )

    def __arrow_ext_serialize__(self) -> bytes:
        # No parameters are necessary
        return b""

    @classmethod
    def __arrow_ext_deserialize__(cls, storage_type, serialized):
        # Sanity checks, not required but illustrate the method signature.
        assert pa.types.is_struct(storage_type)
        assert pa.types.is_integer(storage_type[0].type)
        assert storage_type[0].type == storage_type[1].type
        assert serialized == b""

        # return an instance of this subclass
        return RationalType(storage_type[0].type)

特殊方法 __arrow_ext_serialize____arrow_ext_deserialize__ 定义扩展类型实例的序列化和反序列化。

现在可以使用此方法创建保存扩展类型的数组和表格

>>> rational_type = RationalType(pa.int32())
>>> rational_type.extension_name
'my_package.rational'
>>> rational_type.storage_type
StructType(struct<numer: int32, denom: int32>)

>>> storage_array = pa.array(
...     [
...         {"numer": 10, "denom": 17},
...         {"numer": 20, "denom": 13},
...     ],
...     type=rational_type.storage_type,
... )
>>> arr = rational_type.wrap_array(storage_array)
>>> # or equivalently
>>> arr = pa.ExtensionArray.from_storage(rational_type, storage_array)
>>> arr
<pyarrow.lib.ExtensionArray object at 0x1067f5420>
-- is_valid: all not null
-- child 0 type: int32
  [
    10,
    20
  ]
-- child 1 type: int32
  [
    17,
    13
  ]

此数组可以包含在 RecordBatch 中,通过 IPC 发送,并在另一个 Python 进程中接收。接收进程必须显式注册扩展类型以进行反序列化,否则它将回退到存储类型

>>> pa.register_extension_type(RationalType(pa.int32()))

例如,创建 RecordBatch 并使用 IPC 协议将其写入流

>>> batch = pa.RecordBatch.from_arrays([arr], ["ext"])
>>> sink = pa.BufferOutputStream()
>>> with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
...    writer.write_batch(batch)
>>> buf = sink.getvalue()

然后将其读回会产生正确的类型

>>> with pa.ipc.open_stream(buf) as reader:
...    result = reader.read_all()
>>> result.column("ext").type
RationalType(StructType(struct<numer: int32, denom: int32>))

此外,请注意,虽然我们注册了具体类型 RationalType(pa.int32()),但是对于 所有 Arrow 整数类型,RationalType(integer_type) 使用相同的扩展名称 ("my_package.rational")。因此,上面的代码还允许用户(反)序列化这些数据类型

>>> big_rational_type = RationalType(pa.int64())
>>> storage_array = pa.array(
...     [
...         {"numer": 10, "denom": 17},
...         {"numer": 20, "denom": 13},
...     ],
...     type=big_rational_type.storage_type,
... )
>>> arr = big_rational_type.wrap_array(storage_array)
>>> batch = pa.RecordBatch.from_arrays([arr], ["ext"])
>>> sink = pa.BufferOutputStream()
>>> with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
...    writer.write_batch(batch)
>>> buf = sink.getvalue()
>>> with pa.ipc.open_stream(buf) as reader:
...    result = reader.read_all()
>>> result.column("ext").type
RationalType(StructType(struct<numer: int64, denom: int64>))

接收应用程序不需要是 Python,但如果它已经实现了自己的扩展类型来接收它,它仍然可以将扩展类型识别为“my_package.rational”类型。如果该类型未在接收应用程序中注册,它将回退到存储类型。

参数化扩展类型#

上面的示例说明了如何构造不需要超出其存储类型的额外元数据的扩展类型。但是 Arrow 还提供了更灵活的参数化扩展类型。

此处给出的示例实现了 pandas“期间”数据类型的扩展类型,表示时间跨度(例如,一天、一个月、一个季度等的频率)。它存储为 int64 数组,该数组解释为自 1970 年以来给定频率的时间跨度数。

class PeriodType(pa.ExtensionType):

    def __init__(self, freq):
        # attributes need to be set first before calling
        # super init (as that calls serialize)
        self._freq = freq
        super().__init__(pa.int64(), "my_package.period")

    @property
    def freq(self):
        return self._freq

    def __arrow_ext_serialize__(self):
        return "freq={}".format(self.freq).encode()

    @classmethod
    def __arrow_ext_deserialize__(cls, storage_type, serialized):
        # Return an instance of this subclass given the serialized
        # metadata.
        serialized = serialized.decode()
        assert serialized.startswith("freq=")
        freq = serialized.split("=")[1]
        return PeriodType(freq)

在此,我们确保将重构实例所需的所有信息存储在序列化元数据中(在 __arrow_ext_deserialize__ 类方法中),在本例中为频率字符串。

请注意,一旦创建,数据类型实例就被认为是不可变的。因此,在上面的示例中,freq 参数存储在具有公共只读属性以访问它的私有属性中。

自定义扩展数组类#

默认情况下,所有带有扩展类型的数组都会被构造或反序列化为内置的 ExtensionArray 对象。然而,有时我们可能希望子类化 ExtensionArray,以便为扩展类型添加一些自定义逻辑。Arrow 允许通过在扩展类型的定义中添加一个特殊方法 __arrow_ext_class__ 来实现这一点。

例如,让我们考虑来自 Numpy 快速入门 的 3D 空间中点的例子。我们可以将这些点存储为固定大小的列表,我们希望能够将数据提取为 2-D Numpy 数组 (N, 3),而无需任何复制。

class Point3DArray(pa.ExtensionArray):
    def to_numpy_array(self):
        return self.storage.flatten().to_numpy().reshape((-1, 3))


class Point3DType(pa.ExtensionType):
    def __init__(self):
        super().__init__(pa.list_(pa.float32(), 3), "my_package.Point3DType")

    def __arrow_ext_serialize__(self):
        return b""

    @classmethod
    def __arrow_ext_deserialize__(cls, storage_type, serialized):
        return Point3DType()

    def __arrow_ext_class__(self):
        return Point3DArray

现在使用此扩展类型构建的数组具有预期的自定义数组类。

>>> storage = pa.array([[1, 2, 3], [4, 5, 6]], pa.list_(pa.float32(), 3))
>>> arr = pa.ExtensionArray.from_storage(Point3DType(), storage)
>>> arr
<__main__.Point3DArray object at 0x7f40dea80670>
[
    [
        1,
        2,
        3
    ],
    [
        4,
        5,
        6
    ]
]

然后,扩展类中的其他方法可供用户使用。

>>> arr.to_numpy_array()
array([[1., 2., 3.],
   [4., 5., 6.]], dtype=float32)

此数组可以通过 IPC 发送,在另一个 Python 进程中接收,并且自定义扩展数组类将被保留(只要接收进程在使用 register_extension_type() 读取 IPC 数据之前注册了扩展类型)。

自定义标量转换#

如果您希望自定义扩展类型的标量在调用 ExtensionScalar.as_py() 时转换为自定义类型,则可以通过子类化 ExtensionScalar 来重写 ExtensionScalar.as_py() 方法。例如,如果我们希望上面的 3D 点类型返回一个自定义的 3D 点类而不是一个列表,我们将实现:

from collections import namedtuple

Point3D = namedtuple("Point3D", ["x", "y", "z"])

class Point3DScalar(pa.ExtensionScalar):
    def as_py(self) -> Point3D:
        return Point3D(*self.value.as_py())

class Point3DType(pa.ExtensionType):
    def __init__(self):
        super().__init__(pa.list_(pa.float32(), 3), "my_package.Point3DType")

    def __arrow_ext_serialize__(self):
        return b""

    @classmethod
    def __arrow_ext_deserialize__(cls, storage_type, serialized):
        return Point3DType()

    def __arrow_ext_scalar_class__(self):
        return Point3DScalar

现在,使用此扩展类型构建的数组提供的标量可以转换为我们的 Point3D 类。

>>> storage = pa.array([[1, 2, 3], [4, 5, 6]], pa.list_(pa.float32(), 3))
>>> arr = pa.ExtensionArray.from_storage(Point3DType(), storage)
>>> arr[0].as_py()
Point3D(x=1.0, y=2.0, z=3.0)

>>> arr.to_pylist()
[Point3D(x=1.0, y=2.0, z=3.0), Point3D(x=4.0, y=5.0, z=6.0)]

转换为 pandas#

如果您的扩展类型有相应的 pandas 扩展数组,则可以控制具有扩展类型的列转换为 pandas(在 Table.to_pandas() 中)。

为此,需要实现 ExtensionType.to_pandas_dtype() 方法,并且应返回 pandas.api.extensions.ExtensionDtype 子类实例。

以上面的 pandas period 类型为例,这将如下所示:

class PeriodType(pa.ExtensionType):
    ...

    def to_pandas_dtype(self):
        import pandas as pd
        return pd.PeriodDtype(freq=self.freq)

其次,pandas ExtensionDtype 本身需要实现 __from_arrow__ 方法:该方法给定扩展类型的 pyarrow Array 或 ChunkedArray,可以构造相应的 pandas ExtensionArray。此方法应具有以下签名:

class MyExtensionDtype(pd.api.extensions.ExtensionDtype):
    ...

    def __from_arrow__(self, array: pyarrow.Array/ChunkedArray) -> pandas.ExtensionArray:
        ...

这样,您可以控制 pyarrow 扩展类型的 pyarrow Array 到可以存储在 DataFrame 中的 pandas ExtensionArray 的转换。

规范扩展类型#

您可以在 规范扩展类型 部分中找到规范扩展类型的官方列表。在这里,我们添加了如何在 pyarrow 中使用它们的示例。

固定大小张量#

要创建具有相同形状的张量数组(固定形状张量数组),我们首先需要定义一个具有值类型和形状的固定形状张量扩展类型。

>>> tensor_type = pa.fixed_shape_tensor(pa.int32(), (2, 2))

然后,我们需要使用 pyarrow.list_() 类型的存储数组,其中 value_type` 是固定形状张量的值类型,列表大小是 tensor_type 形状元素的乘积。然后,我们可以使用 pa.ExtensionArray.from_storage() 方法创建张量数组。

>>> arr = [[1, 2, 3, 4], [10, 20, 30, 40], [100, 200, 300, 400]]
>>> storage = pa.array(arr, pa.list_(pa.int32(), 4))
>>> tensor_array = pa.ExtensionArray.from_storage(tensor_type, storage)

我们还可以创建另一个具有不同值类型的张量数组。

>>> tensor_type_2 = pa.fixed_shape_tensor(pa.float32(), (2, 2))
>>> storage_2 = pa.array(arr, pa.list_(pa.float32(), 4))
>>> tensor_array_2 = pa.ExtensionArray.from_storage(tensor_type_2, storage_2)

扩展数组可以用作 pyarrow.Tablepyarrow.RecordBatch 中的列。

>>> data = [
...     pa.array([1, 2, 3]),
...     pa.array(["foo", "bar", None]),
...     pa.array([True, None, True]),
...     tensor_array,
...     tensor_array_2
... ]
>>> my_schema = pa.schema([("f0", pa.int8()),
...                        ("f1", pa.string()),
...                        ("f2", pa.bool_()),
...                        ("tensors_int", tensor_type),
...                        ("tensors_float", tensor_type_2)])
>>> table = pa.Table.from_arrays(data, schema=my_schema)
>>> table
pyarrow.Table
f0: int8
f1: string
f2: bool
tensors_int: extension<arrow.fixed_shape_tensor[value_type=int32, shape=[2,2]]>
tensors_float: extension<arrow.fixed_shape_tensor[value_type=float, shape=[2,2]]>
----
f0: [[1,2,3]]
f1: [["foo","bar",null]]
f2: [[true,null,true]]
tensors_int: [[[1,2,3,4],[10,20,30,40],[100,200,300,400]]]
tensors_float: [[[1,2,3,4],[10,20,30,40],[100,200,300,400]]]

我们还可以将张量数组转换为单个多维 numpy ndarray。通过转换,箭头数组的长度成为 numpy ndarray 中的第一个维度。

>>> numpy_tensor = tensor_array_2.to_numpy_ndarray()
>>> numpy_tensor
array([[[  1.,   2.],
        [  3.,   4.]],
       [[ 10.,  20.],
        [ 30.,  40.]],
       [[100., 200.],
        [300., 400.]]])
 >>> numpy_tensor.shape
(3, 2, 2)

注意

两个可选参数 permutationdim_names 旨在为用户提供有关与物理布局相比,数据的逻辑布局的信息。

仅对于微不足道的排列(None[0, 1, ... N-1],其中 N 是张量维度的数量)才可能转换为 numpy ndarray。

反之亦然,我们也可以将 numpy ndarray 转换为固定形状的张量数组。

>>> pa.FixedShapeTensorArray.from_numpy_ndarray(numpy_tensor)
<pyarrow.lib.FixedShapeTensorArray object at ...>
[
  [
    1,
    2,
    3,
    4
  ],
  [
    10,
    20,
    30,
    40
  ],
  [
    100,
    200,
    300,
    400
  ]
]

通过转换,ndarray 的第一个维度成为 pyarrow 扩展数组的长度。我们可以在示例中看到,形状为 (3, 2, 2) 的 ndarray 变为长度为 3 的箭头数组,其张量元素的形状为 (2, 2)

# ndarray of shape (3, 2, 2)
>>> numpy_tensor.shape
(3, 2, 2)

# arrow array of length 3 with tensor elements of shape (2, 2)
>>> pyarrow_tensor_array = pa.FixedShapeTensorArray.from_numpy_ndarray(numpy_tensor)
>>> len(pyarrow_tensor_array)
3
>>> pyarrow_tensor_array.type.shape
[2, 2]

扩展类型还可以定义 permutationdim_names。例如:

>>> tensor_type = pa.fixed_shape_tensor(pa.float64(), [2, 2, 3], permutation=[0, 2, 1])

>>> tensor_type = pa.fixed_shape_tensor(pa.bool_(), [2, 2, 3], dim_names=["C", "H", "W"])

对于 NCHW 格式,其中

  • N:图像数量,在我们的例子中是数组的长度,并且始终位于第一个维度上。

  • C:图像的通道数

  • H:图像的高度

  • W:图像的宽度