Substrait#
arrow-substrait
模块实现了对 Substrait 格式的支持,从而可以实现与 Arrow 对象之间的转换。
arrow-dataset
模块可以通过 Acero 查询引擎执行 Substrait 计划。
处理模式 (Schemas)#
Arrow 模式可以使用 pyarrow.substrait.serialize_schema()
和 pyarrow.substrait.deserialize_schema()
函数进行编码和解码。
import pyarrow as pa
import pyarrow.substrait as pa_substrait
arrow_schema = pa.schema([
pa.field("x", pa.int32()),
pa.field("y", pa.string())
])
substrait_schema = pa_substrait.serialize_schema(arrow_schema)
以 Substrait NamedStruct
形式编组的模式可以直接通过 substrait_schema.schema
访问。
>>> print(substrait_schema.schema)
b'\n\x01x\n\x01y\x12\x0c\n\x04*\x02\x10\x01\n\x04b\x02\x10\x01'
如果使用了 Arrow 自定义类型,则该模式将需要这些类型的扩展才能实际使用。因此,该模式也以扩展表达式的形式提供,包括所有扩展类型。
>>> print(substrait_schema.expression)
b'"\x14\n\x01x\n\x01y\x12\x0c\n\x04*\x02\x10\x01\n\x04b\x02\x10\x01:\x19\x10,*\x15Acero 17.0.0'
如果安装了 Substrait Python
,该模式也可以转换为一个 substrait-python
对象。
>>> print(substrait_schema.to_pysubstrait())
version {
minor_number: 44
producer: "Acero 17.0.0"
}
base_schema {
names: "x"
names: "y"
struct {
types {
i32 {
nullability: NULLABILITY_NULLABLE
}
}
types {
string {
nullability: NULLABILITY_NULLABLE
}
}
}
}
处理表达式 (Expressions)#
Arrow 计算表达式可以使用 pyarrow.substrait.serialize_expressions()
和 pyarrow.substrait.deserialize_expressions()
函数进行编码和解码。
import pyarrow as pa
import pyarrow.compute as pa
import pyarrow.substrait as pa_substrait
arrow_schema = pa.schema([
pa.field("x", pa.int32()),
pa.field("y", pa.int32())
])
substrait_expr = pa_substrait.serialize_expressions(
exprs=[pc.field("x") + pc.field("y")],
names=["total"],
schema=arrow_schema
)
将表达式编码为 Substrait 的结果将是 protobuf ExtendedExpression
消息数据本身。
>>> print(bytes(substrait_expr))
b'\nZ\x12Xhttps://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml\x12\x07\x1a\x05\x1a\x03add\x1a>\n5\x1a3\x1a\x04*\x02\x10\x01"\n\x1a\x08\x12\x06\n\x02\x12\x00"\x00"\x0c\x1a\n\x12\x08\n\x04\x12\x02\x08\x01"\x00*\x11\n\x08overflow\x12\x05ERROR\x1a\x05total"\x14\n\x01x\n\x01y\x12\x0c\n\x04*\x02\x10\x01\n\x04*\x02\x10\x01:\x19\x10,*\x15Acero 17.0.0'
因此,如果需要 Substrait Python
对象,则必须从 substrait-python
本身解码表达式。
>>> import substrait
>>> pysubstrait_expr = substrait.proto.ExtendedExpression.FromString(substrait_expr)
>>> print(pysubstrait_expr)
version {
minor_number: 44
producer: "Acero 17.0.0"
}
extension_uris {
uri: "https://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml"
}
extensions {
extension_function {
name: "add"
}
}
referred_expr {
expression {
scalar_function {
arguments {
value {
selection {
direct_reference {
struct_field {
}
}
root_reference {
}
}
}
}
arguments {
value {
selection {
direct_reference {
struct_field {
field: 1
}
}
root_reference {
}
}
}
}
options {
name: "overflow"
preference: "ERROR"
}
output_type {
i32 {
nullability: NULLABILITY_NULLABLE
}
}
}
}
output_names: "total"
}
base_schema {
names: "x"
names: "y"
struct {
types {
i32 {
nullability: NULLABILITY_NULLABLE
}
}
types {
i32 {
nullability: NULLABILITY_NULLABLE
}
}
}
}
使用 Substrait 扩展表达式执行查询#
数据集支持使用 Substrait 的 扩展表达式执行查询。这些表达式可以以 pyarrow.substrait.BoundExpressions
的形式传递给数据集扫描器。
import pyarrow.dataset as ds
import pyarrow.substrait as pa_substrait
# Use substrait-python to create the queries
from substrait import proto
dataset = ds.dataset("./data/index-0.parquet")
substrait_schema = pa_substrait.serialize_schema(dataset.schema).to_pysubstrait()
# SELECT project_name FROM dataset WHERE project_name = 'pyarrow'
projection = proto.ExtendedExpression(referred_expr=[
{"expression": {"selection": {"direct_reference": {"struct_field": {"field": 0}}}},
"output_names": ["project_name"]}
])
projection.MergeFrom(substrait_schema)
filtering = proto.ExtendedExpression(
extension_uris=[{"extension_uri_anchor": 99, "uri": "/functions_comparison.yaml"}],
extensions=[{"extension_function": {"extension_uri_reference": 99, "function_anchor": 199, "name": "equal:any1_any1"}}],
referred_expr=[
{"expression": {"scalar_function": {"function_reference": 199, "arguments": [
{"value": {"selection": {"direct_reference": {"struct_field": {"field": 0}}}}},
{"value": {"literal": {"string": "pyarrow"}}}
], "output_type": {"bool": {"nullability": False}}}}}
]
)
filtering.MergeFrom(substrait_schema)
results = dataset.scanner(
columns=pa.substrait.BoundExpressions.from_substrait(projection),
filter=pa.substrait.BoundExpressions.from_substrait(filtering)
).head(5)
project_name
0 pyarrow
1 pyarrow
2 pyarrow
3 pyarrow
4 pyarrow