Acero 用户指南#
本页面描述如何使用 Acero。建议您先阅读概述并熟悉基本概念。
使用 Acero#
Acero 的基本工作流程如下:
首先,创建一个描述计划的
Declaration
对象图。调用 DeclarationToXyz 方法之一来执行 Declaration。
将从 Declaration 图创建一个新的 ExecPlan。每个 Declaration 将对应于计划中的一个 ExecNode。此外,将根据使用的 DeclarationToXyz 方法添加一个接收器节点。
执行 ExecPlan。这通常作为 DeclarationToXyz 调用的一部分发生,但在 DeclarationToReader 中,读取器会在计划完成执行之前返回。
计划完成后,它将被销毁。
创建计划#
使用 Substrait#
Substrait 是创建计划(Declaration
图)的首选机制。原因如下:
Substrait 生产者花费大量时间和精力来创建用户友好的 API,以便以简单的方式生成复杂的执行计划。例如,可以使用一系列复杂的
aggregate
节点来实现pivot_wider
操作。生产者将为您提供更简单的 API,而不是手动创建所有这些aggregate
节点。如果您正在使用 Substrait,那么如果您在某个时候发现其他 Substrait 使用引擎比 Acero 更能满足您的需求,则可以轻松切换到该引擎。
我们希望最终会出现用于基于 Substrait 的优化器和规划器的工具。通过使用 Substrait,您将能够更轻松地在未来使用这些工具。
您可以自己创建 Substrait 计划,但您可能会更容易找到现有的 Substrait 生产者。例如,您可以使用 ibis-substrait 从 python 表达式轻松创建 Substrait 计划。有一些不同的工具可以从 SQL 创建 Substrait 计划。最终,我们希望出现基于 C++ 的 Substrait 生产者。但是,我们目前还没有了解到任何此类工具。
有关从 Substrait 创建执行计划的详细说明,请参阅 Substrait 页面
程序化计划创建#
以编程方式创建执行计划比从 Substrait 创建计划更简单,尽管会失去一些灵活性和面向未来的保证。创建 Declaration 最简单的方法是简单地实例化一个。您将需要声明的名称、输入向量和选项对象。例如:
381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390 auto options = std::make_shared<arrow::dataset::ScanOptions>();
391 // projection
392 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393 options->projection = cp::project({}, {});
394
395 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397 ac::Declaration scan{"scan", std::move(scan_node_options)};
398 ac::Declaration project{
399 "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401 return ExecutePlanAndCollectAsTable(std::move(project));
402}
上面的代码创建了一个扫描声明(没有输入)和一个投影声明(使用扫描作为输入)。这很简单,但我们可以使其更容易一些。如果您正在创建一系列线性声明(如上例所示),那么您还可以使用 Declaration::Sequence()
函数。
420 // Inputs do not have to be passed to the project node when using Sequence
421 ac::Declaration plan =
422 ac::Declaration::Sequence({{"scan", std::move(scan_node_options)},
423 {"project", ac::ProjectNodeOptions({a_times_2})}});
本文档后面还有更多程序化计划创建的示例。
执行计划#
可以使用多种不同的方法来执行声明。每种方法都以略微不同的形式提供数据。由于所有这些方法都以 DeclarationTo...
开头,因此本指南通常将这些方法称为 DeclarationToXyz
方法。
DeclarationToTable#
DeclarationToTable()
方法会将所有结果累积到单个 arrow::Table
中。这可能是从 Acero 收集结果的最简单方法。这种方法的主要缺点是它需要将所有结果累积到内存中。
注意
Acero 以小块处理大型数据集。这在开发者指南中有更详细的描述。因此,您可能会惊讶地发现使用 DeclarationToTable 收集的表的块与您的输入不同。例如,您的输入可能是一个包含 200 万行的单个块的大表。然后,您的输出表可能有 64 个块,每个块有 32Ki 行。当前有一个请求,用于在 GH-15155 中指定输出的块大小。
DeclarationToReader#
DeclarationToReader()
方法允许您迭代地使用结果。它将创建一个 arrow::RecordBatchReader
,您可以随意从中读取。如果您读取读取器的速度不够快,则将应用背压,执行计划将暂停。关闭读取器将取消正在运行的执行计划,读取器的析构函数将等待执行计划完成它正在执行的任何操作,因此它可能会阻塞。
DeclarationToStatus#
DeclarationToStatus()
方法在您想要运行计划但不实际想要使用结果时很有用。例如,这在进行基准测试或计划具有副作用(例如数据集写入节点)时很有用。如果计划生成任何结果,则它们将被立即丢弃。
直接运行计划#
如果 DeclarationToXyz
方法之一由于某种原因不够用,则可以直接运行计划。仅当您正在执行某些独特操作时才需要这样做。例如,如果您创建了自定义接收器节点,或者您需要具有多个输出的计划。
注意
在学术文献和许多现有系统中,通常假设执行计划最多只有一个输出。Acero 中的某些内容(例如 DeclarationToXyz 方法)将期望这一点。但是,设计中没有任何内容严格禁止具有多个接收器节点。
有关如何执行此操作的详细说明超出了本指南的范围,但大致步骤如下:
创建一个新的
ExecPlan
对象。将接收器节点添加到您的
Declaration
对象图中(这是您需要为接收器节点创建声明的唯一类型)使用
Declaration::AddToPlan()
将您的声明添加到您的计划中(如果您的输出超过一个,将无法使用此方法,需要一次添加一个节点)使用
ExecPlan::Validate()
验证计划使用
ExecPlan::StartProducing()
启动计划等待
ExecPlan::finished()
返回的 future 完成。
提供输入#
执行计划的输入数据可以来自各种来源。它通常是从存储在某种文件系统上的文件中读取的。输入来自内存中的数据也很常见。例如,在类似 pandas 的前端中,内存数据是典型的。输入也可以来自网络流,例如 Flight 请求。Acero 可以支持所有这些情况,甚至可以支持此处未提及的独特和自定义情况。
预定义的源节点涵盖了最常见的输入场景。这些如下所列。但是,如果您的源数据是唯一的,那么您将需要使用通用的 source
节点。此节点期望您提供异步批处理流,此处 将更详细地介绍它。
可用的 ExecNode
实现#
下表快速总结了可用的运算符。
源#
这些节点可以用作数据源
工厂名称 |
选项 |
简要描述 |
---|---|---|
|
一个通用的源节点,它包装了异步数据流(示例) |
|
|
从 |
|
|
从 |
|
|
从 |
|
|
从 |
|
|
从 |
|
|
从 |
计算节点#
这些节点对数据执行计算,并可能转换或重塑数据
排列节点#
这些节点重新排序、组合或切片数据流
接收器节点#
这些节点终止计划。用户通常不会创建接收器节点,因为它们是根据用于使用计划的 DeclarationToXyz 方法选择的。但是,此列表可能对那些开发新的接收器节点或以高级方式使用 Acero 的人有用。
工厂名称 |
选项 |
简要描述 |
---|---|---|
|
使用可选的背压将批处理收集到 FIFO 队列中 |
|
|
将批处理写入文件系统(示例) |
|
|
使用用户提供的回调函数使用批处理 |
|
|
将批处理收集到 |
|
|
已弃用 |
|
|
已弃用 |
示例#
本文档的其余部分包含示例执行计划。每个示例都突出显示了特定执行节点的行为。
source
#
source
操作可以被视为创建流式执行计划的入口点。 SourceNodeOptions
用于创建 source
操作。source
操作是当前可用最通用、最灵活的源类型,但配置起来可能相当棘手。首先,您应该查看其他源节点类型,以确保没有更简单的选择。
源节点需要某种可以调用以轮询更多数据的函数。此函数不应带参数,并且应返回 arrow::Future<std::optional<arrow::ExecBatch>>
。此函数可能正在读取文件、迭代内存中的结构或从网络连接接收数据。arrow 库将这些函数称为 arrow::AsyncGenerator
,并且有许多实用程序可用于处理这些函数。对于此示例,我们使用已存储在内存中的记录批处理向量。此外,必须预先知道数据的模式。Acero 必须在任何处理开始之前了解执行图每个阶段的数据模式。这意味着我们必须单独提供源节点的模式,而不是数据本身。
这里我们定义一个结构来保存数据生成器定义。这包括内存中的批处理、模式和充当数据生成器的函数
156struct BatchesWithSchema {
157 std::vector<cp::ExecBatch> batches;
158 std::shared_ptr<arrow::Schema> schema;
159 // This method uses internal arrow utilities to
160 // convert a vector of record batches to an AsyncGenerator of optional batches
161 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
162 auto opt_batches = ::arrow::internal::MapVector(
163 [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
164 batches);
165 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
166 gen = arrow::MakeVectorGenerator(std::move(opt_batches));
167 return gen;
168 }
169};
生成用于计算的示例批处理
173arrow::Result<BatchesWithSchema> MakeBasicBatches() {
174 BatchesWithSchema out;
175 auto field_vector = {arrow::field("a", arrow::int32()),
176 arrow::field("b", arrow::boolean())};
177 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
178 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
179 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
180
181 ARROW_ASSIGN_OR_RAISE(auto b1_bool,
182 GetArrayDataSample<arrow::BooleanType>({false, true}));
183 ARROW_ASSIGN_OR_RAISE(auto b2_bool,
184 GetArrayDataSample<arrow::BooleanType>({true, false, true}));
185 ARROW_ASSIGN_OR_RAISE(auto b3_bool,
186 GetArrayDataSample<arrow::BooleanType>({false, true, false}));
187
188 ARROW_ASSIGN_OR_RAISE(auto b1,
189 GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
190 ARROW_ASSIGN_OR_RAISE(auto b2,
191 GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
192 ARROW_ASSIGN_OR_RAISE(auto b3,
193 GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
194
195 out.batches = {b1, b2, b3};
196 out.schema = arrow::schema(field_vector);
197 return out;
198}
使用 source
的示例(sink 的用法在 中详细解释)
294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom source is often not needed. In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309 ac::Declaration source{"source", std::move(source_node_options)};
310
311 return ExecutePlanAndCollectAsTable(std::move(source));
312}
table_source
#
在之前的示例中,使用了源节点来输入数据。但是在开发应用程序时,如果数据已经在内存中以表格的形式存在,使用TableSourceNodeOptions
会更加便捷,并且性能更高。在这里,输入数据可以作为std::shared_ptr<arrow::Table>
以及max_batch_size
一起传递。max_batch_size
用于将大的记录批次分解,以便可以并行处理。需要注意的是,当源表的批次大小较小时,表批次不会合并成更大的批次。
table_source
使用示例
317/// \brief An example showing a table source node
318///
319/// TableSource-Table Example
320/// This example shows how a table_source can be used
321/// in an execution plan. This includes a table source node
322/// receiving data from a table. This plan simply collects the
323/// data back into a table but nodes could be added that modify
324/// or transform the data as well (as is shown in later examples)
325arrow::Status TableSourceSinkExample() {
326 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
327
328 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
329 int max_batch_size = 2;
330 auto table_source_options = ac::TableSourceNodeOptions{table, max_batch_size};
331
332 ac::Declaration source{"table_source", std::move(table_source_options)};
333
334 return ExecutePlanAndCollectAsTable(std::move(source));
335}
filter
#
顾名思义,filter
操作提供了一个定义数据过滤条件的选项。它选择给定表达式计算结果为 true 的行。可以使用arrow::compute::Expression
编写过滤器,并且表达式的返回类型应为布尔值。例如,如果我们希望保留列b
的值大于 3 的行,则可以使用以下表达式。
Filter 示例
340/// \brief An example showing a filter node
341///
342/// Source-Filter-Table
343/// This example shows how a filter can be used in an execution plan,
344/// to filter data from a source. The output from the execution plan
345/// is collected into a table.
346arrow::Status ScanFilterSinkExample() {
347 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
348
349 auto options = std::make_shared<arrow::dataset::ScanOptions>();
350 // specify the filter. This filter removes all rows where the
351 // value of the "a" column is greater than 3.
352 cp::Expression filter_expr = cp::greater(cp::field_ref("a"), cp::literal(3));
353 // set filter for scanner : on-disk / push-down filtering.
354 // This step can be skipped if you are not reading from disk.
355 options->filter = filter_expr;
356 // empty projection
357 options->projection = cp::project({}, {});
358
359 // construct the scan node
360 std::cout << "Initialized Scanning Options" << std::endl;
361
362 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
363 std::cout << "Scan node options created" << std::endl;
364
365 ac::Declaration scan{"scan", std::move(scan_node_options)};
366
367 // pipe the scan node into the filter node
368 // Need to set the filter in scan node options and filter node options.
369 // At scan node it is used for on-disk / push-down filtering.
370 // At filter node it is used for in-memory filtering.
371 ac::Declaration filter{
372 "filter", {std::move(scan)}, ac::FilterNodeOptions(std::move(filter_expr))};
373
374 return ExecutePlanAndCollectAsTable(std::move(filter));
375}
project
#
project
操作重新排列、删除、转换和创建列。每个输出列都是通过针对源记录批次计算表达式来计算的。这些必须是标量表达式(由标量文字、字段引用和标量函数组成的表达式,即为每个输入行返回一个值且与所有其他行的值无关的元素级函数)。这通过ProjectNodeOptions
公开,它需要每个输出列的arrow::compute::Expression
和名称(如果没有提供名称,将使用表达式字符串表示)。
Project 示例
381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390 auto options = std::make_shared<arrow::dataset::ScanOptions>();
391 // projection
392 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393 options->projection = cp::project({}, {});
394
395 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397 ac::Declaration scan{"scan", std::move(scan_node_options)};
398 ac::Declaration project{
399 "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401 return ExecutePlanAndCollectAsTable(std::move(project));
402}
aggregate
#
aggregate
节点计算各种类型的数据聚合。
Arrow 支持两种类型的聚合:“标量”聚合和“哈希”聚合。标量聚合将数组或标量输入减少到单个标量输出(例如,计算列的平均值)。哈希聚合类似于 SQL 中的GROUP BY
,首先根据一个或多个键列对数据进行分区,然后减少每个分区中的数据。aggregate
节点支持这两种类型的计算,并且可以一次计算任意数量的聚合。
AggregateNodeOptions
用于定义聚合条件。它接受聚合函数及其选项的列表;要聚合的目标字段列表,每个函数对应一个;以及输出字段名称列表,每个函数对应一个。可选地,它接受用于对数据进行分区的列列表,在哈希聚合的情况下。聚合函数可以从此聚合函数列表中选择。
注意
此节点是“流水线断路器”,它将完全物化内存中的数据集。将来会添加溢出机制来缓解此约束。
聚合可以提供结果作为组或标量。例如,像hash_count
这样的操作提供每个唯一记录的计数作为分组结果,而像sum
这样的操作提供单个记录。
标量聚合示例
430/// \brief An example showing an aggregation node to aggregate an entire table
431///
432/// Source-Aggregation-Table
433/// This example shows how an aggregation operation can be applied on a
434/// execution plan resulting in a scalar output. The source node loads the
435/// data and the aggregation (counting unique types in column 'a')
436/// is applied on this data. The output is collected into a table (that will
437/// have exactly one row)
438arrow::Status SourceScalarAggregateSinkExample() {
439 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
440
441 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
442
443 ac::Declaration source{"source", std::move(source_node_options)};
444 auto aggregate_options =
445 ac::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
446 ac::Declaration aggregate{
447 "aggregate", {std::move(source)}, std::move(aggregate_options)};
448
449 return ExecutePlanAndCollectAsTable(std::move(aggregate));
450}
分组聚合示例
455/// \brief An example showing an aggregation node to perform a group-by operation
456///
457/// Source-Aggregation-Table
458/// This example shows how an aggregation operation can be applied on a
459/// execution plan resulting in grouped output. The source node loads the
460/// data and the aggregation (counting unique types in column 'a') is
461/// applied on this data. The output is collected into a table that will contain
462/// one row for each unique combination of group keys.
463arrow::Status SourceGroupAggregateSinkExample() {
464 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
465
466 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
467
468 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
469
470 ac::Declaration source{"source", std::move(source_node_options)};
471 auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
472 auto aggregate_options =
473 ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
474 /*keys=*/{"b"}};
475 ac::Declaration aggregate{
476 "aggregate", {std::move(source)}, std::move(aggregate_options)};
477
478 return ExecutePlanAndCollectAsTable(std::move(aggregate));
479}
sink
#
sink
操作提供输出,并且是流式执行定义的最终节点。SinkNodeOptions
接口用于传递所需的选项。与源操作符类似,接收操作符通过一个函数公开输出,该函数在每次调用时返回一个记录批次期货。预期调用者将重复调用此函数,直到生成器函数耗尽(返回std::optional::nullopt
)。如果此函数调用频率不够高,则记录批次将在内存中累积。执行计划应该只有一个“终端”节点(一个接收节点)。ExecPlan
可能由于取消或错误而在输出被完全使用之前提前终止。但是,可以安全地独立于接收器销毁计划,接收器将通过exec_plan->finished()
持有未使用的批次。
作为源示例的一部分,还包括接收操作;
294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom source is often not needed. In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309 ac::Declaration source{"source", std::move(source_node_options)};
310
311 return ExecutePlanAndCollectAsTable(std::move(source));
312}
consuming_sink
#
consuming_sink
操作符是一个在执行计划中包含使用操作的接收操作(即,在使用完成之前,执行计划不应完成)。与sink
节点不同,此节点接受一个回调函数,该函数预期使用批次。一旦此回调完成,执行计划将不再持有对批次的任何引用。可以使用函数在之前的调用完成之前调用。如果使用函数运行速度不够快,则可能会堆积许多并发执行,从而阻塞 CPU 线程池。执行计划在所有使用函数回调完成之前不会标记为完成。一旦所有批次都已交付,执行计划将等待finish
期货完成,然后再将执行计划标记为完成。这允许在使用函数将批次转换为异步任务的工作流(当前在内部为数据集写入节点完成)。
示例
// define a Custom SinkNodeConsumer
std::atomic<uint32_t> batches_seen{0};
arrow::Future<> finish = arrow::Future<>::Make();
struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {
CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish):
batches_seen(batches_seen), finish(std::move(finish)) {}
// Consumption logic can be written here
arrow::Status Consume(cp::ExecBatch batch) override {
// data can be consumed in the expected way
// transfer to another system or just do some work
// and write to disk
(*batches_seen)++;
return arrow::Status::OK();
}
arrow::Future<> Finish() override { return finish; }
std::atomic<uint32_t> *batches_seen;
arrow::Future<> finish;
};
std::shared_ptr<CustomSinkNodeConsumer> consumer =
std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
arrow::acero::ExecNode *consuming_sink;
ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(),
{source}, cp::ConsumingSinkNodeOptions(consumer)));
Consuming-Sink 示例
484/// \brief An example showing a consuming sink node
485///
486/// Source-Consuming-Sink
487/// This example shows how the data can be consumed within the execution plan
488/// by using a ConsumingSink node. There is no data output from this execution plan.
489arrow::Status SourceConsumingSinkExample() {
490 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
491
492 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
493
494 ac::Declaration source{"source", std::move(source_node_options)};
495
496 std::atomic<uint32_t> batches_seen{0};
497 arrow::Future<> finish = arrow::Future<>::Make();
498 struct CustomSinkNodeConsumer : public ac::SinkNodeConsumer {
499 CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
500 : batches_seen(batches_seen), finish(std::move(finish)) {}
501
502 arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
503 ac::BackpressureControl* backpressure_control,
504 ac::ExecPlan* plan) override {
505 // This will be called as the plan is started (before the first call to Consume)
506 // and provides the schema of the data coming into the node, controls for pausing /
507 // resuming input, and a pointer to the plan itself which can be used to access
508 // other utilities such as the thread indexer or async task scheduler.
509 return arrow::Status::OK();
510 }
511
512 arrow::Status Consume(cp::ExecBatch batch) override {
513 (*batches_seen)++;
514 return arrow::Status::OK();
515 }
516
517 arrow::Future<> Finish() override {
518 // Here you can perform whatever (possibly async) cleanup is needed, e.g. closing
519 // output file handles and flushing remaining work
520 return arrow::Future<>::MakeFinished();
521 }
522
523 std::atomic<uint32_t>* batches_seen;
524 arrow::Future<> finish;
525 };
526 std::shared_ptr<CustomSinkNodeConsumer> consumer =
527 std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
528
529 ac::Declaration consuming_sink{"consuming_sink",
530 {std::move(source)},
531 ac::ConsumingSinkNodeOptions(std::move(consumer))};
532
533 // Since we are consuming the data within the plan there is no output and we simply
534 // run the plan to completion instead of collecting into a table.
535 ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));
536
537 std::cout << "The consuming sink node saw " << batches_seen.load() << " batches"
538 << std::endl;
539 return arrow::Status::OK();
540}
order_by_sink
#
order_by_sink
操作是sink
操作的扩展。此操作能够通过提供OrderBySinkNodeOptions
来保证流的顺序。这里提供arrow::compute::SortOptions
来定义哪些列用于排序以及是按升序还是降序值排序。
注意
此节点是“流水线断路器”,它将完全物化内存中的数据集。将来会添加溢出机制来缓解此约束。
Order-By-Sink 示例
545arrow::Status ExecutePlanAndCollectAsTableWithCustomSink(
546 std::shared_ptr<ac::ExecPlan> plan, std::shared_ptr<arrow::Schema> schema,
547 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
548 // translate sink_gen (async) to sink_reader (sync)
549 std::shared_ptr<arrow::RecordBatchReader> sink_reader =
550 ac::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
551
552 // validate the ExecPlan
553 ARROW_RETURN_NOT_OK(plan->Validate());
554 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
555 // start the ExecPlan
556 plan->StartProducing();
557
558 // collect sink_reader into a Table
559 std::shared_ptr<arrow::Table> response_table;
560
561 ARROW_ASSIGN_OR_RAISE(response_table,
562 arrow::Table::FromRecordBatchReader(sink_reader.get()));
563
564 std::cout << "Results : " << response_table->ToString() << std::endl;
565
566 // stop producing
567 plan->StopProducing();
568 // plan mark finished
569 auto future = plan->finished();
570 return future.status();
571}
572
573/// \brief An example showing an order-by node
574///
575/// Source-OrderBy-Sink
576/// In this example, the data enters through the source node
577/// and the data is ordered in the sink node. The order can be
578/// ASCENDING or DESCENDING and it is configurable. The output
579/// is obtained as a table from the sink node.
580arrow::Status SourceOrderBySinkExample() {
581 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
582 ac::ExecPlan::Make(*cp::threaded_exec_context()));
583
584 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
585
586 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
587
588 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
589 ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
590 ac::MakeExecNode("source", plan.get(), {}, source_node_options));
591
592 ARROW_RETURN_NOT_OK(ac::MakeExecNode(
593 "order_by_sink", plan.get(), {source},
594 ac::OrderBySinkNodeOptions{
595 cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
596
597 return ExecutePlanAndCollectAsTableWithCustomSink(plan, basic_data.schema, sink_gen);
598}
select_k_sink
#
select_k_sink
选项可以实现选择顶部/底部 K 个元素的功能,类似于 SQL ORDER BY ... LIMIT K
子句。SelectKOptions
是通过使用OrderBySinkNode
定义来定义的。此选项返回一个接收节点,该节点接收输入然后计算 top_k/bottom_k。
注意
此节点是“流水线断路器”,它将完全物化内存中的输入。将来会添加溢出机制来缓解此约束。
SelectK 示例
631/// \brief An example showing a select-k node
632///
633/// Source-KSelect
634/// This example shows how K number of elements can be selected
635/// either from the top or bottom. The output node is a modified
636/// sink node where output can be obtained as a table.
637arrow::Status SourceKSelectExample() {
638 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
639 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
640 ac::ExecPlan::Make(*cp::threaded_exec_context()));
641 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
642
643 ARROW_ASSIGN_OR_RAISE(
644 ac::ExecNode * source,
645 ac::MakeExecNode("source", plan.get(), {},
646 ac::SourceNodeOptions{input.schema, input.gen()}));
647
648 cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
649
650 ARROW_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink", plan.get(), {source},
651 ac::SelectKSinkNodeOptions{options, &sink_gen}));
652
653 auto schema = arrow::schema(
654 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
655
656 return ExecutePlanAndCollectAsTableWithCustomSink(plan, schema, sink_gen);
657}
table_sink
#
table_sink
节点能够以内存表的形式接收输出。这比流式执行引擎提供的其他接收节点更易于使用,但仅当输出可以轻松容纳在内存中时才有意义。该节点是使用TableSinkNodeOptions
创建的。
table_sink
使用示例
749/// \brief An example showing a table sink node
750///
751/// TableSink Example
752/// This example shows how a table_sink can be used
753/// in an execution plan. This includes a source node
754/// receiving data as batches and the table sink node
755/// which emits the output as a table.
756arrow::Status TableSinkExample() {
757 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
758 ac::ExecPlan::Make(*cp::threaded_exec_context()));
759
760 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
761
762 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
763
764 ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
765 ac::MakeExecNode("source", plan.get(), {}, source_node_options));
766
767 std::shared_ptr<arrow::Table> output_table;
768 auto table_sink_options = ac::TableSinkNodeOptions{&output_table};
769
770 ARROW_RETURN_NOT_OK(
771 ac::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
772 // validate the ExecPlan
773 ARROW_RETURN_NOT_OK(plan->Validate());
774 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
775 // start the ExecPlan
776 plan->StartProducing();
777
778 // Wait for the plan to finish
779 auto finished = plan->finished();
780 RETURN_NOT_OK(finished.status());
781 std::cout << "Results : " << output_table->ToString() << std::endl;
782 return arrow::Status::OK();
783}
scan
#
scan
操作用于加载和处理数据集。当输入是数据集时,应优先使用它而不是更通用的source
节点。行为是使用arrow::dataset::ScanNodeOptions
定义的。有关数据集和各种扫描选项的更多信息,请参见表格数据集。
此节点能够将下推过滤器应用于文件读取器,从而减少需要读取的数据量。这意味着您可以向扫描节点提供与提供给 FilterNode 相同的过滤器表达式,因为过滤是在两个不同的地方完成的。
Scan 示例
271/// \brief An example demonstrating a scan and sink node
272///
273/// Scan-Table
274/// This example shows how scan operation can be applied on a dataset.
275/// There are operations that can be applied on the scan (project, filter)
276/// and the input data can be processed. The output is obtained as a table
277arrow::Status ScanSinkExample() {
278 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
279
280 auto options = std::make_shared<arrow::dataset::ScanOptions>();
281 options->projection = cp::project({}, {}); // create empty projection
282
283 // construct the scan node
284 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
285
286 ac::Declaration scan{"scan", std::move(scan_node_options)};
287
288 return ExecutePlanAndCollectAsTable(std::move(scan));
289}
write
#
write
节点使用 Arrow 中的表格数据集功能,将查询结果保存为 Parquet、Feather、CSV 等格式的数据集文件。写入选项通过 arrow::dataset::WriteNodeOptions
提供,该选项又包含 arrow::dataset::FileSystemDatasetWriteOptions
。arrow::dataset::FileSystemDatasetWriteOptions
提供对写入数据集的控制,包括输出目录、文件命名方案等选项。
写入示例
663/// \brief An example showing a write node
664/// \param file_path The destination to write to
665///
666/// Scan-Filter-Write
667/// This example shows how scan node can be used to load the data
668/// and after processing how it can be written to disk.
669arrow::Status ScanFilterWriteExample(const std::string& file_path) {
670 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
671
672 auto options = std::make_shared<arrow::dataset::ScanOptions>();
673 // empty projection
674 options->projection = cp::project({}, {});
675
676 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
677
678 ac::Declaration scan{"scan", std::move(scan_node_options)};
679
680 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
681
682 std::string root_path = "";
683 std::string uri = "file://" + file_path;
684 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
685 arrow::fs::FileSystemFromUri(uri, &root_path));
686
687 auto base_path = root_path + "/parquet_dataset";
688 // Uncomment the following line, if run repeatedly
689 // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
690 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
691
692 // The partition schema determines which fields are part of the partitioning.
693 auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
694 // We'll use Hive-style partitioning,
695 // which creates directories with "key=value" pairs.
696
697 auto partitioning =
698 std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
699 // We'll write Parquet files.
700 auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
701
702 arrow::dataset::FileSystemDatasetWriteOptions write_options;
703 write_options.file_write_options = format->DefaultWriteOptions();
704 write_options.filesystem = filesystem;
705 write_options.base_dir = base_path;
706 write_options.partitioning = partitioning;
707 write_options.basename_template = "part{i}.parquet";
708
709 arrow::dataset::WriteNodeOptions write_node_options{write_options};
710
711 ac::Declaration write{"write", {std::move(scan)}, std::move(write_node_options)};
712
713 // Since the write node has no output we simply run the plan to completion and the
714 // data should be written
715 ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));
716
717 std::cout << "Dataset written to " << base_path << std::endl;
718 return arrow::Status::OK();
719}
union
#
union
将具有相同模式的多个数据流合并为一个,类似于 SQL UNION ALL
子句。
以下示例演示了如何使用两个数据源来实现这一点。
Union 示例
725/// \brief An example showing a union node
726///
727/// Source-Union-Table
728/// This example shows how a union operation can be applied on two
729/// data sources. The output is collected into a table.
730arrow::Status SourceUnionSinkExample() {
731 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
732
733 ac::Declaration lhs{"source",
734 ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
735 lhs.label = "lhs";
736 ac::Declaration rhs{"source",
737 ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
738 rhs.label = "rhs";
739 ac::Declaration union_plan{
740 "union", {std::move(lhs), std::move(rhs)}, ac::ExecNodeOptions{}};
741
742 return ExecutePlanAndCollectAsTable(std::move(union_plan));
743}
hash_join
#
hash_join
操作提供关系代数运算,使用基于哈希的算法进行连接。HashJoinNodeOptions
包含定义连接所需的选项。hash_join 支持左/右/全半/反/外连接。还可以通过连接选项设置连接键(即要连接的列)和后缀(即后缀词,例如“_x”,可以将其附加为左右关系中重复的列名的后缀)。阅读有关哈希连接的更多信息。
哈希连接示例
604/// \brief An example showing a hash join node
605///
606/// Source-HashJoin-Table
607/// This example shows how source node gets the data and how a self-join
608/// is applied on the data. The join options are configurable. The output
609/// is collected into a table.
610arrow::Status SourceHashJoinSinkExample() {
611 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
612
613 ac::Declaration left{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
614 ac::Declaration right{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
615
616 ac::HashJoinNodeOptions join_opts{
617 ac::JoinType::INNER,
618 /*left_keys=*/{"str"},
619 /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
620
621 ac::Declaration hashjoin{
622 "hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};
623
624 return ExecutePlanAndCollectAsTable(std::move(hashjoin));
625}
总结#
在 Arrow 源代码的 cpp/examples/arrow/execution_plan_documentation_examples.cc
中可以找到这些节点的示例。
完整示例
19#include <arrow/array.h>
20#include <arrow/builder.h>
21
22#include <arrow/acero/exec_plan.h>
23#include <arrow/compute/api.h>
24#include <arrow/compute/api_vector.h>
25#include <arrow/compute/cast.h>
26
27#include <arrow/csv/api.h>
28
29#include <arrow/dataset/dataset.h>
30#include <arrow/dataset/file_base.h>
31#include <arrow/dataset/file_parquet.h>
32#include <arrow/dataset/plan.h>
33#include <arrow/dataset/scanner.h>
34
35#include <arrow/io/interfaces.h>
36#include <arrow/io/memory.h>
37
38#include <arrow/result.h>
39#include <arrow/status.h>
40#include <arrow/table.h>
41
42#include <arrow/ipc/api.h>
43
44#include <arrow/util/future.h>
45#include <arrow/util/range.h>
46#include <arrow/util/thread_pool.h>
47#include <arrow/util/vector.h>
48
49#include <iostream>
50#include <memory>
51#include <utility>
52
53// Demonstrate various operators in Arrow Streaming Execution Engine
54
55namespace cp = ::arrow::compute;
56namespace ac = ::arrow::acero;
57
58constexpr char kSep[] = "******";
59
60void PrintBlock(const std::string& msg) {
61 std::cout << "\n\t" << kSep << " " << msg << " " << kSep << "\n" << std::endl;
62}
63
64template <typename TYPE,
65 typename = typename std::enable_if<arrow::is_number_type<TYPE>::value |
66 arrow::is_boolean_type<TYPE>::value |
67 arrow::is_temporal_type<TYPE>::value>::type>
68arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(
69 const std::vector<typename TYPE::c_type>& values) {
70 using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
71 ArrowBuilderType builder;
72 ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
73 ARROW_RETURN_NOT_OK(builder.AppendValues(values));
74 return builder.Finish();
75}
76
77template <class TYPE>
78arrow::Result<std::shared_ptr<arrow::Array>> GetBinaryArrayDataSample(
79 const std::vector<std::string>& values) {
80 using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
81 ArrowBuilderType builder;
82 ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
83 ARROW_RETURN_NOT_OK(builder.AppendValues(values));
84 return builder.Finish();
85}
86
87arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch(
88 const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) {
89 std::shared_ptr<arrow::RecordBatch> record_batch;
90 ARROW_ASSIGN_OR_RAISE(auto struct_result,
91 arrow::StructArray::Make(array_vector, field_vector));
92 return record_batch->FromStructArray(struct_result);
93}
94
95/// \brief Create a sample table
96/// The table's contents will be:
97/// a,b
98/// 1,null
99/// 2,true
100/// null,true
101/// 3,false
102/// null,true
103/// 4,false
104/// 5,null
105/// 6,false
106/// 7,false
107/// 8,true
108/// \return The created table
109
110arrow::Result<std::shared_ptr<arrow::Table>> GetTable() {
111 auto null_long = std::numeric_limits<int64_t>::quiet_NaN();
112 ARROW_ASSIGN_OR_RAISE(auto int64_array,
113 GetArrayDataSample<arrow::Int64Type>(
114 {1, 2, null_long, 3, null_long, 4, 5, 6, 7, 8}));
115
116 arrow::BooleanBuilder boolean_builder;
117 std::shared_ptr<arrow::BooleanArray> bool_array;
118
119 std::vector<uint8_t> bool_values = {false, true, true, false, true,
120 false, false, false, false, true};
121 std::vector<bool> is_valid = {false, true, true, true, true,
122 true, false, true, true, true};
123
124 ARROW_RETURN_NOT_OK(boolean_builder.Reserve(10));
125
126 ARROW_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values, is_valid));
127
128 ARROW_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));
129
130 auto record_batch =
131 arrow::RecordBatch::Make(arrow::schema({arrow::field("a", arrow::int64()),
132 arrow::field("b", arrow::boolean())}),
133 10, {int64_array, bool_array});
134 ARROW_ASSIGN_OR_RAISE(auto table, arrow::Table::FromRecordBatches({record_batch}));
135 return table;
136}
137
138/// \brief Create a sample dataset
139/// \return An in-memory dataset based on GetTable()
140arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> GetDataset() {
141 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
142 auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(table);
143 return ds;
144}
145
146arrow::Result<cp::ExecBatch> GetExecBatchFromVectors(
147 const arrow::FieldVector& field_vector, const arrow::ArrayVector& array_vector) {
148 std::shared_ptr<arrow::RecordBatch> record_batch;
149 ARROW_ASSIGN_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, field_vector));
150 cp::ExecBatch batch{*res_batch};
151 return batch;
152}
153
154// (Doc section: BatchesWithSchema Definition)
155struct BatchesWithSchema {
156 std::vector<cp::ExecBatch> batches;
157 std::shared_ptr<arrow::Schema> schema;
158 // This method uses internal arrow utilities to
159 // convert a vector of record batches to an AsyncGenerator of optional batches
160 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
161 auto opt_batches = ::arrow::internal::MapVector(
162 [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
163 batches);
164 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
165 gen = arrow::MakeVectorGenerator(std::move(opt_batches));
166 return gen;
167 }
168};
169// (Doc section: BatchesWithSchema Definition)
170
171// (Doc section: MakeBasicBatches Definition)
172arrow::Result<BatchesWithSchema> MakeBasicBatches() {
173 BatchesWithSchema out;
174 auto field_vector = {arrow::field("a", arrow::int32()),
175 arrow::field("b", arrow::boolean())};
176 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
177 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
178 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
179
180 ARROW_ASSIGN_OR_RAISE(auto b1_bool,
181 GetArrayDataSample<arrow::BooleanType>({false, true}));
182 ARROW_ASSIGN_OR_RAISE(auto b2_bool,
183 GetArrayDataSample<arrow::BooleanType>({true, false, true}));
184 ARROW_ASSIGN_OR_RAISE(auto b3_bool,
185 GetArrayDataSample<arrow::BooleanType>({false, true, false}));
186
187 ARROW_ASSIGN_OR_RAISE(auto b1,
188 GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
189 ARROW_ASSIGN_OR_RAISE(auto b2,
190 GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
191 ARROW_ASSIGN_OR_RAISE(auto b3,
192 GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
193
194 out.batches = {b1, b2, b3};
195 out.schema = arrow::schema(field_vector);
196 return out;
197}
198// (Doc section: MakeBasicBatches Definition)
199
200arrow::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
201 BatchesWithSchema out;
202 auto field = arrow::field("a", arrow::int32());
203 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({1, 3, 0, 2}));
204 ARROW_ASSIGN_OR_RAISE(auto b2_int,
205 GetArrayDataSample<arrow::Int32Type>({121, 101, 120, 12}));
206 ARROW_ASSIGN_OR_RAISE(auto b3_int,
207 GetArrayDataSample<arrow::Int32Type>({10, 110, 210, 121}));
208 ARROW_ASSIGN_OR_RAISE(auto b4_int,
209 GetArrayDataSample<arrow::Int32Type>({51, 101, 2, 34}));
210 ARROW_ASSIGN_OR_RAISE(auto b5_int,
211 GetArrayDataSample<arrow::Int32Type>({11, 31, 1, 12}));
212 ARROW_ASSIGN_OR_RAISE(auto b6_int,
213 GetArrayDataSample<arrow::Int32Type>({12, 101, 120, 12}));
214 ARROW_ASSIGN_OR_RAISE(auto b7_int,
215 GetArrayDataSample<arrow::Int32Type>({0, 110, 210, 11}));
216 ARROW_ASSIGN_OR_RAISE(auto b8_int,
217 GetArrayDataSample<arrow::Int32Type>({51, 10, 2, 3}));
218
219 ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
220 ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
221 ARROW_ASSIGN_OR_RAISE(auto b3,
222 GetExecBatchFromVectors({field, field}, {b3_int, b8_int}));
223 ARROW_ASSIGN_OR_RAISE(auto b4,
224 GetExecBatchFromVectors({field, field, field, field},
225 {b4_int, b5_int, b6_int, b7_int}));
226 out.batches = {b1, b2, b3, b4};
227 out.schema = arrow::schema({field});
228 return out;
229}
230
231arrow::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
232 BatchesWithSchema out;
233 auto fields = {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())};
234 ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({12, 7, 3}));
235 ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({-2, -1, 3}));
236 ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({5, 3, -8}));
237 ARROW_ASSIGN_OR_RAISE(auto b1_str, GetBinaryArrayDataSample<arrow::StringType>(
238 {"alpha", "beta", "alpha"}));
239 ARROW_ASSIGN_OR_RAISE(auto b2_str, GetBinaryArrayDataSample<arrow::StringType>(
240 {"alpha", "gamma", "alpha"}));
241 ARROW_ASSIGN_OR_RAISE(auto b3_str, GetBinaryArrayDataSample<arrow::StringType>(
242 {"gamma", "beta", "alpha"}));
243 ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, b1_str}));
244 ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, b2_str}));
245 ARROW_ASSIGN_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, b3_str}));
246 out.batches = {b1, b2, b3};
247
248 size_t batch_count = out.batches.size();
249 for (int repeat = 1; repeat < multiplicity; ++repeat) {
250 for (size_t i = 0; i < batch_count; ++i) {
251 out.batches.push_back(out.batches[i]);
252 }
253 }
254
255 out.schema = arrow::schema(fields);
256 return out;
257}
258
259arrow::Status ExecutePlanAndCollectAsTable(ac::Declaration plan) {
260 // collect sink_reader into a Table
261 std::shared_ptr<arrow::Table> response_table;
262 ARROW_ASSIGN_OR_RAISE(response_table, ac::DeclarationToTable(std::move(plan)));
263
264 std::cout << "Results : " << response_table->ToString() << std::endl;
265
266 return arrow::Status::OK();
267}
268
269// (Doc section: Scan Example)
270
271/// \brief An example demonstrating a scan and sink node
272///
273/// Scan-Table
274/// This example shows how scan operation can be applied on a dataset.
275/// There are operations that can be applied on the scan (project, filter)
276/// and the input data can be processed. The output is obtained as a table
277arrow::Status ScanSinkExample() {
278 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
279
280 auto options = std::make_shared<arrow::dataset::ScanOptions>();
281 options->projection = cp::project({}, {}); // create empty projection
282
283 // construct the scan node
284 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
285
286 ac::Declaration scan{"scan", std::move(scan_node_options)};
287
288 return ExecutePlanAndCollectAsTable(std::move(scan));
289}
290// (Doc section: Scan Example)
291
292// (Doc section: Source Example)
293
294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom source is often not needed. In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309 ac::Declaration source{"source", std::move(source_node_options)};
310
311 return ExecutePlanAndCollectAsTable(std::move(source));
312}
313// (Doc section: Source Example)
314
315// (Doc section: Table Source Example)
316
317/// \brief An example showing a table source node
318///
319/// TableSource-Table Example
320/// This example shows how a table_source can be used
321/// in an execution plan. This includes a table source node
322/// receiving data from a table. This plan simply collects the
323/// data back into a table but nodes could be added that modify
324/// or transform the data as well (as is shown in later examples)
325arrow::Status TableSourceSinkExample() {
326 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
327
328 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
329 int max_batch_size = 2;
330 auto table_source_options = ac::TableSourceNodeOptions{table, max_batch_size};
331
332 ac::Declaration source{"table_source", std::move(table_source_options)};
333
334 return ExecutePlanAndCollectAsTable(std::move(source));
335}
336// (Doc section: Table Source Example)
337
338// (Doc section: Filter Example)
339
340/// \brief An example showing a filter node
341///
342/// Source-Filter-Table
343/// This example shows how a filter can be used in an execution plan,
344/// to filter data from a source. The output from the execution plan
345/// is collected into a table.
346arrow::Status ScanFilterSinkExample() {
347 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
348
349 auto options = std::make_shared<arrow::dataset::ScanOptions>();
350 // specify the filter. This filter removes all rows where the
351 // value of the "a" column is greater than 3.
352 cp::Expression filter_expr = cp::greater(cp::field_ref("a"), cp::literal(3));
353 // set filter for scanner : on-disk / push-down filtering.
354 // This step can be skipped if you are not reading from disk.
355 options->filter = filter_expr;
356 // empty projection
357 options->projection = cp::project({}, {});
358
359 // construct the scan node
360 std::cout << "Initialized Scanning Options" << std::endl;
361
362 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
363 std::cout << "Scan node options created" << std::endl;
364
365 ac::Declaration scan{"scan", std::move(scan_node_options)};
366
367 // pipe the scan node into the filter node
368 // Need to set the filter in scan node options and filter node options.
369 // At scan node it is used for on-disk / push-down filtering.
370 // At filter node it is used for in-memory filtering.
371 ac::Declaration filter{
372 "filter", {std::move(scan)}, ac::FilterNodeOptions(std::move(filter_expr))};
373
374 return ExecutePlanAndCollectAsTable(std::move(filter));
375}
376
377// (Doc section: Filter Example)
378
379// (Doc section: Project Example)
380
381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390 auto options = std::make_shared<arrow::dataset::ScanOptions>();
391 // projection
392 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393 options->projection = cp::project({}, {});
394
395 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397 ac::Declaration scan{"scan", std::move(scan_node_options)};
398 ac::Declaration project{
399 "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401 return ExecutePlanAndCollectAsTable(std::move(project));
402}
403
404// (Doc section: Project Example)
405
406// This is a variation of ScanProjectSinkExample introducing how to use the
407// Declaration::Sequence function
408arrow::Status ScanProjectSequenceSinkExample() {
409 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
410
411 auto options = std::make_shared<arrow::dataset::ScanOptions>();
412 // projection
413 cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
414 options->projection = cp::project({}, {});
415
416 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
417
418 // (Doc section: Project Sequence Example)
419 // Inputs do not have to be passed to the project node when using Sequence
420 ac::Declaration plan =
421 ac::Declaration::Sequence({{"scan", std::move(scan_node_options)},
422 {"project", ac::ProjectNodeOptions({a_times_2})}});
423 // (Doc section: Project Sequence Example)
424
425 return ExecutePlanAndCollectAsTable(std::move(plan));
426}
427
428// (Doc section: Scalar Aggregate Example)
429
430/// \brief An example showing an aggregation node to aggregate an entire table
431///
432/// Source-Aggregation-Table
433/// This example shows how an aggregation operation can be applied on a
434/// execution plan resulting in a scalar output. The source node loads the
435/// data and the aggregation (counting unique types in column 'a')
436/// is applied on this data. The output is collected into a table (that will
437/// have exactly one row)
438arrow::Status SourceScalarAggregateSinkExample() {
439 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
440
441 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
442
443 ac::Declaration source{"source", std::move(source_node_options)};
444 auto aggregate_options =
445 ac::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
446 ac::Declaration aggregate{
447 "aggregate", {std::move(source)}, std::move(aggregate_options)};
448
449 return ExecutePlanAndCollectAsTable(std::move(aggregate));
450}
451// (Doc section: Scalar Aggregate Example)
452
453// (Doc section: Group Aggregate Example)
454
455/// \brief An example showing an aggregation node to perform a group-by operation
456///
457/// Source-Aggregation-Table
458/// This example shows how an aggregation operation can be applied on a
459/// execution plan resulting in grouped output. The source node loads the
460/// data and the aggregation (counting unique types in column 'a') is
461/// applied on this data. The output is collected into a table that will contain
462/// one row for each unique combination of group keys.
463arrow::Status SourceGroupAggregateSinkExample() {
464 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
465
466 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
467
468 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
469
470 ac::Declaration source{"source", std::move(source_node_options)};
471 auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
472 auto aggregate_options =
473 ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
474 /*keys=*/{"b"}};
475 ac::Declaration aggregate{
476 "aggregate", {std::move(source)}, std::move(aggregate_options)};
477
478 return ExecutePlanAndCollectAsTable(std::move(aggregate));
479}
480// (Doc section: Group Aggregate Example)
481
482// (Doc section: ConsumingSink Example)
483
484/// \brief An example showing a consuming sink node
485///
486/// Source-Consuming-Sink
487/// This example shows how the data can be consumed within the execution plan
488/// by using a ConsumingSink node. There is no data output from this execution plan.
489arrow::Status SourceConsumingSinkExample() {
490 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
491
492 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
493
494 ac::Declaration source{"source", std::move(source_node_options)};
495
496 std::atomic<uint32_t> batches_seen{0};
497 arrow::Future<> finish = arrow::Future<>::Make();
498 struct CustomSinkNodeConsumer : public ac::SinkNodeConsumer {
499 CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
500 : batches_seen(batches_seen), finish(std::move(finish)) {}
501
502 arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
503 ac::BackpressureControl* backpressure_control,
504 ac::ExecPlan* plan) override {
505 // This will be called as the plan is started (before the first call to Consume)
506 // and provides the schema of the data coming into the node, controls for pausing /
507 // resuming input, and a pointer to the plan itself which can be used to access
508 // other utilities such as the thread indexer or async task scheduler.
509 return arrow::Status::OK();
510 }
511
512 arrow::Status Consume(cp::ExecBatch batch) override {
513 (*batches_seen)++;
514 return arrow::Status::OK();
515 }
516
517 arrow::Future<> Finish() override {
518 // Here you can perform whatever (possibly async) cleanup is needed, e.g. closing
519 // output file handles and flushing remaining work
520 return arrow::Future<>::MakeFinished();
521 }
522
523 std::atomic<uint32_t>* batches_seen;
524 arrow::Future<> finish;
525 };
526 std::shared_ptr<CustomSinkNodeConsumer> consumer =
527 std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
528
529 ac::Declaration consuming_sink{"consuming_sink",
530 {std::move(source)},
531 ac::ConsumingSinkNodeOptions(std::move(consumer))};
532
533 // Since we are consuming the data within the plan there is no output and we simply
534 // run the plan to completion instead of collecting into a table.
535 ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));
536
537 std::cout << "The consuming sink node saw " << batches_seen.load() << " batches"
538 << std::endl;
539 return arrow::Status::OK();
540}
541// (Doc section: ConsumingSink Example)
542
543// (Doc section: OrderBySink Example)
544
545arrow::Status ExecutePlanAndCollectAsTableWithCustomSink(
546 std::shared_ptr<ac::ExecPlan> plan, std::shared_ptr<arrow::Schema> schema,
547 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
548 // translate sink_gen (async) to sink_reader (sync)
549 std::shared_ptr<arrow::RecordBatchReader> sink_reader =
550 ac::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
551
552 // validate the ExecPlan
553 ARROW_RETURN_NOT_OK(plan->Validate());
554 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
555 // start the ExecPlan
556 plan->StartProducing();
557
558 // collect sink_reader into a Table
559 std::shared_ptr<arrow::Table> response_table;
560
561 ARROW_ASSIGN_OR_RAISE(response_table,
562 arrow::Table::FromRecordBatchReader(sink_reader.get()));
563
564 std::cout << "Results : " << response_table->ToString() << std::endl;
565
566 // stop producing
567 plan->StopProducing();
568 // plan mark finished
569 auto future = plan->finished();
570 return future.status();
571}
572
573/// \brief An example showing an order-by node
574///
575/// Source-OrderBy-Sink
576/// In this example, the data enters through the source node
577/// and the data is ordered in the sink node. The order can be
578/// ASCENDING or DESCENDING and it is configurable. The output
579/// is obtained as a table from the sink node.
580arrow::Status SourceOrderBySinkExample() {
581 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
582 ac::ExecPlan::Make(*cp::threaded_exec_context()));
583
584 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
585
586 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
587
588 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
589 ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
590 ac::MakeExecNode("source", plan.get(), {}, source_node_options));
591
592 ARROW_RETURN_NOT_OK(ac::MakeExecNode(
593 "order_by_sink", plan.get(), {source},
594 ac::OrderBySinkNodeOptions{
595 cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
596
597 return ExecutePlanAndCollectAsTableWithCustomSink(plan, basic_data.schema, sink_gen);
598}
599
600// (Doc section: OrderBySink Example)
601
602// (Doc section: HashJoin Example)
603
604/// \brief An example showing a hash join node
605///
606/// Source-HashJoin-Table
607/// This example shows how source node gets the data and how a self-join
608/// is applied on the data. The join options are configurable. The output
609/// is collected into a table.
610arrow::Status SourceHashJoinSinkExample() {
611 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
612
613 ac::Declaration left{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
614 ac::Declaration right{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
615
616 ac::HashJoinNodeOptions join_opts{
617 ac::JoinType::INNER,
618 /*left_keys=*/{"str"},
619 /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
620
621 ac::Declaration hashjoin{
622 "hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};
623
624 return ExecutePlanAndCollectAsTable(std::move(hashjoin));
625}
626
627// (Doc section: HashJoin Example)
628
629// (Doc section: KSelect Example)
630
631/// \brief An example showing a select-k node
632///
633/// Source-KSelect
634/// This example shows how K number of elements can be selected
635/// either from the top or bottom. The output node is a modified
636/// sink node where output can be obtained as a table.
637arrow::Status SourceKSelectExample() {
638 ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
639 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
640 ac::ExecPlan::Make(*cp::threaded_exec_context()));
641 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
642
643 ARROW_ASSIGN_OR_RAISE(
644 ac::ExecNode * source,
645 ac::MakeExecNode("source", plan.get(), {},
646 ac::SourceNodeOptions{input.schema, input.gen()}));
647
648 cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
649
650 ARROW_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink", plan.get(), {source},
651 ac::SelectKSinkNodeOptions{options, &sink_gen}));
652
653 auto schema = arrow::schema(
654 {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
655
656 return ExecutePlanAndCollectAsTableWithCustomSink(plan, schema, sink_gen);
657}
658
659// (Doc section: KSelect Example)
660
661// (Doc section: Write Example)
662
663/// \brief An example showing a write node
664/// \param file_path The destination to write to
665///
666/// Scan-Filter-Write
667/// This example shows how scan node can be used to load the data
668/// and after processing how it can be written to disk.
669arrow::Status ScanFilterWriteExample(const std::string& file_path) {
670 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
671
672 auto options = std::make_shared<arrow::dataset::ScanOptions>();
673 // empty projection
674 options->projection = cp::project({}, {});
675
676 auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
677
678 ac::Declaration scan{"scan", std::move(scan_node_options)};
679
680 arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
681
682 std::string root_path = "";
683 std::string uri = "file://" + file_path;
684 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
685 arrow::fs::FileSystemFromUri(uri, &root_path));
686
687 auto base_path = root_path + "/parquet_dataset";
688 // Uncomment the following line, if run repeatedly
689 // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
690 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
691
692 // The partition schema determines which fields are part of the partitioning.
693 auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
694 // We'll use Hive-style partitioning,
695 // which creates directories with "key=value" pairs.
696
697 auto partitioning =
698 std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
699 // We'll write Parquet files.
700 auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
701
702 arrow::dataset::FileSystemDatasetWriteOptions write_options;
703 write_options.file_write_options = format->DefaultWriteOptions();
704 write_options.filesystem = filesystem;
705 write_options.base_dir = base_path;
706 write_options.partitioning = partitioning;
707 write_options.basename_template = "part{i}.parquet";
708
709 arrow::dataset::WriteNodeOptions write_node_options{write_options};
710
711 ac::Declaration write{"write", {std::move(scan)}, std::move(write_node_options)};
712
713 // Since the write node has no output we simply run the plan to completion and the
714 // data should be written
715 ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));
716
717 std::cout << "Dataset written to " << base_path << std::endl;
718 return arrow::Status::OK();
719}
720
721// (Doc section: Write Example)
722
723// (Doc section: Union Example)
724
725/// \brief An example showing a union node
726///
727/// Source-Union-Table
728/// This example shows how a union operation can be applied on two
729/// data sources. The output is collected into a table.
730arrow::Status SourceUnionSinkExample() {
731 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
732
733 ac::Declaration lhs{"source",
734 ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
735 lhs.label = "lhs";
736 ac::Declaration rhs{"source",
737 ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
738 rhs.label = "rhs";
739 ac::Declaration union_plan{
740 "union", {std::move(lhs), std::move(rhs)}, ac::ExecNodeOptions{}};
741
742 return ExecutePlanAndCollectAsTable(std::move(union_plan));
743}
744
745// (Doc section: Union Example)
746
747// (Doc section: Table Sink Example)
748
749/// \brief An example showing a table sink node
750///
751/// TableSink Example
752/// This example shows how a table_sink can be used
753/// in an execution plan. This includes a source node
754/// receiving data as batches and the table sink node
755/// which emits the output as a table.
756arrow::Status TableSinkExample() {
757 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
758 ac::ExecPlan::Make(*cp::threaded_exec_context()));
759
760 ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
761
762 auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
763
764 ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
765 ac::MakeExecNode("source", plan.get(), {}, source_node_options));
766
767 std::shared_ptr<arrow::Table> output_table;
768 auto table_sink_options = ac::TableSinkNodeOptions{&output_table};
769
770 ARROW_RETURN_NOT_OK(
771 ac::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
772 // validate the ExecPlan
773 ARROW_RETURN_NOT_OK(plan->Validate());
774 std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
775 // start the ExecPlan
776 plan->StartProducing();
777
778 // Wait for the plan to finish
779 auto finished = plan->finished();
780 RETURN_NOT_OK(finished.status());
781 std::cout << "Results : " << output_table->ToString() << std::endl;
782 return arrow::Status::OK();
783}
784
785// (Doc section: Table Sink Example)
786
787// (Doc section: RecordBatchReaderSource Example)
788
789/// \brief An example showing the usage of a RecordBatchReader as the data source.
790///
791/// RecordBatchReaderSourceSink Example
792/// This example shows how a record_batch_reader_source can be used
793/// in an execution plan. This includes the source node
794/// receiving data from a TableRecordBatchReader.
795
796arrow::Status RecordBatchReaderSourceSinkExample() {
797 ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
798 std::shared_ptr<arrow::RecordBatchReader> reader =
799 std::make_shared<arrow::TableBatchReader>(table);
800 ac::Declaration reader_source{"record_batch_reader_source",
801 ac::RecordBatchReaderSourceNodeOptions{reader}};
802 return ExecutePlanAndCollectAsTable(std::move(reader_source));
803}
804
805// (Doc section: RecordBatchReaderSource Example)
806
807enum ExampleMode {
808 SOURCE_SINK = 0,
809 TABLE_SOURCE_SINK = 1,
810 SCAN = 2,
811 FILTER = 3,
812 PROJECT = 4,
813 SCALAR_AGGREGATION = 5,
814 GROUP_AGGREGATION = 6,
815 CONSUMING_SINK = 7,
816 ORDER_BY_SINK = 8,
817 HASHJOIN = 9,
818 KSELECT = 10,
819 WRITE = 11,
820 UNION = 12,
821 TABLE_SOURCE_TABLE_SINK = 13,
822 RECORD_BATCH_READER_SOURCE = 14,
823 PROJECT_SEQUENCE = 15
824};
825
826int main(int argc, char** argv) {
827 if (argc < 3) {
828 // Fake success for CI purposes.
829 return EXIT_SUCCESS;
830 }
831
832 std::string base_save_path = argv[1];
833 int mode = std::atoi(argv[2]);
834 arrow::Status status;
835 // ensure arrow::dataset node factories are in the registry
836 arrow::dataset::internal::Initialize();
837 switch (mode) {
838 case SOURCE_SINK:
839 PrintBlock("Source Sink Example");
840 status = SourceSinkExample();
841 break;
842 case TABLE_SOURCE_SINK:
843 PrintBlock("Table Source Sink Example");
844 status = TableSourceSinkExample();
845 break;
846 case SCAN:
847 PrintBlock("Scan Example");
848 status = ScanSinkExample();
849 break;
850 case FILTER:
851 PrintBlock("Filter Example");
852 status = ScanFilterSinkExample();
853 break;
854 case PROJECT:
855 PrintBlock("Project Example");
856 status = ScanProjectSinkExample();
857 break;
858 case PROJECT_SEQUENCE:
859 PrintBlock("Project Example (using Declaration::Sequence)");
860 status = ScanProjectSequenceSinkExample();
861 break;
862 case GROUP_AGGREGATION:
863 PrintBlock("Aggregate Example");
864 status = SourceGroupAggregateSinkExample();
865 break;
866 case SCALAR_AGGREGATION:
867 PrintBlock("Aggregate Example");
868 status = SourceScalarAggregateSinkExample();
869 break;
870 case CONSUMING_SINK:
871 PrintBlock("Consuming-Sink Example");
872 status = SourceConsumingSinkExample();
873 break;
874 case ORDER_BY_SINK:
875 PrintBlock("OrderBy Example");
876 status = SourceOrderBySinkExample();
877 break;
878 case HASHJOIN:
879 PrintBlock("HashJoin Example");
880 status = SourceHashJoinSinkExample();
881 break;
882 case KSELECT:
883 PrintBlock("KSelect Example");
884 status = SourceKSelectExample();
885 break;
886 case WRITE:
887 PrintBlock("Write Example");
888 status = ScanFilterWriteExample(base_save_path);
889 break;
890 case UNION:
891 PrintBlock("Union Example");
892 status = SourceUnionSinkExample();
893 break;
894 case TABLE_SOURCE_TABLE_SINK:
895 PrintBlock("TableSink Example");
896 status = TableSinkExample();
897 break;
898 case RECORD_BATCH_READER_SOURCE:
899 PrintBlock("RecordBatchReaderSource Example");
900 status = RecordBatchReaderSourceSinkExample();
901 break;
902 default:
903 break;
904 }
905
906 if (status.ok()) {
907 return EXIT_SUCCESS;
908 } else {
909 std::cout << "Error occurred: " << status.message() << std::endl;
910 return EXIT_FAILURE;
911 }
912}