扩展 PyArrow#

使用 PyCapsule 接口控制向 (Py)Arrow 的转换#

Arrow C 数据接口 允许在不同的 Arrow 实现之间传输 Arrow 数据。这是一个通用的、跨语言的接口,并非 Python 专用。但对于 Python 库,该接口通过一个 Python 特有的层进行了扩展:Arrow PyCapsule 接口

此 Python 接口确保支持 C 数据接口的不同库能够以标准方式导出 Arrow 数据结构,并识别彼此的对象。

如果您拥有一个提供底层持有 Arrow 兼容数据的 Python 库,则可以在这些对象上实现以下方法:

  • __arrow_c_schema__,用于 Schema(模式)或类类型对象。

  • __arrow_c_array__,用于数组和记录批次(连续表)。

  • __arrow_c_stream__,用于分块数组、表和数据流。

这些方法返回 PyCapsule 对象,关于确切语义的更多详细信息,请参见 规范

当您的数据结构定义了这些方法时,PyArrow 的构造函数(见下文)将识别这些对象支持此协议,并将它们零拷贝地转换为 PyArrow 数据结构。对于任何其他支持此摄取数据协议的库,情况也是如此。

同样,如果您的库具有接受用户提供数据的函数,您可以通过检查这些方法是否存在来添加对该协议的支持,从而接受任何 Arrow 数据(而不是硬编码对特定 Arrow 生产者(如 PyArrow)的支持)。

如需通过此协议使用 PyArrow 消费数据,可以使用以下构造函数来创建各种 PyArrow 对象:

结果类

PyArrow 构造函数

支持的协议

数组

pyarrow.array()

数组

ChunkedArray

pyarrow.chunked_array()

array(数组), stream(流)

RecordBatch

pyarrow.record_batch()

数组

pyarrow.table()

array(数组), stream(流)

RecordBatchReader

pyarrow.RecordBatchReader.from_stream()

stream(流)

Field

pyarrow.field()

模式

Schema

pyarrow.schema()

模式

DataType 可以通过使用 pyarrow.field() 消费 Schema 兼容对象,然后访问结果 Field 的 .type 来创建。

使用 __arrow_array__ 协议控制向 pyarrow.Array 的转换#

pyarrow.array() 函数内置了对 Python 序列、numpy 数组和 pandas 一维对象(Series、Index、Categorical 等)的支持,可将它们转换为 Arrow 数组。通过实现 __arrow_array__ 方法(类似于 numpy 的 __array__ 协议),可以将其扩展到其他类数组对象。

例如,要支持将您的 duck 数组类转换为 Arrow 数组,请定义 __arrow_array__ 方法以返回一个 Arrow 数组。

>>> class MyDuckArray:
...
...     def __arrow_array__(self, type=None):
...         # convert the underlying array values to a PyArrow Array
...         import pyarrow
...         return pyarrow.array(..., type=type)

__arrow_array__ 方法采用一个可选的 type 关键字,该关键字由 pyarrow.array() 传入。该方法允许返回 ArrayChunkedArray

注意

若要以更通用的方式控制 Python 对象到 Arrow 数据的转换,请考虑 Arrow PyCapsule 接口。它不特定于 PyArrow,并且支持转换其他对象,如表(tables)和 Schema(模式)。

定义扩展类型(“用户定义类型”)#

Arrow 提供了一种扩展类型概念,允许用户使用额外的语义来注释数据类型。这使得开发者既可以指定自定义的序列化和反序列化程序(例如,用于 Python 标量pandas),也可以更轻松地解释数据。

在 Arrow 中,扩展类型 是通过使用自定义类型名称(以及可选的字节串,该字节串可用于提供额外元数据,在本规范中称为“参数”)来注释任何内置 Arrow 数据类型(即“存储类型”)来指定的。这些出现在 Field 的 custom_metadata 中的 ARROW:extension:nameARROW:extension:metadata 键中。

请注意,由于这些注释是 Arrow 规范的一部分,它们可能被其他(非 Python)Arrow 消费者(如 PySpark)识别。

PyArrow 允许您通过继承 ExtensionType 并赋予派生类其自己的扩展名及 (反)序列化任何参数的机制,来从 Python 定义扩展类型。例如,我们可以定义一个自定义的有理数类型,表示为一对整数。

>>> import pyarrow as pa
>>> class RationalType(pa.ExtensionType):
...
...     def __init__(self, data_type: pa.DataType):
...         if not pa.types.is_integer(data_type):
...             raise TypeError(f"data_type must be an integer type not {data_type}")
...
...         super().__init__(
...             pa.struct(
...                 [
...                     ("numer", data_type),
...                     ("denom", data_type),
...                 ],
...             ),
...             "my_package.rational",
...         )
...
...     def __arrow_ext_serialize__(self) -> bytes:
...         # No parameters are necessary
...         return b""
...
...     @classmethod
...     def __arrow_ext_deserialize__(cls, storage_type, serialized):
...         # Sanity checks, not required but illustrate the method signature.
...         assert pa.types.is_struct(storage_type)
...         assert pa.types.is_integer(storage_type[0].type)
...         assert storage_type[0].type == storage_type[1].type
...         assert serialized == b""
...
...         # return an instance of this subclass
...         return RationalType(storage_type[0].type)

特殊方法 __arrow_ext_serialize____arrow_ext_deserialize__ 定义了扩展类型实例的序列化和反序列化。

这现在可以用于创建持有该扩展类型的数组和表。

>>> rational_type = RationalType(pa.int32())
>>> rational_type.extension_name
'my_package.rational'
>>> rational_type.storage_type
StructType(struct<numer: int32, denom: int32>)
>>> storage_array = pa.array(
...     [
...         {"numer": 10, "denom": 17},
...         {"numer": 20, "denom": 13},
...     ],
...     type=rational_type.storage_type,
... )
>>> arr = rational_type.wrap_array(storage_array)
>>> # or equivalently
>>> arr = pa.ExtensionArray.from_storage(rational_type, storage_array)
>>> arr
<pyarrow.lib.ExtensionArray object at ...>
-- is_valid: all not null
-- child 0 type: int32
  [
    10,
    20
  ]
-- child 1 type: int32
  [
    17,
    13
  ]

此数组可以包含在 RecordBatches 中,通过 IPC 发送并在另一个 Python 进程中接收。接收进程必须显式注册该扩展类型以进行反序列化,否则它将回退到存储类型。

>>> pa.register_extension_type(RationalType(pa.int32()))

例如,创建一个 RecordBatch 并使用 IPC 协议将其写入流:

>>> batch = pa.RecordBatch.from_arrays([arr], ["ext"])
>>> sink = pa.BufferOutputStream()
>>> with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
...     writer.write_batch(batch)
>>> buf = sink.getvalue()

然后读回它会得到正确的类型。

>>> with pa.ipc.open_stream(buf) as reader:
...     result = reader.read_all()
>>> result.column("ext").type
RationalType(StructType(struct<numer: int32, denom: int32>))

此外,请注意,虽然我们注册了具体类型 RationalType(pa.int32()),但 RationalType(integer_type) 对 *所有* Arrow 整数类型都使用相同的扩展名("my_package.rational")。因此,上述代码也允许用户 (反)序列化这些数据类型。

>>> big_rational_type = RationalType(pa.int64())
>>> storage_array = pa.array(
...     [
...         {"numer": 10, "denom": 17},
...         {"numer": 20, "denom": 13},
...     ],
...     type=big_rational_type.storage_type,
... )
>>> arr = big_rational_type.wrap_array(storage_array)
>>> batch = pa.RecordBatch.from_arrays([arr], ["ext"])
>>> sink = pa.BufferOutputStream()
>>> with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
...     writer.write_batch(batch)
>>> buf = sink.getvalue()
>>> with pa.ipc.open_stream(buf) as reader:
...     result = reader.read_all()
>>> result.column("ext").type
RationalType(StructType(struct<numer: int64, denom: int64>))

接收应用程序不必是 Python,但如果它已实现自己的扩展类型来接收它,它仍然可以识别该扩展类型为“my_package.rational”类型。如果该类型未在接收应用程序中注册,它将回退到存储类型。

参数化扩展类型#

上面的例子展示了如何构建一个除了存储类型外不需要额外元数据的扩展类型。但 Arrow 也提供了更灵活的参数化扩展类型。

此处给出的示例实现了 pandas “period” 数据类型 的扩展类型,表示时间跨度(例如,频率为天、月、季度等)。它存储为 int64 数组,被解释为自 1970 年以来的给定频率的时间跨度数。

>>> class PeriodType(pa.ExtensionType):
...
...     def __init__(self, freq):
...         # attributes need to be set first before calling
...         # super init (as that calls serialize)
...         self._freq = freq
...         super().__init__(pa.int64(), "my_package.period")
...
...     @property
...     def freq(self):
...         return self._freq
...
...     def __arrow_ext_serialize__(self):
...         return "freq={}".format(self.freq).encode()
...
...     @classmethod
...     def __arrow_ext_deserialize__(cls, storage_type, serialized):
...         # Return an instance of this subclass given the serialized
...         # metadata.
...         serialized = serialized.decode()
...         assert serialized.startswith("freq=")
...         freq = serialized.split("=")[1]
...         return PeriodType(freq)

在这里,我们确保将重建实例(在 __arrow_ext_deserialize__ 类方法中)所需的所有信息存储在序列化元数据中,本例中即频率字符串。

请注意,一旦创建,数据类型实例就被视为不可变的。因此,在上面的例子中,freq 参数被存储在一个私有属性中,并带有一个用于访问它的公共只读属性。

自定义扩展数组类#

默认情况下,所有具有扩展类型的数组都被构建或反序列化为内置的 ExtensionArray 对象。然而,人们可能希望继承 ExtensionArray 以添加特定于扩展类型的自定义逻辑。Arrow 允许通过向扩展类型的定义添加特殊方法 __arrow_ext_class__ 来实现这一点。

例如,让我们考虑 Numpy 快速入门 中 3D 空间点的示例。我们可以将它们存储为定长列表,我们希望能够以 (N, 3) 的二维 Numpy 数组形式提取数据,且无需任何拷贝。

>>> class Point3DArray(pa.ExtensionArray):
...     def to_numpy_array(self):
...         return self.storage.flatten().to_numpy().reshape((-1, 3))
>>> class Point3DType(pa.ExtensionType):
...     def __init__(self):
...         super().__init__(pa.list_(pa.float32(), 3), "my_package.Point3DType")
...
...     def __arrow_ext_serialize__(self):
...         return b""
...
...     @classmethod
...     def __arrow_ext_deserialize__(cls, storage_type, serialized):
...         return Point3DType()
...
...     def __arrow_ext_class__(self):
...         return Point3DArray

使用此扩展类型构建的数组现在拥有预期的自定义数组类。

>>> storage = pa.array([[1, 2, 3], [4, 5, 6]], pa.list_(pa.float32(), 3))
>>> arr = pa.ExtensionArray.from_storage(Point3DType(), storage)
>>> arr
<__main__.Point3DArray object at ...>
[
  [
    1,
    2,
    3
  ],
  [
    4,
    5,
    6
  ]
]

扩展类中的附加方法随后可供用户使用。

>>> arr.to_numpy_array()
array([[1., 2., 3.],
       [4., 5., 6.]], dtype=float32)

此数组可以通过 IPC 发送,在另一个 Python 进程中接收,并且自定义扩展数组类将被保留(只要接收进程在读取 IPC 数据之前使用 register_extension_type() 注册了该扩展类型)。

自定义标量转换#

如果您希望当调用 ExtensionScalar.as_py() 时,您的自定义扩展类型的标量能转换为自定义类型,您可以通过继承 ExtensionScalar 来重写 ExtensionScalar.as_py() 方法。例如,如果我们希望上面的 3D 点类型示例返回一个自定义的 3D 点类而不是列表,我们将实现:

>>> from collections import namedtuple
>>> Point3D = namedtuple("Point3D", ["x", "y", "z"])
>>> class Point3DScalar(pa.ExtensionScalar):
...     def as_py(self, **kwargs) -> Point3D:
...         return Point3D(*self.value.as_py(**kwargs))
>>> class Point3DType(pa.ExtensionType):
...     def __init__(self):
...         super().__init__(pa.list_(pa.float32(), 3), "my_package.Point3DType")
...
...     def __arrow_ext_serialize__(self):
...         return b""
...
...     @classmethod
...     def __arrow_ext_deserialize__(cls, storage_type, serialized):
...         return Point3DType()
...
...     def __arrow_ext_scalar_class__(self):
...         return Point3DScalar

使用此扩展类型构建的数组现在提供的标量会转换为我们的 Point3D 类。

>>> storage = pa.array([[1, 2, 3], [4, 5, 6]], pa.list_(pa.float32(), 3))
>>> arr = pa.ExtensionArray.from_storage(Point3DType(), storage)
>>> arr[0].as_py()
Point3D(x=1.0, y=2.0, z=3.0)
>>> arr.to_pylist()
[Point3D(x=1.0, y=2.0, z=3.0), Point3D(x=4.0, y=5.0, z=6.0)]

向 pandas 的转换#

如果您的扩展类型有对应的 pandas 扩展数组,则带有扩展类型的列向 pandas 的转换(在 Table.to_pandas() 中)是可以控制的。

为此,需要实现 ExtensionType.to_pandas_dtype() 方法,该方法应返回一个 pandas.api.extensions.ExtensionDtype 子类实例。

以使用上述 pandas period 类型为例,代码如下:

>>> class PeriodType(pa.ExtensionType):
...
...     def to_pandas_dtype(self):
...         import pandas as pd
...         return pd.PeriodDtype(freq=self.freq)

其次,pandas 的 ExtensionDtype 反过来也需要实现 __from_arrow__ 方法:该方法给定扩展类型的 PyArrow Array 或 ChunkedArray,可以构建对应的 pandas ExtensionArray。此方法应具有以下签名:

>>> import pandas as pd
>>> class MyExtensionDtype(pd.api.extensions.ExtensionDtype):
...
...     def __from_arrow__(self, array):  # pyarrow.Array/ChunkedArray -> pandas.ExtensionArray
...         pass

通过这种方式,您可以控制 PyArrow 扩展类型的 PyArrow Array 到可以存储在 DataFrame 中的 pandas ExtensionArray 的转换。

规范扩展类型#

您可以在 规范扩展类型 部分找到规范扩展类型的官方列表。在这里,我们添加了关于如何在 PyArrow 中使用它们的示例。

定形张量#

要创建具有相同形状的张量数组(定形张量数组),我们首先需要定义一个具有值类型和形状的定形张量扩展类型。

>>> tensor_type = pa.fixed_shape_tensor(pa.int32(), (2, 2))

然后我们需要 pyarrow.list_() 类型的存储数组,其中 value_type 是定形张量的值类型,列表大小是 tensor_type 形状元素的乘积。然后我们可以使用 pa.ExtensionArray.from_storage() 方法创建张量数组。

>>> arr = [[1, 2, 3, 4], [10, 20, 30, 40], [100, 200, 300, 400]]
>>> storage = pa.array(arr, pa.list_(pa.int32(), 4))
>>> tensor_array = pa.ExtensionArray.from_storage(tensor_type, storage)

我们也可以创建另一个具有不同值类型的张量数组。

>>> tensor_type_2 = pa.fixed_shape_tensor(pa.float32(), (2, 2))
>>> storage_2 = pa.array(arr, pa.list_(pa.float32(), 4))
>>> tensor_array_2 = pa.ExtensionArray.from_storage(tensor_type_2, storage_2)

扩展数组可用作 pyarrow.Tablepyarrow.RecordBatch 中的列。

>>> data = [
...     pa.array([1, 2, 3]),
...     pa.array(["foo", "bar", None]),
...     pa.array([True, None, True]),
...     tensor_array,
...     tensor_array_2
... ]
>>> my_schema = pa.schema([("f0", pa.int8()),
...                        ("f1", pa.string()),
...                        ("f2", pa.bool_()),
...                        ("tensors_int", tensor_type),
...                        ("tensors_float", tensor_type_2)])
>>> table = pa.Table.from_arrays(data, schema=my_schema)
>>> table
pyarrow.Table
f0: int8
f1: string
f2: bool
tensors_int: extension<arrow.fixed_shape_tensor[value_type=int32, shape=[2,2]]>
tensors_float: extension<arrow.fixed_shape_tensor[value_type=float, shape=[2,2]]>
----
f0: [[1,2,3]]
f1: [["foo","bar",null]]
f2: [[true,null,true]]
tensors_int: [[[1,2,3,4],[10,20,30,40],[100,200,300,400]]]
tensors_float: [[[1,2,3,4],[10,20,30,40],[100,200,300,400]]]

我们还可以将张量数组转换为单个多维 numpy ndarray。通过这种转换,Arrow 数组的长度成为 numpy ndarray 的第一个维度。

>>> numpy_tensor = tensor_array_2.to_numpy_ndarray()
>>> numpy_tensor
array([[[  1.,   2.],
        [  3.,   4.]],

       [[ 10.,  20.],
        [ 30.,  40.]],

       [[100., 200.],
        [300., 400.]]], dtype=float32)
 >>> numpy_tensor.shape
 (3, 2, 2)

注意

两个可选参数 permutationdim_names 旨在为用户提供关于数据逻辑布局与物理布局相比的信息。

向 numpy ndarray 的转换仅对平凡置换(None[0, 1, ... N-1],其中 N 是张量维数)是可能的。

反之亦然,我们可以将 numpy ndarray 转换为定形张量数组。

>>> pa.FixedShapeTensorArray.from_numpy_ndarray(numpy_tensor)
<pyarrow.lib.FixedShapeTensorArray object at ...>
[
  [
    1,
    2,
    3,
    4
  ],
  [
    10,
    20,
    30,
    40
  ],
  [
    100,
    200,
    300,
    400
  ]
]

通过转换,ndarray 的第一个维度成为 PyArrow 扩展数组的长度。我们在示例中可以看到,形状为 (3, 2, 2) 的 ndarray 变成了长度为 3 的 Arrow 数组,其张量元素形状为 (2, 2)

# ndarray of shape (3, 2, 2)
>>> numpy_tensor.shape
(3, 2, 2)

# arrow array of length 3 with tensor elements of shape (2, 2)
>>> pyarrow_tensor_array = pa.FixedShapeTensorArray.from_numpy_ndarray(numpy_tensor)
>>> len(pyarrow_tensor_array)
3
>>> pyarrow_tensor_array.type.shape
[2, 2]

扩展类型还可以定义 permutationdim_names。例如:

>>> tensor_type = pa.fixed_shape_tensor(pa.float64(), [2, 2, 3], permutation=[0, 2, 1])

>>> tensor_type = pa.fixed_shape_tensor(pa.bool_(), [2, 2, 3], dim_names=["C", "H", "W"])

对于 NCHW 格式,其中:

  • N:图像数量,在我们的例子中是数组的长度,总是位于第一个维度

  • C:图像的通道数

  • H:图像的高度

  • W:图像的宽度

UUID#

UUID 扩展类型(arrow.uuid)将通用唯一标识符表示为 16 字节的定长二进制值。PyArrow 提供了与 Python 内置 uuid 模块的集成,包括自动类型推断。

创建 UUID 标量和数组#

PyArrow 从 Python 的 uuid.UUID 对象中推断 UUID 类型,因此您可以直接将它们传递给 pyarrow.scalar()pyarrow.array()

>>> import uuid
>>> import pyarrow as pa

>>> pa.scalar(uuid.uuid4())
<pyarrow.UuidScalar: UUID('...')>

>>> uuids = [uuid.uuid4() for _ in range(3)]
>>> arr = pa.array(uuids)
>>> arr.type
UuidType(extension<arrow.uuid>)

您也可以使用 pyarrow.uuid() 显式指定 UUID 类型。

>>> pa.array([uuid.uuid4(), uuid.uuid4()], type=pa.uuid())
<pyarrow.lib.UuidArray object at ...>
[
  ...,
  ...
]