计算函数#
Arrow 支持对可能具有不同类型的输入进行逻辑计算操作。
pyarrow.compute
模块提供了标准计算操作,可以直接使用。
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> a = pa.array([1, 1, 2, 3])
>>> pc.sum(a)
<pyarrow.Int64Scalar: 7>
分组聚合函数会引发异常,需要通过 pyarrow.Table.group_by()
功能使用。有关更多详细信息,请参阅 分组聚合。
标准计算函数#
许多计算函数都支持数组(分块或不分块)和标量输入,但有些函数会强制要求其中之一。例如,sort_indices
要求其第一个也是唯一的输入为数组。
以下是一些简单的示例
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> a = pa.array([1, 1, 2, 3])
>>> b = pa.array([4, 1, 2, 8])
>>> pc.equal(a, b)
<pyarrow.lib.BooleanArray object at 0x7f686e4eef30>
[
false,
true,
true,
false
]
>>> x, y = pa.scalar(7.8), pa.scalar(9.3)
>>> pc.multiply(x, y)
<pyarrow.DoubleScalar: 72.54>
这些函数不仅可以执行逐元素操作。以下是如何对表格进行排序的示例
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> t = pa.table({'x':[1,2,3],'y':[3,2,1]})
>>> i = pc.sort_indices(t, sort_keys=[('y', 'ascending')])
>>> i
<pyarrow.lib.UInt64Array object at 0x7fcee5df75e8>
[
2,
1,
0
]
有关 PyArrow 提供的计算函数的完整列表,您可以参考 计算函数 参考。
另请参阅
分组聚合#
PyArrow 通过 pyarrow.Table
的 pyarrow.Table.group_by()
方法支持对分组进行聚合操作。该方法将返回一个分组声明,哈希聚合函数可以应用于该声明。
>>> import pyarrow as pa
>>> t = pa.table([
... pa.array(["a", "a", "b", "b", "c"]),
... pa.array([1, 2, 3, 4, 5]),
... ], names=["keys", "values"])
>>> t.group_by("keys").aggregate([("values", "sum")])
pyarrow.Table
values_sum: int64
keys: string
----
values_sum: [[3,7,5]]
keys: [["a","b","c"]]
在前面示例中传递给 aggregate
方法的 "sum"
聚合操作是 hash_sum
计算函数。
可以通过将多个聚合操作提供给 aggregate
方法来同时执行它们。
>>> import pyarrow as pa
>>> t = pa.table([
... pa.array(["a", "a", "b", "b", "c"]),
... pa.array([1, 2, 3, 4, 5]),
... ], names=["keys", "values"])
>>> t.group_by("keys").aggregate([
... ("values", "sum"),
... ("keys", "count")
... ])
pyarrow.Table
values_sum: int64
keys_count: int64
keys: string
----
values_sum: [[3,7,5]]
keys_count: [[2,2,1]]
keys: [["a","b","c"]]
还可以为每个聚合函数提供聚合选项,例如,我们可以使用 CountOptions
来更改我们计算空值的方式。
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> table_with_nulls = pa.table([
... pa.array(["a", "a", "a"]),
... pa.array([1, None, None])
... ], names=["keys", "values"])
>>> table_with_nulls.group_by(["keys"]).aggregate([
... ("values", "count", pc.CountOptions(mode="all"))
... ])
pyarrow.Table
values_count: int64
keys: string
----
values_count: [[3]]
keys: [["a"]]
>>> table_with_nulls.group_by(["keys"]).aggregate([
... ("values", "count", pc.CountOptions(mode="only_valid"))
... ])
pyarrow.Table
values_count: int64
keys: string
----
values_count: [[1]]
keys: [["a"]]
以下是所有支持的分组聚合函数列表。您可以使用或不使用 "hash_"
前缀来使用它们。
hash_all |
每个组中的所有元素是否都计算为真 |
|
hash_any |
每个组中的任何元素是否都计算为真 |
|
hash_approximate_median |
计算每个组中值的近似中位数 |
|
hash_count |
计算每个组中空值/非空值的个数 |
|
hash_count_all |
计算每个组中的行数 |
|
hash_count_distinct |
计算每个组中不同值的个数 |
|
hash_distinct |
保留每个组中不同的值 |
|
hash_first |
计算每个组中的第一个值 |
|
hash_first_last |
计算每个组中值的第一个和最后一个 |
|
hash_last |
计算每个组中的第一个值 |
|
hash_list |
列出每个组中的所有值 |
|
hash_max |
计算每个组中值的最小值或最大值 |
|
hash_mean |
计算每个组中值的平均值 |
|
hash_min |
计算每个组中值的最小值或最大值 |
|
hash_min_max |
计算每个组中值的最小值和最大值 |
|
hash_one |
从每个组中获取一个值 |
|
hash_product |
计算每个组中值的乘积 |
|
hash_stddev |
计算每个组中值的标准差 |
|
hash_sum |
对每个组中的值求和 |
|
hash_tdigest |
计算每个组中值的近似分位数 |
|
hash_variance |
计算每个组中值的方差 |
表和数据集连接#
Table
和 Dataset
都通过 Table.join()
和 Dataset.join()
方法支持连接操作。
这些方法接受一个将要连接到初始表的右侧表或数据集,以及一个或多个应该从这两个实体中使用的键来执行连接。
默认情况下,执行 left outer join
,但可以请求任何支持的连接类型。
左半连接 (left semi)
右半连接 (right semi)
左反连接 (left anti)
右反连接 (right anti)
内连接 (inner)
左外连接 (left outer)
右外连接 (right outer)
全外连接 (full outer)
只需提供一个表和一个应在其上执行连接的键,就可以执行基本的连接。
import pyarrow as pa
table1 = pa.table({'id': [1, 2, 3],
'year': [2020, 2022, 2019]})
table2 = pa.table({'id': [3, 4],
'n_legs': [5, 100],
'animal': ["Brittle stars", "Centipede"]})
joined_table = table1.join(table2, keys="id")
结果将是一个新的表,该表通过 id
键将 table1
与 table2
进行 left outer join
连接创建。
pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
n_legs: [[5,null,null]]
animal: [["Brittle stars",null,null]]
我们可以执行其他类型的连接,例如 full outer join
,方法是将它们传递给 join_type
参数。
table1.join(table2, keys='id', join_type="full outer")
在这种情况下,结果将是
pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: [[3,1,2,4]]
year: [[2019,2020,2022,null]]
n_legs: [[5,null,null,100]]
animal: [["Brittle stars",null,null,"Centipede"]]
还可以提供额外的连接键,以便连接在两个键而不是一个键上进行。例如,我们可以向 table2
添加一个 year
列,以便我们可以根据 ('id', 'year')
进行连接。
table2_withyear = table2.append_column("year", pa.array([2019, 2022]))
table1.join(table2_withyear, keys=["id", "year"])
结果将是一个表,其中只有 id=3
和 year=2019
的条目有数据,其余的将为 null
。
pyarrow.Table
id: int64
year: int64
animal: string
n_legs: int64
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
animal: [["Brittle stars",null,null]]
n_legs: [[5,null,null]]
Dataset.join()
也具有相同的功能,因此您可以获取两个数据集并将其连接起来。
import pyarrow.dataset as ds
ds1 = ds.dataset(table1)
ds2 = ds.dataset(table2)
joined_ds = ds1.join(ds2, keys="id")
结果数据集将是一个包含连接数据的 InMemoryDataset
。
>>> joined_ds.head(5)
pyarrow.Table
id: int64
year: int64
animal: string
n_legs: int64
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
animal: [["Brittle stars",null,null]]
n_legs: [[5,null,null]]
按表达式过滤#
Table
和 Dataset
都可以使用布尔 Expression
进行过滤。
表达式可以从 pyarrow.compute.field()
开始构建。然后可以将比较和转换应用于一个或多个字段,以构建您关心的过滤表达式。
大多数 Compute Functions 可用于对 field
执行转换。
例如,我们可以构建一个过滤器来查找列 "nums"
中所有偶数行。
import pyarrow.compute as pc
even_filter = (pc.bit_wise_and(pc.field("nums"), pc.scalar(1)) == pc.scalar(0))
注意
过滤器通过在数字和 1
之间执行按位与运算来查找偶数。由于 1
在二进制形式中为 00000001
,因此只有最后一位设置为 1
的数字才会从 bit_wise_and
运算中返回非零结果。这样,我们就可以识别所有奇数。鉴于我们对偶数感兴趣,然后我们检查 bit_wise_and
运算返回的数字是否等于 0
。只有最后一位为 0
的数字才会将 num & 1
的结果返回为 0
,并且由于最后一位为 0
的所有数字都是 2
的倍数,因此我们将仅过滤偶数。
获得过滤器后,我们可以将其提供给 Table.filter()
方法,以仅过滤匹配行的表。
>>> table = pa.table({'nums': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
... 'chars': ["a", "b", "c", "d", "e", "f", "g", "h", "i", "l"]})
>>> table.filter(even_filter)
pyarrow.Table
nums: int64
chars: string
----
nums: [[2,4,6,8,10]]
chars: [["b","d","f","h","l"]]
可以使用 &
、|
、~
将多个过滤器连接起来,以执行 and
、or
和 not
操作。例如,使用 ~even_filter
实际上最终会过滤所有奇数。
>>> table.filter(~even_filter)
pyarrow.Table
nums: int64
chars: string
----
nums: [[1,3,5,7,9]]
chars: [["a","c","e","g","i"]]
我们可以构建一个过滤器,通过将我们的 even_filter
与 pc.field("nums") > 5
过滤器组合起来,找到所有大于 5 的偶数。
>>> table.filter(even_filter & (pc.field("nums") > 5))
pyarrow.Table
nums: int64
chars: string
----
nums: [[6,8,10]]
chars: [["f","h","l"]]
Dataset
可以类似地使用 Dataset.filter()
方法进行过滤。该方法将返回 Dataset
的一个实例,该实例将在访问数据集的实际数据时延迟应用过滤器。
>>> dataset = ds.dataset(table)
>>> filtered = dataset.filter(pc.field("nums") < 5).filter(pc.field("nums") > 2)
>>> filtered.to_table()
pyarrow.Table
nums: int64
chars: string
----
nums: [[3,4]]
chars: [["c","d"]]
用户定义函数#
警告
此 API 为**实验性**。
PyArrow 允许定义和注册自定义计算函数。然后,可以使用其注册的函数名称从 Python 以及 C++(以及可能包装 Arrow C++ 的任何其他实现,例如 R 的 arrow
包)调用这些函数。
UDF 支持仅限于标量函数。标量函数是在数组或标量上执行逐元素操作的函数。通常,标量函数的输出不依赖于参数中值的顺序。请注意,此类函数与 SQL 表达式中使用的函数或 NumPy 的 通用函数 存在粗略对应关系。
要注册 UDF,需要定义函数名称、函数文档、输入类型和输出类型。使用 pyarrow.compute.register_scalar_function()
,
import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
function_name = "numpy_gcd"
function_docs = {
"summary": "Calculates the greatest common divisor",
"description":
"Given 'x' and 'y' find the greatest number that divides\n"
"evenly into both x and y."
}
input_types = {
"x" : pa.int64(),
"y" : pa.int64()
}
output_type = pa.int64()
def to_np(val):
if isinstance(val, pa.Scalar):
return val.as_py()
else:
return np.array(val)
def gcd_numpy(ctx, x, y):
np_x = to_np(x)
np_y = to_np(y)
return pa.array(np.gcd(np_x, np_y))
pc.register_scalar_function(gcd_numpy,
function_name,
function_docs,
input_types,
output_type)
用户定义函数的实现始终采用第一个上下文参数(在上面的示例中命名为 ctx
),它是 pyarrow.compute.UdfContext
的实例。此上下文公开了几个有用的属性,特别是 memory_pool
,用于在用户定义函数的上下文中进行分配。
您可以使用 pyarrow.compute.call_function()
直接调用用户定义函数。
>>> pc.call_function("numpy_gcd", [pa.scalar(27), pa.scalar(63)])
<pyarrow.Int64Scalar: 9>
>>> pc.call_function("numpy_gcd", [pa.scalar(27), pa.array([81, 12, 5])])
<pyarrow.lib.Int64Array object at 0x7fcfa0e7b100>
[
27,
3,
1
]
使用数据集#
更一般地,用户定义函数可以在任何可以使用其名称引用计算函数的地方使用。例如,可以使用 Expression._call()
在数据集的列上调用它们。
考虑一个数据在表中并且我们想要计算一列与标量值 30 的最大公约数的实例。我们将重用上面创建的“numpy_gcd”用户定义函数。
>>> import pyarrow.dataset as ds
>>> data_table = pa.table({'category': ['A', 'B', 'C', 'D'], 'value': [90, 630, 1827, 2709]})
>>> dataset = ds.dataset(data_table)
>>> func_args = [pc.scalar(30), ds.field("value")]
>>> dataset.to_table(
... columns={
... 'gcd_value': ds.field('')._call("numpy_gcd", func_args),
... 'value': ds.field('value'),
... 'category': ds.field('category')
... })
pyarrow.Table
gcd_value: int64
value: int64
category: string
----
gcd_value: [[30,30,3,3]]
value: [[90,630,1827,2709]]
category: [["A","B","C","D"]]
请注意,ds.field('')._call(...)
返回一个 pyarrow.compute.Expression()
。传递给此函数调用的参数是表达式,而不是标量值(注意 pyarrow.scalar()
和 pyarrow.compute.scalar()
之间的区别,后者生成一个表达式)。当投影运算符执行它时,将评估此表达式。
投影表达式#
在上面的示例中,我们使用表达式向我们的表添加了一个新列(gcd_value
)。向表添加新的、动态计算的列称为“投影”,并且对投影表达式中可以使用哪些类型的函数存在限制。投影函数必须为每个输入行发出一个输出值。该输出值应完全根据输入行计算,并且不应依赖于任何其他行。例如,我们一直在用作示例的“numpy_gcd”函数是投影中可以使用的一个有效函数。“累积和”函数将不是一个有效函数,因为每个输入行的结果都取决于之前出现的行。“删除空值”函数也将无效,因为它不会为某些行发出值。