行转列转换#

固定模式#

以下示例将结构体数组转换为 arrow::Table 实例,然后将其转换回原始结构体数组。

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <arrow/api.h>
#include <arrow/result.h>

#include <cstdint>
#include <iomanip>
#include <iostream>
#include <vector>

using arrow::DoubleBuilder;
using arrow::Int64Builder;
using arrow::ListBuilder;

// While we want to use columnar data structures to build efficient operations, we
// often receive data in a row-wise fashion from other systems. In the following,
// we want give a brief introduction into the classes provided by Apache Arrow by
// showing how to transform row-wise data into a columnar table.
//
// The table contains an id for a product, the number of components in the product
// and the cost of each component.
//
// The data in this example is stored in the following struct:
struct data_row {
  int64_t id;
  int64_t components;
  std::vector<double> component_cost;
};

// Transforming a vector of structs into a columnar Table.
//
// The final representation should be an `arrow::Table` which in turn
// is made up of an `arrow::Schema` and a list of
// `arrow::ChunkedArray` instances. As the first step, we will iterate
// over the data and build up the arrays incrementally.  For this
// task, we provide `arrow::ArrayBuilder` classes that help in the
// construction of the final `arrow::Array` instances.
//
// For each type, Arrow has a specially typed builder class. For the primitive
// values `id` and `components` we can use the `arrow::Int64Builder`. For the
// `component_cost` vector, we need to have two builders, a top-level
// `arrow::ListBuilder` that builds the array of offsets and a nested
// `arrow::DoubleBuilder` that constructs the underlying values array that
// is referenced by the offsets in the former array.
arrow::Result<std::shared_ptr<arrow::Table>> VectorToColumnarTable(
    const std::vector<struct data_row>& rows) {
  // The builders are more efficient using
  // arrow::jemalloc::MemoryPool::default_pool() as this can increase the size of
  // the underlying memory regions in-place. At the moment, arrow::jemalloc is only
  // supported on Unix systems, not Windows.
  arrow::MemoryPool* pool = arrow::default_memory_pool();

  Int64Builder id_builder(pool);
  Int64Builder components_builder(pool);
  ListBuilder component_cost_builder(pool, std::make_shared<DoubleBuilder>(pool));
  // The following builder is owned by component_cost_builder.
  DoubleBuilder* component_item_cost_builder =
      (static_cast<DoubleBuilder*>(component_cost_builder.value_builder()));

  // Now we can loop over our existing data and insert it into the builders. The
  // `Append` calls here may fail (e.g. we cannot allocate enough additional memory).
  // Thus we need to check their return values. For more information on these values,
  // check the documentation about `arrow::Status`.
  for (const data_row& row : rows) {
    ARROW_RETURN_NOT_OK(id_builder.Append(row.id));
    ARROW_RETURN_NOT_OK(components_builder.Append(row.components));

    // Indicate the start of a new list row. This will memorise the current
    // offset in the values builder.
    ARROW_RETURN_NOT_OK(component_cost_builder.Append());
    // Store the actual values. The same memory layout is
    // used for the component cost data, in this case a vector of
    // type double, as for the memory that Arrow uses to hold this
    // data and will be created.
    ARROW_RETURN_NOT_OK(component_item_cost_builder->AppendValues(
        row.component_cost.data(), row.component_cost.size()));
  }

  // At the end, we finalise the arrays, declare the (type) schema and combine them
  // into a single `arrow::Table`:
  std::shared_ptr<arrow::Array> id_array;
  ARROW_RETURN_NOT_OK(id_builder.Finish(&id_array));
  std::shared_ptr<arrow::Array> components_array;
  ARROW_RETURN_NOT_OK(components_builder.Finish(&components_array));
  // No need to invoke component_item_cost_builder.Finish because it is implied by
  // the parent builder's Finish invocation.
  std::shared_ptr<arrow::Array> component_cost_array;
  ARROW_RETURN_NOT_OK(component_cost_builder.Finish(&component_cost_array));

  std::vector<std::shared_ptr<arrow::Field>> schema_vector = {
      arrow::field("id", arrow::int64()), arrow::field("components", arrow::int64()),
      arrow::field("component_cost", arrow::list(arrow::float64()))};

  auto schema = std::make_shared<arrow::Schema>(schema_vector);

  // The final `table` variable is the one we can then pass on to other functions
  // that can consume Apache Arrow memory structures. This object has ownership of
  // all referenced data, thus we don't have to care about undefined references once
  // we leave the scope of the function building the table and its underlying arrays.
  std::shared_ptr<arrow::Table> table =
      arrow::Table::Make(schema, {id_array, components_array, component_cost_array});

  return table;
}

arrow::Result<std::vector<data_row>> ColumnarTableToVector(
    const std::shared_ptr<arrow::Table>& table) {
  // To convert an Arrow table back into the same row-wise representation as in the
  // above section, we first will check that the table conforms to our expected
  // schema and then will build up the vector of rows incrementally.
  //
  // For the check if the table is as expected, we can utilise solely its schema.
  std::vector<std::shared_ptr<arrow::Field>> schema_vector = {
      arrow::field("id", arrow::int64()), arrow::field("components", arrow::int64()),
      arrow::field("component_cost", arrow::list(arrow::float64()))};
  auto expected_schema = std::make_shared<arrow::Schema>(schema_vector);

  if (!expected_schema->Equals(*table->schema())) {
    // The table doesn't have the expected schema thus we cannot directly
    // convert it to our target representation.
    return arrow::Status::Invalid("Schemas are not matching!");
  }

  // As we have ensured that the table has the expected structure, we can unpack the
  // underlying arrays. For the primitive columns `id` and `components` we can use the
  // high level functions to get the values whereas for the nested column
  // `component_costs` we need to access the C-pointer to the data to copy its
  // contents into the resulting `std::vector<double>`. Here we need to be careful to
  // also add the offset to the pointer. This offset is needed to enable zero-copy
  // slicing operations. While this could be adjusted automatically for double
  // arrays, this cannot be done for the accompanying bitmap as often the slicing
  // border would be inside a byte.

  auto ids = std::static_pointer_cast<arrow::Int64Array>(table->column(0)->chunk(0));
  auto components =
      std::static_pointer_cast<arrow::Int64Array>(table->column(1)->chunk(0));
  auto component_cost =
      std::static_pointer_cast<arrow::ListArray>(table->column(2)->chunk(0));
  auto component_cost_values =
      std::static_pointer_cast<arrow::DoubleArray>(component_cost->values());
  // To enable zero-copy slices, the native values pointer might need to account
  // for this slicing offset. This is not needed for the higher level functions
  // like Value(…) that already account for this offset internally.
  const double* ccv_ptr = component_cost_values->raw_values();
  std::vector<data_row> rows;
  for (int64_t i = 0; i < table->num_rows(); i++) {
    // Another simplification in this example is that we assume that there are
    // no null entries, e.g. each row is fill with valid values.
    int64_t id = ids->Value(i);
    int64_t component = components->Value(i);
    const double* first = ccv_ptr + component_cost->value_offset(i);
    const double* last = ccv_ptr + component_cost->value_offset(i + 1);
    std::vector<double> components_vec(first, last);
    rows.push_back({id, component, components_vec});
  }

  return rows;
}

arrow::Status RunRowConversion() {
  std::vector<data_row> original_rows = {
      {1, 1, {10.0}}, {2, 3, {11.0, 12.0, 13.0}}, {3, 2, {15.0, 25.0}}};
  std::shared_ptr<arrow::Table> table;
  std::vector<data_row> converted_rows;

  ARROW_ASSIGN_OR_RAISE(table, VectorToColumnarTable(original_rows));

  ARROW_ASSIGN_OR_RAISE(converted_rows, ColumnarTableToVector(table));

  assert(original_rows.size() == converted_rows.size());

  // Print out contents of table, should get
  // ID Components Component prices
  // 1  1          10
  // 2  3          11  12  13
  // 3  2          15  25
  std::cout << std::left << std::setw(3) << "ID " << std::left << std::setw(11)
            << "Components " << std::left << std::setw(15) << "Component prices "
            << std::endl;
  for (const auto& row : converted_rows) {
    std::cout << std::left << std::setw(3) << row.id << std::left << std::setw(11)
              << row.components;
    for (const auto& cost : row.component_cost) {
      std::cout << std::left << std::setw(4) << cost;
    }
    std::cout << std::endl;
  }
  return arrow::Status::OK();
}

int main(int argc, char** argv) {
  auto status = RunRowConversion();
  if (!status.ok()) {
    std::cerr << status.ToString() << std::endl;
    return EXIT_FAILURE;
  }
  return EXIT_SUCCESS;
}

动态模式#

在许多情况下,我们需要转换到和来自编译时未知模式的行数据。为了帮助实现这些转换,此库提供了一些实用程序

以下示例显示了如何实现 rapidjson::Document 和 Arrow 对象之间的转换。您可以在 apache/arrow 中阅读完整的代码示例

编写转换为 Arrow#

要将行转换为 Arrow 记录批,我们将为所有列设置数组构建器,然后为每个字段迭代行值并追加到构建器。我们假设我们已经知道目标模式,该模式可能是由另一个系统提供的或在另一个函数中推断出来的。在转换期间推断模式是一个具有挑战性的命题;许多系统将检查前 N 行以推断模式(如果尚不存在)。

在顶层,我们定义了一个函数 ConvertToRecordBatch

495arrow::Result<std::shared_ptr<arrow::RecordBatch>> ConvertToRecordBatch(
496    const std::vector<rapidjson::Document>& rows, std::shared_ptr<arrow::Schema> schema) {
497  // RecordBatchBuilder will create array builders for us for each field in our
498  // schema. By passing the number of output rows (`rows.size()`) we can
499  // pre-allocate the correct size of arrays, except of course in the case of
500  // string, byte, and list arrays, which have dynamic lengths.
501  std::unique_ptr<arrow::RecordBatchBuilder> batch_builder;
502  ARROW_ASSIGN_OR_RAISE(
503      batch_builder,
504      arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool(), rows.size()));
505
506  // Inner converter will take rows and be responsible for appending values
507  // to provided array builders.
508  JsonValueConverter converter(rows);
509  for (int i = 0; i < batch_builder->num_fields(); ++i) {
510    std::shared_ptr<arrow::Field> field = schema->field(i);
511    arrow::ArrayBuilder* builder = batch_builder->GetField(i);
512    ARROW_RETURN_NOT_OK(converter.Convert(*field.get(), builder));
513  }
514
515  std::shared_ptr<arrow::RecordBatch> batch;
516  ARROW_ASSIGN_OR_RAISE(batch, batch_builder->Flush());
517
518  // Use RecordBatch::ValidateFull() to make sure arrays were correctly constructed.
519  DCHECK_OK(batch->ValidateFull());
520  return batch;
521}  // ConvertToRecordBatch

首先,我们使用 arrow::RecordBatchBuilder,它方便地为模式中的每个字段创建构建器。然后,我们遍历模式的字段,获取构建器,并在我们的 JsonValueConverter 上调用 Convert()(将在后面讨论)。最后,我们调用 batch->ValidateFull(),它检查我们数组的完整性以确保正确执行转换,这对于调试新的转换实现很有用。

下一层,JsonValueConverter 负责将提供的字段的行值追加到提供的数组构建器。为了专门针对每种数据类型进行逻辑处理,它实现了 Visit 方法并调用 arrow::VisitTypeInline()。(有关类型访问者的更多信息,请参阅 访问者模式。)

在该类的末尾是私有方法 FieldValues(),它返回跨行当前字段的列值的迭代器。在扁平化的基于行的结构(例如值向量)中,这可能很容易实现。但是,如果模式嵌套,例如 JSON 文档的情况,则需要一个特殊的迭代器来导航嵌套级别。有关 DocValuesIterator 的实现细节,请参阅 完整示例

323class JsonValueConverter {
324 public:
325  explicit JsonValueConverter(const std::vector<rapidjson::Document>& rows)
326      : rows_(rows), array_levels_(0) {}
327
328  JsonValueConverter(const std::vector<rapidjson::Document>& rows,
329                     const std::vector<std::string>& root_path, int64_t array_levels)
330      : rows_(rows), root_path_(root_path), array_levels_(array_levels) {}
331
332  /// \brief For field passed in, append corresponding values to builder
333  arrow::Status Convert(const arrow::Field& field, arrow::ArrayBuilder* builder) {
334    return Convert(field, field.name(), builder);
335  }
336
337  /// \brief For field passed in, append corresponding values to builder
338  arrow::Status Convert(const arrow::Field& field, const std::string& field_name,
339                        arrow::ArrayBuilder* builder) {
340    field_name_ = field_name;
341    builder_ = builder;
342    ARROW_RETURN_NOT_OK(arrow::VisitTypeInline(*field.type().get(), this));
343    return arrow::Status::OK();
344  }
345
346  // Default implementation
347  arrow::Status Visit(const arrow::DataType& type) {
348    return arrow::Status::NotImplemented(
349        "Cannot convert json value to Arrow array of type ", type.ToString());
350  }
351
352  arrow::Status Visit(const arrow::Int64Type& type) {
353    arrow::Int64Builder* builder = static_cast<arrow::Int64Builder*>(builder_);
354    for (const auto& maybe_value : FieldValues()) {
355      ARROW_ASSIGN_OR_RAISE(auto value, maybe_value);
356      if (value->IsNull()) {
357        ARROW_RETURN_NOT_OK(builder->AppendNull());
358      } else {
359        if (value->IsUint()) {
360          ARROW_RETURN_NOT_OK(builder->Append(value->GetUint()));
361        } else if (value->IsInt()) {
362          ARROW_RETURN_NOT_OK(builder->Append(value->GetInt()));
363        } else if (value->IsUint64()) {
364          ARROW_RETURN_NOT_OK(builder->Append(value->GetUint64()));
365        } else if (value->IsInt64()) {
366          ARROW_RETURN_NOT_OK(builder->Append(value->GetInt64()));
367        } else {
368          return arrow::Status::Invalid("Value is not an integer");
369        }
370      }
371    }
372    return arrow::Status::OK();
373  }
374
375  arrow::Status Visit(const arrow::DoubleType& type) {
376    arrow::DoubleBuilder* builder = static_cast<arrow::DoubleBuilder*>(builder_);
377    for (const auto& maybe_value : FieldValues()) {
378      ARROW_ASSIGN_OR_RAISE(auto value, maybe_value);
379      if (value->IsNull()) {
380        ARROW_RETURN_NOT_OK(builder->AppendNull());
381      } else {
382        ARROW_RETURN_NOT_OK(builder->Append(value->GetDouble()));
383      }
384    }
385    return arrow::Status::OK();
386  }
387
388  arrow::Status Visit(const arrow::StringType& type) {
389    arrow::StringBuilder* builder = static_cast<arrow::StringBuilder*>(builder_);
390    for (const auto& maybe_value : FieldValues()) {
391      ARROW_ASSIGN_OR_RAISE(auto value, maybe_value);
392      if (value->IsNull()) {
393        ARROW_RETURN_NOT_OK(builder->AppendNull());
394      } else {
395        ARROW_RETURN_NOT_OK(builder->Append(value->GetString()));
396      }
397    }
398    return arrow::Status::OK();
399  }
400
401  arrow::Status Visit(const arrow::BooleanType& type) {
402    arrow::BooleanBuilder* builder = static_cast<arrow::BooleanBuilder*>(builder_);
403    for (const auto& maybe_value : FieldValues()) {
404      ARROW_ASSIGN_OR_RAISE(auto value, maybe_value);
405      if (value->IsNull()) {
406        ARROW_RETURN_NOT_OK(builder->AppendNull());
407      } else {
408        ARROW_RETURN_NOT_OK(builder->Append(value->GetBool()));
409      }
410    }
411    return arrow::Status::OK();
412  }
413
414  arrow::Status Visit(const arrow::StructType& type) {
415    arrow::StructBuilder* builder = static_cast<arrow::StructBuilder*>(builder_);
416
417    std::vector<std::string> child_path(root_path_);
418    if (field_name_.size() > 0) {
419      child_path.push_back(field_name_);
420    }
421    auto child_converter = JsonValueConverter(rows_, child_path, array_levels_);
422
423    for (int i = 0; i < type.num_fields(); ++i) {
424      std::shared_ptr<arrow::Field> child_field = type.field(i);
425      std::shared_ptr<arrow::ArrayBuilder> child_builder = builder->child_builder(i);
426
427      ARROW_RETURN_NOT_OK(
428          child_converter.Convert(*child_field.get(), child_builder.get()));
429    }
430
431    // Make null bitmap
432    for (const auto& maybe_value : FieldValues()) {
433      ARROW_ASSIGN_OR_RAISE(auto value, maybe_value);
434      ARROW_RETURN_NOT_OK(builder->Append(!value->IsNull()));
435    }
436
437    return arrow::Status::OK();
438  }
439
440  arrow::Status Visit(const arrow::ListType& type) {
441    arrow::ListBuilder* builder = static_cast<arrow::ListBuilder*>(builder_);
442
443    // Values and offsets needs to be interleaved in ListBuilder, so first collect the
444    // values
445    std::unique_ptr<arrow::ArrayBuilder> tmp_value_builder;
446    ARROW_ASSIGN_OR_RAISE(tmp_value_builder,
447                          arrow::MakeBuilder(builder->value_builder()->type()));
448    std::vector<std::string> child_path(root_path_);
449    child_path.push_back(field_name_);
450    auto child_converter = JsonValueConverter(rows_, child_path, array_levels_ + 1);
451    ARROW_RETURN_NOT_OK(
452        child_converter.Convert(*type.value_field().get(), "", tmp_value_builder.get()));
453
454    std::shared_ptr<arrow::Array> values_array;
455    ARROW_RETURN_NOT_OK(tmp_value_builder->Finish(&values_array));
456    std::shared_ptr<arrow::ArrayData> values_data = values_array->data();
457
458    arrow::ArrayBuilder* value_builder = builder->value_builder();
459    int64_t offset = 0;
460    for (const auto& maybe_value : FieldValues()) {
461      ARROW_ASSIGN_OR_RAISE(auto value, maybe_value);
462      ARROW_RETURN_NOT_OK(builder->Append(!value->IsNull()));
463      if (!value->IsNull() && value->Size() > 0) {
464        ARROW_RETURN_NOT_OK(
465            value_builder->AppendArraySlice(*values_data.get(), offset, value->Size()));
466        offset += value->Size();
467      }
468    }
469
470    return arrow::Status::OK();
471  }
472
473 private:
474  std::string field_name_;
475  arrow::ArrayBuilder* builder_;
476  const std::vector<rapidjson::Document>& rows_;
477  std::vector<std::string> root_path_;
478  int64_t array_levels_;
479
480  /// Return a flattened iterator over values at nested location
481  arrow::Iterator<const rapidjson::Value*> FieldValues() {
482    std::vector<std::string> path(root_path_);
483    if (field_name_.size() > 0) {
484      path.push_back(field_name_);
485    }
486    auto iter = DocValuesIterator(rows_, std::move(path), array_levels_);
487    auto fn = [iter]() mutable -> arrow::Result<const rapidjson::Value*> {
488      return iter.Next();
489    };
490
491    return arrow::MakeFunctionIterator(fn);
492  }
493};  // JsonValueConverter

编写从 Arrow 转换#

要从 Arrow 记录批转换为行,我们将以较小的批次处理表,访问批次的每个字段并逐列填充输出行。

在顶层,我们定义了 ArrowToDocumentConverter,它提供将 Arrow 批和表转换为行的 API。在许多情况下,以较小的批次执行转换为行操作比一次执行整个表更有效。因此,我们定义了一个 ConvertToVector 方法来转换单个批次,然后在另一个转换方法中,我们使用 arrow::TableBatchReader 迭代表的切片。这将返回 Arrow 的迭代器类型(arrow::Iterator),因此行可以逐个处理或收集到容器中。

179class ArrowToDocumentConverter {
180 public:
181  /// Convert a single batch of Arrow data into Documents
182  arrow::Result<std::vector<rapidjson::Document>> ConvertToVector(
183      std::shared_ptr<arrow::RecordBatch> batch) {
184    RowBatchBuilder builder{batch->num_rows()};
185
186    for (int i = 0; i < batch->num_columns(); ++i) {
187      builder.SetField(batch->schema()->field(i).get());
188      ARROW_RETURN_NOT_OK(arrow::VisitArrayInline(*batch->column(i).get(), &builder));
189    }
190
191    return std::move(builder).Rows();
192  }
193
194  /// Convert an Arrow table into an iterator of Documents
195  arrow::Iterator<rapidjson::Document> ConvertToIterator(
196      std::shared_ptr<arrow::Table> table, size_t batch_size) {
197    // Use TableBatchReader to divide table into smaller batches. The batches
198    // created are zero-copy slices with *at most* `batch_size` rows.
199    auto batch_reader = std::make_shared<arrow::TableBatchReader>(*table);
200    batch_reader->set_chunksize(batch_size);
201
202    auto read_batch = [this](const std::shared_ptr<arrow::RecordBatch>& batch)
203        -> arrow::Result<arrow::Iterator<rapidjson::Document>> {
204      ARROW_ASSIGN_OR_RAISE(auto rows, ConvertToVector(batch));
205      return arrow::MakeVectorIterator(std::move(rows));
206    };
207
208    auto nested_iter = arrow::MakeMaybeMapIterator(
209        read_batch, arrow::MakeIteratorFromReader(std::move(batch_reader)));
210
211    return arrow::MakeFlattenIterator(std::move(nested_iter));
212  }
213};  // ArrowToDocumentConverter

下一层,输出行由 RowBatchBuilder 填充。RowBatchBuilder 实现 Visit() 方法,但为了节省代码,我们使用 arrow::enable_if_primitive_ctype 为具有基本 C 等价物(布尔值、整数和浮点数)的数组类型编写了一个模板方法。有关其他类型谓词,请参阅 类型特征

 57class RowBatchBuilder {
 58 public:
 59  explicit RowBatchBuilder(int64_t num_rows) : field_(nullptr) {
 60    // Reserve all of the space required up-front to avoid unnecessary resizing
 61    rows_.reserve(num_rows);
 62
 63    for (int64_t i = 0; i < num_rows; ++i) {
 64      rows_.push_back(rapidjson::Document());
 65      rows_[i].SetObject();
 66    }
 67  }
 68
 69  /// \brief Set which field to convert.
 70  void SetField(const arrow::Field* field) { field_ = field; }
 71
 72  /// \brief Retrieve converted rows from builder.
 73  std::vector<rapidjson::Document> Rows() && { return std::move(rows_); }
 74
 75  // Default implementation
 76  arrow::Status Visit(const arrow::Array& array) {
 77    return arrow::Status::NotImplemented(
 78        "Cannot convert to json document for array of type ", array.type()->ToString());
 79  }
 80
 81  // Handles booleans, integers, floats
 82  template <typename ArrayType, typename DataClass = typename ArrayType::TypeClass>
 83  arrow::enable_if_primitive_ctype<DataClass, arrow::Status> Visit(
 84      const ArrayType& array) {
 85    assert(static_cast<int64_t>(rows_.size()) == array.length());
 86    for (int64_t i = 0; i < array.length(); ++i) {
 87      if (!array.IsNull(i)) {
 88        rapidjson::Value str_key(field_->name(), rows_[i].GetAllocator());
 89        rows_[i].AddMember(str_key, array.Value(i), rows_[i].GetAllocator());
 90      }
 91    }
 92    return arrow::Status::OK();
 93  }
 94
 95  arrow::Status Visit(const arrow::StringArray& array) {
 96    assert(static_cast<int64_t>(rows_.size()) == array.length());
 97    for (int64_t i = 0; i < array.length(); ++i) {
 98      if (!array.IsNull(i)) {
 99        rapidjson::Value str_key(field_->name(), rows_[i].GetAllocator());
100        std::string_view value_view = array.Value(i);
101        rapidjson::Value value;
102        value.SetString(value_view.data(),
103                        static_cast<rapidjson::SizeType>(value_view.size()),
104                        rows_[i].GetAllocator());
105        rows_[i].AddMember(str_key, value, rows_[i].GetAllocator());
106      }
107    }
108    return arrow::Status::OK();
109  }
110
111  arrow::Status Visit(const arrow::StructArray& array) {
112    const arrow::StructType* type = array.struct_type();
113
114    assert(static_cast<int64_t>(rows_.size()) == array.length());
115
116    RowBatchBuilder child_builder(rows_.size());
117    for (int i = 0; i < type->num_fields(); ++i) {
118      const arrow::Field* child_field = type->field(i).get();
119      child_builder.SetField(child_field);
120      ARROW_RETURN_NOT_OK(arrow::VisitArrayInline(*array.field(i).get(), &child_builder));
121    }
122    std::vector<rapidjson::Document> rows = std::move(child_builder).Rows();
123
124    for (int64_t i = 0; i < array.length(); ++i) {
125      if (!array.IsNull(i)) {
126        rapidjson::Value str_key(field_->name(), rows_[i].GetAllocator());
127        // Must copy value to new allocator
128        rapidjson::Value row_val;
129        row_val.CopyFrom(rows[i], rows_[i].GetAllocator());
130        rows_[i].AddMember(str_key, row_val, rows_[i].GetAllocator());
131      }
132    }
133    return arrow::Status::OK();
134  }
135
136  arrow::Status Visit(const arrow::ListArray& array) {
137    assert(static_cast<int64_t>(rows_.size()) == array.length());
138    // First create rows from values
139    std::shared_ptr<arrow::Array> values = array.values();
140    RowBatchBuilder child_builder(values->length());
141    const arrow::Field* value_field = array.list_type()->value_field().get();
142    std::string value_field_name = value_field->name();
143    child_builder.SetField(value_field);
144    ARROW_RETURN_NOT_OK(arrow::VisitArrayInline(*values.get(), &child_builder));
145
146    std::vector<rapidjson::Document> rows = std::move(child_builder).Rows();
147
148    int64_t values_i = 0;
149    for (int64_t i = 0; i < array.length(); ++i) {
150      if (array.IsNull(i)) continue;
151
152      rapidjson::Document::AllocatorType& allocator = rows_[i].GetAllocator();
153      auto array_len = array.value_length(i);
154
155      rapidjson::Value value;
156      value.SetArray();
157      value.Reserve(array_len, allocator);
158
159      for (int64_t j = 0; j < array_len; ++j) {
160        rapidjson::Value row_val;
161        // Must copy value to new allocator
162        row_val.CopyFrom(rows[values_i][value_field_name], allocator);
163        value.PushBack(row_val, allocator);
164        ++values_i;
165      }
166
167      rapidjson::Value str_key(field_->name(), allocator);
168      rows_[i].AddMember(str_key, value, allocator);
169    }
170
171    return arrow::Status::OK();
172  }
173
174 private:
175  const arrow::Field* field_;
176  std::vector<rapidjson::Document> rows_;
177};  // RowBatchBuilder