数据类型和内存中的数据模型#
Apache Arrow 通过将类型元数据与内存缓冲区(如内存和 IO 文档中解释的那些)组合起来定义列式数组数据结构。这些数据结构在 Python 中通过一系列相互关联的类公开:
类型元数据:
pyarrow.DataType的实例,描述数组的类型并管理其值的解释方式。Schema:
pyarrow.Schema的实例,描述命名类型集合。可以将其视为表状对象中的列类型。数组:
pyarrow.Array的实例,它们是由 Arrow 缓冲区对象组成的原子、连续的列式数据结构。记录批次:
pyarrow.RecordBatch的实例,它们是具有特定 Schema 的 Array 对象集合。表:
pyarrow.Table的实例,一个逻辑表数据结构,其中每列包含一个或多个相同类型的pyarrow.Array对象。
我们将在下面的各节中通过一系列示例来研究这些内容。
类型元数据#
Apache Arrow 为数组数据定义了与语言无关的面向列的数据结构。其中包括:
固定长度原始类型:数字、布尔值、日期和时间、固定大小二进制、小数以及其他适合给定数字的值。
可变长度原始类型:二进制、字符串。
嵌套类型:列表、映射、结构和联合。
字典类型:一种编码的分类类型(稍后详述)。
Arrow 中的每个数据类型都有一个相应的工厂函数,用于在 Python 中创建该类型对象的实例。
In [1]: import pyarrow as pa
In [2]: t1 = pa.int32()
In [3]: t2 = pa.string()
In [4]: t3 = pa.binary()
In [5]: t4 = pa.binary(10)
In [6]: t5 = pa.timestamp('ms')
In [7]: t1
Out[7]: DataType(int32)
In [8]: print(t1)
int32
In [9]: print(t4)
fixed_size_binary[10]
In [10]: print(t5)
timestamp[ms]
注意
不同的数据类型可能使用给定的物理存储。例如,int64、float64 和 timestamp[ms] 都占用每值 64 位。
这些对象是 metadata;它们用于描述数组、schema 和记录批次中的数据。在 Python 中,它们可以用于输入数据(例如 Python 对象)可以强制转换为多个 Arrow 类型的函数。
Field 类型是类型加上名称和可选的用户定义元数据。
In [11]: f0 = pa.field('int32_field', t1)
In [12]: f0
Out[12]: pyarrow.Field<int32_field: int32>
In [13]: f0.name
Out[13]: 'int32_field'
In [14]: f0.type
Out[14]: DataType(int32)
Arrow 支持嵌套值类型,如列表、映射、结构和联合。创建这些类型时,您必须传递类型或字段以指示这些类型的子类型的数据类型。例如,我们可以使用以下方式定义一个 int32 值列表:
In [15]: t6 = pa.list_(t1)
In [16]: t6
Out[16]: ListType(list<item: int32>)
struct 是命名字段的集合。
In [17]: fields = [
....: pa.field('s0', t1),
....: pa.field('s1', t2),
....: pa.field('s2', t4),
....: pa.field('s3', t6),
....: ]
....:
In [18]: t7 = pa.struct(fields)
In [19]: print(t7)
struct<s0: int32, s1: string, s2: fixed_size_binary[10], s3: list<item: int32>>
为方便起见,您可以直接传递 (name, type) 元组,而不是 Field 实例。
In [20]: t8 = pa.struct([('s0', t1), ('s1', t2), ('s2', t4), ('s3', t6)])
In [21]: print(t8)
struct<s0: int32, s1: string, s2: fixed_size_binary[10], s3: list<item: int32>>
In [22]: t8 == t7
Out[22]: True
有关数据类型函数的完整列表,请参阅数据类型 API。
模式#
Schema 类型类似于 struct 数组类型;它定义了记录批次或表数据结构中的列名和类型。pyarrow.schema() 工厂函数在 Python 中创建新的 Schema 对象。
In [23]: my_schema = pa.schema([('field0', t1),
....: ('field1', t2),
....: ('field2', t4),
....: ('field3', t6)])
....:
In [24]: my_schema
Out[24]:
field0: int32
field1: string
field2: fixed_size_binary[10]
field3: list<item: int32>
child 0, item: int32
在某些应用程序中,您可能不会直接创建 schema,而只使用嵌入在 IPC 消息 中的 schema。
Schema 是不可变的,这意味着您不能更新现有 schema,但可以使用 Schema.set() 创建一个具有更新值的新 schema。
In [25]: updated_field = pa.field('field0_new', pa.int64())
In [26]: my_schema2 = my_schema.set(0, updated_field)
In [27]: my_schema2
Out[27]:
field0_new: int64
field1: string
field2: fixed_size_binary[10]
field3: list<item: int32>
child 0, item: int32
数组#
对于每种数据类型,都有一个伴随的数组数据结构,用于存储定义单个连续列式数组数据块的内存缓冲区。当您使用 PyArrow 时,这些数据可能来自 IPC 工具,但也可以从各种类型的 Python 序列(列表、NumPy 数组、pandas 数据)创建。
创建数组的一个简单方法是使用 pyarrow.array,它类似于 numpy.array 函数。默认情况下,PyArrow 会为您推断数据类型。
In [28]: arr = pa.array([1, 2, None, 3])
In [29]: arr
Out[29]:
<pyarrow.lib.Int64Array object at 0x7fe0310679a0>
[
1,
2,
null,
3
]
但您也可以传递特定的数据类型来覆盖类型推断。
In [30]: pa.array([1, 2], type=pa.uint16())
Out[30]:
<pyarrow.lib.UInt16Array object at 0x7fe031067fa0>
[
1,
2
]
数组的 type 属性是相应的类型元数据。
In [31]: arr.type
Out[31]: DataType(int64)
每个内存中的数组都有已知的长度和空值计数(如果没有空值,则为 0)。
In [32]: len(arr)
Out[32]: 4
In [33]: arr.null_count
Out[33]: 1
可以使用正常索引选择标量值。pyarrow.array 将 None 值转换为 Arrow 空值;我们为 null 返回特殊的 pyarrow.NA 值。
In [34]: arr[0]
Out[34]: <pyarrow.Int64Scalar: 1>
In [35]: arr[2]
Out[35]: <pyarrow.Int64Scalar: None>
Arrow 数据是不可变的,因此值可以被选择但不能被赋值。
数组可以被切片而无需复制。
In [36]: arr[1:3]
Out[36]:
<pyarrow.lib.Int64Array object at 0x7fe030fc88e0>
[
2,
null
]
None 值和 NaN 处理#
如上节所述,Python 对象 None 在转换为 pyarrow.Array 时总是转换为 Arrow 空元素。对于浮点 NaN 值,它由 Python 对象 float('nan') 或 numpy.nan 表示,我们通常在转换过程中将其转换为一个有效的浮点值。如果提供给 pyarrow.array 的整数输入包含 np.nan,则会引发 ValueError。
为了更好地与 Pandas 兼容,我们支持将 NaN 值解释为空元素。这在所有 from_pandas 函数上自动启用,并且可以通过将 from_pandas=True 作为函数参数传递来在其他转换函数上启用。
列表数组#
pyarrow.array 能够推断简单嵌套数据结构(如列表)的类型。
In [37]: nested_arr = pa.array([[], None, [1, 2], [None, 1]])
In [38]: print(nested_arr.type)
list<item: int64>
ListView 数组#
pyarrow.array 可以创建另一种列表类型,称为 ListView。
In [39]: nested_arr = pa.array([[], None, [1, 2], [None, 1]], type=pa.list_view(pa.int64()))
In [40]: print(nested_arr.type)
list_view<item: int64>
ListView 数组具有与 List 数组不同的缓冲区集。ListView 数组同时具有偏移量和大小缓冲区,而 List 数组只有偏移量缓冲区。这允许 ListView 数组指定无序偏移量。
In [41]: values = [1, 2, 3, 4, 5, 6]
In [42]: offsets = [4, 2, 0]
In [43]: sizes = [2, 2, 2]
In [44]: arr = pa.ListViewArray.from_arrays(offsets, sizes, values)
In [45]: arr
Out[45]:
<pyarrow.lib.ListViewArray object at 0x7fe030fc96c0>
[
[
5,
6
],
[
3,
4
],
[
1,
2
]
]
有关 ListView 布局 的更多详细信息,请参阅格式规范。
结构体数组#
pyarrow.array 能够从字典数组推断结构体类型的 schema。
In [46]: pa.array([{'x': 1, 'y': True}, {'z': 3.4, 'x': 4}])
Out[46]:
<pyarrow.lib.StructArray object at 0x7fe030fc9900>
-- is_valid: all not null
-- child 0 type: int64
[
1,
4
]
-- child 1 type: bool
[
true,
null
]
-- child 2 type: double
[
null,
3.4
]
结构体数组可以通过 Python 字典或元组序列进行初始化。对于元组,您必须显式传递类型。
In [47]: ty = pa.struct([('x', pa.int8()),
....: ('y', pa.bool_())])
....:
In [48]: pa.array([{'x': 1, 'y': True}, {'x': 2, 'y': False}], type=ty)
Out[48]:
<pyarrow.lib.StructArray object at 0x7fe030fc9f60>
-- is_valid: all not null
-- child 0 type: int8
[
1,
2
]
-- child 1 type: bool
[
true,
false
]
In [49]: pa.array([(3, True), (4, False)], type=ty)
Out[49]:
<pyarrow.lib.StructArray object at 0x7fe030fc9ea0>
-- is_valid: all not null
-- child 0 type: int8
[
3,
4
]
-- child 1 type: bool
[
true,
false
]
初始化结构体数组时,允许在结构体级别和单个字段级别都存在空值。如果从 Python 字典序列初始化,缺少字典键将作为空值处理。
In [50]: pa.array([{'x': 1}, None, {'y': None}], type=ty)
Out[50]:
<pyarrow.lib.StructArray object at 0x7fe030fca140>
-- is_valid:
[
true,
false,
true
]
-- child 0 type: int8
[
1,
0,
null
]
-- child 1 type: bool
[
null,
false,
null
]
您还可以从每个结构体组件的现有数组构建结构体数组。在这种情况下,数据存储将与单个数组共享,并且不涉及复制。
In [51]: xs = pa.array([5, 6, 7], type=pa.int16())
In [52]: ys = pa.array([False, True, True])
In [53]: arr = pa.StructArray.from_arrays((xs, ys), names=('x', 'y'))
In [54]: arr.type
Out[54]: StructType(struct<x: int16, y: bool>)
In [55]: arr
Out[55]:
<pyarrow.lib.StructArray object at 0x7fe030fcab60>
-- is_valid: all not null
-- child 0 type: int16
[
5,
6,
7
]
-- child 1 type: bool
[
false,
true,
true
]
映射数组#
映射数组可以从列表的列表(键值对)构建,但前提是必须将类型显式传递给 array()。
In [56]: data = [[('x', 1), ('y', 0)], [('a', 2), ('b', 45)]]
In [57]: ty = pa.map_(pa.string(), pa.int64())
In [58]: pa.array(data, type=ty)
Out[58]:
<pyarrow.lib.MapArray object at 0x7fe030fcb040>
[
keys:
[
"x",
"y"
]
values:
[
1,
0
],
keys:
[
"a",
"b"
]
values:
[
2,
45
]
]
MapArrays 还可以从偏移量、键和项目数组构建。偏移量表示每个映射的起始位置。请注意,MapArray.keys 和 MapArray.items 属性给出的是扁平化的键和项目。要保持键和项目与其行关联,请使用 ListArray.from_arrays() 构造函数和 MapArray.offsets 属性。
In [59]: arr = pa.MapArray.from_arrays([0, 2, 3], ['x', 'y', 'z'], [4, 5, 6])
In [60]: arr.keys
Out[60]:
<pyarrow.lib.StringArray object at 0x7fe030fcb3a0>
[
"x",
"y",
"z"
]
In [61]: arr.items
Out[61]:
<pyarrow.lib.Int64Array object at 0x7fe030fcb4c0>
[
4,
5,
6
]
In [62]: pa.ListArray.from_arrays(arr.offsets, arr.keys)
Out[62]:
<pyarrow.lib.ListArray object at 0x7fe030fcb700>
[
[
"x",
"y"
],
[
"z"
]
]
In [63]: pa.ListArray.from_arrays(arr.offsets, arr.items)
Out[63]:
<pyarrow.lib.ListArray object at 0x7fe030fcb640>
[
[
4,
5
],
[
6
]
]
联合数组#
联合类型表示一种嵌套数组类型,其中每个值可以是(且仅是)一组可能类型中的一种。联合数组有两种可能的存储类型:稀疏和密集。
在稀疏联合数组中,每个子数组的长度与生成的联合数组相同。它们通过一个 int8 “类型”数组进行附加,该数组告诉每个值应从哪个子数组中选择。
In [64]: xs = pa.array([5, 6, 7])
In [65]: ys = pa.array([False, False, True])
In [66]: types = pa.array([0, 1, 1], type=pa.int8())
In [67]: union_arr = pa.UnionArray.from_sparse(types, [xs, ys])
In [68]: union_arr.type
Out[68]: SparseUnionType(sparse_union<0: int64=0, 1: bool=1>)
In [69]: union_arr
Out[69]:
<pyarrow.lib.UnionArray object at 0x7fe030fcbb20>
-- is_valid: all not null
-- type_ids: [
0,
1,
1
]
-- child 0 type: int64
[
5,
6,
7
]
-- child 1 type: bool
[
false,
false,
true
]
在密集联合数组中,除了 int8 “类型”数组外,您还需要传递一个 int32 “偏移量”数组,该数组告诉每个值在所选子数组中的哪个偏移量处可以找到。
In [70]: xs = pa.array([5, 6, 7])
In [71]: ys = pa.array([False, True])
In [72]: types = pa.array([0, 1, 1, 0, 0], type=pa.int8())
In [73]: offsets = pa.array([0, 0, 1, 1, 2], type=pa.int32())
In [74]: union_arr = pa.UnionArray.from_dense(types, offsets, [xs, ys])
In [75]: union_arr.type
Out[75]: DenseUnionType(dense_union<0: int64=0, 1: bool=1>)
In [76]: union_arr
Out[76]:
<pyarrow.lib.UnionArray object at 0x7fe030e24340>
-- is_valid: all not null
-- type_ids: [
0,
1,
1,
0,
0
]
-- value_offsets: [
0,
0,
1,
1,
2
]
-- child 0 type: int64
[
5,
6,
7
]
-- child 1 type: bool
[
false,
true
]
字典数组#
PyArrow 中的 Dictionary 类型是一种特殊的数组类型,类似于 R 中的因子或 pandas.Categorical。它允许文件或流中的一个或多个记录批次传输整数索引,这些索引引用共享的字典,其中包含逻辑数组中的不同值。这通常用于字符串以节省内存和提高性能。
Apache Arrow 格式中字典的处理方式以及它们在 C++ 和 Python 中显示的方式略有不同。我们定义了一个特殊的 DictionaryArray 类型,以及相应的字典类型。让我们考虑一个例子:
In [77]: indices = pa.array([0, 1, 0, 1, 2, 0, None, 2])
In [78]: dictionary = pa.array(['foo', 'bar', 'baz'])
In [79]: dict_array = pa.DictionaryArray.from_arrays(indices, dictionary)
In [80]: dict_array
Out[80]:
<pyarrow.lib.DictionaryArray object at 0x7fe030f97bc0>
-- dictionary:
[
"foo",
"bar",
"baz"
]
-- indices:
[
0,
1,
0,
1,
2,
0,
null,
2
]
在这里我们有:
In [81]: print(dict_array.type)
dictionary<values=string, indices=int64, ordered=0>
In [82]: dict_array.indices
Out[82]:
<pyarrow.lib.Int64Array object at 0x7fe030e24a60>
[
0,
1,
0,
1,
2,
0,
null,
2
]
In [83]: dict_array.dictionary
Out[83]:
<pyarrow.lib.StringArray object at 0x7fe030e24700>
[
"foo",
"bar",
"baz"
]
当将 DictionaryArray 与 pandas 一起使用时,它的对应物是 pandas.Categorical(稍后详述)。
In [84]: dict_array.to_pandas()
Out[84]:
0 foo
1 bar
2 foo
3 bar
4 baz
5 foo
6 NaN
7 baz
dtype: category
Categories (3, object): ['foo', 'bar', 'baz']
记录批次#
Apache Arrow 中的 Record Batch 是等长数组实例的集合。让我们考虑一个数组集合:
In [85]: data = [
....: pa.array([1, 2, 3, 4]),
....: pa.array(['foo', 'bar', 'baz', None]),
....: pa.array([True, None, False, True])
....: ]
....:
可以使用 RecordBatch.from_arrays 从这个数组列表创建记录批次。
In [86]: batch = pa.RecordBatch.from_arrays(data, ['f0', 'f1', 'f2'])
In [87]: batch.num_columns
Out[87]: 3
In [88]: batch.num_rows
Out[88]: 4
In [89]: batch.schema
Out[89]:
f0: int64
f1: string
f2: bool
In [90]: batch[1]
Out[90]:
<pyarrow.lib.StringArray object at 0x7fe030e25ba0>
[
"foo",
"bar",
"baz",
null
]
记录批次可以像数组一样进行切片,而无需复制内存。
In [91]: batch2 = batch.slice(1, 3)
In [92]: batch2[1]
Out[92]:
<pyarrow.lib.StringArray object at 0x7fe030e25d80>
[
"bar",
"baz",
null
]
表#
PyArrow Table 类型不是 Apache Arrow 规范的一部分,而是一个工具,用于帮助将多个记录批次和数组片段作为单个逻辑数据集进行处理。一个相关的例子是,我们可能会在套接字流中接收到多个小的记录批次,然后需要将它们连接到连续内存中以在 NumPy 或 pandas 中使用。Table 对象可以高效地完成此操作,而无需额外的内存复制。
考虑到我们上面创建的记录批次,我们可以使用 Table.from_batches 创建一个包含一个或多个批次副本的 Table。
In [93]: batches = [batch] * 5
In [94]: table = pa.Table.from_batches(batches)
In [95]: table
Out[95]:
pyarrow.Table
f0: int64
f1: string
f2: bool
----
f0: [[1,2,3,4],[1,2,3,4],...,[1,2,3,4],[1,2,3,4]]
f1: [["foo","bar","baz",null],["foo","bar","baz",null],...,["foo","bar","baz",null],["foo","bar","baz",null]]
f2: [[true,null,false,true],[true,null,false,true],...,[true,null,false,true],[true,null,false,true]]
In [96]: table.num_rows
Out[96]: 20
表的列是 ChunkedArray 的实例,它是一个包含一个或多个相同类型数组的容器。
In [97]: c = table[0]
In [98]: c
Out[98]:
<pyarrow.lib.ChunkedArray object at 0x7fe030e260e0>
[
[
1,
2,
3,
4
],
[
1,
2,
3,
4
],
...,
[
1,
2,
3,
4
],
[
1,
2,
3,
4
]
]
In [99]: c.num_chunks
Out[99]: 5
In [100]: c.chunk(0)
Out[100]:
<pyarrow.lib.Int64Array object at 0x7fe030e26440>
[
1,
2,
3,
4
]
正如您在 pandas 部分 中看到的,我们可以将这些对象转换为连续的 NumPy 数组以在 pandas 中使用。
In [101]: c.to_pandas()
Out[101]:
0 1
1 2
2 3
3 4
4 1
5 2
6 3
7 4
8 1
9 2
10 3
11 4
12 1
13 2
14 3
15 4
16 1
17 2
18 3
19 4
Name: f0, dtype: int64
如果 schema 相同,多个表也可以使用 pyarrow.concat_tables 连接成一个表。
In [102]: tables = [table] * 2
In [103]: table_all = pa.concat_tables(tables)
In [104]: table_all.num_rows
Out[104]: 40
In [105]: c = table_all[0]
In [106]: c.num_chunks
Out[106]: 10
这类似于 Table.from_batches,但使用表作为输入而不是记录批次。记录批次可以转换为表,反之则不行,因此如果您的数据已经处于表形式,则使用 pyarrow.concat_tables。
自定义 Schema 和字段元数据#
Arrow 支持 schema 级别和字段级别的自定义键值元数据,允许系统插入自己的应用程序定义的元数据以自定义行为。
自定义元数据可以在 schema 级别通过 Schema.metadata 访问,在字段级别通过 Field.metadata 访问。
请注意,此元数据在 流式传输、序列化和 IPC 过程中会保留。
要自定义现有表的 schema 元数据,您可以使用 Table.replace_schema_metadata()。
In [107]: table.schema.metadata # empty
In [108]: table = table.replace_schema_metadata({"f0": "First dose"})
In [109]: table.schema.metadata
Out[109]: {b'f0': b'First dose'}
要自定义表 schema 中字段的元数据,您可以使用 Field.with_metadata()。
In [110]: field_f1 = table.schema.field("f1")
In [111]: field_f1.metadata # empty
In [112]: field_f1 = field_f1.with_metadata({"f1": "Second dose"})
In [113]: field_f1.metadata
Out[113]: {b'f1': b'Second dose'}
这两个选项都会创建数据的浅拷贝,实际上并没有更改不可变的 Schema。为了更改表中 schema 的元数据,我们在调用 Table.replace_schema_metadata() 时创建了一个新对象。
要更改 schema 中字段的元数据,我们需要定义一个新 schema 并将数据转换为此 schema。
In [114]: my_schema2 = pa.schema([
.....: pa.field('f0', pa.int64(), metadata={"name": "First dose"}),
.....: pa.field('f1', pa.string(), metadata={"name": "Second dose"}),
.....: pa.field('f2', pa.bool_())],
.....: metadata={"f2": "booster"})
.....:
In [115]: t2 = table.cast(my_schema2)
In [116]: t2.schema.field("f0").metadata
Out[116]: {b'name': b'First dose'}
In [117]: t2.schema.field("f1").metadata
Out[117]: {b'name': b'Second dose'}
In [118]: t2.schema.metadata
Out[118]: {b'f2': b'booster'}
元数据键值对在 C++ 实现中是 std::string 对象,因此在 Python 中是字节对象 (b'...')。
记录批次读取器#
PyArrow 中的许多函数要么返回,要么将 RecordBatchReader 作为参数。它可以用作记录批次的任何可迭代对象,但也提供它们的公共 schema,而无需获取任何批次。
>>> schema = pa.schema([('x', pa.int64())])
>>> def iter_record_batches():
... for i in range(2):
... yield pa.RecordBatch.from_arrays([pa.array([1, 2, 3])], schema=schema)
>>> reader = pa.RecordBatchReader.from_batches(schema, iter_record_batches())
>>> print(reader.schema)
pyarrow.Schema
x: int64
>>> for batch in reader:
... print(batch)
pyarrow.RecordBatch
x: int64
pyarrow.RecordBatch
x: int64
它还可以使用 C 流接口 在语言之间发送。
RecordBatch 到 Tensor 的转换#
RecordBatch 的每个数组都有自己的连续内存,不一定与其他数组相邻。机器学习库中使用的不同内存结构是二维数组(也称为二维张量或矩阵),它只占用一个连续的内存块。
因此,有一个函数 pyarrow.RecordBatch.to_tensor() 可用于有效地将表格列式数据转换为张量。
此转换支持的数据类型是无符号整数、有符号整数和浮点类型。目前仅支持列主序转换。
>>> import pyarrow as pa
>>> arr1 = [1, 2, 3, 4, 5]
>>> arr2 = [10, 20, 30, 40, 50]
>>> batch = pa.RecordBatch.from_arrays(
... [
... pa.array(arr1, type=pa.uint16()),
... pa.array(arr2, type=pa.int16()),
... ], ["a", "b"]
... )
>>> batch.to_tensor()
<pyarrow.Tensor>
type: int32
shape: (9, 2)
strides: (4, 36)
>>> batch.to_tensor().to_numpy()
array([[ 1, 10],
[ 2, 20],
[ 3, 30],
[ 4, 40],
[ 5, 50]], dtype=int32)
将 null_to_nan 设置为 True 也可以转换带空值的数据。它们将被转换为 NaN。
>>> import pyarrow as pa
>>> batch = pa.record_batch(
... [
... pa.array([1, 2, 3, 4, None], type=pa.int32()),
... pa.array([10, 20, 30, 40, None], type=pa.float32()),
... ], names = ["a", "b"]
... )
>>> batch.to_tensor(null_to_nan=True).to_numpy()
array([[ 1., 10.],
[ 2., 20.],
[ 3., 30.],
[ 4., 40.],
[nan, nan]])