Gandiva 表达式、投影和过滤器#
构建表达式#
Gandiva 提供了一种通用的表达式表示形式,其中表达式由节点树表示。表达式树是使用 TreeExprBuilder
构建的。表达式树的叶子通常是字段引用(由 TreeExprBuilder::MakeField()
创建)和字面值(由 TreeExprBuilder::MakeLiteral()
创建)。可以使用以下方法将节点组合成更复杂的表达式树:
使用
TreeExprBuilder::MakeFunction()
创建函数节点。(您可以调用GetRegisteredFunctionSignatures()
获取有效函数签名的列表。)使用
TreeExprBuilder::MakeIf()
创建 if-else 逻辑。使用
TreeExprBuilder::MakeAnd()
和TreeExprBuilder::MakeOr()
创建布尔表达式。(对于“非”,请在MakeFunction
中使用not(bool)
函数。)使用
TreeExprBuilder::MakeInExpressionInt32()
和其他“in 表达式”函数创建集合成员测试。
这些函数中的每一个都创建新的复合节点,其中包含叶节点(字面量和字段引用)或其他复合节点作为子节点。通过组合这些节点,您可以创建任意复杂的表达式树。
构建表达式树后,它们将包装在 Expression
或 Condition
中,具体取决于它们将如何使用。Expression
用于投影,而 Condition
用于过滤器。
例如,以下是如何创建表示 x + 3
的表达式和表示 x < 3
的条件
std::shared_ptr<arrow::Field> field_x_raw = arrow::field("x", arrow::int32());
std::shared_ptr<Node> field_x = TreeExprBuilder::MakeField(field_x_raw);
std::shared_ptr<Node> literal_3 = TreeExprBuilder::MakeLiteral(3);
std::shared_ptr<arrow::Field> field_result = arrow::field("result", arrow::int32());
std::shared_ptr<Node> add_node =
TreeExprBuilder::MakeFunction("add", {field_x, literal_3}, arrow::int32());
std::shared_ptr<Expression> expression =
TreeExprBuilder::MakeExpression(add_node, field_result);
std::shared_ptr<Node> less_than_node =
TreeExprBuilder::MakeFunction("less_than", {field_x, literal_3}, arrow::boolean());
std::shared_ptr<Condition> condition = TreeExprBuilder::MakeCondition(less_than_node);
投影和过滤器#
Gandiva 的两个执行内核是 Projector
和 Filter
。Projector
使用一个记录批次并投影到一个新的记录批次中。Filter
使用一个记录批次并生成一个包含与条件匹配的索引的 SelectionVector
。
对于 Projector
和 Filter
,表达式 IR 的优化发生在创建实例时。它们是根据静态模式编译的,因此此时必须知道记录批次的模式。
继续上一节中创建的 expression
和 condition
,以下是如何创建投影器和过滤器的示例
std::shared_ptr<arrow::Schema> input_schema = arrow::schema({field_x_raw});
std::shared_ptr<arrow::Schema> output_schema = arrow::schema({field_result});
std::shared_ptr<Projector> projector;
Status status;
std::vector<std::shared_ptr<Expression>> expressions = {expression};
status = Projector::Make(input_schema, expressions, &projector);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<Filter> filter;
status = Filter::Make(input_schema, condition, &filter);
ARROW_RETURN_NOT_OK(status);
创建投影器或过滤器后,就可以在 Arrow 记录批次上对其进行评估。这些执行内核本身是单线程的,但设计为可重用以并行处理不同的记录批次。
评估投影#
使用 Projector::Evaluate()
执行。这将输出一个数组向量,可以将其与输出模式一起传递给 arrow::RecordBatch::Make()
。
auto pool = arrow::default_memory_pool();
int num_records = 4;
arrow::Int32Builder builder;
int32_t values[4] = {1, 2, 3, 4};
ARROW_RETURN_NOT_OK(builder.AppendValues(values, 4));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> array, builder.Finish());
auto in_batch = arrow::RecordBatch::Make(input_schema, num_records, {array});
arrow::ArrayVector outputs;
status = projector->Evaluate(*in_batch, pool, &outputs);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<arrow::RecordBatch> result =
arrow::RecordBatch::Make(output_schema, outputs[0]->length(), outputs);
评估过滤器#
Filter::Evaluate()
生成 SelectionVector
,这是一个包含与过滤器条件匹配的行索引的向量。选择向量是围绕箭头整数数组的包装器,通过位宽进行参数化。创建选择向量时(必须在传递给 Evaluate()
*之前* 初始化它),必须选择位宽,这决定了它可以容纳的最大索引值,以及最大槽数,这决定了它可以包含多少个索引。通常,最大槽数应设置为您的批大小,位宽应设置为可以表示所有小于批大小的整数的最小整数大小。例如,如果您的批大小为 100k,则将最大槽数设置为 100k,将位宽设置为 32(因为 2^16 = 64k,这太小了)。
运行 Evaluate()
并填充 SelectionVector
后,使用 SelectionVector::ToArray()
方法获取基础数组,然后使用 ::arrow::compute::Take()
实现输出记录批次。
std::shared_ptr<gandiva::SelectionVector> result_indices;
// Use 16-bit integers for indices. Result can be no longer than input size,
// so use batch num_rows as max_slots.
status = gandiva::SelectionVector::MakeInt16(/*max_slots=*/in_batch->num_rows(), pool,
&result_indices);
ARROW_RETURN_NOT_OK(status);
status = filter->Evaluate(*in_batch, result_indices);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<arrow::Array> take_indices = result_indices->ToArray();
Datum maybe_batch;
ARROW_ASSIGN_OR_RAISE(maybe_batch,
arrow::compute::Take(Datum(in_batch), Datum(take_indices),
TakeOptions::NoBoundsCheck()));
result = maybe_batch.record_batch();
评估投影和过滤器#
最后,您还可以使用 Projector::Evaluate()
在应用选择向量的同时进行投影。为此,首先确保使用 SelectionVector::GetMode()
初始化 Projector
,以便投影器使用正确的位宽进行编译。然后,您可以将 SelectionVector
传递到 Projector::Evaluate()
方法中。
// Make sure the projector is compiled for the appropriate selection vector mode
status = Projector::Make(input_schema, expressions, result_indices->GetMode(),
ConfigurationBuilder::DefaultConfiguration(), &projector);
ARROW_RETURN_NOT_OK(status);
arrow::ArrayVector outputs_filtered;
status = projector->Evaluate(*in_batch, result_indices.get(), pool, &outputs_filtered);
ARROW_RETURN_NOT_OK(status);
result =
arrow::RecordBatch::Make(output_schema, outputs[0]->length(), outputs_filtered);