Arrow 数据集#

Arrow C++ 提供 Datasets 的概念和实现,用于处理分片数据,这些数据可能大于内存,无论是由于生成大量数据、从流中读取还是磁盘上有大型文件。在本文中,您将:

  1. 读取一个多文件分区数据集并将其放入表中,

  2. 从表中写入一个分区数据集。

先决条件#

在继续之前,请确保您已具备:

  1. 已安装 Arrow,您可以在此处进行设置:在您自己的项目中使用 Arrow C++

  2. 通过 基本的 Arrow 数据结构 了解 Arrow 基本数据结构

为了观察差异,阅读 Arrow 文件 I/O 可能也很有用。然而,这不是必需的。

设置#

在运行一些计算之前,我们需要填补几个空白

  1. 我们需要包含必要的头文件。

  2. 需要一个 main() 函数来将所有内容连接起来。

  3. 我们需要磁盘上的数据来操作。

包含文件#

在编写 C++ 代码之前,我们需要一些头文件。我们将获取用于输出的 iostream,然后导入 Arrow 的计算功能,用于本文中将处理的每种文件类型。

#include <arrow/api.h>
#include <arrow/compute/api.h>
#include <arrow/dataset/api.h>
// We use Parquet headers for setting up examples; they are not required for using
// datasets.
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>

#include <unistd.h>
#include <iostream>

Main()#

对于连接代码,我们将使用之前关于数据结构教程中的 main() 模式。

int main() {
  arrow::Status st = RunMain();
  if (!st.ok()) {
    std::cerr << st << std::endl;
    return 1;
  }
  return 0;
}

它与 RunMain() 搭配使用,就像我们之前使用它一样。

arrow::Status RunMain() {

生成用于读取的文件#

我们需要一些文件来实际操作。在实践中,您可能会为自己的应用程序提供一些输入。然而,在这里,我们希望在不增加提供或查找数据集的开销的情况下进行探索,因此让我们生成一些,使其易于理解。您可以随意阅读此内容,但本文将正确地探讨这些概念——目前,只需将其复制,并意识到它最终会以磁盘上的分区数据集结束。

// Generate some data for the rest of this example.
arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
  // This code should look familiar from the basic Arrow example, and is not the
  // focus of this example. However, we need data to work on it, and this makes that!
  auto schema =
      arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
                     arrow::field("c", arrow::int64())});
  std::shared_ptr<arrow::Array> array_a;
  std::shared_ptr<arrow::Array> array_b;
  std::shared_ptr<arrow::Array> array_c;
  arrow::NumericBuilder<arrow::Int64Type> builder;
  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
  ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
  builder.Reset();
  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
  ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
  builder.Reset();
  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
  ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
  return arrow::Table::Make(schema, {array_a, array_b, array_c});
}

// Set up a dataset by writing two Parquet files.
arrow::Result<std::string> CreateExampleParquetDataset(
    const std::shared_ptr<arrow::fs::FileSystem>& filesystem,
    const std::string& root_path) {
  // Much like CreateTable(), this is utility that gets us the dataset we'll be reading
  // from. Don't worry, we also write a dataset in the example proper.
  auto base_path = root_path + "parquet_dataset";
  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
  // Create an Arrow Table
  ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
  // Write it into two Parquet files
  ARROW_ASSIGN_OR_RAISE(auto output,
                        filesystem->OpenOutputStream(base_path + "/data1.parquet"));
  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
      *table->Slice(0, 5), arrow::default_memory_pool(), output, 2048));
  ARROW_ASSIGN_OR_RAISE(output,
                        filesystem->OpenOutputStream(base_path + "/data2.parquet"));
  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
      *table->Slice(5), arrow::default_memory_pool(), output, 2048));
  return base_path;
}

arrow::Status PrepareEnv() {
  // Initilize the compute module to register the required kernels for Dataset
  ARROW_RETURN_NOT_OK(arrow::compute::Initialize());
  // Get our environment prepared for reading, by setting up some quick writing.
  ARROW_ASSIGN_OR_RAISE(auto src_table, CreateTable())
  std::shared_ptr<arrow::fs::FileSystem> setup_fs;
  // Note this operates in the directory the executable is built in.
  char setup_path[256];
  char* result = getcwd(setup_path, 256);
  if (result == NULL) {
    return arrow::Status::IOError("Fetching PWD failed.");
  }

  ARROW_ASSIGN_OR_RAISE(setup_fs, arrow::fs::FileSystemFromUriOrPath(setup_path));
  ARROW_ASSIGN_OR_RAISE(auto dset_path, CreateExampleParquetDataset(setup_fs, ""));

  return arrow::Status::OK();
}

为了实际拥有这些文件,请确保在 RunMain() 中调用的第一件事是我们的辅助函数 PrepareEnv(),它将在磁盘上为我们准备一个数据集来操作。

  ARROW_RETURN_NOT_OK(PrepareEnv());

读取分区数据集#

读取数据集与读取单个文件是不同的任务。由于需要能够解析多个文件和/或文件夹,此任务比读取单个文件需要更多工作。此过程可以分为以下步骤:

  1. 为本地文件系统获取一个 fs::FileSystem 对象。

  2. 创建一个 fs::FileSelector 并使用它来准备一个 dataset::FileSystemDatasetFactory

  3. 使用 dataset::FileSystemDatasetFactory 构建一个 dataset::Dataset

  4. 使用 dataset::Scanner 读取到 Table 中。

准备 FileSystem 对象#

为了开始,我们需要能够与本地文件系统交互。为此,我们需要一个 fs::FileSystem 对象。 fs::FileSystem 是一个抽象,它允许我们使用相同的接口,无论我们使用的是 Amazon S3、Google Cloud Storage 还是本地磁盘——我们将使用本地磁盘。所以,让我们声明它。

  // First, we need a filesystem object, which lets us interact with our local
  // filesystem starting at a given path. For the sake of simplicity, that'll be
  // the current directory.
  std::shared_ptr<arrow::fs::FileSystem> fs;

在这个例子中,我们的 FileSystem 的基本路径将存在于可执行文件的同一目录中。 fs::FileSystemFromUriOrPath() 允许我们为任何支持的文件系统类型获取 fs::FileSystem 对象。然而,在这里,我们只需传递我们的路径。

  // Get the CWD, use it to make the FileSystem object.
  char init_path[256];
  char* result = getcwd(init_path, 256);
  if (result == NULL) {
    return arrow::Status::IOError("Fetching PWD failed.");
  }
  ARROW_ASSIGN_OR_RAISE(fs, arrow::fs::FileSystemFromUriOrPath(init_path));

另请参阅

fs::FileSystem 用于其他支持的文件系统。

创建 FileSystemDatasetFactory#

一个 fs::FileSystem 存储了大量的元数据,但我们需要能够遍历它并解析这些元数据。在 Arrow 中,我们使用 FileSelector 来完成此操作。

  // A file selector lets us actually traverse a multi-file dataset.
  arrow::fs::FileSelector selector;

这个 fs::FileSelector 暂时无法做任何事情。为了使用它,我们需要配置它——我们将让它在“parquet_dataset”中开始任何选择,这是环境准备过程为我们留下数据集的地方,并将 recursive 设置为 true,这允许遍历文件夹。

  selector.base_dir = "parquet_dataset";
  // Recursive is a safe bet if you don't know the nesting of your dataset.
  selector.recursive = true;

要从 fs::FileSystem 获取 dataset::Dataset,我们需要准备一个 dataset::FileSystemDatasetFactory。这是一个冗长但具有描述性的名称——它将为我们创建一个工厂,以便从我们的 fs::FileSystem 获取数据。首先,我们通过填充一个 dataset::FileSystemFactoryOptions 结构体来配置它。

  // Making an options object lets us configure our dataset reading.
  arrow::dataset::FileSystemFactoryOptions options;
  // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
  // schema. We won't set any other options, defaults are fine.
  options.partitioning = arrow::dataset::HivePartitioning::MakeFactory();

有许多文件格式,我们必须选择一种在实际读取时预期的格式。Parquet 是我们磁盘上的格式,所以我们自然会在读取时请求它。

  auto read_format = std::make_shared<arrow::dataset::ParquetFileFormat>();

设置好 fs::FileSystemfs::FileSelector、选项和文件格式之后,我们就可以创建 dataset::FileSystemDatasetFactory 了。这只需要传入我们准备好的所有内容并将其分配给一个变量。

  // Now, we get a factory that will let us get our dataset -- we don't have the
  // dataset yet!
  ARROW_ASSIGN_OR_RAISE(auto factory, arrow::dataset::FileSystemDatasetFactory::Make(
                                          fs, selector, read_format, options));

使用工厂构建数据集#

通过设置 dataset::FileSystemDatasetFactory,我们就可以像在基本教程中使用 ArrayBuilder 一样,通过 dataset::FileSystemDatasetFactory::Finish() 实际构建我们的 dataset::Dataset

  // Now we build our dataset from the factory.
  ARROW_ASSIGN_OR_RAISE(auto read_dataset, factory->Finish());

现在,我们在内存中有一个 dataset::Dataset 对象。这并不意味着整个数据集都在内存中显现,而是我们现在可以访问工具来探索和使用磁盘上的数据集。例如,我们可以获取构成整个数据集的片段(文件),并打印出它们以及一些小信息。

  // Print out the fragments
  ARROW_ASSIGN_OR_RAISE(auto fragments, read_dataset->GetFragments());
  for (const auto& fragment : fragments) {
    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
    std::cout << "Partition expression: "
              << (*fragment)->partition_expression().ToString() << std::endl;
  }

将数据集移入表#

我们处理 Datasets 的一种方式是将它们放入 Table 中,在那里我们可以对该 Table 执行我们所学到的任何操作。

另请参阅

Acero: 一个 C++ 流式执行引擎 用于执行,避免在内存中显现整个数据集。

为了将 Dataset 的内容移动到 Table 中,我们需要一个 dataset::Scanner,它扫描数据并将其输出到 Table。首先,我们从 dataset::Dataset 获取一个 dataset::ScannerBuilder

  // Scan dataset into a Table -- once this is done, you can do
  // normal table things with it, like computation and printing. However, now you're
  // also dedicated to being in memory.
  ARROW_ASSIGN_OR_RAISE(auto read_scan_builder, read_dataset->NewScan());

当然,Builder 的唯一用途是为我们提供 dataset::Scanner,所以让我们使用 dataset::ScannerBuilder::Finish()

  ARROW_ASSIGN_OR_RAISE(auto read_scanner, read_scan_builder->Finish());

现在我们有了在 dataset::Dataset 中移动的工具,让我们用它来获取我们的 Tabledataset::Scanner::ToTable() 正好提供了我们所需的功能,我们可以打印结果。

  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, read_scanner->ToTable());
  std::cout << table->ToString();

这给我们留下了一个普通的 Table。同样,如果要在不将其移动到 Table 的情况下对 Datasets 进行操作,请考虑使用 Acero。

将数据集从表写入磁盘#

写入 dataset::Dataset 是一项与写入单个文件不同的任务。由于需要能够解析和处理跨多个文件和文件夹的分区方案,此任务比写入单个文件需要更多工作。此过程可以分解为以下步骤:

  1. 准备一个 TableBatchReader

  2. 创建一个 dataset::Scanner 以从 TableBatchReader 中拉取数据。

  3. 准备模式、分区和文件格式选项。

  4. 设置 dataset::FileSystemDatasetWriteOptions ——一个配置我们写入函数的结构体。

  5. 将数据集写入磁盘。

准备要写入的表数据#

我们有一个 Table,我们想在磁盘上获取一个 dataset::Dataset。实际上,为了探索,我们将对数据集使用不同的分区方案——而不是像原始片段那样只分成两半,我们将根据每行在“a”列中的值进行分区。

为了开始,我们来获取一个 TableBatchReader!这使得写入 Dataset 变得非常容易,并且可以在任何需要将 Table 分解为 RecordBatches 流的情况下使用。在这里,我们只需使用 TableBatchReader 的构造函数,并传入我们的表。

  // Now, let's get a table out to disk as a dataset!
  // We make a RecordBatchReader from our Table, then set up a scanner, which lets us
  // go to a file.
  std::shared_ptr<arrow::TableBatchReader> write_dataset =
      std::make_shared<arrow::TableBatchReader>(table);

创建用于移动表数据的扫描器#

一旦数据源可用,写入 dataset::Dataset 的过程与读取它的逆过程相似。之前,我们使用 dataset::Scanner 将数据扫描到 Table 中——现在,我们需要一个来从我们的 TableBatchReader 中读取数据。为了获取那个 dataset::Scanner,我们将基于我们的 TableBatchReader 创建一个 dataset::ScannerBuilder,然后使用该 Builder 来构建一个 dataset::Scanner

  auto write_scanner_builder =
      arrow::dataset::ScannerBuilder::FromRecordBatchReader(write_dataset);
  ARROW_ASSIGN_OR_RAISE(auto write_scanner, write_scanner_builder->Finish())

准备模式、分区和文件格式变量#

由于我们希望根据“a”列进行分区,我们需要声明这一点。在定义我们的分区 Schema 时,我们将只包含一个包含“a”的 Field

  // The partition schema determines which fields are used as keys for partitioning.
  auto partition_schema = arrow::schema({arrow::field("a", arrow::utf8())});

这个 Schema 决定了分区的键,但我们需要选择一个算法来处理这个键。我们将再次使用 Hive 风格,这次将我们的模式作为配置传递给它。

  // We'll use Hive-style partitioning, which creates directories with "key=value"
  // pairs.
  auto partitioning =
      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);

有多种文件格式可用,但 Parquet 通常与 Arrow 一起使用,所以我们将写回该格式。

  // Now, we declare we'll be writing Parquet files.
  auto write_format = std::make_shared<arrow::dataset::ParquetFileFormat>();

配置 FileSystemDatasetWriteOptions#

为了写入磁盘,我们需要一些配置。我们将通过在 dataset::FileSystemDatasetWriteOptions 结构体中设置值来实现。我们将尽可能使用默认值对其进行初始化。

  // This time, we make Options for writing, but do much more configuration.
  arrow::dataset::FileSystemDatasetWriteOptions write_options;
  // Defaults to start.
  write_options.file_write_options = write_format->DefaultWriteOptions();

写入文件的一个重要步骤是拥有一个目标 fs::FileSystem。幸运的是,我们有一个在设置读取时准备好的。这是一个简单的变量赋值。

  // Use the filesystem we already have.
  write_options.filesystem = fs;

Arrow 可以创建目录,但它确实需要一个目录名称,所以让我们给它一个,称之为“write_dataset”。

  // Write to the folder "write_dataset" in current directory.
  write_options.base_dir = "write_dataset";

我们之前创建了一种分区方法,声明我们将使用 Hive 风格——这就是我们实际将其传递给写入函数的地方。

  // Use the partitioning declared above.
  write_options.partitioning = partitioning;

其中一部分将是 Arrow 将文件分割开来,从而防止它们过大而无法处理。这正是数据集最初碎片化的原因。为了设置这个,我们需要目录中每个片段的基本名称——在这种情况下,我们将使用“part{i}.parquet”,这意味着(同一目录中的)第三个文件将被称为“part3.parquet”,例如。

  // Define what the name for the files making up the dataset will be.
  write_options.basename_template = "part{i}.parquet";

有时,数据会被多次写入同一个位置,并且会接受覆盖。由于我们可能希望多次运行此应用程序,我们将设置 Arrow 覆盖现有数据——如果我们不这样做,Arrow 将在第一次运行此应用程序后由于看到现有数据而中止。

  // Set behavior to overwrite existing data -- specifically, this lets this example
  // be run more than once, and allows whatever code you have to overwrite what's there.
  write_options.existing_data_behavior =
      arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore;

将数据集写入磁盘#

一旦 dataset::FileSystemDatasetWriteOptions 配置完成,并且 dataset::Scanner 准备好解析数据,我们就可以将选项和 dataset::Scanner 传递给 dataset::FileSystemDataset::Write() 以写入磁盘。

  // Write to disk!
  ARROW_RETURN_NOT_OK(
      arrow::dataset::FileSystemDataset::Write(write_options, write_scanner));

您可以检查您的磁盘,看看您已经写入了一个文件夹,其中包含每个“a”值的子文件夹,每个子文件夹都包含 Parquet 文件!

结束程序#

最后,我们只需返回 Status::OK(),这样 main() 就知道我们已完成,并且一切正常,就像前面的教程一样。

  return arrow::Status::OK();
}

通过这些,您已经读取和写入了分区数据集!这种方法,经过一些配置,将适用于任何受支持的数据集格式。作为此类数据集的一个示例,NYC 出租车数据集是一个众所周知的示例,您可以在此处找到它。现在您可以将大于内存的数据映射用于使用!

这意味着现在我们必须能够在不一次性将所有数据拉入内存的情况下处理这些数据。为此,请尝试使用 Acero。

另请参阅

有关 Acero 的更多信息,请参阅 Acero: 一个 C++ 流式执行引擎

请参考以下内容以获取完整的代码副本。

 19// (Doc section: Includes)
 20#include <arrow/api.h>
 21#include <arrow/compute/api.h>
 22#include <arrow/dataset/api.h>
 23// We use Parquet headers for setting up examples; they are not required for using
 24// datasets.
 25#include <parquet/arrow/reader.h>
 26#include <parquet/arrow/writer.h>
 27
 28#include <unistd.h>
 29#include <iostream>
 30// (Doc section: Includes)
 31
 32// (Doc section: Helper Functions)
 33// Generate some data for the rest of this example.
 34arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
 35  // This code should look familiar from the basic Arrow example, and is not the
 36  // focus of this example. However, we need data to work on it, and this makes that!
 37  auto schema =
 38      arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
 39                     arrow::field("c", arrow::int64())});
 40  std::shared_ptr<arrow::Array> array_a;
 41  std::shared_ptr<arrow::Array> array_b;
 42  std::shared_ptr<arrow::Array> array_c;
 43  arrow::NumericBuilder<arrow::Int64Type> builder;
 44  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
 45  ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
 46  builder.Reset();
 47  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
 48  ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
 49  builder.Reset();
 50  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
 51  ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
 52  return arrow::Table::Make(schema, {array_a, array_b, array_c});
 53}
 54
 55// Set up a dataset by writing two Parquet files.
 56arrow::Result<std::string> CreateExampleParquetDataset(
 57    const std::shared_ptr<arrow::fs::FileSystem>& filesystem,
 58    const std::string& root_path) {
 59  // Much like CreateTable(), this is utility that gets us the dataset we'll be reading
 60  // from. Don't worry, we also write a dataset in the example proper.
 61  auto base_path = root_path + "parquet_dataset";
 62  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
 63  // Create an Arrow Table
 64  ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
 65  // Write it into two Parquet files
 66  ARROW_ASSIGN_OR_RAISE(auto output,
 67                        filesystem->OpenOutputStream(base_path + "/data1.parquet"));
 68  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
 69      *table->Slice(0, 5), arrow::default_memory_pool(), output, 2048));
 70  ARROW_ASSIGN_OR_RAISE(output,
 71                        filesystem->OpenOutputStream(base_path + "/data2.parquet"));
 72  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
 73      *table->Slice(5), arrow::default_memory_pool(), output, 2048));
 74  return base_path;
 75}
 76
 77arrow::Status PrepareEnv() {
 78  // Initilize the compute module to register the required kernels for Dataset
 79  ARROW_RETURN_NOT_OK(arrow::compute::Initialize());
 80  // Get our environment prepared for reading, by setting up some quick writing.
 81  ARROW_ASSIGN_OR_RAISE(auto src_table, CreateTable())
 82  std::shared_ptr<arrow::fs::FileSystem> setup_fs;
 83  // Note this operates in the directory the executable is built in.
 84  char setup_path[256];
 85  char* result = getcwd(setup_path, 256);
 86  if (result == NULL) {
 87    return arrow::Status::IOError("Fetching PWD failed.");
 88  }
 89
 90  ARROW_ASSIGN_OR_RAISE(setup_fs, arrow::fs::FileSystemFromUriOrPath(setup_path));
 91  ARROW_ASSIGN_OR_RAISE(auto dset_path, CreateExampleParquetDataset(setup_fs, ""));
 92
 93  return arrow::Status::OK();
 94}
 95// (Doc section: Helper Functions)
 96
 97// (Doc section: RunMain)
 98arrow::Status RunMain() {
 99  // (Doc section: RunMain)
100  // (Doc section: PrepareEnv)
101  ARROW_RETURN_NOT_OK(PrepareEnv());
102  // (Doc section: PrepareEnv)
103
104  // (Doc section: FileSystem Declare)
105  // First, we need a filesystem object, which lets us interact with our local
106  // filesystem starting at a given path. For the sake of simplicity, that'll be
107  // the current directory.
108  std::shared_ptr<arrow::fs::FileSystem> fs;
109  // (Doc section: FileSystem Declare)
110
111  // (Doc section: FileSystem Init)
112  // Get the CWD, use it to make the FileSystem object.
113  char init_path[256];
114  char* result = getcwd(init_path, 256);
115  if (result == NULL) {
116    return arrow::Status::IOError("Fetching PWD failed.");
117  }
118  ARROW_ASSIGN_OR_RAISE(fs, arrow::fs::FileSystemFromUriOrPath(init_path));
119  // (Doc section: FileSystem Init)
120
121  // (Doc section: FileSelector Declare)
122  // A file selector lets us actually traverse a multi-file dataset.
123  arrow::fs::FileSelector selector;
124  // (Doc section: FileSelector Declare)
125  // (Doc section: FileSelector Config)
126  selector.base_dir = "parquet_dataset";
127  // Recursive is a safe bet if you don't know the nesting of your dataset.
128  selector.recursive = true;
129  // (Doc section: FileSelector Config)
130  // (Doc section: FileSystemFactoryOptions)
131  // Making an options object lets us configure our dataset reading.
132  arrow::dataset::FileSystemFactoryOptions options;
133  // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
134  // schema. We won't set any other options, defaults are fine.
135  options.partitioning = arrow::dataset::HivePartitioning::MakeFactory();
136  // (Doc section: FileSystemFactoryOptions)
137  // (Doc section: File Format Setup)
138  auto read_format = std::make_shared<arrow::dataset::ParquetFileFormat>();
139  // (Doc section: File Format Setup)
140  // (Doc section: FileSystemDatasetFactory Make)
141  // Now, we get a factory that will let us get our dataset -- we don't have the
142  // dataset yet!
143  ARROW_ASSIGN_OR_RAISE(auto factory, arrow::dataset::FileSystemDatasetFactory::Make(
144                                          fs, selector, read_format, options));
145  // (Doc section: FileSystemDatasetFactory Make)
146  // (Doc section: FileSystemDatasetFactory Finish)
147  // Now we build our dataset from the factory.
148  ARROW_ASSIGN_OR_RAISE(auto read_dataset, factory->Finish());
149  // (Doc section: FileSystemDatasetFactory Finish)
150  // (Doc section: Dataset Fragments)
151  // Print out the fragments
152  ARROW_ASSIGN_OR_RAISE(auto fragments, read_dataset->GetFragments());
153  for (const auto& fragment : fragments) {
154    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
155    std::cout << "Partition expression: "
156              << (*fragment)->partition_expression().ToString() << std::endl;
157  }
158  // (Doc section: Dataset Fragments)
159  // (Doc section: Read Scan Builder)
160  // Scan dataset into a Table -- once this is done, you can do
161  // normal table things with it, like computation and printing. However, now you're
162  // also dedicated to being in memory.
163  ARROW_ASSIGN_OR_RAISE(auto read_scan_builder, read_dataset->NewScan());
164  // (Doc section: Read Scan Builder)
165  // (Doc section: Read Scanner)
166  ARROW_ASSIGN_OR_RAISE(auto read_scanner, read_scan_builder->Finish());
167  // (Doc section: Read Scanner)
168  // (Doc section: To Table)
169  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, read_scanner->ToTable());
170  std::cout << table->ToString();
171  // (Doc section: To Table)
172
173  // (Doc section: TableBatchReader)
174  // Now, let's get a table out to disk as a dataset!
175  // We make a RecordBatchReader from our Table, then set up a scanner, which lets us
176  // go to a file.
177  std::shared_ptr<arrow::TableBatchReader> write_dataset =
178      std::make_shared<arrow::TableBatchReader>(table);
179  // (Doc section: TableBatchReader)
180  // (Doc section: WriteScanner)
181  auto write_scanner_builder =
182      arrow::dataset::ScannerBuilder::FromRecordBatchReader(write_dataset);
183  ARROW_ASSIGN_OR_RAISE(auto write_scanner, write_scanner_builder->Finish())
184  // (Doc section: WriteScanner)
185  // (Doc section: Partition Schema)
186  // The partition schema determines which fields are used as keys for partitioning.
187  auto partition_schema = arrow::schema({arrow::field("a", arrow::utf8())});
188  // (Doc section: Partition Schema)
189  // (Doc section: Partition Create)
190  // We'll use Hive-style partitioning, which creates directories with "key=value"
191  // pairs.
192  auto partitioning =
193      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
194  // (Doc section: Partition Create)
195  // (Doc section: Write Format)
196  // Now, we declare we'll be writing Parquet files.
197  auto write_format = std::make_shared<arrow::dataset::ParquetFileFormat>();
198  // (Doc section: Write Format)
199  // (Doc section: Write Options)
200  // This time, we make Options for writing, but do much more configuration.
201  arrow::dataset::FileSystemDatasetWriteOptions write_options;
202  // Defaults to start.
203  write_options.file_write_options = write_format->DefaultWriteOptions();
204  // (Doc section: Write Options)
205  // (Doc section: Options FS)
206  // Use the filesystem we already have.
207  write_options.filesystem = fs;
208  // (Doc section: Options FS)
209  // (Doc section: Options Target)
210  // Write to the folder "write_dataset" in current directory.
211  write_options.base_dir = "write_dataset";
212  // (Doc section: Options Target)
213  // (Doc section: Options Partitioning)
214  // Use the partitioning declared above.
215  write_options.partitioning = partitioning;
216  // (Doc section: Options Partitioning)
217  // (Doc section: Options Name Template)
218  // Define what the name for the files making up the dataset will be.
219  write_options.basename_template = "part{i}.parquet";
220  // (Doc section: Options Name Template)
221  // (Doc section: Options File Behavior)
222  // Set behavior to overwrite existing data -- specifically, this lets this example
223  // be run more than once, and allows whatever code you have to overwrite what's there.
224  write_options.existing_data_behavior =
225      arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore;
226  // (Doc section: Options File Behavior)
227  // (Doc section: Write Dataset)
228  // Write to disk!
229  ARROW_RETURN_NOT_OK(
230      arrow::dataset::FileSystemDataset::Write(write_options, write_scanner));
231  // (Doc section: Write Dataset)
232  // (Doc section: Ret)
233  return arrow::Status::OK();
234}
235// (Doc section: Ret)
236// (Doc section: Main)
237int main() {
238  arrow::Status st = RunMain();
239  if (!st.ok()) {
240    std::cerr << st << std::endl;
241    return 1;
242  }
243  return 0;
244}
245// (Doc section: Main)