数据类型和内存数据模型#

Apache Arrow通过将类型元数据与内存缓冲区组合在一起来定义列式数组数据结构,例如内存和 IO文档中解释的那些缓冲区。 这些数据结构通过一系列相互关联的类在Python中公开

  • 类型元数据pyarrow.DataType的实例,它描述了数组的类型并管理其值的解释方式

  • 模式pyarrow.Schema的实例,它描述了类型的命名集合。 这些可以被认为是类似表对象中的列类型。

  • 数组pyarrow.Array的实例,它是原子、连续的列式数据结构,由Arrow Buffer对象组成

  • 记录批次pyarrow.RecordBatch的实例,它是具有特定模式的Array对象的集合

  • pyarrow.Table的实例,这是一种逻辑表数据结构,其中每一列由一个或多个相同类型的pyarrow.Array对象组成。

我们将在下面的章节中通过一系列示例来检查这些。

类型元数据#

Apache Arrow定义了与语言无关的列式数据结构,用于数组数据。 这些包括

  • 固定长度的原始类型:数字、布尔值、日期和时间、固定大小的二进制、十进制和其他适合给定数字的值

  • 可变长度的原始类型:二进制、字符串

  • 嵌套类型:列表、映射、结构和联合

  • 字典类型:一种编码的分类类型(稍后会详细介绍)

Arrow中的每种数据类型都有一个相应的工厂函数,用于在Python中创建该类型对象的实例

In [1]: import pyarrow as pa

In [2]: t1 = pa.int32()

In [3]: t2 = pa.string()

In [4]: t3 = pa.binary()

In [5]: t4 = pa.binary(10)

In [6]: t5 = pa.timestamp('ms')

In [7]: t1
Out[7]: DataType(int32)

In [8]: print(t1)
int32

In [9]: print(t4)
fixed_size_binary[10]

In [10]: print(t5)
timestamp[ms]

注意

不同的数据类型可能使用给定的物理存储。 例如,int64float64timestamp[ms]每个值都占用64位。

这些对象是metadata;它们用于描述数组、模式和记录批次中的数据。 在Python中,它们可以在输入数据(例如,Python对象)可能被强制转换为多个Arrow类型的函数中使用。

Field类型是一种类型加上名称和可选的用户定义的元数据

In [11]: f0 = pa.field('int32_field', t1)

In [12]: f0
Out[12]: pyarrow.Field<int32_field: int32>

In [13]: f0.name
Out[13]: 'int32_field'

In [14]: f0.type
Out[14]: DataType(int32)

Arrow支持嵌套值类型,如列表、映射、结构和联合。 创建这些类型时,必须传递类型或字段以指示类型的子类型的数据类型。 例如,我们可以定义一个int32值的列表,如下所示

In [15]: t6 = pa.list_(t1)

In [16]: t6
Out[16]: ListType(list<item: int32>)

struct是命名字段的集合

In [17]: fields = [
   ....:     pa.field('s0', t1),
   ....:     pa.field('s1', t2),
   ....:     pa.field('s2', t4),
   ....:     pa.field('s3', t6),
   ....: ]
   ....: 

In [18]: t7 = pa.struct(fields)

In [19]: print(t7)
struct<s0: int32, s1: string, s2: fixed_size_binary[10], s3: list<item: int32>>

为了方便起见,您可以直接传递(name, type)元组,而不是Field实例

In [20]: t8 = pa.struct([('s0', t1), ('s1', t2), ('s2', t4), ('s3', t6)])

In [21]: print(t8)
struct<s0: int32, s1: string, s2: fixed_size_binary[10], s3: list<item: int32>>

In [22]: t8 == t7
Out[22]: True

有关数据类型函数的完整列表,请参见数据类型API

模式#

Schema类型类似于struct数组类型; 它定义了记录批处理或表数据结构中的列名和类型。 pyarrow.schema()工厂函数在Python中创建新的Schema对象

In [23]: my_schema = pa.schema([('field0', t1),
   ....:                        ('field1', t2),
   ....:                        ('field2', t4),
   ....:                        ('field3', t6)])
   ....: 

In [24]: my_schema
Out[24]: 
field0: int32
field1: string
field2: fixed_size_binary[10]
field3: list<item: int32>
  child 0, item: int32

在某些应用程序中,您可能不会直接创建模式,而只会使用嵌入在IPC消息中的模式。

数组#

对于每种数据类型,都有一个伴随的数组数据结构,用于保存定义单个连续列式数组数据块的内存缓冲区。 当您使用PyArrow时,此数据可能来自IPC工具,但也可以从各种类型的Python序列(列表、NumPy数组、pandas数据)创建。

创建数组的一个简单方法是使用pyarrow.array,它类似于numpy.array函数。 默认情况下,PyArrow会为您推断数据类型

In [25]: arr = pa.array([1, 2, None, 3])

In [26]: arr
Out[26]: 
<pyarrow.lib.Int64Array object at 0x7f9db4254580>
[
  1,
  2,
  null,
  3
]

但是您也可以传递特定的数据类型来覆盖类型推断

In [27]: pa.array([1, 2], type=pa.uint16())
Out[27]: 
<pyarrow.lib.UInt16Array object at 0x7f9db4254be0>
[
  1,
  2
]

数组的type属性是相应的类型元数据

In [28]: arr.type
Out[28]: DataType(int64)

每个内存中的数组都有已知的长度和空值计数(如果没有空值,则为0)

In [29]: len(arr)
Out[29]: 4

In [30]: arr.null_count
Out[30]: 1

可以使用普通索引选择标量值。 pyarrow.arrayNone值转换为Arrow空值; 我们为null返回特殊的pyarrow.NA

In [31]: arr[0]
Out[31]: <pyarrow.Int64Scalar: 1>

In [32]: arr[2]
Out[32]: <pyarrow.Int64Scalar: None>

Arrow数据是不可变的,因此可以选择值但不能分配。

可以对数组进行切片,而无需复制

In [33]: arr[1:3]
Out[33]: 
<pyarrow.lib.Int64Array object at 0x7f9db4255240>
[
  2,
  null
]

None值和NAN处理#

如上节所述,Python对象None在转换为pyarrow.Array时始终转换为Arrow空元素。 对于浮点NaN值,它由Python对象float('nan')numpy.nan表示,我们通常在转换期间将其转换为有效浮点值。 如果提供给pyarrow.array的整数输入包含np.nan,则会引发ValueError

为了更好地兼容Pandas,我们支持将NaN值解释为空元素。 这在所有from_pandas函数上自动启用,并且可以通过将from_pandas=True作为函数参数传递在其他转换函数上启用。

列表数组#

pyarrow.array能够推断简单嵌套数据结构(如列表)的类型

In [34]: nested_arr = pa.array([[], None, [1, 2], [None, 1]])

In [35]: print(nested_arr.type)
list<item: int64>

ListView数组#

pyarrow.array可以创建一种称为ListView的备用列表类型

In [36]: nested_arr = pa.array([[], None, [1, 2], [None, 1]], type=pa.list_view(pa.int64()))

In [37]: print(nested_arr.type)
list_view<item: int64>

ListView数组具有与List数组不同的一组缓冲区。 ListView数组同时具有offsets和sizes缓冲区,而List数组只有offsets缓冲区。 这允许ListView数组指定无序偏移量

In [38]: values = [1, 2, 3, 4, 5, 6]

In [39]: offsets = [4, 2, 0]

In [40]: sizes = [2, 2, 2]

In [41]: arr = pa.ListViewArray.from_arrays(offsets, sizes, values)

In [42]: arr
Out[42]: 
<pyarrow.lib.ListViewArray object at 0x7f9db4256140>
[
  [
    5,
    6
  ],
  [
    3,
    4
  ],
  [
    1,
    2
  ]
]

有关更多详细信息,请参见ListView布局的格式规范。

结构数组#

pyarrow.array能够从字典数组中推断结构类型的模式

In [43]: pa.array([{'x': 1, 'y': True}, {'z': 3.4, 'x': 4}])
Out[43]: 
<pyarrow.lib.StructArray object at 0x7f9db4256560>
-- is_valid: all not null
-- child 0 type: int64
  [
    1,
    4
  ]
-- child 1 type: bool
  [
    true,
    null
  ]
-- child 2 type: double
  [
    null,
    3.4
  ]

可以从Python字典或元组序列初始化结构数组。 对于元组,您必须显式传递类型

In [44]: ty = pa.struct([('x', pa.int8()),
   ....:                 ('y', pa.bool_())])
   ....: 

In [45]: pa.array([{'x': 1, 'y': True}, {'x': 2, 'y': False}], type=ty)
Out[45]: 
<pyarrow.lib.StructArray object at 0x7f9db4256c20>
-- is_valid: all not null
-- child 0 type: int8
  [
    1,
    2
  ]
-- child 1 type: bool
  [
    true,
    false
  ]

In [46]: pa.array([(3, True), (4, False)], type=ty)
Out[46]: 
<pyarrow.lib.StructArray object at 0x7f9db4256ce0>
-- is_valid: all not null
-- child 0 type: int8
  [
    3,
    4
  ]
-- child 1 type: bool
  [
    true,
    false
  ]

初始化结构数组时,允许在结构级别和单个字段级别都使用空值。 如果从Python字典序列初始化,则缺少字典键将被视为空值

In [47]: pa.array([{'x': 1}, None, {'y': None}], type=ty)
Out[47]: 
<pyarrow.lib.StructArray object at 0x7f9db4256da0>
-- is_valid:
  [
    true,
    false,
    true
  ]
-- child 0 type: int8
  [
    1,
    0,
    null
  ]
-- child 1 type: bool
  [
    null,
    false,
    null
  ]

您还可以从每个结构的组件的现有数组构造结构数组。 在这种情况下,数据存储将与单个数组共享,并且不涉及复制

In [48]: xs = pa.array([5, 6, 7], type=pa.int16())

In [49]: ys = pa.array([False, True, True])

In [50]: arr = pa.StructArray.from_arrays((xs, ys), names=('x', 'y'))

In [51]: arr.type
Out[51]: StructType(struct<x: int16, y: bool>)

In [52]: arr
Out[52]: 
<pyarrow.lib.StructArray object at 0x7f9db43fb100>
-- is_valid: all not null
-- child 0 type: int16
  [
    5,
    6,
    7
  ]
-- child 1 type: bool
  [
    false,
    true,
    true
  ]

映射数组#

可以从元组(键值对)的列表的列表中构造映射数组,但前提是类型已显式传递到array()

In [53]: data = [[('x', 1), ('y', 0)], [('a', 2), ('b', 45)]]

In [54]: ty = pa.map_(pa.string(), pa.int64())

In [55]: pa.array(data, type=ty)
Out[55]: 
<pyarrow.lib.MapArray object at 0x7f9db4257940>
[
  keys:
  [
    "x",
    "y"
  ]
  values:
  [
    1,
    0
  ],
  keys:
  [
    "a",
    "b"
  ]
  values:
  [
    2,
    45
  ]
]

MapArrays也可以从偏移量、键和项数组构造。偏移量表示每个映射的起始位置。请注意,MapArray.keysMapArray.items 属性给出扁平化的键和项。为了保持键和项与其行相关联,请使用 ListArray.from_arrays() 构造函数,并结合 MapArray.offsets 属性。

In [56]: arr = pa.MapArray.from_arrays([0, 2, 3], ['x', 'y', 'z'], [4, 5, 6])

In [57]: arr.keys
Out[57]: 
<pyarrow.lib.StringArray object at 0x7f9df8fb9360>
[
  "x",
  "y",
  "z"
]

In [58]: arr.items
Out[58]: 
<pyarrow.lib.Int64Array object at 0x7f9db42578e0>
[
  4,
  5,
  6
]

In [59]: pa.ListArray.from_arrays(arr.offsets, arr.keys)
Out[59]: 
<pyarrow.lib.ListArray object at 0x7f9db4257d00>
[
  [
    "x",
    "y"
  ],
  [
    "z"
  ]
]

In [60]: pa.ListArray.from_arrays(arr.offsets, arr.items)
Out[60]: 
<pyarrow.lib.ListArray object at 0x7f9db4257d60>
[
  [
    4,
    5
  ],
  [
    6
  ]
]

联合数组#

联合类型表示一种嵌套数组类型,其中每个值可以是(且仅可以是)一组可能类型中的一种。联合数组有两种可能的存储类型:稀疏和密集。

在稀疏联合数组中,每个子数组的长度与生成的联合数组相同。 它们通过一个 int8 “类型”数组进行连接,该数组告诉每个值必须从哪个子数组中选择。

In [61]: xs = pa.array([5, 6, 7])

In [62]: ys = pa.array([False, False, True])

In [63]: types = pa.array([0, 1, 1], type=pa.int8())

In [64]: union_arr = pa.UnionArray.from_sparse(types, [xs, ys])

In [65]: union_arr.type
Out[65]: SparseUnionType(sparse_union<0: int64=0, 1: bool=1>)

In [66]: union_arr
Out[66]: 
<pyarrow.lib.UnionArray object at 0x7f9db4257880>
-- is_valid: all not null
-- type_ids:   [
    0,
    1,
    1
  ]
-- child 0 type: int64
  [
    5,
    6,
    7
  ]
-- child 1 type: bool
  [
    false,
    false,
    true
  ]

在密集联合数组中,除了 int8 “类型”数组之外,您还需要传递一个 int32 “偏移量”数组,该数组告诉每个值在所选子数组中的哪个偏移量可以找到它。

In [67]: xs = pa.array([5, 6, 7])

In [68]: ys = pa.array([False, True])

In [69]: types = pa.array([0, 1, 1, 0, 0], type=pa.int8())

In [70]: offsets = pa.array([0, 0, 1, 1, 2], type=pa.int32())

In [71]: union_arr = pa.UnionArray.from_dense(types, offsets, [xs, ys])

In [72]: union_arr.type
Out[72]: DenseUnionType(dense_union<0: int64=0, 1: bool=1>)

In [73]: union_arr
Out[73]: 
<pyarrow.lib.UnionArray object at 0x7f9db4298b20>
-- is_valid: all not null
-- type_ids:   [
    0,
    1,
    1,
    0,
    0
  ]
-- value_offsets:   [
    0,
    0,
    1,
    1,
    2
  ]
-- child 0 type: int64
  [
    5,
    6,
    7
  ]
-- child 1 type: bool
  [
    false,
    true
  ]

字典数组#

PyArrow 中的 字典 类型是一种特殊的数组类型,类似于 R 中的因子或 pandas.Categorical。它使文件或流中的一个或多个记录批次能够传输引用共享 字典 的整数索引,该字典包含逻辑数组中的不同值。这尤其经常用于字符串,以节省内存并提高性能。

Apache Arrow 格式处理字典的方式以及它们在 C++ 和 Python 中出现的方式略有不同。我们定义了一个特殊的 DictionaryArray 类型,以及相应的字典类型。让我们考虑一个例子

In [74]: indices = pa.array([0, 1, 0, 1, 2, 0, None, 2])

In [75]: dictionary = pa.array(['foo', 'bar', 'baz'])

In [76]: dict_array = pa.DictionaryArray.from_arrays(indices, dictionary)

In [77]: dict_array
Out[77]: 
<pyarrow.lib.DictionaryArray object at 0x7f9db428d850>

-- dictionary:
  [
    "foo",
    "bar",
    "baz"
  ]
-- indices:
  [
    0,
    1,
    0,
    1,
    2,
    0,
    null,
    2
  ]

这里我们有

In [78]: print(dict_array.type)
dictionary<values=string, indices=int64, ordered=0>

In [79]: dict_array.indices
Out[79]: 
<pyarrow.lib.Int64Array object at 0x7f9db4257c40>
[
  0,
  1,
  0,
  1,
  2,
  0,
  null,
  2
]

In [80]: dict_array.dictionary
Out[80]: 
<pyarrow.lib.StringArray object at 0x7f9db4255720>
[
  "foo",
  "bar",
  "baz"
]

当使用 DictionaryArray 与 pandas 时,类似物是 pandas.Categorical(稍后详细介绍)

In [81]: dict_array.to_pandas()
Out[81]: 
0    foo
1    bar
2    foo
3    bar
4    baz
5    foo
6    NaN
7    baz
dtype: category
Categories (3, object): ['foo', 'bar', 'baz']

记录批次#

Apache Arrow 中的 记录批次 是等长数组实例的集合。让我们考虑一个数组集合

In [82]: data = [
   ....:     pa.array([1, 2, 3, 4]),
   ....:     pa.array(['foo', 'bar', 'baz', None]),
   ....:     pa.array([True, None, False, True])
   ....: ]
   ....: 

可以使用 RecordBatch.from_arrays 从此数组列表创建记录批次

In [83]: batch = pa.RecordBatch.from_arrays(data, ['f0', 'f1', 'f2'])

In [84]: batch.num_columns
Out[84]: 3

In [85]: batch.num_rows
Out[85]: 4

In [86]: batch.schema
Out[86]: 
f0: int64
f1: string
f2: bool

In [87]: batch[1]
Out[87]: 
<pyarrow.lib.StringArray object at 0x7f9db429a1a0>
[
  "foo",
  "bar",
  "baz",
  null
]

可以像数组一样对记录批次进行切片,而无需复制内存

In [88]: batch2 = batch.slice(1, 3)

In [89]: batch2[1]
Out[89]: 
<pyarrow.lib.StringArray object at 0x7f9db4254640>
[
  "bar",
  "baz",
  null
]

#

PyArrow Table 类型不是 Apache Arrow 规范的一部分,而是一个帮助管理多个记录批次和数组片段作为单个逻辑数据集的工具。作为一个相关的例子,我们可能会在套接字流中收到多个小记录批次,然后需要将它们连接到连续的内存中,以便在 NumPy 或 pandas 中使用。Table 对象可以高效地完成此操作,而无需额外的内存复制。

考虑到我们上面创建的记录批次,我们可以使用 Table.from_batches 创建一个包含批次的一个或多个副本的表

In [90]: batches = [batch] * 5

In [91]: table = pa.Table.from_batches(batches)

In [92]: table
Out[92]: 
pyarrow.Table
f0: int64
f1: string
f2: bool
----
f0: [[1,2,3,4],[1,2,3,4],...,[1,2,3,4],[1,2,3,4]]
f1: [["foo","bar","baz",null],["foo","bar","baz",null],...,["foo","bar","baz",null],["foo","bar","baz",null]]
f2: [[true,null,false,true],[true,null,false,true],...,[true,null,false,true],[true,null,false,true]]

In [93]: table.num_rows
Out[93]: 20

表的列是 ChunkedArray 的实例,它是相同类型的一个或多个数组的容器。

In [94]: c = table[0]

In [95]: c
Out[95]: 
<pyarrow.lib.ChunkedArray object at 0x7f9db429a5c0>
[
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ],
...,
  [
    1,
    2,
    3,
    4
  ],
  [
    1,
    2,
    3,
    4
  ]
]

In [96]: c.num_chunks
Out[96]: 5

In [97]: c.chunk(0)
Out[97]: 
<pyarrow.lib.Int64Array object at 0x7f9db429aa40>
[
  1,
  2,
  3,
  4
]

正如您将在 pandas 部分中看到的那样,我们可以将这些对象转换为连续的 NumPy 数组,以便在 pandas 中使用

In [98]: c.to_pandas()
Out[98]: 
0     1
1     2
2     3
3     4
4     1
5     2
6     3
7     4
8     1
9     2
10    3
11    4
12    1
13    2
14    3
15    4
16    1
17    2
18    3
19    4
Name: f0, dtype: int64

如果模式相等,也可以使用 pyarrow.concat_tables 将多个表连接在一起以形成单个表

In [99]: tables = [table] * 2

In [100]: table_all = pa.concat_tables(tables)

In [101]: table_all.num_rows
Out[101]: 40

In [102]: c = table_all[0]

In [103]: c.num_chunks
Out[103]: 10

这类似于 Table.from_batches,但使用表作为输入而不是记录批次。记录批次可以转换为表,但反之则不然,因此如果您的数据已经采用表的形式,请使用 pyarrow.concat_tables

自定义模式和字段元数据#

Arrow 支持模式级别和字段级别的自定义键值元数据,允许系统插入自己的应用程序定义的元数据以自定义行为。

可以在模式级别的 Schema.metadata 和字段级别的 Field.metadata 访问自定义元数据。

请注意,此元数据在 流式传输、序列化和 IPC 过程中保留。

要自定义现有表的模式元数据,您可以使用 Table.replace_schema_metadata()

In [104]: table.schema.metadata # empty

In [105]: table = table.replace_schema_metadata({"f0": "First dose"})

In [106]: table.schema.metadata
Out[106]: {b'f0': b'First dose'}

要自定义表模式中字段的元数据,您可以使用 Field.with_metadata()

In [107]: field_f1 = table.schema.field("f1")

In [108]: field_f1.metadata # empty

In [109]: field_f1 = field_f1.with_metadata({"f1": "Second dose"})

In [110]: field_f1.metadata
Out[110]: {b'f1': b'Second dose'}

这两种选项都会创建数据的浅拷贝,并且实际上不会更改不可变的模式。要更改表模式中的元数据,我们在调用 Table.replace_schema_metadata() 时创建了一个新对象。

要更改模式中字段的元数据,我们需要定义一个新模式并将数据强制转换为此模式

In [111]: my_schema2 = pa.schema([
   .....:    pa.field('f0', pa.int64(), metadata={"name": "First dose"}),
   .....:    pa.field('f1', pa.string(), metadata={"name": "Second dose"}),
   .....:    pa.field('f2', pa.bool_())],
   .....:    metadata={"f2": "booster"})
   .....: 

In [112]: t2 = table.cast(my_schema2)

In [113]: t2.schema.field("f0").metadata
Out[113]: {b'name': b'First dose'}

In [114]: t2.schema.field("f1").metadata
Out[114]: {b'name': b'Second dose'}

In [115]: t2.schema.metadata
Out[115]: {b'f2': b'booster'}

元数据键值对是 C++ 实现中的 std::string 对象,因此它们是 Python 中的字节对象 (b'...')。

记录批次读取器#

PyArrow 中的许多函数要么返回一个 RecordBatchReader,要么将其作为参数。它可以像任何记录批次的可迭代对象一样使用,但也提供了它们的公共模式,而无需获取任何批次。

>>> schema = pa.schema([('x', pa.int64())])
>>> def iter_record_batches():
...    for i in range(2):
...       yield pa.RecordBatch.from_arrays([pa.array([1, 2, 3])], schema=schema)
>>> reader = pa.RecordBatchReader.from_batches(schema, iter_record_batches())
>>> print(reader.schema)
pyarrow.Schema
x: int64
>>> for batch in reader:
...    print(batch)
pyarrow.RecordBatch
x: int64
pyarrow.RecordBatch
x: int64

它也可以使用 C 流接口 在语言之间发送。

将 RecordBatch 转换为张量#

RecordBatch 的每个数组都有自己的连续内存,该内存不一定与其他数组相邻。机器学习库中使用的一种不同的内存结构是二维数组(也称为二维张量或矩阵),它只需要一个连续的内存块。

因此,有一个函数 pyarrow.RecordBatch.to_tensor() 可用于有效地将表格列数据转换为张量。

此转换中支持的数据类型是无符号、有符号整数和浮点类型。目前仅支持列优先转换。

>>>  import pyarrow as pa
>>>  arr1 = [1, 2, 3, 4, 5]
>>>  arr2 = [10, 20, 30, 40, 50]
>>>  batch = pa.RecordBatch.from_arrays(
...      [
...          pa.array(arr1, type=pa.uint16()),
...          pa.array(arr2, type=pa.int16()),
...      ], ["a", "b"]
...  )
>>>  batch.to_tensor()
<pyarrow.Tensor>
type: int32
shape: (9, 2)
strides: (4, 36)
>>>  batch.to_tensor().to_numpy()
array([[ 1, 10],
      [ 2, 20],
      [ 3, 30],
      [ 4, 40],
      [ 5, 50]], dtype=int32)

如果将 null_to_nan 设置为 True,也可以转换带有空值的数据。它们将被转换为 NaN

>>> import pyarrow as pa
>>> batch = pa.record_batch(
...     [
...         pa.array([1, 2, 3, 4, None], type=pa.int32()),
...         pa.array([10, 20, 30, 40, None], type=pa.float32()),
...     ], names = ["a", "b"]
... )
>>> batch.to_tensor(null_to_nan=True).to_numpy()
array([[ 1., 10.],
      [ 2., 20.],
      [ 3., 30.],
      [ 4., 40.],
      [nan, nan]])