计算函数#

Arrow 支持对可能具有不同类型的输入进行逻辑计算操作。

标准的计算操作由 pyarrow.compute 模块提供,可以直接使用。

>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> a = pa.array([1, 1, 2, 3])
>>> pc.sum(a)
<pyarrow.Int64Scalar: 7>

分组聚合函数则会抛出异常,必须通过 pyarrow.Table.group_by() 功能来使用。更多详情请参阅 分组聚合 (Grouped Aggregations)

标准计算函数#

许多计算函数支持数组(无论是否分块)和标量输入,但有些函数会强制要求特定输入。例如,sort_indices 要求其第一个且唯一的输入必须是数组。

以下是一些简单的示例

>>> a = pa.array([1, 1, 2, 3])
>>> b = pa.array([4, 1, 2, 8])
>>> pc.equal(a, b)
<pyarrow.lib.BooleanArray object at ...>
[
  false,
  true,
  true,
  false
]
>>> x, y = pa.scalar(7.8), pa.scalar(9.3)
>>> pc.multiply(x, y)
<pyarrow.DoubleScalar: 72.54>

如果您使用的计算函数返回多个值,结果将以 StructScalar 的形式返回。您可以通过调用 pyarrow.StructScalar.values() 方法来提取各个值。

>>> a = pa.array([1, 1, 2, 3])
>>> pc.min_max(a)
<pyarrow.StructScalar: [('min', 1), ('max', 3)]>
>>> a, b = pc.min_max(a).values()
>>> a
<pyarrow.Int64Scalar: 1>
>>> b
<pyarrow.Int64Scalar: 3>

这些函数不仅可以执行逐元素操作。以下是一个对表进行排序的示例

>>> t = pa.table({'x':[1,2,3],'y':[3,2,1]})
>>> i = pc.sort_indices(t, sort_keys=[('y', 'ascending')])
>>> i
<pyarrow.lib.UInt64Array object at ...>
[
  2,
  1,
  0
]

有关 PyArrow 提供的计算函数的完整列表,您可以参考 计算函数 (Compute Functions) 参考文档。

分组聚合#

PyArrow 通过 pyarrow.Table.group_by() 方法支持对 pyarrow.Table 进行分组聚合。该方法将返回一个分组声明,可以对其应用哈希聚合函数。

>>> t = pa.table([
...       pa.array(["a", "a", "b", "b", "c"]),
...       pa.array([1, 2, 3, 4, 5]),
... ], names=["keys", "values"])
>>> t.group_by("keys").aggregate([("values", "sum")])
pyarrow.Table
keys: string
values_sum: int64
----
keys: [["a","b","c"]]
values_sum: [[3,7,5]]

前一个示例中传递给 aggregate 方法的 "sum" 聚合即为 hash_sum 计算函数。

可以通过将多个聚合函数提供给 aggregate 方法来同时执行多个聚合。

>>> t = pa.table([
...       pa.array(["a", "a", "b", "b", "c"]),
...       pa.array([1, 2, 3, 4, 5]),
... ], names=["keys", "values"])
>>> t.group_by("keys").aggregate([
...    ("values", "sum"),
...    ("keys", "count")
... ])
pyarrow.Table
keys: string
values_sum: int64
keys_count: int64
----
keys: [["a","b","c"]]
values_sum: [[3,7,5]]
keys_count: [[2,2,1]]

也可以为每个聚合函数提供聚合选项,例如,我们可以使用 CountOptions 来改变我们计算空值的方式。

>>> table_with_nulls = pa.table([
...    pa.array(["a", "a", "a"]),
...    pa.array([1, None, None])
... ], names=["keys", "values"])
>>> table_with_nulls.group_by(["keys"]).aggregate([
...    ("values", "count", pc.CountOptions(mode="all"))
... ])
pyarrow.Table
keys: string
values_count: int64
----
keys: [["a"]]
values_count: [[3]]
>>> table_with_nulls.group_by(["keys"]).aggregate([
...    ("values", "count", pc.CountOptions(mode="only_valid"))
... ])
pyarrow.Table
keys: string
values_count: int64
----
keys: [["a"]]
values_count: [[1]]

以下是所有支持的分组聚合函数列表。您可以根据需要使用或不使用 "hash_" 前缀。

hash_all

每个组中的所有元素是否都计算为 true

ScalarAggregateOptions

hash_any

每个组中是否有任何元素计算为 true

ScalarAggregateOptions

hash_approximate_median

计算每个组中值的近似中位数

ScalarAggregateOptions

hash_count

计算每个组中空值/非空值的数量

CountOptions

hash_count_all

计算每个组中的行数

hash_count_distinct

计算每个组中不同值的数量

CountOptions

hash_distinct

保留每个组中的不同值

CountOptions

hash_first

计算每个组中的第一个值

ScalarAggregateOptions

hash_first_last

计算每个组中值的第一个和最后一个

ScalarAggregateOptions

hash_kurtosis

计算每个组中值的峰度

hash_last

计算每个组中的第一个值

ScalarAggregateOptions

hash_list

列出每个组中的所有值

hash_max

计算每个组中值的最小值或最大值

ScalarAggregateOptions

hash_mean

计算每个组中值的平均值

ScalarAggregateOptions

hash_min

计算每个组中值的最小值或最大值

ScalarAggregateOptions

hash_min_max

计算每个组中值的最小值和最大值

ScalarAggregateOptions

hash_one

从每个组中获取一个值

hash_pivot_wider

根据透视键列对值进行透视

PivotWiderOptions

hash_product

计算每个组中值的乘积

ScalarAggregateOptions

hash_skew

计算每个组中值的偏度

hash_stddev

计算每个组中值的标准差

hash_sum

计算每个组中值的总和

ScalarAggregateOptions

hash_tdigest

计算每个组中值的近似分位数

TDigestOptions

hash_variance

计算每个组中值的方差

表和数据集连接#

TableDataset 都支持通过 Table.join()Dataset.join() 方法进行连接操作。

这些方法接受一个将要与初始对象连接的右侧表或数据集,以及一个或多个用于执行连接的键。

默认执行 左外连接 (left outer join),但也可以请求任何其他支持的连接类型:

  • 左半连接 (left semi)

  • 右半连接 (right semi)

  • 左反连接 (left anti)

  • 右反连接 (right anti)

  • 内连接 (inner)

  • 左外连接 (left outer)

  • 右外连接 (right outer)

  • 全外连接 (full outer)

只需提供一个表和一个执行连接的键,即可执行基本连接。

>>> table1 = pa.table({'id': [1, 2, 3],
...                    'year': [2020, 2022, 2019]})
>>> table2 = pa.table({'id': [3, 4],
...                    'n_legs': [5, 100],
...                    'animal': ["Brittle stars", "Centipede"]})
>>> joined_table = table1.join(table2, keys="id")

结果将是一个新表,它是通过 table1table2id 键上进行 左外连接 创建的。

>>> joined_table
pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
n_legs: [[5,null,null]]
animal: [["Brittle stars",null,null]]

我们可以通过将其他连接类型传递给 join_type 参数来执行其他类型的连接,例如 全外连接 (full outer join)

>>> table1.join(table2, keys='id', join_type="full outer").combine_chunks().sort_by('id')
pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: [[1,2,3,4]]
year: [[2020,2022,2019,null]]
n_legs: [[null,null,5,100]]
animal: [[null,null,"Brittle stars","Centipede"]]

也可以提供额外的连接键,以便在两个键而不是一个键上执行连接。例如,我们可以向 table2 添加一个 year 列,以便我们可以在 ('id', 'year') 上进行连接。

>>> table2_withyear = table2.append_column("year", pa.array([2019, 2022]))
>>> table1.join(table2_withyear, keys=["id", "year"])
pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
n_legs: [[5,null,null]]
animal: [["Brittle stars",null,null]]

同样的功能也适用于 Dataset.join(),因此您可以对两个数据集进行连接。

>>> import pyarrow.dataset as ds
>>> ds1 = ds.dataset(table1)
>>> ds2 = ds.dataset(table2)
>>> joined_ds = ds1.join(ds2, keys="id")
>>> joined_ds.head(5)
pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
n_legs: [[5,null,null]]
animal: [["Brittle stars",null,null]]

通过表达式过滤#

TableDataset 都可以使用布尔值 Expression 进行过滤。

表达式可以从 pyarrow.compute.field() 开始构建。然后可以将比较和转换应用于一个或多个字段,以构建您关心的过滤表达式。

大多数 计算函数 (Compute Functions) 可用于对 field 执行转换。

例如,我们可以构建一个过滤器来查找列 "nums" 中的所有偶数行。

>>> even_filter = (pc.bit_wise_and(pc.field("nums"), pc.scalar(1)) == pc.scalar(0))

注意

该过滤器通过在数字和 1 之间执行按位与运算来查找偶数。由于 1 的二进制形式为 00000001,只有最后一位为 1 的数字才会从 bit_wise_and 操作中返回非零结果。通过这种方式,我们可以识别出所有奇数。鉴于我们对偶数感兴趣,我们随后检查 bit_wise_and 操作返回的数字是否等于 0。只有最后一位为 0 的数字在 num & 1 操作后才会返回 0,而所有最后一位为 0 的数字都是 2 的倍数,因此我们将仅筛选出偶数。

一旦有了过滤器,就可以将其提供给 Table.filter() 方法,以仅针对匹配的行过滤我们的表。

>>> table = pa.table({'nums': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
...                   'chars': ["a", "b", "c", "d", "e", "f", "g", "h", "i", "l"]})
>>> table.filter(even_filter)
pyarrow.Table
nums: int64
chars: string
----
nums: [[2,4,6,8,10]]
chars: [["b","d","f","h","l"]]

多个过滤器可以使用 &, |, ~ 进行连接,以执行 and, ornot 操作。例如,使用 ~even_filter 最终将过滤出所有奇数。

>>> table.filter(~even_filter)
pyarrow.Table
nums: int64
chars: string
----
nums: [[1,3,5,7,9]]
chars: [["a","c","e","g","i"]]

通过将我们的 even_filterpc.field("nums") > 5 过滤器相结合,我们可以构建一个查找所有大于 5 的偶数的过滤器。

>>> table.filter(even_filter & (pc.field("nums") > 5))
pyarrow.Table
nums: int64
chars: string
----
nums: [[6,8,10]]
chars: [["f","h","l"]]

Dataset 也可以类似地使用 Dataset.filter() 方法进行过滤。该方法将返回一个 Dataset 实例,当访问数据集的实际数据时,它会延迟应用该过滤器。

>>> dataset = ds.dataset(table)
>>> filtered = dataset.filter(pc.field("nums") < 5).filter(pc.field("nums") > 2)
>>> filtered.to_table()
pyarrow.Table
nums: int64
chars: string
----
nums: [[3,4]]
chars: [["c","d"]]

用户自定义函数#

警告

此 API 是实验性的。

PyArrow 允许定义和注册自定义计算函数。这些函数随后可以使用其注册的函数名从 Python 以及 C++(以及任何其他包装 Arrow C++ 的实现,如 R arrow 包)中调用。

UDF 支持仅限于标量函数。标量函数是对数组或标量执行逐元素操作的函数。通常,标量函数的输出不依赖于参数中值的顺序。注意,这类函数与 SQL 表达式中使用的函数或 NumPy 通用函数 (universal functions) 大致对应。

要注册 UDF,需要定义函数名、函数文档、输入类型和输出类型。使用 pyarrow.compute.register_scalar_function()

>>> import numpy as np
>>> function_name = "numpy_gcd"
>>> function_docs = {
...       "summary": "Calculates the greatest common divisor",
...       "description":
...          "Given 'x' and 'y' find the greatest number that divides\n"
...          "evenly into both x and y."
... }
>>> input_types = {
...    "x" : pa.int64(),
...    "y" : pa.int64()
... }
>>> output_type = pa.int64()
>>>
>>> def to_np(val):
...     if isinstance(val, pa.Scalar):
...        return val.as_py()
...     else:
...        return np.array(val)
>>>
>>> def gcd_numpy(ctx, x, y):
...     np_x = to_np(x)
...     np_y = to_np(y)
...     return pa.array(np.gcd(np_x, np_y))
>>>
>>> pc.register_scalar_function(gcd_numpy,
...                            function_name,
...                            function_docs,
...                            input_types,
...                            output_type)

用户定义函数的实现总是接受第一个 context 参数(在上面的示例中命名为 ctx),它是 pyarrow.compute.UdfContext 的一个实例。此上下文公开了几个有用的属性,特别是用于在用户定义函数中进行分配的 memory_pool

您可以直接使用 pyarrow.compute.call_function() 调用用户定义的函数。

>>> pc.call_function("numpy_gcd", [pa.scalar(27), pa.scalar(63)])
<pyarrow.Int64Scalar: 9>
>>> pc.call_function("numpy_gcd", [pa.scalar(27), pa.array([81, 12, 5])])
<pyarrow.lib.Int64Array object at ...>
[
  27,
  3,
  1
]

使用数据集#

更广泛地说,用户定义函数可以在任何可以通过名称引用计算函数的地方使用。例如,它们可以使用 Expression._call() 在数据集的列上调用。

考虑一个数据在表中且我们想要计算一列与标量值 30 的 GCD(最大公约数)的情况。我们将重用上面创建的 “numpy_gcd” 用户定义函数。

>>> data_table = pa.table({'category': ['A', 'B', 'C', 'D'], 'value': [90, 630, 1827, 2709]})
>>> dataset = ds.dataset(data_table)
>>> func_args = [pc.scalar(30), ds.field("value")]
>>> dataset.to_table(
...             columns={
...                 'gcd_value': ds.field('')._call("numpy_gcd", func_args),
...                 'value': ds.field('value'),
...                 'category': ds.field('category')
...             })
pyarrow.Table
gcd_value: int64
value: int64
category: string
----
gcd_value: [[30,30,3,3]]
value: [[90,630,1827,2709]]
category: [["A","B","C","D"]]

注意 ds.field('')._call(...) 返回一个 pyarrow.compute.Expression()。传递给此函数调用的参数是表达式,而不是标量值(注意 pyarrow.scalar()pyarrow.compute.scalar() 之间的区别,后者生成一个表达式)。此表达式在投影运算符执行时求值。

投影表达式#

在上面的示例中,我们使用一个表达式向表中添加了一个新列(gcd_value)。向表添加新的、动态计算的列称为“投影”,且在投影表达式中可以使用的函数类型有限制。投影函数必须为每个输入行发出单个输出值。该输出值应完全根据输入行计算,且不应依赖于任何其他行。例如,我们上面用作示例的 “numpy_gcd” 函数是投影中可以使用的有效函数。“累积求和”函数将不是有效函数,因为每个输入行的结果取决于之前的行。“删除空值”函数也将无效,因为它不会为某些行发出值。

标准 Python 运算符#

PyArrow 支持标准的 Python 运算符,用于数组和标量的逐元素操作。目前,该支持仅限于某些标准计算函数,即算术运算(+, -, /, %, **)、按位运算(&, |, ^, >>, <<)等。

上述运算符尽可能使用底层内核的检查版本,并具有相同的各自约束,例如,您不能将两个字符串数组相加。

您可以按照以下方式使用运算符

>>> import pyarrow as pa
>>> arr = pa.array([-1, 2, -3])
>>> val = pa.scalar(42.7)
>>> arr + val
<pyarrow.lib.DoubleArray object at ...>
[
  41.7,
  44.7,
  39.7
]

>>> val ** arr
<pyarrow.lib.DoubleArray object at ...>
[
  0.023419203747072598,
  1823.2900000000002,
  0.000012844475506953143
]

>>> arr << 2
<pyarrow.lib.Int64Array object at ...>
[
  -4,
  8,
  -12
]