表格数据集#

另请参阅

数据集 API 参考

Arrow Datasets 库提供了高效处理表格数据的相关功能,这些数据集的大小可能超过内存限制,且分布在多个文件中。其特性包括:

  • 一个支持不同来源、不同文件格式及不同文件系统(本地、云存储)的统一接口。

  • 源发现(遍历目录、处理具有各种分区方案的分区数据集、基础模式归一化等)。

  • 通过谓词下推(过滤行)、投影(选择和派生列)以及可选的并行读取功能实现的优化读取。

目前支持的文件格式包括 Parquet、Feather / Arrow IPC、CSV 和 ORC(注意:ORC 数据集目前仅支持读取,暂不支持写入)。未来的目标是扩展对其他文件格式和数据源(如数据库连接)的支持。

读取数据集#

对于下面的示例,我们先创建一个小型数据集,包含一个拥有两个 Parquet 文件的目录。

50// Generate some data for the rest of this example.
51arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
52  auto schema =
53      arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
54                     arrow::field("c", arrow::int64())});
55  std::shared_ptr<arrow::Array> array_a;
56  std::shared_ptr<arrow::Array> array_b;
57  std::shared_ptr<arrow::Array> array_c;
58  arrow::NumericBuilder<arrow::Int64Type> builder;
59  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
60  ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
61  builder.Reset();
62  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
63  ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
64  builder.Reset();
65  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
66  ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
67  return arrow::Table::Make(schema, {array_a, array_b, array_c});
68}
69
70// Set up a dataset by writing two Parquet files.
71arrow::Result<std::string> CreateExampleParquetDataset(
72    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
73  auto base_path = root_path + "/parquet_dataset";
74  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
75  // Create an Arrow Table
76  ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
77  // Write it into two Parquet files
78  ARROW_ASSIGN_OR_RAISE(auto output,
79                        filesystem->OpenOutputStream(base_path + "/data1.parquet"));
80  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
81      *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
82  ARROW_ASSIGN_OR_RAISE(output,
83                        filesystem->OpenOutputStream(base_path + "/data2.parquet"));
84  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
85      *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
86  return base_path;
87}

(请参阅底部的完整示例:完整示例。)

数据集发现#

arrow::dataset::Dataset 对象可以使用各种 arrow::dataset::DatasetFactory 对象来创建。这里,我们将使用 arrow::dataset::FileSystemDatasetFactory,它能够根据给定的基目录路径创建数据集。

163// Read the whole dataset with the given format, without partitioning.
164arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
165    const std::shared_ptr<fs::FileSystem>& filesystem,
166    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
167  // Create a dataset by scanning the filesystem for files
168  fs::FileSelector selector;
169  selector.base_dir = base_dir;
170  ARROW_ASSIGN_OR_RAISE(
171      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
172                                                       ds::FileSystemFactoryOptions()));
173  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
174  // Print out the fragments
175  ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
176  for (const auto& fragment : fragments) {
177    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
178  }
179  // Read the entire dataset as a Table
180  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
181  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
182  return scanner->ToTable();
183}

我们还传入了要使用的文件系统和读取所用的文件格式。这使我们能够(例如)在读取本地文件或 Amazon S3 中的文件,以及在 Parquet 和 CSV 之间进行选择。

除了搜索基目录外,我们还可以手动列出文件路径。

创建 arrow::dataset::Dataset 并不会立即开始读取数据本身。它只会遍历目录以找到所有文件(如果需要),这些文件可以通过 arrow::dataset::FileSystemDataset::files() 获取。

// Print out the files crawled (only for FileSystemDataset)
for (const auto& filename : dataset->files()) {
  std::cout << filename << std::endl;
}

……并推断数据集的模式(Schema)(默认从第一个文件推断)。

std::cout << dataset->schema()->ToString() << std::endl;

使用 arrow::dataset::Dataset::NewScan() 方法,我们可以构建一个 arrow::dataset::Scanner,并使用 arrow::dataset::Scanner::ToTable() 方法将数据集(或其一部分)读取到 arrow::Table 中。

163// Read the whole dataset with the given format, without partitioning.
164arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
165    const std::shared_ptr<fs::FileSystem>& filesystem,
166    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
167  // Create a dataset by scanning the filesystem for files
168  fs::FileSelector selector;
169  selector.base_dir = base_dir;
170  ARROW_ASSIGN_OR_RAISE(
171      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
172                                                       ds::FileSystemFactoryOptions()));
173  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
174  // Print out the fragments
175  ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
176  for (const auto& fragment : fragments) {
177    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
178  }
179  // Read the entire dataset as a Table
180  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
181  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
182  return scanner->ToTable();
183}

注意

根据数据集的大小,这可能需要大量的内存;请参阅下文关于过滤/投影的 过滤数据 部分。

读取不同的文件格式#

上述示例使用的是本地磁盘上的 Parquet 文件,但 Dataset API 为多种文件格式和文件系统提供了统一的接口。(有关后者的更多信息,请参阅 从云存储中读取。)目前支持 Parquet、ORC、Feather / Arrow IPC 和 CSV 文件格式;未来计划支持更多格式。

如果我们以 Feather 文件而非 Parquet 文件格式保存表

 91// Set up a dataset by writing two Feather files.
 92arrow::Result<std::string> CreateExampleFeatherDataset(
 93    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
 94  auto base_path = root_path + "/feather_dataset";
 95  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
 96  // Create an Arrow Table
 97  ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
 98  // Write it into two Feather files
 99  ARROW_ASSIGN_OR_RAISE(auto output,
100                        filesystem->OpenOutputStream(base_path + "/data1.feather"));
101  ARROW_ASSIGN_OR_RAISE(auto writer,
102                        arrow::ipc::MakeFileWriter(output.get(), table->schema()));
103  ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(0, 5)));
104  ARROW_RETURN_NOT_OK(writer->Close());
105  ARROW_ASSIGN_OR_RAISE(output,
106                        filesystem->OpenOutputStream(base_path + "/data2.feather"));
107  ARROW_ASSIGN_OR_RAISE(writer,
108                        arrow::ipc::MakeFileWriter(output.get(), table->schema()));
109  ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(5)));
110  ARROW_RETURN_NOT_OK(writer->Close());
111  return base_path;
112}

……那么我们可以通过传入 arrow::dataset::IpcFileFormat 来读取该 Feather 文件。

auto format = std::make_shared<ds::ParquetFileFormat>();
// ...
auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
                   .ValueOrDie();

自定义文件格式#

arrow::dataset::FileFormat 对象具有控制文件如何读取的属性。例如:

auto format = std::make_shared<ds::ParquetFileFormat>();
format->reader_options.dict_columns.insert("a");

将配置列 "a" 在读取时进行字典编码。同样,设置 arrow::dataset::CsvFileFormat::parse_options 可以让我们更改读取逗号分隔或制表符分隔数据等设置。

此外,将 arrow::dataset::FragmentScanOptions 传递给 arrow::dataset::ScannerBuilder::FragmentScanOptions() 可提供对数据扫描的精细控制。例如,对于 CSV 文件,我们可以在扫描时更改哪些值被转换为布尔值 true 或 false。

过滤数据#

到目前为止,我们一直在读取整个数据集,但如果我们只需要其中的一部分数据,这可能会浪费时间和内存。 arrow::dataset::Scanner 提供了对读取哪些数据的控制。

在此代码片段中,我们使用 arrow::dataset::ScannerBuilder::Project() 来选择要读取的列。

187// Read a dataset, but select only column "b" and only rows where b < 4.
188//
189// This is useful when you only want a few columns from a dataset. Where possible,
190// Datasets will push down the column selection such that less work is done.
191arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
192    const std::shared_ptr<fs::FileSystem>& filesystem,
193    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
194  fs::FileSelector selector;
195  selector.base_dir = base_dir;
196  ARROW_ASSIGN_OR_RAISE(
197      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
198                                                       ds::FileSystemFactoryOptions()));
199  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
200  // Read specified columns with a row filter
201  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
202  ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
203  ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
204  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
205  return scanner->ToTable();
206}

某些格式(如 Parquet)可以通过仅从文件系统中读取指定的列来减少此处的 I/O 开销。

可以使用 arrow::dataset::ScannerBuilder::Filter() 提供过滤器,这样不符合过滤谓词的行将不会被包含在返回的表中。同样,像 Parquet 这样的格式可以利用此过滤器来减少所需的 I/O 量。

187// Read a dataset, but select only column "b" and only rows where b < 4.
188//
189// This is useful when you only want a few columns from a dataset. Where possible,
190// Datasets will push down the column selection such that less work is done.
191arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
192    const std::shared_ptr<fs::FileSystem>& filesystem,
193    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
194  fs::FileSelector selector;
195  selector.base_dir = base_dir;
196  ARROW_ASSIGN_OR_RAISE(
197      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
198                                                       ds::FileSystemFactoryOptions()));
199  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
200  // Read specified columns with a row filter
201  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
202  ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
203  ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
204  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
205  return scanner->ToTable();
206}

投影列#

除了选择列之外,arrow::dataset::ScannerBuilder::Project() 还可以用于更复杂的投影,例如重命名列、将它们强制转换为其他类型,甚至基于表达式计算来派生新列。

在这种情况下,我们传入一个表达式向量用于构建列值,以及一个列名向量。

210// Read a dataset, but with column projection.
211//
212// This is useful to derive new columns from existing data. For example, here we
213// demonstrate casting a column to a different type, and turning a numeric column into a
214// boolean column based on a predicate. You could also rename columns or perform
215// computations involving multiple columns.
216arrow::Result<std::shared_ptr<arrow::Table>> ProjectDataset(
217    const std::shared_ptr<fs::FileSystem>& filesystem,
218    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
219  fs::FileSelector selector;
220  selector.base_dir = base_dir;
221  ARROW_ASSIGN_OR_RAISE(
222      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
223                                                       ds::FileSystemFactoryOptions()));
224  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
225  // Read specified columns with a row filter
226  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
227  ARROW_RETURN_NOT_OK(scan_builder->Project(
228      {
229          // Leave column "a" as-is.
230          cp::field_ref("a"),
231          // Cast column "b" to float32.
232          cp::call("cast", {cp::field_ref("b")},
233                   arrow::compute::CastOptions::Safe(arrow::float32())),
234          // Derive a boolean column from "c".
235          cp::equal(cp::field_ref("c"), cp::literal(1)),
236      },
237      {"a_renamed", "b_as_float32", "c_1"}));
238  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
239  return scanner->ToTable();
240}

这也决定了列的选择;结果表中将仅包含给定的列。如果您希望在现有列之外包含一个派生列,您可以从数据集模式构建表达式。

244// Read a dataset, but with column projection.
245//
246// This time, we read all original columns plus one derived column. This simply combines
247// the previous two examples: selecting a subset of columns by name, and deriving new
248// columns with an expression.
249arrow::Result<std::shared_ptr<arrow::Table>> SelectAndProjectDataset(
250    const std::shared_ptr<fs::FileSystem>& filesystem,
251    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
252  fs::FileSelector selector;
253  selector.base_dir = base_dir;
254  ARROW_ASSIGN_OR_RAISE(
255      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
256                                                       ds::FileSystemFactoryOptions()));
257  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
258  // Read specified columns with a row filter
259  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
260  std::vector<std::string> names;
261  std::vector<cp::Expression> exprs;
262  // Read all the original columns.
263  for (const auto& field : dataset->schema()->fields()) {
264    names.push_back(field->name());
265    exprs.push_back(cp::field_ref(field->name()));
266  }
267  // Also derive a new column.
268  names.emplace_back("b_large");
269  exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
270  ARROW_RETURN_NOT_OK(scan_builder->Project(exprs, names));
271  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
272  return scanner->ToTable();
273}

注意

当结合使用过滤器和投影时,Arrow 将确定所有需要读取的必要列。例如,如果您在最终未选择的列上进行过滤,Arrow 仍然会读取该列以评估过滤器。

读取和写入分区数据#

到目前为止,我们一直在处理由扁平目录和文件组成的数据集。通常,数据集会包含一个或多个经常被用于过滤的列。通过将文件组织成嵌套的目录结构,我们可以定义一个分区数据集,而不是必须先读取再过滤数据。在这种结构中,子目录名称保存了有关该目录中存储哪些数据子集的信息。这样,我们可以通过使用这些信息来避免加载不匹配过滤器的文件,从而更有效地过滤数据。

例如,一个按年和月分区的数据集可能具有以下布局:

dataset_name/
  year=2007/
    month=01/
       data0.parquet
       data1.parquet
       ...
    month=02/
       data0.parquet
       data1.parquet
       ...
    month=03/
    ...
  year=2008/
    month=01/
    ...
  ...

上述分区方案使用了 Apache Hive 中常见的“/key=value/”目录名。按照此约定,位于 dataset_name/year=2007/month=01/data0.parquet 的文件仅包含 year == 2007month == 01 的数据。

让我们创建一个小型分区数据集。为此,我们将使用 Arrow 的数据集写入功能。

116// Set up a dataset by writing files with partitioning
117arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
118    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
119  auto base_path = root_path + "/parquet_dataset";
120  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
121  // Create an Arrow Table
122  auto schema = arrow::schema(
123      {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
124       arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
125  std::vector<std::shared_ptr<arrow::Array>> arrays(4);
126  arrow::NumericBuilder<arrow::Int64Type> builder;
127  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
128  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
129  builder.Reset();
130  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
131  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
132  builder.Reset();
133  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
134  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
135  arrow::StringBuilder string_builder;
136  ARROW_RETURN_NOT_OK(
137      string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
138  ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
139  auto table = arrow::Table::Make(schema, arrays);
140  // Write it using Datasets
141  auto dataset = std::make_shared<ds::InMemoryDataset>(table);
142  ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
143  ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
144
145  // The partition schema determines which fields are part of the partitioning.
146  auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
147  // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
148  auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
149  // We'll write Parquet files.
150  auto format = std::make_shared<ds::ParquetFileFormat>();
151  ds::FileSystemDatasetWriteOptions write_options;
152  write_options.file_write_options = format->DefaultWriteOptions();
153  write_options.filesystem = filesystem;
154  write_options.base_dir = base_path;
155  write_options.partitioning = partitioning;
156  write_options.basename_template = "part{i}.parquet";
157  ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
158  return base_path;
159}

以上操作创建了一个包含两个子目录(“part=a”和“part=b”)的目录,并且写入这些目录的 Parquet 文件不再包含“part”列。

在读取此数据集时,我们现在指定该数据集应使用类似 Hive 的分区方案。

277// Read an entire dataset, but with partitioning information.
278arrow::Result<std::shared_ptr<arrow::Table>> ScanPartitionedDataset(
279    const std::shared_ptr<fs::FileSystem>& filesystem,
280    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
281  fs::FileSelector selector;
282  selector.base_dir = base_dir;
283  selector.recursive = true;  // Make sure to search subdirectories
284  ds::FileSystemFactoryOptions options;
285  // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
286  // schema.
287  options.partitioning = ds::HivePartitioning::MakeFactory();
288  ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
289                                          filesystem, selector, format, options));
290  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
291  // Print out the fragments
292  ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments());
293  for (const auto& fragment : fragments) {
294    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
295    std::cout << "Partition expression: "
296              << (*fragment)->partition_expression().ToString() << std::endl;
297  }
298  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
299  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
300  return scanner->ToTable();
301}

尽管分区字段不包含在实际的 Parquet 文件中,但在扫描此数据集时,它们会被添加回生成的表中。

$ ./debug/dataset_documentation_example file:///tmp parquet_hive partitioned
Found fragment: /tmp/parquet_dataset/part=a/part0.parquet
Partition expression: (part == "a")
Found fragment: /tmp/parquet_dataset/part=b/part1.parquet
Partition expression: (part == "b")
Read 20 rows
a: int64
  -- field metadata --
  PARQUET:field_id: '1'
b: double
  -- field metadata --
  PARQUET:field_id: '2'
c: int64
  -- field metadata --
  PARQUET:field_id: '3'
part: string
----
# snip...

我们现在可以过滤分区键,如果文件不匹配过滤器,这完全避免了加载这些文件。

305// Read an entire dataset, but with partitioning information. Also, filter the dataset on
306// the partition values.
307arrow::Result<std::shared_ptr<arrow::Table>> FilterPartitionedDataset(
308    const std::shared_ptr<fs::FileSystem>& filesystem,
309    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
310  fs::FileSelector selector;
311  selector.base_dir = base_dir;
312  selector.recursive = true;
313  ds::FileSystemFactoryOptions options;
314  options.partitioning = ds::HivePartitioning::MakeFactory();
315  ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
316                                          filesystem, selector, format, options));
317  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
318  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
319  // Filter based on the partition values. This will mean that we won't even read the
320  // files whose partition expressions don't match the filter.
321  ARROW_RETURN_NOT_OK(
322      scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
323  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
324  return scanner->ToTable();
325}

不同的分区方案#

上面的示例使用了类似 Hive 的目录方案,例如“/year=2009/month=11/day=15”。我们通过传入 Hive 分区工厂来指定这一点。在这种情况下,分区键的类型是从文件路径中推断出来的。

也可以直接构建分区并显式定义分区键的模式。例如:

auto part = std::make_shared<ds::HivePartitioning>(arrow::schema({
    arrow::field("year", arrow::int16()),
    arrow::field("month", arrow::int8()),
    arrow::field("day", arrow::int32())
}));

Arrow 支持另一种分区方案,即“目录分区”(directory partitioning),其中文件路径中的片段代表分区键的值,而不包含名称(字段名隐含在片段的索引中)。例如,给定字段名“year”、“month”和“day”,一个路径可能是“/2019/11/15”。

由于名称未包含在文件路径中,因此在构建目录分区时必须指定这些名称。

auto part = ds::DirectoryPartitioning::MakeFactory({"year", "month", "day"});

目录分区也支持提供完整的模式,而不是从文件路径推断类型。

分区性能考量#

数据集分区有两个影响性能的方面:它增加了文件数量,并在文件周围创建了目录结构。两者都有好处也有代价。根据配置和数据集的大小,代价可能会超过收益。

由于分区将数据集拆分为多个文件,分区数据集可以并行读取和写入。但是,每个额外的文件都会增加文件系统交互处理的一点开销。它还增加了整体数据集大小,因为每个文件都有一些共享元数据。例如,每个 Parquet 文件都包含模式和组级统计信息。分区数量是文件数量的下限。如果您按日期对一年的数据进行分区,您将至少有 365 个文件。如果您进一步按另一个具有 1,000 个唯一值的维度进行分区,您将拥有多达 365,000 个文件。这种精细的分区通常会导致大量主要由元数据组成的小文件。

分区数据集创建了嵌套的文件夹结构,这允许我们修剪扫描中加载的文件。然而,这增加了发现数据集中文件的开销,因为我们需要递归地“列出目录”来查找数据文件。过细的分区会导致问题:按日期对一年数据进行分区需要 365 次列出调用来查找所有文件;增加另一个基数为 1,000 的列将使调用次数变为 365,365 次。

最优化分区布局将取决于您的数据、访问模式以及将要读取数据的系统。大多数系统,包括 Arrow,应该都能在各种文件大小和分区布局下工作,但您应该避免一些极端情况。以下准则可以帮助避免一些已知的最坏情况:

  • 避免文件小于 20MB 和大于 2GB。

  • 避免分区布局中包含超过 10,000 个不同分区。

对于文件中包含组概念的文件格式,例如 Parquet,也适用类似的准则。行组可以在读取时提供并行性,并允许根据统计数据跳过数据,但非常小的组可能会导致元数据在文件大小中占据很大一部分。Arrow 的文件写入器在大多数情况下都为组大小提供了合理的默认值。

从其他数据源读取#

读取内存数据#

如果您已经在内存中拥有想要使用 Datasets API 处理的数据(例如,为了过滤/投影数据或将其写入文件系统),您可以将其包装在 arrow::dataset::InMemoryDataset 中。

auto table = arrow::Table::FromRecordBatches(...);
auto dataset = std::make_shared<arrow::dataset::InMemoryDataset>(std::move(table));
// Scan the dataset, filter, it, etc.
auto scanner_builder = dataset->NewScan();

在示例中,我们使用了 InMemoryDataset 将我们的示例数据写入本地磁盘,并在其余示例中使用。

116// Set up a dataset by writing files with partitioning
117arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
118    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
119  auto base_path = root_path + "/parquet_dataset";
120  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
121  // Create an Arrow Table
122  auto schema = arrow::schema(
123      {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
124       arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
125  std::vector<std::shared_ptr<arrow::Array>> arrays(4);
126  arrow::NumericBuilder<arrow::Int64Type> builder;
127  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
128  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
129  builder.Reset();
130  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
131  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
132  builder.Reset();
133  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
134  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
135  arrow::StringBuilder string_builder;
136  ARROW_RETURN_NOT_OK(
137      string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
138  ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
139  auto table = arrow::Table::Make(schema, arrays);
140  // Write it using Datasets
141  auto dataset = std::make_shared<ds::InMemoryDataset>(table);
142  ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
143  ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
144
145  // The partition schema determines which fields are part of the partitioning.
146  auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
147  // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
148  auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
149  // We'll write Parquet files.
150  auto format = std::make_shared<ds::ParquetFileFormat>();
151  ds::FileSystemDatasetWriteOptions write_options;
152  write_options.file_write_options = format->DefaultWriteOptions();
153  write_options.filesystem = filesystem;
154  write_options.base_dir = base_path;
155  write_options.partitioning = partitioning;
156  write_options.basename_template = "part{i}.parquet";
157  ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
158  return base_path;
159}

从云存储读取#

除了本地文件外,Arrow Datasets 还支持通过传递不同的文件系统来从云存储系统(如 Amazon S3)中读取。

请参阅 文件系统 文档以了解更多关于可用文件系统的详细信息。

关于事务和 ACID 保证的说明#

Dataset API 不提供事务支持或任何 ACID 保证。这会影响读取和写入。并发读取没有问题。并发写入或与读取同时进行的写入可能会出现意外行为。可以使用多种方法来避免在同一文件上操作,例如为每个写入器使用唯一的基名模板、为新文件使用临时目录,或者使用独立的文件列表存储,而不是依赖目录发现。

在写入过程中意外终止进程可能导致系统处于不一致状态。写入调用通常在要写入的字节完全交付给操作系统页面缓存后立即返回。即使写入操作已完成,如果在写入调用后立即发生突然断电,文件的一部分仍可能丢失。

大多数文件格式都有在末尾写入的“魔数”。这意味着可以安全地检测和丢弃部分文件写入。CSV 文件格式没有这样的概念,部分写入的 CSV 文件可能会被检测为有效文件。

完整示例#

  1// Licensed to the Apache Software Foundation (ASF) under one
  2// or more contributor license agreements. See the NOTICE file
  3// distributed with this work for additional information
  4// regarding copyright ownership. The ASF licenses this file
  5// to you under the Apache License, Version 2.0 (the
  6// "License"); you may not use this file except in compliance
  7// with the License. You may obtain a copy of the License at
  8//
  9// https://apache.org/licenses/LICENSE-2.0
 10//
 11// Unless required by applicable law or agreed to in writing,
 12// software distributed under the License is distributed on an
 13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 14// KIND, either express or implied. See the License for the
 15// specific language governing permissions and limitations
 16// under the License.
 17
 18// This example showcases various ways to work with Datasets. It's
 19// intended to be paired with the documentation.
 20
 21#include <arrow/api.h>
 22#include <arrow/compute/api.h>
 23#include <arrow/compute/cast.h>
 24#include <arrow/dataset/dataset.h>
 25#include <arrow/dataset/discovery.h>
 26#include <arrow/dataset/file_base.h>
 27#include <arrow/dataset/file_ipc.h>
 28#include <arrow/dataset/file_parquet.h>
 29#include <arrow/dataset/scanner.h>
 30#include <arrow/filesystem/filesystem.h>
 31#include <arrow/ipc/writer.h>
 32#include <arrow/util/iterator.h>
 33#include <parquet/arrow/writer.h>
 34#include "arrow/compute/expression.h"
 35
 36#include <iostream>
 37#include <vector>
 38
 39namespace ds = arrow::dataset;
 40namespace fs = arrow::fs;
 41namespace cp = arrow::compute;
 42
 43/**
 44 * \brief Run Example
 45 *
 46 * ./debug/dataset-documentation-example file:///<some_path>/<some_directory> parquet
 47 */
 48
 49// (Doc section: Reading Datasets)
 50// Generate some data for the rest of this example.
 51arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
 52  auto schema =
 53      arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
 54                     arrow::field("c", arrow::int64())});
 55  std::shared_ptr<arrow::Array> array_a;
 56  std::shared_ptr<arrow::Array> array_b;
 57  std::shared_ptr<arrow::Array> array_c;
 58  arrow::NumericBuilder<arrow::Int64Type> builder;
 59  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
 60  ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
 61  builder.Reset();
 62  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
 63  ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
 64  builder.Reset();
 65  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
 66  ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
 67  return arrow::Table::Make(schema, {array_a, array_b, array_c});
 68}
 69
 70// Set up a dataset by writing two Parquet files.
 71arrow::Result<std::string> CreateExampleParquetDataset(
 72    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
 73  auto base_path = root_path + "/parquet_dataset";
 74  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
 75  // Create an Arrow Table
 76  ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
 77  // Write it into two Parquet files
 78  ARROW_ASSIGN_OR_RAISE(auto output,
 79                        filesystem->OpenOutputStream(base_path + "/data1.parquet"));
 80  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
 81      *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
 82  ARROW_ASSIGN_OR_RAISE(output,
 83                        filesystem->OpenOutputStream(base_path + "/data2.parquet"));
 84  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
 85      *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
 86  return base_path;
 87}
 88// (Doc section: Reading Datasets)
 89
 90// (Doc section: Reading different file formats)
 91// Set up a dataset by writing two Feather files.
 92arrow::Result<std::string> CreateExampleFeatherDataset(
 93    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
 94  auto base_path = root_path + "/feather_dataset";
 95  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
 96  // Create an Arrow Table
 97  ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
 98  // Write it into two Feather files
 99  ARROW_ASSIGN_OR_RAISE(auto output,
100                        filesystem->OpenOutputStream(base_path + "/data1.feather"));
101  ARROW_ASSIGN_OR_RAISE(auto writer,
102                        arrow::ipc::MakeFileWriter(output.get(), table->schema()));
103  ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(0, 5)));
104  ARROW_RETURN_NOT_OK(writer->Close());
105  ARROW_ASSIGN_OR_RAISE(output,
106                        filesystem->OpenOutputStream(base_path + "/data2.feather"));
107  ARROW_ASSIGN_OR_RAISE(writer,
108                        arrow::ipc::MakeFileWriter(output.get(), table->schema()));
109  ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(5)));
110  ARROW_RETURN_NOT_OK(writer->Close());
111  return base_path;
112}
113// (Doc section: Reading different file formats)
114
115// (Doc section: Reading and writing partitioned data)
116// Set up a dataset by writing files with partitioning
117arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
118    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
119  auto base_path = root_path + "/parquet_dataset";
120  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
121  // Create an Arrow Table
122  auto schema = arrow::schema(
123      {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
124       arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
125  std::vector<std::shared_ptr<arrow::Array>> arrays(4);
126  arrow::NumericBuilder<arrow::Int64Type> builder;
127  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
128  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
129  builder.Reset();
130  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
131  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
132  builder.Reset();
133  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
134  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
135  arrow::StringBuilder string_builder;
136  ARROW_RETURN_NOT_OK(
137      string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
138  ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
139  auto table = arrow::Table::Make(schema, arrays);
140  // Write it using Datasets
141  auto dataset = std::make_shared<ds::InMemoryDataset>(table);
142  ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
143  ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
144
145  // The partition schema determines which fields are part of the partitioning.
146  auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
147  // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
148  auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
149  // We'll write Parquet files.
150  auto format = std::make_shared<ds::ParquetFileFormat>();
151  ds::FileSystemDatasetWriteOptions write_options;
152  write_options.file_write_options = format->DefaultWriteOptions();
153  write_options.filesystem = filesystem;
154  write_options.base_dir = base_path;
155  write_options.partitioning = partitioning;
156  write_options.basename_template = "part{i}.parquet";
157  ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
158  return base_path;
159}
160// (Doc section: Reading and writing partitioned data)
161
162// (Doc section: Dataset discovery)
163// Read the whole dataset with the given format, without partitioning.
164arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
165    const std::shared_ptr<fs::FileSystem>& filesystem,
166    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
167  // Create a dataset by scanning the filesystem for files
168  fs::FileSelector selector;
169  selector.base_dir = base_dir;
170  ARROW_ASSIGN_OR_RAISE(
171      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
172                                                       ds::FileSystemFactoryOptions()));
173  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
174  // Print out the fragments
175  ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
176  for (const auto& fragment : fragments) {
177    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
178  }
179  // Read the entire dataset as a Table
180  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
181  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
182  return scanner->ToTable();
183}
184// (Doc section: Dataset discovery)
185
186// (Doc section: Filtering data)
187// Read a dataset, but select only column "b" and only rows where b < 4.
188//
189// This is useful when you only want a few columns from a dataset. Where possible,
190// Datasets will push down the column selection such that less work is done.
191arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
192    const std::shared_ptr<fs::FileSystem>& filesystem,
193    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
194  fs::FileSelector selector;
195  selector.base_dir = base_dir;
196  ARROW_ASSIGN_OR_RAISE(
197      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
198                                                       ds::FileSystemFactoryOptions()));
199  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
200  // Read specified columns with a row filter
201  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
202  ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
203  ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
204  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
205  return scanner->ToTable();
206}
207// (Doc section: Filtering data)
208
209// (Doc section: Projecting columns)
210// Read a dataset, but with column projection.
211//
212// This is useful to derive new columns from existing data. For example, here we
213// demonstrate casting a column to a different type, and turning a numeric column into a
214// boolean column based on a predicate. You could also rename columns or perform
215// computations involving multiple columns.
216arrow::Result<std::shared_ptr<arrow::Table>> ProjectDataset(
217    const std::shared_ptr<fs::FileSystem>& filesystem,
218    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
219  fs::FileSelector selector;
220  selector.base_dir = base_dir;
221  ARROW_ASSIGN_OR_RAISE(
222      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
223                                                       ds::FileSystemFactoryOptions()));
224  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
225  // Read specified columns with a row filter
226  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
227  ARROW_RETURN_NOT_OK(scan_builder->Project(
228      {
229          // Leave column "a" as-is.
230          cp::field_ref("a"),
231          // Cast column "b" to float32.
232          cp::call("cast", {cp::field_ref("b")},
233                   arrow::compute::CastOptions::Safe(arrow::float32())),
234          // Derive a boolean column from "c".
235          cp::equal(cp::field_ref("c"), cp::literal(1)),
236      },
237      {"a_renamed", "b_as_float32", "c_1"}));
238  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
239  return scanner->ToTable();
240}
241// (Doc section: Projecting columns)
242
243// (Doc section: Projecting columns #2)
244// Read a dataset, but with column projection.
245//
246// This time, we read all original columns plus one derived column. This simply combines
247// the previous two examples: selecting a subset of columns by name, and deriving new
248// columns with an expression.
249arrow::Result<std::shared_ptr<arrow::Table>> SelectAndProjectDataset(
250    const std::shared_ptr<fs::FileSystem>& filesystem,
251    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
252  fs::FileSelector selector;
253  selector.base_dir = base_dir;
254  ARROW_ASSIGN_OR_RAISE(
255      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
256                                                       ds::FileSystemFactoryOptions()));
257  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
258  // Read specified columns with a row filter
259  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
260  std::vector<std::string> names;
261  std::vector<cp::Expression> exprs;
262  // Read all the original columns.
263  for (const auto& field : dataset->schema()->fields()) {
264    names.push_back(field->name());
265    exprs.push_back(cp::field_ref(field->name()));
266  }
267  // Also derive a new column.
268  names.emplace_back("b_large");
269  exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
270  ARROW_RETURN_NOT_OK(scan_builder->Project(exprs, names));
271  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
272  return scanner->ToTable();
273}
274// (Doc section: Projecting columns #2)
275
276// (Doc section: Reading and writing partitioned data #2)
277// Read an entire dataset, but with partitioning information.
278arrow::Result<std::shared_ptr<arrow::Table>> ScanPartitionedDataset(
279    const std::shared_ptr<fs::FileSystem>& filesystem,
280    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
281  fs::FileSelector selector;
282  selector.base_dir = base_dir;
283  selector.recursive = true;  // Make sure to search subdirectories
284  ds::FileSystemFactoryOptions options;
285  // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
286  // schema.
287  options.partitioning = ds::HivePartitioning::MakeFactory();
288  ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
289                                          filesystem, selector, format, options));
290  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
291  // Print out the fragments
292  ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments());
293  for (const auto& fragment : fragments) {
294    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
295    std::cout << "Partition expression: "
296              << (*fragment)->partition_expression().ToString() << std::endl;
297  }
298  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
299  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
300  return scanner->ToTable();
301}
302// (Doc section: Reading and writing partitioned data #2)
303
304// (Doc section: Reading and writing partitioned data #3)
305// Read an entire dataset, but with partitioning information. Also, filter the dataset on
306// the partition values.
307arrow::Result<std::shared_ptr<arrow::Table>> FilterPartitionedDataset(
308    const std::shared_ptr<fs::FileSystem>& filesystem,
309    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
310  fs::FileSelector selector;
311  selector.base_dir = base_dir;
312  selector.recursive = true;
313  ds::FileSystemFactoryOptions options;
314  options.partitioning = ds::HivePartitioning::MakeFactory();
315  ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
316                                          filesystem, selector, format, options));
317  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
318  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
319  // Filter based on the partition values. This will mean that we won't even read the
320  // files whose partition expressions don't match the filter.
321  ARROW_RETURN_NOT_OK(
322      scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
323  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
324  return scanner->ToTable();
325}
326// (Doc section: Reading and writing partitioned data #3)
327
328arrow::Status RunDatasetDocumentation(const std::string& format_name,
329                                      const std::string& uri, const std::string& mode) {
330  ARROW_RETURN_NOT_OK(arrow::compute::Initialize());
331
332  std::string base_path;
333  std::shared_ptr<ds::FileFormat> format;
334  std::string root_path;
335  ARROW_ASSIGN_OR_RAISE(auto fs, fs::FileSystemFromUri(uri, &root_path));
336
337  if (format_name == "feather") {
338    format = std::make_shared<ds::IpcFileFormat>();
339    ARROW_ASSIGN_OR_RAISE(base_path, CreateExampleFeatherDataset(fs, root_path));
340  } else if (format_name == "parquet") {
341    format = std::make_shared<ds::ParquetFileFormat>();
342    ARROW_ASSIGN_OR_RAISE(base_path, CreateExampleParquetDataset(fs, root_path));
343  } else if (format_name == "parquet_hive") {
344    format = std::make_shared<ds::ParquetFileFormat>();
345    ARROW_ASSIGN_OR_RAISE(base_path,
346                          CreateExampleParquetHivePartitionedDataset(fs, root_path));
347  } else {
348    std::cerr << "Unknown format: " << format_name << std::endl;
349    std::cerr << "Supported formats: feather, parquet, parquet_hive" << std::endl;
350    return arrow::Status::ExecutionError("Dataset creating failed.");
351  }
352
353  std::shared_ptr<arrow::Table> table;
354  if (mode == "no_filter") {
355    ARROW_ASSIGN_OR_RAISE(table, ScanWholeDataset(fs, format, base_path));
356  } else if (mode == "filter") {
357    ARROW_ASSIGN_OR_RAISE(table, FilterAndSelectDataset(fs, format, base_path));
358  } else if (mode == "project") {
359    ARROW_ASSIGN_OR_RAISE(table, ProjectDataset(fs, format, base_path));
360  } else if (mode == "select_project") {
361    ARROW_ASSIGN_OR_RAISE(table, SelectAndProjectDataset(fs, format, base_path));
362  } else if (mode == "partitioned") {
363    ARROW_ASSIGN_OR_RAISE(table, ScanPartitionedDataset(fs, format, base_path));
364  } else if (mode == "filter_partitioned") {
365    ARROW_ASSIGN_OR_RAISE(table, FilterPartitionedDataset(fs, format, base_path));
366  } else {
367    std::cerr << "Unknown mode: " << mode << std::endl;
368    std::cerr
369        << "Supported modes: no_filter, filter, project, select_project, partitioned"
370        << std::endl;
371    return arrow::Status::ExecutionError("Dataset reading failed.");
372  }
373  std::cout << "Read " << table->num_rows() << " rows" << std::endl;
374  std::cout << table->ToString() << std::endl;
375  return arrow::Status::OK();
376}
377
378int main(int argc, char** argv) {
379  if (argc < 3) {
380    // Fake success for CI purposes.
381    return EXIT_SUCCESS;
382  }
383
384  std::string uri = argv[1];
385  std::string format_name = argv[2];
386  std::string mode = argc > 3 ? argv[3] : "no_filter";
387
388  auto status = RunDatasetDocumentation(format_name, uri, mode);
389  if (!status.ok()) {
390    std::cerr << status.ToString() << std::endl;
391    return EXIT_FAILURE;
392  }
393  return EXIT_SUCCESS;
394}