计算函数#
Arrow支持对可能具有不同类型的输入执行逻辑计算操作。
标准的计算操作由 pyarrow.compute
模块提供,可以直接使用。
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> a = pa.array([1, 1, 2, 3])
>>> pc.sum(a)
<pyarrow.Int64Scalar: 7>
分组聚合函数会引发异常,需要通过 pyarrow.Table.group_by()
功能来使用。 更多细节请参考 分组聚合。
标准计算函数#
许多计算函数同时支持数组(分块或非分块)和标量输入,但有些函数会强制要求其中一种。 例如,sort_indices
要求其第一个也是唯一一个输入必须是数组。
以下是一些简单的示例
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> a = pa.array([1, 1, 2, 3])
>>> b = pa.array([4, 1, 2, 8])
>>> pc.equal(a, b)
<pyarrow.lib.BooleanArray object at 0x7f686e4eef30>
[
false,
true,
true,
false
]
>>> x, y = pa.scalar(7.8), pa.scalar(9.3)
>>> pc.multiply(x, y)
<pyarrow.DoubleScalar: 72.54>
这些函数不仅可以进行逐元素操作。 以下是一个对表进行排序的示例
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> t = pa.table({'x':[1,2,3],'y':[3,2,1]})
>>> i = pc.sort_indices(t, sort_keys=[('y', 'ascending')])
>>> i
<pyarrow.lib.UInt64Array object at 0x7fcee5df75e8>
[
2,
1,
0
]
有关 PyArrow 提供的完整计算函数列表,您可以参考 计算函数 参考。
另请参阅
分组聚合#
PyArrow 通过 pyarrow.Table.group_by()
方法支持对 pyarrow.Table
进行分组聚合。 该方法将返回一个分组声明,可以将哈希聚合函数应用于该声明。
>>> import pyarrow as pa
>>> t = pa.table([
... pa.array(["a", "a", "b", "b", "c"]),
... pa.array([1, 2, 3, 4, 5]),
... ], names=["keys", "values"])
>>> t.group_by("keys").aggregate([("values", "sum")])
pyarrow.Table
values_sum: int64
keys: string
----
values_sum: [[3,7,5]]
keys: [["a","b","c"]]
传递给上一个示例中 aggregate
方法的 "sum"
聚合是 hash_sum
计算函数。
通过将多个聚合提供给 aggregate
方法,可以同时执行多个聚合
>>> import pyarrow as pa
>>> t = pa.table([
... pa.array(["a", "a", "b", "b", "c"]),
... pa.array([1, 2, 3, 4, 5]),
... ], names=["keys", "values"])
>>> t.group_by("keys").aggregate([
... ("values", "sum"),
... ("keys", "count")
... ])
pyarrow.Table
values_sum: int64
keys_count: int64
keys: string
----
values_sum: [[3,7,5]]
keys_count: [[2,2,1]]
keys: [["a","b","c"]]
还可以为每个聚合函数提供聚合选项,例如,我们可以使用 CountOptions
来更改我们计算空值的方式
>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> table_with_nulls = pa.table([
... pa.array(["a", "a", "a"]),
... pa.array([1, None, None])
... ], names=["keys", "values"])
>>> table_with_nulls.group_by(["keys"]).aggregate([
... ("values", "count", pc.CountOptions(mode="all"))
... ])
pyarrow.Table
values_count: int64
keys: string
----
values_count: [[3]]
keys: [["a"]]
>>> table_with_nulls.group_by(["keys"]).aggregate([
... ("values", "count", pc.CountOptions(mode="only_valid"))
... ])
pyarrow.Table
values_count: int64
keys: string
----
values_count: [[1]]
keys: [["a"]]
以下是所有支持的分组聚合函数的列表。 您可以使用它们,无论是否带有 "hash_"
前缀。
hash_all |
每个组中的所有元素是否都评估为 true |
|
hash_any |
每个组中是否有任何元素评估为 true |
|
hash_approximate_median |
计算每个组中值的近似中位数 |
|
hash_count |
计算每个组中空值/非空值的数量 |
|
hash_count_all |
计算每个组中的行数 |
|
hash_count_distinct |
计算每个组中不同的值的数量 |
|
hash_distinct |
保留每个组中不同的值 |
|
hash_first |
计算每个组中的第一个值 |
|
hash_first_last |
计算每个组中值的第一个和最后一个 |
|
hash_kurtosis |
计算每个组中值的峰度 |
|
hash_last |
计算每个组中的第一个值 |
|
hash_list |
列出每个组中的所有值 |
|
hash_max |
计算每个组中值的最小值或最大值 |
|
hash_mean |
计算每个组中值的平均值 |
|
hash_min |
计算每个组中值的最小值或最大值 |
|
hash_min_max |
计算每个组中值的最小值和最大值 |
|
hash_one |
从每个组获取一个值 |
|
hash_pivot_wider |
根据透视键列透视值 |
|
hash_product |
计算每个组中值的乘积 |
|
hash_skew |
计算每个组中值的偏度 |
|
hash_stddev |
计算每个组中值的标准差 |
|
hash_sum |
对每个组中的值求和 |
|
hash_tdigest |
计算每个组中值的近似分位数 |
|
hash_variance |
计算每个组中值的方差 |
表和数据集连接#
Table
和 Dataset
都支持通过 Table.join()
和 Dataset.join()
方法进行连接操作。
这些方法接受一个右表或数据集,该表或数据集将连接到初始表或数据集,以及一个或多个键,这些键应从两个实体中使用以执行连接。
默认情况下,执行 left outer join
,但可以请求任何支持的连接类型
left semi
right semi
left anti
right anti
inner
left outer
right outer
full outer
只需提供一个表和一个应该执行连接的键,即可执行基本连接
import pyarrow as pa
table1 = pa.table({'id': [1, 2, 3],
'year': [2020, 2022, 2019]})
table2 = pa.table({'id': [3, 4],
'n_legs': [5, 100],
'animal': ["Brittle stars", "Centipede"]})
joined_table = table1.join(table2, keys="id")
结果将是一个新表,该表通过在 id
键上使用 left outer join
将 table1
与 table2
连接而创建
pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
n_legs: [[5,null,null]]
animal: [["Brittle stars",null,null]]
我们可以执行其他类型的连接,例如,通过将它们传递给 join_type
参数来执行 full outer join
table1.join(table2, keys='id', join_type="full outer")
在这种情况下,结果将是
pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: [[3,1,2,4]]
year: [[2019,2020,2022,null]]
n_legs: [[5,null,null,100]]
animal: [["Brittle stars",null,null,"Centipede"]]
还可以提供其他连接键,以便连接发生在两个键而不是一个键上。 例如,我们可以向 table2
添加一个 year
列,以便我们可以连接 ('id', 'year')
table2_withyear = table2.append_column("year", pa.array([2019, 2022]))
table1.join(table2_withyear, keys=["id", "year"])
结果将是一个表,其中只有 id=3
和 year=2019
的条目具有数据,其余条目将为 null
pyarrow.Table
id: int64
year: int64
animal: string
n_legs: int64
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
animal: [["Brittle stars",null,null]]
n_legs: [[5,null,null]]
Dataset.join()
也提供相同的功能,因此您可以获取两个数据集并将它们连接起来
import pyarrow.dataset as ds
ds1 = ds.dataset(table1)
ds2 = ds.dataset(table2)
joined_ds = ds1.join(ds2, keys="id")
结果数据集将是一个 InMemoryDataset
,其中包含连接的数据
>>> joined_ds.head(5)
pyarrow.Table
id: int64
year: int64
animal: string
n_legs: int64
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
animal: [["Brittle stars",null,null]]
n_legs: [[5,null,null]]
按表达式过滤#
可以使用布尔 Expression
过滤 Table
和 Dataset
。
可以从 pyarrow.compute.field()
开始构建表达式。 然后可以将比较和转换应用于一个或多个字段,以构建您关心的过滤表达式。
大多数 计算函数 可用于对 field
执行转换。
例如,我们可以构建一个过滤器,以查找 "nums"
列中所有为偶数的行
import pyarrow.compute as pc
even_filter = (pc.bit_wise_and(pc.field("nums"), pc.scalar(1)) == pc.scalar(0))
注意
该过滤器通过在数字和 1
之间执行按位与运算来查找偶数。 由于 1
在二进制形式中为 00000001
,因此只有最后一位设置为 1
的数字才会从 bit_wise_and
运算中返回非零结果。 这样,我们就可以识别所有奇数。 鉴于我们对偶数感兴趣,因此我们检查 bit_wise_and
运算返回的数字是否等于 0
。 只有最后一位为 0
的数字才会返回 0
作为 num & 1
的结果,并且由于最后一位为 0
的所有数字都是 2
的倍数,因此我们将仅过滤偶数。
获得过滤器后,我们可以将其提供给 Table.filter()
方法,以便仅过滤与该过滤器匹配的行
>>> table = pa.table({'nums': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
... 'chars': ["a", "b", "c", "d", "e", "f", "g", "h", "i", "l"]})
>>> table.filter(even_filter)
pyarrow.Table
nums: int64
chars: string
----
nums: [[2,4,6,8,10]]
chars: [["b","d","f","h","l"]]
可以使用 &
、|
、~
连接多个过滤器,以执行 and
、or
和 not
运算。 例如,使用 ~even_filter
实际上最终会过滤所有奇数
>>> table.filter(~even_filter)
pyarrow.Table
nums: int64
chars: string
----
nums: [[1,3,5,7,9]]
chars: [["a","c","e","g","i"]]
我们可以通过将我们的 even_filter
与 pc.field("nums") > 5
过滤器组合,来构建一个查找所有大于 5 的偶数的过滤器
>>> table.filter(even_filter & (pc.field("nums") > 5))
pyarrow.Table
nums: int64
chars: string
----
nums: [[6,8,10]]
chars: [["f","h","l"]]
可以使用 Dataset.filter()
方法类似地过滤 Dataset
。 该方法将返回 Dataset
的一个实例,该实例将在访问数据集的实际数据后立即延迟应用过滤器
>>> dataset = ds.dataset(table)
>>> filtered = dataset.filter(pc.field("nums") < 5).filter(pc.field("nums") > 2)
>>> filtered.to_table()
pyarrow.Table
nums: int64
chars: string
----
nums: [[3,4]]
chars: [["c","d"]]
用户自定义函数#
警告
此API是实验性的。
PyArrow允许定义和注册自定义计算函数。然后,可以使用它们注册的函数名从Python以及C++(以及可能任何其他包装Arrow C++的实现,例如R的 arrow
包)调用这些函数。
UDF支持仅限于标量函数。标量函数是对数组或标量执行逐元素操作的函数。一般来说,标量函数的输出不依赖于参数中值的顺序。请注意,此类函数与SQL表达式中使用的函数或NumPy 通用函数大致对应。
要注册 UDF,需要定义函数名、函数文档、输入类型和输出类型。 使用 pyarrow.compute.register_scalar_function()
,
import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
function_name = "numpy_gcd"
function_docs = {
"summary": "Calculates the greatest common divisor",
"description":
"Given 'x' and 'y' find the greatest number that divides\n"
"evenly into both x and y."
}
input_types = {
"x" : pa.int64(),
"y" : pa.int64()
}
output_type = pa.int64()
def to_np(val):
if isinstance(val, pa.Scalar):
return val.as_py()
else:
return np.array(val)
def gcd_numpy(ctx, x, y):
np_x = to_np(x)
np_y = to_np(y)
return pa.array(np.gcd(np_x, np_y))
pc.register_scalar_function(gcd_numpy,
function_name,
function_docs,
input_types,
output_type)
用户自定义函数的实现始终采用第一个 *context* 参数(在上面的示例中命名为 ctx
),它是 pyarrow.compute.UdfContext
的实例。此上下文公开了多个有用的属性,特别是 memory_pool
,用于在用户自定义函数的上下文中进行分配。
您可以使用 pyarrow.compute.call_function()
直接调用用户自定义函数
>>> pc.call_function("numpy_gcd", [pa.scalar(27), pa.scalar(63)])
<pyarrow.Int64Scalar: 9>
>>> pc.call_function("numpy_gcd", [pa.scalar(27), pa.array([81, 12, 5])])
<pyarrow.lib.Int64Array object at 0x7fcfa0e7b100>
[
27,
3,
1
]
使用数据集#
更一般地,用户定义的函数在任何可以通过其名称引用计算函数的地方都可用。 例如,可以使用 Expression._call()
在数据集的列上调用它们。
考虑一个实例,其中数据在一个表中,我们想计算一列与标量值 30 的 GCD。我们将重新使用上面创建的“numpy_gcd”用户定义的函数
>>> import pyarrow.dataset as ds
>>> data_table = pa.table({'category': ['A', 'B', 'C', 'D'], 'value': [90, 630, 1827, 2709]})
>>> dataset = ds.dataset(data_table)
>>> func_args = [pc.scalar(30), ds.field("value")]
>>> dataset.to_table(
... columns={
... 'gcd_value': ds.field('')._call("numpy_gcd", func_args),
... 'value': ds.field('value'),
... 'category': ds.field('category')
... })
pyarrow.Table
gcd_value: int64
value: int64
category: string
----
gcd_value: [[30,30,3,3]]
value: [[90,630,1827,2709]]
category: [["A","B","C","D"]]
请注意,ds.field('')._call(...)
返回一个 pyarrow.compute.Expression()
。 传递给此函数调用的参数是表达式,而不是标量值(注意 pyarrow.scalar()
和 pyarrow.compute.scalar()
之间的区别,后者产生一个表达式)。 当投影运算符执行它时,此表达式被评估。
投影表达式#
在上面的示例中,我们使用表达式向我们的表添加了一个新列 (gcd_value
)。 向表中添加新的动态计算的列称为“投影”,并且对可以在投影表达式中使用哪些类型的函数有限制。 投影函数必须为每个输入行发出一个输出值。 该输出值应完全从输入行计算,不应依赖于任何其他行。 例如,我们一直在使用的“numpy_gcd”函数是一个有效的投影函数。 “累积总和”函数将不是有效的函数,因为每个输入行的结果取决于之前的行。 “删除空值”函数也将无效,因为它不会为某些行发出值。