Substrait#

arrow-substrait 模块实现了对 Substrait 格式的支持,从而可以实现与 Arrow 对象之间的转换。

arrow-dataset 模块可以通过 Acero 查询引擎执行 Substrait 计划。

处理模式 (Schemas)#

Arrow 模式可以使用 pyarrow.substrait.serialize_schema()pyarrow.substrait.deserialize_schema() 函数进行编码和解码。

import pyarrow as pa
import pyarrow.substrait as pa_substrait

arrow_schema = pa.schema([
    pa.field("x", pa.int32()),
    pa.field("y", pa.string())
])
substrait_schema = pa_substrait.serialize_schema(arrow_schema)

以 Substrait NamedStruct 形式编组的模式可以直接通过 substrait_schema.schema 访问。

>>> print(substrait_schema.schema)
b'\n\x01x\n\x01y\x12\x0c\n\x04*\x02\x10\x01\n\x04b\x02\x10\x01'

如果使用了 Arrow 自定义类型,则该模式将需要这些类型的扩展才能实际使用。因此,该模式也以扩展表达式的形式提供,包括所有扩展类型。

>>> print(substrait_schema.expression)
b'"\x14\n\x01x\n\x01y\x12\x0c\n\x04*\x02\x10\x01\n\x04b\x02\x10\x01:\x19\x10,*\x15Acero 17.0.0'

如果安装了 Substrait Python,该模式也可以转换为一个 substrait-python 对象。

>>> print(substrait_schema.to_pysubstrait())
version {
    minor_number: 44
    producer: "Acero 17.0.0"
}
base_schema {
    names: "x"
    names: "y"
    struct {
        types {
            i32 {
                nullability: NULLABILITY_NULLABLE
            }
        }
        types {
            string {
                nullability: NULLABILITY_NULLABLE
            }
        }
    }
}

处理表达式 (Expressions)#

Arrow 计算表达式可以使用 pyarrow.substrait.serialize_expressions()pyarrow.substrait.deserialize_expressions() 函数进行编码和解码。

import pyarrow as pa
import pyarrow.compute as pa
import pyarrow.substrait as pa_substrait

arrow_schema = pa.schema([
    pa.field("x", pa.int32()),
    pa.field("y", pa.int32())
])

substrait_expr = pa_substrait.serialize_expressions(
    exprs=[pc.field("x") + pc.field("y")],
    names=["total"],
    schema=arrow_schema
)

将表达式编码为 Substrait 的结果将是 protobuf ExtendedExpression 消息数据本身。

>>> print(bytes(substrait_expr))
b'\nZ\x12Xhttps://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml\x12\x07\x1a\x05\x1a\x03add\x1a>\n5\x1a3\x1a\x04*\x02\x10\x01"\n\x1a\x08\x12\x06\n\x02\x12\x00"\x00"\x0c\x1a\n\x12\x08\n\x04\x12\x02\x08\x01"\x00*\x11\n\x08overflow\x12\x05ERROR\x1a\x05total"\x14\n\x01x\n\x01y\x12\x0c\n\x04*\x02\x10\x01\n\x04*\x02\x10\x01:\x19\x10,*\x15Acero 17.0.0'

因此,如果需要 Substrait Python 对象,则必须从 substrait-python 本身解码表达式。

>>> import substrait
>>> pysubstrait_expr = substrait.proto.ExtendedExpression.FromString(substrait_expr)
>>> print(pysubstrait_expr)
version {
  minor_number: 44
  producer: "Acero 17.0.0"
}
extension_uris {
  uri: "https://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml"
}
extensions {
  extension_function {
    name: "add"
  }
}
referred_expr {
  expression {
    scalar_function {
      arguments {
        value {
          selection {
            direct_reference {
              struct_field {
              }
            }
            root_reference {
            }
          }
        }
      }
      arguments {
        value {
          selection {
            direct_reference {
              struct_field {
                field: 1
              }
            }
            root_reference {
            }
          }
        }
      }
      options {
        name: "overflow"
        preference: "ERROR"
      }
      output_type {
        i32 {
          nullability: NULLABILITY_NULLABLE
        }
      }
    }
  }
  output_names: "total"
}
base_schema {
  names: "x"
  names: "y"
  struct {
    types {
      i32 {
        nullability: NULLABILITY_NULLABLE
      }
    }
    types {
      i32 {
        nullability: NULLABILITY_NULLABLE
      }
    }
  }
}

使用 Substrait 扩展表达式执行查询#

数据集支持使用 Substrait 的 扩展表达式执行查询。这些表达式可以以 pyarrow.substrait.BoundExpressions 的形式传递给数据集扫描器。

import pyarrow.dataset as ds
import pyarrow.substrait as pa_substrait

# Use substrait-python to create the queries
from substrait import proto

dataset = ds.dataset("./data/index-0.parquet")
substrait_schema = pa_substrait.serialize_schema(dataset.schema).to_pysubstrait()

# SELECT project_name FROM dataset WHERE project_name = 'pyarrow'

projection = proto.ExtendedExpression(referred_expr=[
    {"expression": {"selection": {"direct_reference": {"struct_field": {"field": 0}}}},
    "output_names": ["project_name"]}
])
projection.MergeFrom(substrait_schema)

filtering = proto.ExtendedExpression(
        extension_uris=[{"extension_uri_anchor": 99, "uri": "/functions_comparison.yaml"}],
        extensions=[{"extension_function": {"extension_uri_reference": 99, "function_anchor": 199, "name": "equal:any1_any1"}}],
        referred_expr=[
            {"expression": {"scalar_function": {"function_reference": 199, "arguments": [
                {"value": {"selection": {"direct_reference": {"struct_field": {"field": 0}}}}},
                {"value": {"literal": {"string": "pyarrow"}}}
            ], "output_type": {"bool": {"nullability": False}}}}}
        ]
)
filtering.MergeFrom(substrait_schema)

results = dataset.scanner(
    columns=pa.substrait.BoundExpressions.from_substrait(projection),
    filter=pa.substrait.BoundExpressions.from_substrait(filtering)
).head(5)
project_name
0      pyarrow
1      pyarrow
2      pyarrow
3      pyarrow
4      pyarrow