Arrow 数据集#

Arrow C++ 提供了 Datasets 的概念和实现,用于处理碎片化数据,这些数据可能大于内存,原因可能是生成大量数据、从流中读取数据或在磁盘上拥有一个大型文件。 在本文中,您将

  1. 读取多分区的多文件数据集并将其放入表格中,

  2. 从表格中写出分区数据集。

先决条件#

在继续之前,请确保您已安装

  1. Arrow,您可以在此处设置:在您自己的项目中使用 Arrow C++

  2. 基本 Arrow 数据结构 中了解基本的 Arrow 数据结构

为了见证差异,阅读 Arrow 文件 I/O 也可能很有用。 但是,这不是必需的。

设置#

在运行一些计算之前,我们需要填补几个空白

  1. 我们需要包含必要的头文件。

  2. 需要一个 main() 将所有内容粘合在一起。

  3. 我们需要磁盘上的数据才能进行操作。

包含#

在编写 C++ 代码之前,我们需要一些包含。 我们将获得用于输出的 iostream,然后为我们将在本文中使用的每种文件类型导入 Arrow 的计算功能

#include <arrow/api.h>
#include <arrow/dataset/api.h>
// We use Parquet headers for setting up examples; they are not required for using
// datasets.
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>

#include <unistd.h>
#include <iostream>

Main()#

对于我们的粘合剂,我们将使用先前关于数据结构的教程中的 main() 模式

int main() {
  arrow::Status st = RunMain();
  if (!st.ok()) {
    std::cerr << st << std::endl;
    return 1;
  }
  return 0;
}

就像我们之前使用它一样,它与 RunMain() 配对

arrow::Status RunMain() {

生成要读取的文件#

我们需要一些文件才能实际操作。 实际上,您可能会为自己的应用程序提供一些输入。 但是,在这里,我们希望在不提供或查找数据集的情况下进行探索,因此让我们生成一些数据集以使其易于遵循。 可以随意阅读此内容,但是本文将正确地介绍这些概念 – 现在只需将其复制进去,并意识到它以磁盘上的分区数据集结尾

// Generate some data for the rest of this example.
arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
  // This code should look familiar from the basic Arrow example, and is not the
  // focus of this example. However, we need data to work on it, and this makes that!
  auto schema =
      arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
                     arrow::field("c", arrow::int64())});
  std::shared_ptr<arrow::Array> array_a;
  std::shared_ptr<arrow::Array> array_b;
  std::shared_ptr<arrow::Array> array_c;
  arrow::NumericBuilder<arrow::Int64Type> builder;
  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
  ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
  builder.Reset();
  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
  ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
  builder.Reset();
  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
  ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
  return arrow::Table::Make(schema, {array_a, array_b, array_c});
}

// Set up a dataset by writing two Parquet files.
arrow::Result<std::string> CreateExampleParquetDataset(
    const std::shared_ptr<arrow::fs::FileSystem>& filesystem,
    const std::string& root_path) {
  // Much like CreateTable(), this is utility that gets us the dataset we'll be reading
  // from. Don't worry, we also write a dataset in the example proper.
  auto base_path = root_path + "parquet_dataset";
  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
  // Create an Arrow Table
  ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
  // Write it into two Parquet files
  ARROW_ASSIGN_OR_RAISE(auto output,
                        filesystem->OpenOutputStream(base_path + "/data1.parquet"));
  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
      *table->Slice(0, 5), arrow::default_memory_pool(), output, 2048));
  ARROW_ASSIGN_OR_RAISE(output,
                        filesystem->OpenOutputStream(base_path + "/data2.parquet"));
  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
      *table->Slice(5), arrow::default_memory_pool(), output, 2048));
  return base_path;
}

arrow::Status PrepareEnv() {
  // Get our environment prepared for reading, by setting up some quick writing.
  ARROW_ASSIGN_OR_RAISE(auto src_table, CreateTable())
  std::shared_ptr<arrow::fs::FileSystem> setup_fs;
  // Note this operates in the directory the executable is built in.
  char setup_path[256];
  char* result = getcwd(setup_path, 256);
  if (result == NULL) {
    return arrow::Status::IOError("Fetching PWD failed.");
  }

  ARROW_ASSIGN_OR_RAISE(setup_fs, arrow::fs::FileSystemFromUriOrPath(setup_path));
  ARROW_ASSIGN_OR_RAISE(auto dset_path, CreateExampleParquetDataset(setup_fs, ""));

  return arrow::Status::OK();
}

为了实际拥有这些文件,请确保在 RunMain() 中调用的第一件事是我们的辅助函数 PrepareEnv(),该函数将在磁盘上获取一个数据集以供我们操作

  ARROW_RETURN_NOT_OK(PrepareEnv());

读取分区数据集#

读取数据集与读取单个文件是一项不同的任务。 由于需要能够解析多个文件和/或文件夹,因此该任务比读取单个文件需要更多的工作。 此过程可以分为以下步骤

  1. 获取本地 FS 的 fs::FileSystem 对象

  2. 创建一个 fs::FileSelector 并使用它来准备一个 dataset::FileSystemDatasetFactory

  3. 使用 dataset::FileSystemDatasetFactory 构建一个 dataset::Dataset

  4. 使用 dataset::Scanner 读入一个 Table

准备文件系统对象#

为了开始,我们需要能够与本地文件系统进行交互。 为了做到这一点,我们需要一个 fs::FileSystem 对象。 fs::FileSystem 是一种抽象,无论使用 Amazon S3、Google Cloud Storage 还是本地磁盘,都可以使用相同的接口 – 我们将使用本地磁盘。 因此,让我们声明它

  // First, we need a filesystem object, which lets us interact with our local
  // filesystem starting at a given path. For the sake of simplicity, that'll be
  // the current directory.
  std::shared_ptr<arrow::fs::FileSystem> fs;

对于此示例,我们将使我们的 FileSystem 的基本路径与可执行文件位于同一目录中。 fs::FileSystemFromUriOrPath() 让我们获得任何受支持文件系统的 fs::FileSystem 对象。 但是,在这里,我们将只传递我们的路径

  // Get the CWD, use it to make the FileSystem object.
  char init_path[256];
  char* result = getcwd(init_path, 256);
  if (result == NULL) {
    return arrow::Status::IOError("Fetching PWD failed.");
  }
  ARROW_ASSIGN_OR_RAISE(fs, arrow::fs::FileSystemFromUriOrPath(init_path));

另请参阅

fs::FileSystem 适用于其他受支持的文件系统。

创建 FileSystemDatasetFactory#

fs::FileSystem 存储大量元数据,但是我们需要能够遍历它并解析该元数据。 在 Arrow 中,我们使用 FileSelector 来执行此操作

  // A file selector lets us actually traverse a multi-file dataset.
  arrow::fs::FileSelector selector;

这个 fs::FileSelector 尚未能够执行任何操作。 为了使用它,我们需要对其进行配置 – 我们将使其在 “parquet_dataset” 中开始任何选择,环境准备过程已在此处为我们留下数据集,并将 recursive 设置为 true,这允许遍历文件夹。

  selector.base_dir = "parquet_dataset";
  // Recursive is a safe bet if you don't know the nesting of your dataset.
  selector.recursive = true;

为了从 fs::FileSystem 获取 dataset::Dataset,我们需要准备一个 dataset::FileSystemDatasetFactory。这是一个很长但描述性的名称 – 它将为我们创建一个工厂,以便从我们的 fs::FileSystem 获取数据。首先,我们通过填充 dataset::FileSystemFactoryOptions 结构体来配置它。

  // Making an options object lets us configure our dataset reading.
  arrow::dataset::FileSystemFactoryOptions options;
  // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
  // schema. We won't set any other options, defaults are fine.
  options.partitioning = arrow::dataset::HivePartitioning::MakeFactory();

文件格式有很多种,我们必须选择一种在实际读取时期望的文件格式。Parquet 是我们磁盘上的格式,所以当然我们在读取时会要求使用这种格式。

  auto read_format = std::make_shared<arrow::dataset::ParquetFileFormat>();

在设置好 fs::FileSystemfs::FileSelector、选项和文件格式后,我们可以创建 dataset::FileSystemDatasetFactory。这只需要传入我们准备好的所有内容,并将其分配给一个变量。

  // Now, we get a factory that will let us get our dataset -- we don't have the
  // dataset yet!
  ARROW_ASSIGN_OR_RAISE(auto factory, arrow::dataset::FileSystemDatasetFactory::Make(
                                          fs, selector, read_format, options));

使用 Factory 构建 Dataset#

通过设置 dataset::FileSystemDatasetFactory,我们可以使用 dataset::FileSystemDatasetFactory::Finish() 实际构建我们的 dataset::Dataset,就像基础教程中的 ArrayBuilder 一样。

  // Now we build our dataset from the factory.
  ARROW_ASSIGN_OR_RAISE(auto read_dataset, factory->Finish());

现在,我们在内存中有一个 dataset::Dataset 对象。这并不意味着整个数据集都已在内存中显现,而是意味着我们现在可以访问允许我们浏览和使用磁盘上的数据集的工具。例如,我们可以获取构成我们整个数据集的片段(文件),并将它们打印出来,以及一些小信息。

  // Print out the fragments
  ARROW_ASSIGN_OR_RAISE(auto fragments, read_dataset->GetFragments());
  for (const auto& fragment : fragments) {
    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
    std::cout << "Partition expression: "
              << (*fragment)->partition_expression().ToString() << std::endl;
  }

将 Dataset 移动到 Table#

我们可以使用 Datasets 的一种方法是将它们放入 Table 中,在那里我们可以对该 Table 执行任何我们已经学会可以对 Tables 执行的操作。

另请参阅

Acero: A C++ 流式执行引擎 用于执行,避免在内存中显现整个数据集。

为了将 Dataset's 内容移动到 Table 中,我们需要一个 dataset::Scanner,它扫描数据并将其输出到 Table。首先,我们从 dataset::Dataset 获取一个 dataset::ScannerBuilder

  // Scan dataset into a Table -- once this is done, you can do
  // normal table things with it, like computation and printing. However, now you're
  // also dedicated to being in memory.
  ARROW_ASSIGN_OR_RAISE(auto read_scan_builder, read_dataset->NewScan());

当然,Builder 唯一的用处是让我们得到我们的 dataset::Scanner,所以让我们使用 dataset::ScannerBuilder::Finish()

  ARROW_ASSIGN_OR_RAISE(auto read_scanner, read_scan_builder->Finish());

现在我们有了一个工具来遍历我们的 dataset::Dataset,让我们用它来获取我们的 Tabledataset::Scanner::ToTable() 提供了我们正在寻找的内容,我们可以打印结果。

  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, read_scanner->ToTable());
  std::cout << table->ToString();

这给我们留下了一个普通的 Table。同样,要处理 Datasets 而不移动到 Table,请考虑使用 Acero。

从 Table 将 Dataset 写入磁盘#

写入 dataset::Dataset 与写入单个文件是不同的任务。由于需要能够解析跨多个文件和文件夹的分区方案,因此该任务比写入单个文件需要更多的工作。这个过程可以分解为以下步骤:

  1. 准备一个 TableBatchReader

  2. 创建一个 dataset::Scanner 以从 TableBatchReader 中提取数据

  3. 准备 schema、分区和文件格式选项

  4. 设置 dataset::FileSystemDatasetWriteOptions – 一个配置我们的写入功能的结构体

  5. 将数据集写入磁盘

准备来自 Table 的数据以进行写入#

我们有一个 Table,我们想在磁盘上获得一个 dataset::Dataset。事实上,为了探索,我们将为数据集使用不同的分区方案 – 而不是像原始片段一样简单地分成两半,我们将根据每行在 “a” 列中的值进行分区。

为了开始,让我们得到一个 TableBatchReader!这使得写入 Dataset 变得非常容易,并且可以在任何需要将 Table 分解为 RecordBatches 流的任何地方使用。在这里,我们可以直接使用 TableBatchReader's 构造函数,使用我们的 table。

  // Now, let's get a table out to disk as a dataset!
  // We make a RecordBatchReader from our Table, then set up a scanner, which lets us
  // go to a file.
  std::shared_ptr<arrow::TableBatchReader> write_dataset =
      std::make_shared<arrow::TableBatchReader>(table);

创建 Scanner 以移动 Table 数据#

一旦有了数据源,写入 dataset::Dataset 的过程与读取它的过程类似,只是方向相反。之前,我们使用 dataset::Scanner 来扫描到 Table 中 – 现在,我们需要使用它从我们的 TableBatchReader 中读取数据。为了获得这个 dataset::Scanner,我们将基于我们的 TableBatchReader 创建一个 dataset::ScannerBuilder,然后使用这个 Builder 来构建一个 dataset::Scanner

  auto write_scanner_builder =
      arrow::dataset::ScannerBuilder::FromRecordBatchReader(write_dataset);
  ARROW_ASSIGN_OR_RAISE(auto write_scanner, write_scanner_builder->Finish())

准备 Schema、分区和文件格式变量#

因为我们想基于 “a” 列进行分区,所以我们需要声明它。在定义我们的分区 Schema 时,我们只需要一个包含 “a” 的 Field

  // The partition schema determines which fields are used as keys for partitioning.
  auto partition_schema = arrow::schema({arrow::field("a", arrow::utf8())});

这个 Schema 决定了分区的键是什么,但我们需要选择一个算法来处理这个键。我们将再次使用 Hive 风格,这次将我们的 schema 作为配置传递给它

  // We'll use Hive-style partitioning, which creates directories with "key=value"
  // pairs.
  auto partitioning =
      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);

有几种文件格式可用,但 Parquet 通常与 Arrow 一起使用,所以我们将写回到该格式

  // Now, we declare we'll be writing Parquet files.
  auto write_format = std::make_shared<arrow::dataset::ParquetFileFormat>();

配置 FileSystemDatasetWriteOptions#

为了写入磁盘,我们需要一些配置。我们将通过在 dataset::FileSystemDatasetWriteOptions 结构中设置值来实现。我们将尽可能地使用默认值进行初始化

  // This time, we make Options for writing, but do much more configuration.
  arrow::dataset::FileSystemDatasetWriteOptions write_options;
  // Defaults to start.
  write_options.file_write_options = write_format->DefaultWriteOptions();

写入文件的重要一步是拥有一个 fs::FileSystem 作为目标。幸运的是,我们在设置读取时就有了它。这是一个简单的变量赋值

  // Use the filesystem we already have.
  write_options.filesystem = fs;

Arrow 可以创建目录,但它需要一个目录名,所以让我们给它一个,称之为 “write_dataset”

  // Write to the folder "write_dataset" in current directory.
  write_options.base_dir = "write_dataset";

我们之前创建了一个分区方法,声明我们将使用 Hive 风格 – 这就是我们实际将其传递给写入函数的地方

  // Use the partitioning declared above.
  write_options.partitioning = partitioning;

其中一部分是 Arrow 将会分割文件,从而防止它们过大而无法处理。这使得数据集一开始就被碎片化。为了设置这一点,我们需要每个目录中片段的基本名称 – 在本例中,我们将使用 “part{i}.parquet”,这意味着第三个文件(在同一目录中)将被称为 “part3.parquet”,例如

  // Define what the name for the files making up the dataset will be.
  write_options.basename_template = "part{i}.parquet";

有时,数据将被多次写入同一位置,并且将接受覆盖。由于我们可能希望多次运行此应用程序,我们将设置 Arrow 来覆盖现有数据 – 如果我们不这样做,Arrow 将在第一次运行此应用程序后由于看到现有数据而中止

  // Set behavior to overwrite existing data -- specifically, this lets this example
  // be run more than once, and allows whatever code you have to overwrite what's there.
  write_options.existing_data_behavior =
      arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore;

将数据集写入磁盘#

一旦配置了 dataset::FileSystemDatasetWriteOptions,并且准备好了一个 dataset::Scanner 来解析数据,我们可以将 Options 和 dataset::Scanner 传递给 dataset::FileSystemDataset::Write() 以写入磁盘

  // Write to disk!
  ARROW_RETURN_NOT_OK(
      arrow::dataset::FileSystemDataset::Write(write_options, write_scanner));

您可以查看您的磁盘,以查看您是否写入了一个文件夹,其中包含 “a” 的每个值的子文件夹,每个子文件夹都有 Parquet 文件!

结束程序#

最后,我们只需返回 Status::OK(),以便 main() 知道我们已经完成,并且一切正常,就像之前的教程一样。

  return arrow::Status::OK();
}

这样,您就完成了读取和写入分区数据集!通过一些配置,这种方法适用于任何受支持的数据集格式。例如,纽约出租车数据集是一个众所周知的例子,您可以在这里找到它。现在您可以获得映射的、大于内存的数据以供使用!

这意味着现在我们必须能够在不一次性将所有数据拉入内存的情况下处理这些数据。为此,请尝试 Acero。

另请参阅

Acero:一个 C++ 流式执行引擎 有关 Acero 的更多信息。

请参考以下内容获取完整代码的副本

 19// (Doc section: Includes)
 20#include <arrow/api.h>
 21#include <arrow/dataset/api.h>
 22// We use Parquet headers for setting up examples; they are not required for using
 23// datasets.
 24#include <parquet/arrow/reader.h>
 25#include <parquet/arrow/writer.h>
 26
 27#include <unistd.h>
 28#include <iostream>
 29// (Doc section: Includes)
 30
 31// (Doc section: Helper Functions)
 32// Generate some data for the rest of this example.
 33arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
 34  // This code should look familiar from the basic Arrow example, and is not the
 35  // focus of this example. However, we need data to work on it, and this makes that!
 36  auto schema =
 37      arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
 38                     arrow::field("c", arrow::int64())});
 39  std::shared_ptr<arrow::Array> array_a;
 40  std::shared_ptr<arrow::Array> array_b;
 41  std::shared_ptr<arrow::Array> array_c;
 42  arrow::NumericBuilder<arrow::Int64Type> builder;
 43  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
 44  ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
 45  builder.Reset();
 46  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
 47  ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
 48  builder.Reset();
 49  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
 50  ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
 51  return arrow::Table::Make(schema, {array_a, array_b, array_c});
 52}
 53
 54// Set up a dataset by writing two Parquet files.
 55arrow::Result<std::string> CreateExampleParquetDataset(
 56    const std::shared_ptr<arrow::fs::FileSystem>& filesystem,
 57    const std::string& root_path) {
 58  // Much like CreateTable(), this is utility that gets us the dataset we'll be reading
 59  // from. Don't worry, we also write a dataset in the example proper.
 60  auto base_path = root_path + "parquet_dataset";
 61  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
 62  // Create an Arrow Table
 63  ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
 64  // Write it into two Parquet files
 65  ARROW_ASSIGN_OR_RAISE(auto output,
 66                        filesystem->OpenOutputStream(base_path + "/data1.parquet"));
 67  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
 68      *table->Slice(0, 5), arrow::default_memory_pool(), output, 2048));
 69  ARROW_ASSIGN_OR_RAISE(output,
 70                        filesystem->OpenOutputStream(base_path + "/data2.parquet"));
 71  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
 72      *table->Slice(5), arrow::default_memory_pool(), output, 2048));
 73  return base_path;
 74}
 75
 76arrow::Status PrepareEnv() {
 77  // Get our environment prepared for reading, by setting up some quick writing.
 78  ARROW_ASSIGN_OR_RAISE(auto src_table, CreateTable())
 79  std::shared_ptr<arrow::fs::FileSystem> setup_fs;
 80  // Note this operates in the directory the executable is built in.
 81  char setup_path[256];
 82  char* result = getcwd(setup_path, 256);
 83  if (result == NULL) {
 84    return arrow::Status::IOError("Fetching PWD failed.");
 85  }
 86
 87  ARROW_ASSIGN_OR_RAISE(setup_fs, arrow::fs::FileSystemFromUriOrPath(setup_path));
 88  ARROW_ASSIGN_OR_RAISE(auto dset_path, CreateExampleParquetDataset(setup_fs, ""));
 89
 90  return arrow::Status::OK();
 91}
 92// (Doc section: Helper Functions)
 93
 94// (Doc section: RunMain)
 95arrow::Status RunMain() {
 96  // (Doc section: RunMain)
 97  // (Doc section: PrepareEnv)
 98  ARROW_RETURN_NOT_OK(PrepareEnv());
 99  // (Doc section: PrepareEnv)
100
101  // (Doc section: FileSystem Declare)
102  // First, we need a filesystem object, which lets us interact with our local
103  // filesystem starting at a given path. For the sake of simplicity, that'll be
104  // the current directory.
105  std::shared_ptr<arrow::fs::FileSystem> fs;
106  // (Doc section: FileSystem Declare)
107
108  // (Doc section: FileSystem Init)
109  // Get the CWD, use it to make the FileSystem object.
110  char init_path[256];
111  char* result = getcwd(init_path, 256);
112  if (result == NULL) {
113    return arrow::Status::IOError("Fetching PWD failed.");
114  }
115  ARROW_ASSIGN_OR_RAISE(fs, arrow::fs::FileSystemFromUriOrPath(init_path));
116  // (Doc section: FileSystem Init)
117
118  // (Doc section: FileSelector Declare)
119  // A file selector lets us actually traverse a multi-file dataset.
120  arrow::fs::FileSelector selector;
121  // (Doc section: FileSelector Declare)
122  // (Doc section: FileSelector Config)
123  selector.base_dir = "parquet_dataset";
124  // Recursive is a safe bet if you don't know the nesting of your dataset.
125  selector.recursive = true;
126  // (Doc section: FileSelector Config)
127  // (Doc section: FileSystemFactoryOptions)
128  // Making an options object lets us configure our dataset reading.
129  arrow::dataset::FileSystemFactoryOptions options;
130  // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
131  // schema. We won't set any other options, defaults are fine.
132  options.partitioning = arrow::dataset::HivePartitioning::MakeFactory();
133  // (Doc section: FileSystemFactoryOptions)
134  // (Doc section: File Format Setup)
135  auto read_format = std::make_shared<arrow::dataset::ParquetFileFormat>();
136  // (Doc section: File Format Setup)
137  // (Doc section: FileSystemDatasetFactory Make)
138  // Now, we get a factory that will let us get our dataset -- we don't have the
139  // dataset yet!
140  ARROW_ASSIGN_OR_RAISE(auto factory, arrow::dataset::FileSystemDatasetFactory::Make(
141                                          fs, selector, read_format, options));
142  // (Doc section: FileSystemDatasetFactory Make)
143  // (Doc section: FileSystemDatasetFactory Finish)
144  // Now we build our dataset from the factory.
145  ARROW_ASSIGN_OR_RAISE(auto read_dataset, factory->Finish());
146  // (Doc section: FileSystemDatasetFactory Finish)
147  // (Doc section: Dataset Fragments)
148  // Print out the fragments
149  ARROW_ASSIGN_OR_RAISE(auto fragments, read_dataset->GetFragments());
150  for (const auto& fragment : fragments) {
151    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
152    std::cout << "Partition expression: "
153              << (*fragment)->partition_expression().ToString() << std::endl;
154  }
155  // (Doc section: Dataset Fragments)
156  // (Doc section: Read Scan Builder)
157  // Scan dataset into a Table -- once this is done, you can do
158  // normal table things with it, like computation and printing. However, now you're
159  // also dedicated to being in memory.
160  ARROW_ASSIGN_OR_RAISE(auto read_scan_builder, read_dataset->NewScan());
161  // (Doc section: Read Scan Builder)
162  // (Doc section: Read Scanner)
163  ARROW_ASSIGN_OR_RAISE(auto read_scanner, read_scan_builder->Finish());
164  // (Doc section: Read Scanner)
165  // (Doc section: To Table)
166  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, read_scanner->ToTable());
167  std::cout << table->ToString();
168  // (Doc section: To Table)
169
170  // (Doc section: TableBatchReader)
171  // Now, let's get a table out to disk as a dataset!
172  // We make a RecordBatchReader from our Table, then set up a scanner, which lets us
173  // go to a file.
174  std::shared_ptr<arrow::TableBatchReader> write_dataset =
175      std::make_shared<arrow::TableBatchReader>(table);
176  // (Doc section: TableBatchReader)
177  // (Doc section: WriteScanner)
178  auto write_scanner_builder =
179      arrow::dataset::ScannerBuilder::FromRecordBatchReader(write_dataset);
180  ARROW_ASSIGN_OR_RAISE(auto write_scanner, write_scanner_builder->Finish())
181  // (Doc section: WriteScanner)
182  // (Doc section: Partition Schema)
183  // The partition schema determines which fields are used as keys for partitioning.
184  auto partition_schema = arrow::schema({arrow::field("a", arrow::utf8())});
185  // (Doc section: Partition Schema)
186  // (Doc section: Partition Create)
187  // We'll use Hive-style partitioning, which creates directories with "key=value"
188  // pairs.
189  auto partitioning =
190      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
191  // (Doc section: Partition Create)
192  // (Doc section: Write Format)
193  // Now, we declare we'll be writing Parquet files.
194  auto write_format = std::make_shared<arrow::dataset::ParquetFileFormat>();
195  // (Doc section: Write Format)
196  // (Doc section: Write Options)
197  // This time, we make Options for writing, but do much more configuration.
198  arrow::dataset::FileSystemDatasetWriteOptions write_options;
199  // Defaults to start.
200  write_options.file_write_options = write_format->DefaultWriteOptions();
201  // (Doc section: Write Options)
202  // (Doc section: Options FS)
203  // Use the filesystem we already have.
204  write_options.filesystem = fs;
205  // (Doc section: Options FS)
206  // (Doc section: Options Target)
207  // Write to the folder "write_dataset" in current directory.
208  write_options.base_dir = "write_dataset";
209  // (Doc section: Options Target)
210  // (Doc section: Options Partitioning)
211  // Use the partitioning declared above.
212  write_options.partitioning = partitioning;
213  // (Doc section: Options Partitioning)
214  // (Doc section: Options Name Template)
215  // Define what the name for the files making up the dataset will be.
216  write_options.basename_template = "part{i}.parquet";
217  // (Doc section: Options Name Template)
218  // (Doc section: Options File Behavior)
219  // Set behavior to overwrite existing data -- specifically, this lets this example
220  // be run more than once, and allows whatever code you have to overwrite what's there.
221  write_options.existing_data_behavior =
222      arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore;
223  // (Doc section: Options File Behavior)
224  // (Doc section: Write Dataset)
225  // Write to disk!
226  ARROW_RETURN_NOT_OK(
227      arrow::dataset::FileSystemDataset::Write(write_options, write_scanner));
228  // (Doc section: Write Dataset)
229  // (Doc section: Ret)
230  return arrow::Status::OK();
231}
232// (Doc section: Ret)
233// (Doc section: Main)
234int main() {
235  arrow::Status st = RunMain();
236  if (!st.ok()) {
237    std::cerr << st << std::endl;
238    return 1;
239  }
240  return 0;
241}
242// (Doc section: Main)