数据类型和内存中数据模型#

Apache Arrow 通过将类型元数据与内存缓冲区相结合来定义列式数组数据结构,正如 内存与 IO 文档中所解释的那样。这些数据结构在 Python 中通过一系列相互关联的类来展示:

  • 类型元数据 (Type Metadata): pyarrow.DataType 的实例,用于描述数组的类型并管理其值的解释方式。

  • 模式 (Schemas): pyarrow.Schema 的实例,用于描述命名的类型集合。它们可以被视为类表对象中的列类型。

  • 数组 (Arrays): pyarrow.Array 的实例,是由 Arrow 缓冲区 (Buffer) 对象组成的原子性、连续的列式数据结构。

  • 记录批次 (Record Batches): pyarrow.RecordBatch 的实例,是具有特定模式 (Schema) 的 Array 对象集合。

  • 表 (Tables): pyarrow.Table 的实例,一种逻辑表数据结构,其中每一列由一个或多个相同类型的 pyarrow.Array 对象组成。

我们将在以下章节通过一系列示例来详细考察这些内容。

类型元数据#

Apache Arrow 为数组数据定义了与语言无关的列式数据结构。其中包括:

  • 定长原始类型: 数字、布尔值、日期和时间、固定大小二进制、小数以及其他适合给定字节长度的值。

  • 变长原始类型: 二进制、字符串。

  • 嵌套类型: 列表 (list)、映射 (map)、结构体 (struct) 和联合体 (union)。

  • 字典类型: 一种编码后的分类类型(稍后详述)。

Arrow 中的每种数据类型都有一个对应的工厂函数,用于在 Python 中创建该类型对象的实例。

>>> import pyarrow as pa
>>> t1 = pa.int32()
>>> t2 = pa.string()
>>> t3 = pa.binary()
>>> t4 = pa.binary(10)
>>> t5 = pa.timestamp('ms')
>>> t1
DataType(int32)
>>> print(t1)
int32
>>> print(t4)
fixed_size_binary[10]
>>> print(t5)
timestamp[ms]

注意

不同的数据类型可能使用相同的物理存储。例如,int64float64timestamp[ms] 每个值都占用 64 位。

这些对象即为“元数据”;它们用于描述数组、模式和记录批次中的数据。在 Python 中,当输入数据(例如 Python 对象)可能被强制转换为多种 Arrow 类型时,可以使用它们。

Field 类型包含类型、名称以及可选的用户定义元数据。

>>> f0 = pa.field('int32_field', t1)
>>> f0
pyarrow.Field<int32_field: int32>
>>> f0.name
'int32_field'
>>> f0.type
DataType(int32)

Arrow 支持嵌套值类型,如列表、映射、结构体和联合体。创建这些类型时,必须传递类型或字段以指示该类型的子项的数据类型。例如,我们可以定义一个 int32 值的列表:

>>> t6 = pa.list_(t1)
>>> t6
ListType(list<item: int32>)

struct 是一组命名字段的集合。

>>> fields = [
...     pa.field('s0', t1),
...     pa.field('s1', t2),
...     pa.field('s2', t4),
...     pa.field('s3', t6),
... ]
>>> t7 = pa.struct(fields)
>>> print(t7)
struct<s0: int32, s1: string, s2: fixed_size_binary[10], s3: list<item: int32>>

为方便起见,可以直接传递 (name, type) 元组,而不是 Field 实例。

>>> t8 = pa.struct([('s0', t1), ('s1', t2), ('s2', t4), ('s3', t6)])
>>> print(t8)
struct<s0: int32, s1: string, s2: fixed_size_binary[10], s3: list<item: int32>>
>>> t8 == t7
True

有关数据类型函数的完整列表,请参阅 数据类型 API

模式#

Schema 类型类似于 struct 数组类型;它定义了记录批次或表数据结构中的列名和类型。pyarrow.schema() 工厂函数可在 Python 中创建新的 Schema 对象。

>>> my_schema = pa.schema([('field0', t1),
...                        ('field1', t2),
...                        ('field2', t4),
...                        ('field3', t6)])
>>> my_schema
field0: int32
field1: string
field2: fixed_size_binary[10]
field3: list<item: int32>
  child 0, item: int32

在某些应用中,您可能不会直接创建模式,而是仅使用嵌入在 IPC 消息 中的模式。

模式是不可变的,这意味着您无法更新现有的模式,但可以使用 Schema.set() 创建一个包含更新值的新模式。

>>> updated_field = pa.field('field0_new', pa.int64())
>>> my_schema2 = my_schema.set(0, updated_field)
>>> my_schema2
field0_new: int64
field1: string
field2: fixed_size_binary[10]
field3: list<item: int32>
  child 0, item: int32

数组#

对于每种数据类型,都有相应的数组数据结构,用于保存定义单个连续列式数组数据块的内存缓冲区。当您使用 PyArrow 时,这些数据可能来自 IPC 工具,也可以从各种类型的 Python 序列(列表、NumPy 数组、pandas 数据)创建。

创建数组的一种简单方法是使用 pyarrow.array,它类似于 numpy.array 函数。默认情况下,PyArrow 会为您推断数据类型。

>>> arr = pa.array([1, 2, None, 3])
>>> arr
<pyarrow.lib.Int64Array object at ...>
[
  1,
  2,
  null,
  3
]

但您也可以传递特定的数据类型来覆盖类型推断。

>>> pa.array([1, 2], type=pa.uint16())
<pyarrow.lib.UInt16Array object at ...>
[
  1,
  2
]

数组的 type 属性即为对应的类型元数据片段。

>>> arr.type
DataType(int64)

每个内存中的数组都有已知的长度和空值计数(如果没有空值,则为 0)。

>>> len(arr)
4
>>> arr.null_count
1

标量值可以通过普通索引进行选择。pyarrow.arrayNone 值转换为 Arrow 空值;我们返回特殊的 pyarrow.NA 值来表示空值。

>>> arr[0]
<pyarrow.Int64Scalar: 1>
>>> arr[2]
<pyarrow.Int64Scalar: None>

Arrow 数据是不可变的,因此只能选择值,不能进行赋值。

数组可以在不进行拷贝的情况下进行切片。

>>> arr[1:3]
<pyarrow.lib.Int64Array object at ...>
[
  2,
  null
]

None 值和 NaN 处理#

如上节所述,Python 对象 None 在转换为 pyarrow.Array 时总是被转换为 Arrow 的空元素。对于由 Python 对象 float('nan')numpy.nan 表示的浮点 NaN 值,我们通常在转换过程中将其转换为有效的浮点值。如果向 pyarrow.array 提供包含 np.nan 的整数输入,则会引发 ValueError

为了更好地兼容 Pandas,我们支持将 NaN 值解释为空元素。这在所有 from_pandas 函数上都是自动启用的,也可以通过在其他转换函数中传递 from_pandas=True 参数来启用。

列表数组 (List arrays)#

pyarrow.array 能够推断简单嵌套数据结构(如列表)的类型。

>>> nested_arr = pa.array([[], None, [1, 2], [None, 1]])
>>> print(nested_arr.type)
list<item: int64>

ListView 数组#

pyarrow.array 可以创建一种名为 ListView 的替代列表类型。

>>> nested_arr = pa.array([[], None, [1, 2], [None, 1]], type=pa.list_view(pa.int64()))
>>> print(nested_arr.type)
list_view<item: int64>

ListView 数组拥有与 List 数组不同的一组缓冲区。ListView 数组同时具有偏移量 (offsets) 和大小 (sizes) 缓冲区,而 List 数组仅具有偏移量缓冲区。这使得 ListView 数组能够指定无序的偏移量。

>>> values = [1, 2, 3, 4, 5, 6]
>>> offsets = [4, 2, 0]
>>> sizes = [2, 2, 2]
>>> arr = pa.ListViewArray.from_arrays(offsets, sizes, values)
>>> arr
<pyarrow.lib.ListViewArray object at ...>
[
  [
    5,
    6
  ],
  [
    3,
    4
  ],
  [
    1,
    2
  ]
]

有关 ListView 布局 的更多详细信息,请参阅格式规范。

结构体数组 (Struct arrays)#

pyarrow.array 能够从字典数组中推断结构体类型的模式。

>>> pa.array([{'x': 1, 'y': True}, {'z': 3.4, 'x': 4}])
<pyarrow.lib.StructArray object at ...>
-- is_valid: all not null
-- child 0 type: int64
  [
    1,
    4
  ]
-- child 1 type: bool
  [
    true,
    null
  ]
-- child 2 type: double
  [
    null,
    3.4
  ]

结构体数组可以从 Python 字典或元组序列中初始化。对于元组,必须显式传递类型。

>>> ty = pa.struct([('x', pa.int8()),
...                 ('y', pa.bool_())])
>>> pa.array([{'x': 1, 'y': True}, {'x': 2, 'y': False}], type=ty)
<pyarrow.lib.StructArray object at ...>
-- is_valid: all not null
-- child 0 type: int8
  [
    1,
    2
  ]
-- child 1 type: bool
  [
    true,
    false
  ]
>>> pa.array([(3, True), (4, False)], type=ty)
<pyarrow.lib.StructArray object at ...>
-- is_valid: all not null
-- child 0 type: int8
  [
    3,
    4
  ]
-- child 1 type: bool
  [
    true,
    false
  ]

初始化结构体数组时,允许在结构体级别和单个字段级别存在空值。如果从 Python 字典序列初始化,缺失的字典键将被视为 null 值。

>>> pa.array([{'x': 1}, None, {'y': None}], type=ty)
<pyarrow.lib.StructArray object at ...>
-- is_valid:
  [
    true,
    false,
    true
  ]
-- child 0 type: int8
  [
    1,
    0,
    null
  ]
-- child 1 type: bool
  [
    null,
    false,
    null
  ]

您还可以从现有的数组为结构体的每个组件构造结构体数组。在这种情况下,数据存储将与单个数组共享,并且不涉及任何拷贝。

>>> xs = pa.array([5, 6, 7], type=pa.int16())
>>> ys = pa.array([False, True, True])
>>> arr = pa.StructArray.from_arrays((xs, ys), names=('x', 'y'))
>>> arr.type
StructType(struct<x: int16, y: bool>)
>>> arr
<pyarrow.lib.StructArray object at ...>
-- is_valid: all not null
-- child 0 type: int16
  [
    5,
    6,
    7
  ]
-- child 1 type: bool
  [
    false,
    true,
    true
  ]

映射数组 (Map arrays)#

映射数组可以从元组列表(键值对)的列表中构造,但前提是必须将类型显式传递给 array()

>>> data = [[('x', 1), ('y', 0)], [('a', 2), ('b', 45)]]
>>> ty = pa.map_(pa.string(), pa.int64())
>>> pa.array(data, type=ty)
<pyarrow.lib.MapArray object at ...>
[
  keys:
  [
    "x",
    "y"
  ]
  values:
  [
    1,
    0
  ],
  keys:
  [
    "a",
    "b"
  ]
  values:
  [
    2,
    45
  ]
]

MapArray 也可以从偏移量、键和值数组中构造。偏移量表示每个映射的起始位置。注意,MapArray.keysMapArray.items 属性给出了展平后的键和值。要保持键和值与其对应的行相关联,请使用带有 MapArray.offsets 属性的 ListArray.from_arrays() 构造函数。

>>> arr = pa.MapArray.from_arrays([0, 2, 3], ['x', 'y', 'z'], [4, 5, 6])
>>> arr.keys
<pyarrow.lib.StringArray object at ...>
[
  "x",
  "y",
  "z"
]
>>> arr.items
<pyarrow.lib.Int64Array object at ...>
[
  4,
  5,
  6
]
>>> pa.ListArray.from_arrays(arr.offsets, arr.keys)
<pyarrow.lib.ListArray object at ...>
[
  [
    "x",
    "y"
  ],
  [
    "z"
  ]
]
>>> pa.ListArray.from_arrays(arr.offsets, arr.items)
<pyarrow.lib.ListArray object at ...>
[
  [
    4,
    5
  ],
  [
    6
  ]
]

联合体数组 (Union arrays)#

联合体类型表示一种嵌套数组类型,其中每个值可以是集合中可能类型中的一种(且只能是一种)。联合体数组有两种可能的存储类型:稀疏 (sparse) 和密集 (dense)。

在稀疏联合体数组中,每个子数组的长度与结果联合体数组的长度相同。它们配有一个 int8 类型的“类型”数组,该数组告知对于每个值,它必须从哪个子数组中选择。

>>> xs = pa.array([5, 6, 7])
>>> ys = pa.array([False, False, True])
>>> types = pa.array([0, 1, 1], type=pa.int8())
>>> union_arr = pa.UnionArray.from_sparse(types, [xs, ys])
>>> union_arr.type
SparseUnionType(sparse_union<0: int64=0, 1: bool=1>)
>>> union_arr
<pyarrow.lib.UnionArray object at ...>
-- is_valid: all not null
-- type_ids:   [
    0,
    1,
    1
  ]
-- child 0 type: int64
  [
    5,
    6,
    7
  ]
-- child 1 type: bool
  [
    false,
    false,
    true
  ]

在密集联合体数组中,除了 int8 “类型”数组外,还需要传递一个 int32 “偏移量”数组,该数组告知对于每个值,它可以在选定子数组的哪个偏移量处找到。

>>> xs = pa.array([5, 6, 7])
>>> ys = pa.array([False, True])
>>> types = pa.array([0, 1, 1, 0, 0], type=pa.int8())
>>> offsets = pa.array([0, 0, 1, 1, 2], type=pa.int32())
>>> union_arr = pa.UnionArray.from_dense(types, offsets, [xs, ys])
>>> union_arr.type
DenseUnionType(dense_union<0: int64=0, 1: bool=1>)
>>> union_arr
<pyarrow.lib.UnionArray object at ...>
-- is_valid: all not null
-- type_ids:   [
    0,
    1,
    1,
    0,
    0
  ]
-- value_offsets:   [
    0,
    0,
    1,
    1,
    2
  ]
-- child 0 type: int64
  [
    5,
    6,
    7
  ]
-- child 1 type: bool
  [
    false,
    true
  ]

字典数组 (Dictionary Arrays)#

PyArrow 中的 Dictionary 类型是一种特殊的数组类型,类似于 R 中的 factor 或 pandas.Categorical。它使文件或流中的一个或多个记录批次能够传输引用共享字典的整数索引,该字典包含逻辑数组中的不同值。这通常与字符串一起使用,以节省内存并提高性能。

Apache Arrow 格式处理字典的方式以及它们在 C++ 和 Python 中的呈现方式略有不同。我们定义了一个特殊的 DictionaryArray 类型以及相应的字典类型。让我们考虑一个例子。

>>> indices = pa.array([0, 1, 0, 1, 2, 0, None, 2])
>>> dictionary = pa.array(['foo', 'bar', 'baz'])
>>>
>>> dict_array = pa.DictionaryArray.from_arrays(indices, dictionary)
>>> dict_array
<pyarrow.lib.DictionaryArray object at ...>
...
-- dictionary:
  [
    "foo",
    "bar",
    "baz"
  ]
-- indices:
  [
    0,
    1,
    0,
    1,
    2,
    0,
    null,
    2
  ]

这里我们有:

>>> print(dict_array.type)
dictionary<values=string, indices=int64, ordered=0>
>>> dict_array.indices
<pyarrow.lib.Int64Array object at ...>
[
  0,
  1,
  0,
  1,
  2,
  0,
  null,
  2
]
>>> dict_array.dictionary
<pyarrow.lib.StringArray object at ...>
[
  "foo",
  "bar",
  "baz"
]

当将 DictionaryArray 与 pandas 一起使用时,其对应物是 pandas.Categorical(稍后详述)。

>>> dict_array.to_pandas()
0    foo
1    bar
2    foo
3    bar
4    baz
5    foo
6    NaN
7    baz
dtype: category
Categories (3, str): ['foo', 'bar', 'baz']

记录批次(Record Batches)#

Apache Arrow 中的 记录批次 (Record Batch) 是长度相等的数组实例的集合。让我们考虑一组数组。

>>> data = [
...     pa.array([1, 2, 3, 4]),
...     pa.array(['foo', 'bar', 'baz', None]),
...     pa.array([True, None, False, True])
... ]

可以使用 RecordBatch.from_arrays 从此数组列表中创建一个记录批次。

>>> batch = pa.RecordBatch.from_arrays(data, ['f0', 'f1', 'f2'])
>>> batch.num_columns
3
>>> batch.num_rows
4
>>> batch.schema
f0: int64
f1: string
f2: bool
>>>
>>> batch[1]
<pyarrow.lib.StringArray object at ...>
[
  "foo",
  "bar",
  "baz",
  null
]

记录批次可以像数组一样进行切片,而无需拷贝内存。

>>> batch2 = batch.slice(1, 3)
>>> batch2[1]
<pyarrow.lib.StringArray object at ...>
[
  "bar",
  "baz",
  null
]

表 (Tables)#

PyArrow 的 Table 类型不是 Apache Arrow 规范的一部分,而是一个旨在帮助将多个记录批次和数组片段处理为单个逻辑数据集的工具。作为一个相关的例子,我们可能在套接字流中收到多个小的记录批次,然后需要将它们连接成连续的内存以便在 NumPy 或 pandas 中使用。Table 对象使其变得高效,而无需额外的内存拷贝。

考虑到我们上面创建的记录批次,可以使用 Table.from_batches 创建一个包含该批次一个或多个副本的 Table。

>>> batches = [batch] * 5
>>> table = pa.Table.from_batches(batches)
>>> table
pyarrow.Table
f0: int64
f1: string
f2: bool
----
f0: [[1,2,3,4],[1,2,3,4],...,[1,2,3,4],[1,2,3,4]]
f1: [["foo","bar","baz",null],...,["foo","bar","baz",null]]
f2: [[true,null,false,true],...,[true,null,false,true]]
>>> table.num_rows
20

表的列是 ChunkedArray 的实例,它是包含一个或多个相同类型数组的容器。

>>> c = table[0]
>>> c
<pyarrow.lib.ChunkedArray object at ...>
[
  [
    1,
    2,
    3,
    4
  ],
  ...
  [
    1,
    2,
    3,
    4
  ]
]
>>> c.num_chunks
5
>>> c.chunk(0)
<pyarrow.lib.Int64Array object at ...>
[
  1,
  2,
  3,
  4
]

正如您在 pandas 章节 中看到的那样,我们可以将这些对象转换为连续的 NumPy 数组以在 pandas 中使用。

>>> c.to_pandas()
0     1
1     2
2     3
3     4
4     1
5     2
6     3
7     4
8     1
9     2
10    3
11    4
12    1
13    2
14    3
15    4
16    1
17    2
18    3
19    4
Name: f0, dtype: int64

如果模式相同,多个表也可以使用 pyarrow.concat_tables 连接在一起形成单个表。

>>> tables = [table] * 2
>>> table_all = pa.concat_tables(tables)
>>> table_all.num_rows
40
>>> c = table_all[0]
>>> c.num_chunks
10

这类似于 Table.from_batches,但使用表作为输入而不是记录批次。记录批次可以制成表,但反之则不然,因此如果您的数据已经是表的形式,请使用 pyarrow.concat_tables

自定义模式和字段元数据#

Arrow 支持模式级和字段级的自定义键值元数据,允许系统插入自己的应用程序定义元数据以自定义行为。

自定义元数据可以在模式级通过 Schema.metadata 访问,在字段级通过 Field.metadata 访问。

请注意,此元数据在 流式处理、序列化和 IPC 过程中会被保留。

要自定义现有表的模式元数据,可以使用 Table.replace_schema_metadata()

>>> table.schema.metadata
>>> table = table.replace_schema_metadata({"f0": "First dose"})
>>> table.schema.metadata
{b'f0': b'First dose'}

要自定义表模式中字段的元数据,可以使用 Field.with_metadata()

>>> field_f1 = table.schema.field("f1")
>>> field_f1.metadata
>>> field_f1 = field_f1.with_metadata({"f1": "Second dose"})
>>> field_f1.metadata
{b'f1': b'Second dose'}

这两个选项都会创建数据的浅拷贝,实际上并不会更改不可变的 Schema。为了更改我们创建的表的模式中的元数据,在调用 Table.replace_schema_metadata() 时,我们创建了一个新对象。

要更改模式中字段的元数据,我们需要定义一个新模式并将数据转换 (cast) 为此模式。

>>> my_schema2 = pa.schema([
...    pa.field('f0', pa.int64(), metadata={"name": "First dose"}),
...    pa.field('f1', pa.string(), metadata={"name": "Second dose"}),
...    pa.field('f2', pa.bool_())],
...    metadata={"f2": "booster"})
>>> t2 = table.cast(my_schema2)
>>> t2.schema.field("f0").metadata
{b'name': b'First dose'}
>>> t2.schema.field("f1").metadata
{b'name': b'Second dose'}
>>> t2.schema.metadata
{b'f2': b'booster'}

元数据键和值对在 C++ 实现中是 std::string 对象,因此在 Python 中它们是字节对象 (b'...')。

记录批次读取器 (Record Batch Readers)#

PyArrow 中的许多函数都会返回或接收 RecordBatchReader 作为参数。它可以像任何记录批次的可迭代对象一样使用,但也提供了它们的通用模式,而无需获取任何批次。

>>> schema = pa.schema([('x', pa.int64())])
>>>
>>> def iter_record_batches():
...    for i in range(2):
...       yield pa.RecordBatch.from_arrays([pa.array([1, 2, 3])], schema=schema)
>>>
>>> reader = pa.RecordBatchReader.from_batches(schema, iter_record_batches())
>>> print(reader.schema)
x: int64
>>> for batch in reader:
...    print(batch)
pyarrow.RecordBatch
x: int64
----
x: [1,2,3]
pyarrow.RecordBatch
x: int64
----
x: [1,2,3]

它也可以通过 C 流接口 在语言之间发送。

记录批次到张量的转换#

RecordBatch 的每个数组都有其自己的连续内存,不一定与其他数组相邻。机器学习库中使用的一种不同的内存结构是二维数组(也称为 2 维张量或矩阵),它仅占用一个连续的内存块。

因此,可以使用函数 pyarrow.RecordBatch.to_tensor() 来有效地将表格列式数据转换为张量。

此转换支持的数据类型为无符号整数、有符号整数和浮点类型。目前仅支持列主序 (column-major) 转换。

>>> arr1 = [1, 2, 3, 4, 5]
>>> arr2 = [10, 20, 30, 40, 50]
>>> batch = pa.RecordBatch.from_arrays(
...      [
...          pa.array(arr1, type=pa.uint16()),
...          pa.array(arr2, type=pa.int16()),
...      ], ["a", "b"]
...  )
>>> batch.to_tensor()
<pyarrow.Tensor>
type: int32
shape: (5, 2)
strides: (8, 4)
>>> batch.to_tensor().to_numpy()
array([[ 1, 10],
       [ 2, 20],
       [ 3, 30],
       [ 4, 40],
       [ 5, 50]], dtype=int32)

null_to_nan 设置为 True,也可以转换带有空值的数据。它们将被转换为 NaN

>>> batch = pa.record_batch(
...     [
...         pa.array([1, 2, 3, 4, None], type=pa.int32()),
...         pa.array([10, 20, 30, 40, None], type=pa.float32()),
...     ], names = ["a", "b"]
... )
>>> batch.to_tensor(null_to_nan=True).to_numpy()
array([[ 1., 10.],
       [ 2., 20.],
       [ 3., 30.],
       [ 4., 40.],
       [nan, nan]])