Gandiva表达式、投影器和过滤器#

构建表达式#

Gandiva提供了一种通用的表达式表示形式,其中表达式由节点树表示。 表达式树是使用TreeExprBuilder构建的。 表达式树的叶子通常是字段引用(由TreeExprBuilder::MakeField()创建)和文字值(由TreeExprBuilder::MakeLiteral()创建)。 可以使用以下方式将节点组合成更复杂的表达式树

  • TreeExprBuilder::MakeFunction()来创建函数节点。(您可以调用GetRegisteredFunctionSignatures()来获取有效函数签名的列表。)

  • TreeExprBuilder::MakeIf()来创建 if-else 逻辑。

  • TreeExprBuilder::MakeAnd()TreeExprBuilder::MakeOr()来创建布尔表达式。(对于“not”,请在MakeFunction中使用not(bool)函数。)

  • TreeExprBuilder::MakeInExpressionInt32()和其他“in expression”函数来创建集合成员资格测试。

这些函数中的每一个都会创建新的复合节点,其中包含叶节点(文字和字段引用)或其他作为子级的复合节点。 通过组合这些,您可以创建任意复杂的表达式树。

一旦构建了表达式树,它们就会被包装在ExpressionCondition中,具体取决于它们的使用方式。 Expression用于投影,而Condition用于过滤器。

例如,以下是如何创建一个表示x + 3的Expression和一个表示x < 3的Condition的例子

std::shared_ptr<arrow::Field> field_x_raw = arrow::field("x", arrow::int32());
std::shared_ptr<Node> field_x = TreeExprBuilder::MakeField(field_x_raw);
std::shared_ptr<Node> literal_3 = TreeExprBuilder::MakeLiteral(3);
std::shared_ptr<arrow::Field> field_result = arrow::field("result", arrow::int32());

std::shared_ptr<Node> add_node =
    TreeExprBuilder::MakeFunction("add", {field_x, literal_3}, arrow::int32());
std::shared_ptr<Expression> expression =
    TreeExprBuilder::MakeExpression(add_node, field_result);

std::shared_ptr<Node> less_than_node =
    TreeExprBuilder::MakeFunction("less_than", {field_x, literal_3}, arrow::boolean());
std::shared_ptr<Condition> condition = TreeExprBuilder::MakeCondition(less_than_node);

投影器和过滤器#

Gandiva的两个执行内核是ProjectorFilterProjector使用记录批处理并投影到新的记录批处理中。 Filter使用记录批处理并生成一个SelectionVector,其中包含与条件匹配的索引。

对于ProjectorFilter,表达式IR的优化发生在创建实例时。 它们是针对静态模式编译的,因此此时必须知道记录批处理的模式。

继续上一节中创建的expressioncondition,以下是创建 Projector 和 Filter 的示例

std::shared_ptr<arrow::Schema> input_schema = arrow::schema({field_x_raw});
std::shared_ptr<arrow::Schema> output_schema = arrow::schema({field_result});
std::shared_ptr<Projector> projector;
Status status;
std::vector<std::shared_ptr<Expression>> expressions = {expression};
status = Projector::Make(input_schema, expressions, &projector);
ARROW_RETURN_NOT_OK(status);

std::shared_ptr<Filter> filter;
status = Filter::Make(input_schema, condition, &filter);
ARROW_RETURN_NOT_OK(status);

创建 Projector 或 Filter 后,可以在 Arrow 记录批处理上对其进行评估。 这些执行内核本身是单线程的,但设计为可重用以并行处理不同的记录批处理。

评估投影#

执行使用Projector::Evaluate()执行。 这会输出一个数组向量,可以将其与输出模式一起传递到arrow::RecordBatch::Make()

auto pool = arrow::default_memory_pool();
int num_records = 4;
arrow::Int32Builder builder;
int32_t values[4] = {1, 2, 3, 4};
ARROW_RETURN_NOT_OK(builder.AppendValues(values, 4));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> array, builder.Finish());
auto in_batch = arrow::RecordBatch::Make(input_schema, num_records, {array});

arrow::ArrayVector outputs;
status = projector->Evaluate(*in_batch, pool, &outputs);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<arrow::RecordBatch> result =
    arrow::RecordBatch::Make(output_schema, outputs[0]->length(), outputs);

评估过滤器#

Filter::Evaluate()产生SelectionVector,这是一个与过滤器条件匹配的行索引向量。 选择向量是箭头的整数数组周围的包装器,由位宽参数化。 创建选择向量时(您必须在传递给Evaluate()之前对其进行初始化),您必须选择位宽,该位宽确定它可以容纳的最大索引值,以及最大槽数,该槽数确定它可以包含多少索引。 通常,最大槽数应设置为批处理大小,位宽应设置为可以表示小于批处理大小的所有整数的最小整数大小。 例如,如果您的批处理大小为 100k,请将最大槽数设置为 100k,并将位宽设置为 32(因为 2^16 = 64k 太小)。

一旦Evaluate()已经运行并且填充了SelectionVector,请使用SelectionVector::ToArray()方法获取底层数组,然后使用::arrow::compute::Take()来物化输出记录批处理。

std::shared_ptr<gandiva::SelectionVector> result_indices;
// Use 16-bit integers for indices. Result can be no longer than input size,
// so use batch num_rows as max_slots.
status = gandiva::SelectionVector::MakeInt16(/*max_slots=*/in_batch->num_rows(), pool,
                                             &result_indices);
ARROW_RETURN_NOT_OK(status);
status = filter->Evaluate(*in_batch, result_indices);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<arrow::Array> take_indices = result_indices->ToArray();
Datum maybe_batch;
ARROW_ASSIGN_OR_RAISE(maybe_batch,
                      arrow::compute::Take(Datum(in_batch), Datum(take_indices),
                                           TakeOptions::NoBoundsCheck()));
result = maybe_batch.record_batch();

评估投影和过滤器#

最后,您还可以在应用选择向量的同时进行投影,使用Projector::Evaluate()。 为此,首先确保使用SelectionVector::GetMode()初始化Projector,以便投影仪以正确的位宽编译。 然后,您可以将SelectionVector传递到Projector::Evaluate()方法中。

// Make sure the projector is compiled for the appropriate selection vector mode
status = Projector::Make(input_schema, expressions, result_indices->GetMode(),
                         ConfigurationBuilder::DefaultConfiguration(), &projector);
ARROW_RETURN_NOT_OK(status);

arrow::ArrayVector outputs_filtered;
status = projector->Evaluate(*in_batch, result_indices.get(), pool, &outputs_filtered);
ARROW_RETURN_NOT_OK(status);

result =
    arrow::RecordBatch::Make(output_schema, outputs[0]->length(), outputs_filtered);