Arrow 计算#

Apache Arrow 提供了计算函数,以方便高效和可移植的数据处理。在本文中,您将使用 Arrow 的计算功能来

  1. 计算列的总和

  2. 计算两列的元素级总和

  3. 在列中搜索值

先决条件#

在继续之前,请确保您已安装

  1. Arrow,您可以在此处进行设置:在您自己的项目中使用 Arrow C++。如果您自己编译 Arrow,请确保编译时启用了计算模块(即,-DARROW_COMPUTE=ON),请参阅可选组件

  2. 了解来自 基本 Arrow 数据结构 的基本 Arrow 数据结构

设置#

在运行一些计算之前,我们需要填补几个空白

  1. 我们需要包含必要的头文件。

  2. 需要一个 main() 来将它们粘合在一起。

  3. 我们需要数据来进行操作。

包含#

在编写 C++ 代码之前,我们需要一些包含。我们将获取 iostream 以进行输出,然后导入 Arrow 的计算功能

#include <arrow/api.h>
#include <arrow/compute/api.h>

#include <iostream>

Main()#

对于我们的粘合,我们将使用先前关于数据结构的教程中的 main() 模式

int main() {
  arrow::Status st = RunMain();
  if (!st.ok()) {
    std::cerr << st << std::endl;
    return 1;
  }
  return 0;
}

就像我们之前使用它一样,它与 RunMain() 配对

arrow::Status RunMain() {

生成用于计算的表#

在开始之前,我们将初始化一个 Table,其中包含两个列用于操作。我们将使用来自 基本 Arrow 数据结构 的方法,因此如果有什么令人困惑,请回头查看

  // Create a couple 32-bit integer arrays.
  arrow::Int32Builder int32builder;
  int32_t some_nums_raw[5] = {34, 624, 2223, 5654, 4356};
  ARROW_RETURN_NOT_OK(int32builder.AppendValues(some_nums_raw, 5));
  std::shared_ptr<arrow::Array> some_nums;
  ARROW_ASSIGN_OR_RAISE(some_nums, int32builder.Finish());

  int32_t more_nums_raw[5] = {75342, 23, 64, 17, 736};
  ARROW_RETURN_NOT_OK(int32builder.AppendValues(more_nums_raw, 5));
  std::shared_ptr<arrow::Array> more_nums;
  ARROW_ASSIGN_OR_RAISE(more_nums, int32builder.Finish());

  // Make a table out of our pair of arrays.
  std::shared_ptr<arrow::Field> field_a, field_b;
  std::shared_ptr<arrow::Schema> schema;

  field_a = arrow::field("A", arrow::int32());
  field_b = arrow::field("B", arrow::int32());

  schema = arrow::schema({field_a, field_b});

  std::shared_ptr<arrow::Table> table;
  table = arrow::Table::Make(schema, {some_nums, more_nums}, 5);

计算数组上的总和#

使用计算函数有两个一般步骤,我们在此处将其分开

  1. 准备用于输出的 Datum

  2. 调用 compute::Sum(),这是一个方便函数,用于对 Array 进行求和

  3. 检索和打印输出

使用 Datum 准备输出内存#

完成计算后,我们需要一个地方来存储我们的结果。在 Arrow 中,用于此类输出的对象称为 Datum。此对象用于在计算函数中传递输入和输出,并且可以包含许多不同形状的 Arrow 数据结构。我们需要它来从计算函数中检索输出。

  // The Datum class is what all compute functions output to, and they can take Datums
  // as inputs, as well.
  arrow::Datum sum;

调用 Sum()#

在这里,我们将获取我们的 Table,其中包含列“A”和“B”,并对列“A”求和。对于求和,有一个方便函数,称为 compute::Sum(),它可以降低计算接口的复杂性。我们将查看下一个计算的更复杂版本。对于给定的函数,请参阅 计算函数,以查看是否有方便函数。compute::Sum() 接受给定的 ArrayChunkedArray – 在这里,我们使用 Table::GetColumnByName() 传入列 A。然后,它输出到 Datum。将所有这些放在一起,我们得到这个

  // Here, we can use arrow::compute::Sum. This is a convenience function, and the next
  // computation won't be so simple. However, using these where possible helps
  // readability.
  ARROW_ASSIGN_OR_RAISE(sum, arrow::compute::Sum({table->GetColumnByName("A")}));

从 Datum 获取结果#

上一步给我们留下了一个 Datum,其中包含我们的总和。但是,我们不能直接打印它 – 它在保存任意 Arrow 数据结构方面的灵活性意味着我们必须仔细检索我们的数据。首先,为了了解其中的内容,我们可以检查它是什么样的数据结构,然后检查正在保存的原始类型

  // Get the kind of Datum and what it holds -- this is a Scalar, with int64.
  std::cout << "Datum kind: " << sum.ToString()
            << " content type: " << sum.type()->ToString() << std::endl;

这应报告 Datum 存储一个带有 64 位整数的 Scalar。只是为了看看值是什么,我们可以像这样打印出来,结果是 12891

  // Note that we explicitly request a scalar -- the Datum cannot simply give what it is,
  // you must ask for the correct type.
  std::cout << sum.scalar_as<arrow::Int64Scalar>().value << std::endl;

现在我们已经使用了 compute::Sum() 并从中获得了我们想要的东西!

使用 CallFunction() 计算元素级数组加法#

下一层复杂性使用了 compute::Sum() 很有帮助地隐藏的内容:compute::CallFunction()。对于此示例,我们将探讨如何使用更强大的 compute::CallFunction() 和“add”计算函数。模式保持相似

  1. 准备 Datum 以进行输出

  2. 使用“add”调用 compute::CallFunction()

  3. 检索和打印输出

使用 Datum 准备输出内存#

再次,我们需要一个 Datum 用于我们获得的任何输出

  arrow::Datum element_wise_sum;

将 CallFunction() 与“add”一起使用#

compute::CallFunction() 的第一个参数是所需函数的名称,第二个参数是该函数的数据输入,以向量形式传递。现在,我们想要对 “A” 和 “B” 列进行逐元素相加。因此,我们将请求 “add” 函数,传入 “A” 和 “B” 列,并将输出写入我们的 Datum。将这些组合起来,就得到以下代码:

  // Get element-wise sum of both columns A and B in our Table. Note that here we use
  // CallFunction(), which takes the name of the function as the first argument.
  ARROW_ASSIGN_OR_RAISE(element_wise_sum, arrow::compute::CallFunction(
                                              "add", {table->GetColumnByName("A"),
                                                      table->GetColumnByName("B")}));

另请参阅

可用函数列表,了解可以与 compute::CallFunction() 一起使用的其他函数。

从 Datum 获取结果#

同样,Datum 需要仔细处理。当我们知道它包含什么时,处理起来会容易得多。这个 Datum 包含一个带有 32 位整数的 ChunkedArray,但我们可以打印出来确认。

  // Get the kind of Datum and what it holds -- this is a ChunkedArray, with int32.
  std::cout << "Datum kind: " << element_wise_sum.ToString()
            << " content type: " << element_wise_sum.type()->ToString() << std::endl;

由于它是一个 ChunkedArray,我们从 Datum 请求它 – ChunkedArray 有一个 ChunkedArray::ToString() 方法,所以我们将用它来打印其内容。

  // This time, we get a ChunkedArray, not a scalar.
  std::cout << element_wise_sum.chunked_array()->ToString() << std::endl;

输出看起来像这样

Datum kind: ChunkedArray content type: int32
[
  [
    75376,
    647,
    2287,
    5671,
    5092
  ]
]

现在,我们使用了 compute::CallFunction(),而不是便捷函数!这使得可以进行更广泛的计算。

使用 CallFunction() 和 Options 查找值#

还剩一类计算。compute::CallFunction() 使用向量作为数据输入,但计算通常需要额外的参数来执行函数。为了提供这些参数,计算函数可以与结构体关联,在这些结构体中可以定义它们的参数。您可以在 此处 检查给定函数使用哪个结构体。对于这个例子,我们将使用 “index” 计算函数在 “A” 列中搜索一个值。这个过程有三个步骤,而不是之前的两个步骤:

  1. 准备用于输出的 Datum

  2. 准备 compute::IndexOptions

  3. 使用 “index” 和 compute::IndexOptions 调用 compute::CallFunction()

  4. 检索和打印输出

使用 Datum 为输出准备内存#

我们需要一个 Datum 来存储我们得到的任何输出。

  // Use an options struct to set up searching for 2223 in column A (the third item).
  arrow::Datum third_item;

使用 IndexOptions 配置 “index”#

对于这个探索,我们将使用 “index” 函数 – 这是一种搜索方法,它返回输入值的索引。为了传递这个输入值,我们需要一个 compute::IndexOptions 结构体。所以,让我们创建该结构体。

  // An options struct is used in lieu of passing an arbitrary amount of arguments.
  arrow::compute::IndexOptions index_options;

在一个搜索函数中,需要一个目标值。在这里,我们将使用 2223,即 A 列中的第三项,并相应地配置我们的结构体。

  // We need an Arrow Scalar, not a raw value.
  index_options.value = arrow::MakeScalar(2223);

将 CallFunction() 与 “index” 和 IndexOptions 一起使用#

为了实际运行该函数,我们再次使用 compute::CallFunction(),这次将我们的 IndexOptions 结构体通过引用作为第三个参数传递。与之前一样,第一个参数是函数名称,第二个参数是我们的数据输入。

  ARROW_ASSIGN_OR_RAISE(
      third_item, arrow::compute::CallFunction("index", {table->GetColumnByName("A")},
                                               &index_options));

从 Datum 获取结果#

最后一次,让我们看看我们的 Datum 有什么!这将是一个带有 64 位整数的 Scalar,输出将为 2。

  // Get the kind of Datum and what it holds -- this is a Scalar, with int64
  std::cout << "Datum kind: " << third_item.ToString()
            << " content type: " << third_item.type()->ToString() << std::endl;
  // We get a scalar -- the location of 2223 in column A, which is 2 in 0-based indexing.
  std::cout << third_item.scalar_as<arrow::Int64Scalar>().value << std::endl;

结束程序#

最后,我们只需返回 arrow::Status::OK(),以便 main() 知道我们已经完成,并且一切正常,就像之前的教程一样。

  return arrow::Status::OK();
}

有了这个,您已经使用了三种主要类型的计算函数 – 有和没有便捷函数,然后是有 Options 结构体。现在您可以处理您需要的任何 Table,并解决您遇到的任何可以放入内存的数据问题!

这意味着现在我们必须看看如何在下一篇文章中通过 Arrow Datasets 处理大于内存的数据集。

请参阅下文以获取完整代码的副本

 19// (Doc section: Includes)
 20#include <arrow/api.h>
 21#include <arrow/compute/api.h>
 22
 23#include <iostream>
 24// (Doc section: Includes)
 25
 26// (Doc section: RunMain)
 27arrow::Status RunMain() {
 28  // (Doc section: RunMain)
 29  // (Doc section: Create Tables)
 30  // Create a couple 32-bit integer arrays.
 31  arrow::Int32Builder int32builder;
 32  int32_t some_nums_raw[5] = {34, 624, 2223, 5654, 4356};
 33  ARROW_RETURN_NOT_OK(int32builder.AppendValues(some_nums_raw, 5));
 34  std::shared_ptr<arrow::Array> some_nums;
 35  ARROW_ASSIGN_OR_RAISE(some_nums, int32builder.Finish());
 36
 37  int32_t more_nums_raw[5] = {75342, 23, 64, 17, 736};
 38  ARROW_RETURN_NOT_OK(int32builder.AppendValues(more_nums_raw, 5));
 39  std::shared_ptr<arrow::Array> more_nums;
 40  ARROW_ASSIGN_OR_RAISE(more_nums, int32builder.Finish());
 41
 42  // Make a table out of our pair of arrays.
 43  std::shared_ptr<arrow::Field> field_a, field_b;
 44  std::shared_ptr<arrow::Schema> schema;
 45
 46  field_a = arrow::field("A", arrow::int32());
 47  field_b = arrow::field("B", arrow::int32());
 48
 49  schema = arrow::schema({field_a, field_b});
 50
 51  std::shared_ptr<arrow::Table> table;
 52  table = arrow::Table::Make(schema, {some_nums, more_nums}, 5);
 53  // (Doc section: Create Tables)
 54
 55  // (Doc section: Sum Datum Declaration)
 56  // The Datum class is what all compute functions output to, and they can take Datums
 57  // as inputs, as well.
 58  arrow::Datum sum;
 59  // (Doc section: Sum Datum Declaration)
 60  // (Doc section: Sum Call)
 61  // Here, we can use arrow::compute::Sum. This is a convenience function, and the next
 62  // computation won't be so simple. However, using these where possible helps
 63  // readability.
 64  ARROW_ASSIGN_OR_RAISE(sum, arrow::compute::Sum({table->GetColumnByName("A")}));
 65  // (Doc section: Sum Call)
 66  // (Doc section: Sum Datum Type)
 67  // Get the kind of Datum and what it holds -- this is a Scalar, with int64.
 68  std::cout << "Datum kind: " << sum.ToString()
 69            << " content type: " << sum.type()->ToString() << std::endl;
 70  // (Doc section: Sum Datum Type)
 71  // (Doc section: Sum Contents)
 72  // Note that we explicitly request a scalar -- the Datum cannot simply give what it is,
 73  // you must ask for the correct type.
 74  std::cout << sum.scalar_as<arrow::Int64Scalar>().value << std::endl;
 75  // (Doc section: Sum Contents)
 76
 77  // (Doc section: Add Datum Declaration)
 78  arrow::Datum element_wise_sum;
 79  // (Doc section: Add Datum Declaration)
 80  // (Doc section: Add Call)
 81  // Get element-wise sum of both columns A and B in our Table. Note that here we use
 82  // CallFunction(), which takes the name of the function as the first argument.
 83  ARROW_ASSIGN_OR_RAISE(element_wise_sum, arrow::compute::CallFunction(
 84                                              "add", {table->GetColumnByName("A"),
 85                                                      table->GetColumnByName("B")}));
 86  // (Doc section: Add Call)
 87  // (Doc section: Add Datum Type)
 88  // Get the kind of Datum and what it holds -- this is a ChunkedArray, with int32.
 89  std::cout << "Datum kind: " << element_wise_sum.ToString()
 90            << " content type: " << element_wise_sum.type()->ToString() << std::endl;
 91  // (Doc section: Add Datum Type)
 92  // (Doc section: Add Contents)
 93  // This time, we get a ChunkedArray, not a scalar.
 94  std::cout << element_wise_sum.chunked_array()->ToString() << std::endl;
 95  // (Doc section: Add Contents)
 96
 97  // (Doc section: Index Datum Declare)
 98  // Use an options struct to set up searching for 2223 in column A (the third item).
 99  arrow::Datum third_item;
100  // (Doc section: Index Datum Declare)
101  // (Doc section: IndexOptions Declare)
102  // An options struct is used in lieu of passing an arbitrary amount of arguments.
103  arrow::compute::IndexOptions index_options;
104  // (Doc section: IndexOptions Declare)
105  // (Doc section: IndexOptions Assign)
106  // We need an Arrow Scalar, not a raw value.
107  index_options.value = arrow::MakeScalar(2223);
108  // (Doc section: IndexOptions Assign)
109  // (Doc section: Index Call)
110  ARROW_ASSIGN_OR_RAISE(
111      third_item, arrow::compute::CallFunction("index", {table->GetColumnByName("A")},
112                                               &index_options));
113  // (Doc section: Index Call)
114  // (Doc section: Index Inspection)
115  // Get the kind of Datum and what it holds -- this is a Scalar, with int64
116  std::cout << "Datum kind: " << third_item.ToString()
117            << " content type: " << third_item.type()->ToString() << std::endl;
118  // We get a scalar -- the location of 2223 in column A, which is 2 in 0-based indexing.
119  std::cout << third_item.scalar_as<arrow::Int64Scalar>().value << std::endl;
120  // (Doc section: Index Inspection)
121  // (Doc section: Ret)
122  return arrow::Status::OK();
123}
124// (Doc section: Ret)
125
126// (Doc section: Main)
127int main() {
128  arrow::Status st = RunMain();
129  if (!st.ok()) {
130    std::cerr << st << std::endl;
131    return 1;
132  }
133  return 0;
134}
135// (Doc section: Main)