表格数据集#

另请参阅

数据集 API 参考

Arrow 数据集库提供高效处理表格、可能大于内存以及多文件数据集的功能。这包括

  • 一个统一的接口,支持不同的来源和文件格式以及不同的文件系统(本地、云)。

  • 源发现(爬取目录、处理具有各种分区方案的分区数据集、基本模式规范化,…)

  • 优化的读取,带有谓词下推(过滤行)、投影(选择和派生列),以及可选的并行读取。

目前支持的文件格式有 Parquet、Feather / Arrow IPC、CSV 和 ORC(请注意,ORC 数据集目前只能读取,还不能写入)。目标是在未来扩展对其他文件格式和数据源(例如数据库连接)的支持。

读取数据集#

对于下面的示例,让我们创建一个由包含两个 parquet 文件的目录组成的小数据集

49// Generate some data for the rest of this example.
50arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
51  auto schema =
52      arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
53                     arrow::field("c", arrow::int64())});
54  std::shared_ptr<arrow::Array> array_a;
55  std::shared_ptr<arrow::Array> array_b;
56  std::shared_ptr<arrow::Array> array_c;
57  arrow::NumericBuilder<arrow::Int64Type> builder;
58  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
59  ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
60  builder.Reset();
61  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
62  ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
63  builder.Reset();
64  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
65  ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
66  return arrow::Table::Make(schema, {array_a, array_b, array_c});
67}
68
69// Set up a dataset by writing two Parquet files.
70arrow::Result<std::string> CreateExampleParquetDataset(
71    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
72  auto base_path = root_path + "/parquet_dataset";
73  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
74  // Create an Arrow Table
75  ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
76  // Write it into two Parquet files
77  ARROW_ASSIGN_OR_RAISE(auto output,
78                        filesystem->OpenOutputStream(base_path + "/data1.parquet"));
79  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
80      *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
81  ARROW_ASSIGN_OR_RAISE(output,
82                        filesystem->OpenOutputStream(base_path + "/data2.parquet"));
83  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
84      *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
85  return base_path;
86}

(请参阅底部的完整示例:完整示例。)

数据集发现#

可以使用各种 arrow::dataset::DatasetFactory 对象创建 arrow::dataset::Dataset 对象。 在这里,我们将使用 arrow::dataset::FileSystemDatasetFactory,它可以创建一个给定基目录路径的数据集

162// Read the whole dataset with the given format, without partitioning.
163arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
164    const std::shared_ptr<fs::FileSystem>& filesystem,
165    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
166  // Create a dataset by scanning the filesystem for files
167  fs::FileSelector selector;
168  selector.base_dir = base_dir;
169  ARROW_ASSIGN_OR_RAISE(
170      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
171                                                       ds::FileSystemFactoryOptions()));
172  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
173  // Print out the fragments
174  ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
175  for (const auto& fragment : fragments) {
176    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
177  }
178  // Read the entire dataset as a Table
179  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
180  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
181  return scanner->ToTable();
182}

我们还传递要使用的文件系统以及用于读取的文件格式。 这使我们可以在(例如)读取本地文件或 Amazon S3 中的文件之间,或在 Parquet 和 CSV 之间进行选择。

除了搜索基目录之外,我们还可以手动列出文件路径。

创建 arrow::dataset::Dataset 不会开始读取数据本身。 它仅爬取目录以查找所有文件(如果需要),可以使用 arrow::dataset::FileSystemDataset::files() 检索这些文件

// Print out the files crawled (only for FileSystemDataset)
for (const auto& filename : dataset->files()) {
  std::cout << filename << std::endl;
}

…并推断数据集的模式(默认情况下从第一个文件推断)

std::cout << dataset->schema()->ToString() << std::endl;

使用 arrow::dataset::Dataset::NewScan() 方法,我们可以构建一个 arrow::dataset::Scanner 并使用 arrow::dataset::Scanner::ToTable() 方法将数据集(或其中的一部分)读入 arrow::Table

162// Read the whole dataset with the given format, without partitioning.
163arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
164    const std::shared_ptr<fs::FileSystem>& filesystem,
165    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
166  // Create a dataset by scanning the filesystem for files
167  fs::FileSelector selector;
168  selector.base_dir = base_dir;
169  ARROW_ASSIGN_OR_RAISE(
170      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
171                                                       ds::FileSystemFactoryOptions()));
172  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
173  // Print out the fragments
174  ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
175  for (const auto& fragment : fragments) {
176    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
177  }
178  // Read the entire dataset as a Table
179  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
180  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
181  return scanner->ToTable();
182}

注意

根据数据集的大小,这可能需要大量内存;请参阅下面关于过滤/投影的 过滤数据

读取不同的文件格式#

以上示例使用本地磁盘上的 Parquet 文件,但 Dataset API 在多种文件格式和文件系统上提供了一致的接口。(有关后者的更多信息,请参阅 从云存储读取。) 目前,支持 Parquet、ORC、Feather / Arrow IPC 和 CSV 文件格式;计划在未来支持更多格式。

如果我们把表保存为 Feather 文件而不是 Parquet 文件

 90// Set up a dataset by writing two Feather files.
 91arrow::Result<std::string> CreateExampleFeatherDataset(
 92    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
 93  auto base_path = root_path + "/feather_dataset";
 94  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
 95  // Create an Arrow Table
 96  ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
 97  // Write it into two Feather files
 98  ARROW_ASSIGN_OR_RAISE(auto output,
 99                        filesystem->OpenOutputStream(base_path + "/data1.feather"));
100  ARROW_ASSIGN_OR_RAISE(auto writer,
101                        arrow::ipc::MakeFileWriter(output.get(), table->schema()));
102  ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(0, 5)));
103  ARROW_RETURN_NOT_OK(writer->Close());
104  ARROW_ASSIGN_OR_RAISE(output,
105                        filesystem->OpenOutputStream(base_path + "/data2.feather"));
106  ARROW_ASSIGN_OR_RAISE(writer,
107                        arrow::ipc::MakeFileWriter(output.get(), table->schema()));
108  ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(5)));
109  ARROW_RETURN_NOT_OK(writer->Close());
110  return base_path;
111}

…然后我们可以通过传递一个 arrow::dataset::IpcFileFormat 来读取 Feather 文件

auto format = std::make_shared<ds::ParquetFileFormat>();
// ...
auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
                   .ValueOrDie();

自定义文件格式#

arrow::dataset::FileFormat 对象具有控制文件读取方式的属性。 例如

auto format = std::make_shared<ds::ParquetFileFormat>();
format->reader_options.dict_columns.insert("a");

将配置列 "a" 在读取时进行字典编码。 类似地,设置 arrow::dataset::CsvFileFormat::parse_options 允许我们更改诸如读取逗号分隔或制表符分隔数据之类的内容。

此外,将 arrow::dataset::FragmentScanOptions 传递给 arrow::dataset::ScannerBuilder::FragmentScanOptions() 提供了对数据扫描的细粒度控制。 例如,对于 CSV 文件,我们可以更改在扫描时将哪些值转换为布尔值 true 和 false。

过滤数据#

到目前为止,我们一直在读取整个数据集,但是如果只需要数据的子集,这可能会浪费时间或内存来读取我们不需要的数据。 arrow::dataset::Scanner 提供了对读取哪些数据的控制。

在此代码段中,我们使用 arrow::dataset::ScannerBuilder::Project() 来选择要读取的列

186// Read a dataset, but select only column "b" and only rows where b < 4.
187//
188// This is useful when you only want a few columns from a dataset. Where possible,
189// Datasets will push down the column selection such that less work is done.
190arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
191    const std::shared_ptr<fs::FileSystem>& filesystem,
192    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
193  fs::FileSelector selector;
194  selector.base_dir = base_dir;
195  ARROW_ASSIGN_OR_RAISE(
196      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
197                                                       ds::FileSystemFactoryOptions()));
198  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
199  // Read specified columns with a row filter
200  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
201  ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
202  ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
203  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
204  return scanner->ToTable();
205}

某些格式(例如 Parquet)可以通过仅从文件系统读取指定的列来降低 I/O 成本。

可以使用 arrow::dataset::ScannerBuilder::Filter() 提供过滤器,以便不匹配过滤器谓词的行将不包含在返回的表中。 同样,某些格式(例如 Parquet)可以使用此过滤器来减少所需的 I/O 量。

186// Read a dataset, but select only column "b" and only rows where b < 4.
187//
188// This is useful when you only want a few columns from a dataset. Where possible,
189// Datasets will push down the column selection such that less work is done.
190arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
191    const std::shared_ptr<fs::FileSystem>& filesystem,
192    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
193  fs::FileSelector selector;
194  selector.base_dir = base_dir;
195  ARROW_ASSIGN_OR_RAISE(
196      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
197                                                       ds::FileSystemFactoryOptions()));
198  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
199  // Read specified columns with a row filter
200  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
201  ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
202  ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
203  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
204  return scanner->ToTable();
205}

投影列#

除了选择列之外,arrow::dataset::ScannerBuilder::Project() 还可以用于更复杂的投影,例如重命名列、将它们转换为其他类型,甚至基于评估表达式来派生新列。

在这种情况下,我们传递一个用于构造列值的表达式向量和一个用于列名的向量。

209// Read a dataset, but with column projection.
210//
211// This is useful to derive new columns from existing data. For example, here we
212// demonstrate casting a column to a different type, and turning a numeric column into a
213// boolean column based on a predicate. You could also rename columns or perform
214// computations involving multiple columns.
215arrow::Result<std::shared_ptr<arrow::Table>> ProjectDataset(
216    const std::shared_ptr<fs::FileSystem>& filesystem,
217    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
218  fs::FileSelector selector;
219  selector.base_dir = base_dir;
220  ARROW_ASSIGN_OR_RAISE(
221      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
222                                                       ds::FileSystemFactoryOptions()));
223  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
224  // Read specified columns with a row filter
225  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
226  ARROW_RETURN_NOT_OK(scan_builder->Project(
227      {
228          // Leave column "a" as-is.
229          cp::field_ref("a"),
230          // Cast column "b" to float32.
231          cp::call("cast", {cp::field_ref("b")},
232                   arrow::compute::CastOptions::Safe(arrow::float32())),
233          // Derive a boolean column from "c".
234          cp::equal(cp::field_ref("c"), cp::literal(1)),
235      },
236      {"a_renamed", "b_as_float32", "c_1"}));
237  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
238  return scanner->ToTable();
239}

这也决定了列的选择;结果表中只会存在给定的列。如果你想在现有列的基础上包含一个派生列,你可以从数据集模式构建表达式。

243// Read a dataset, but with column projection.
244//
245// This time, we read all original columns plus one derived column. This simply combines
246// the previous two examples: selecting a subset of columns by name, and deriving new
247// columns with an expression.
248arrow::Result<std::shared_ptr<arrow::Table>> SelectAndProjectDataset(
249    const std::shared_ptr<fs::FileSystem>& filesystem,
250    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
251  fs::FileSelector selector;
252  selector.base_dir = base_dir;
253  ARROW_ASSIGN_OR_RAISE(
254      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
255                                                       ds::FileSystemFactoryOptions()));
256  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
257  // Read specified columns with a row filter
258  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
259  std::vector<std::string> names;
260  std::vector<cp::Expression> exprs;
261  // Read all the original columns.
262  for (const auto& field : dataset->schema()->fields()) {
263    names.push_back(field->name());
264    exprs.push_back(cp::field_ref(field->name()));
265  }
266  // Also derive a new column.
267  names.emplace_back("b_large");
268  exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
269  ARROW_RETURN_NOT_OK(scan_builder->Project(exprs, names));
270  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
271  return scanner->ToTable();
272}

注意

当组合筛选器和投影时,Arrow 将确定所有需要读取的列。例如,如果你筛选一个最终未被选择的列,Arrow 仍然会读取该列来评估筛选器。

读取和写入分区数据#

到目前为止,我们一直在处理由包含文件的平面目录组成的数据集。通常,数据集会有一个或多个经常被筛选的列。我们可以将文件组织成嵌套的目录结构,从而定义一个分区数据集,其中子目录名称包含关于存储在该目录中的数据子集的信息,而无需读取然后筛选数据。然后,我们可以更有效地筛选数据,通过使用该信息来避免加载不匹配筛选器的文件。

例如,一个按年份和月份分区的数据集可能具有以下布局:

dataset_name/
  year=2007/
    month=01/
       data0.parquet
       data1.parquet
       ...
    month=02/
       data0.parquet
       data1.parquet
       ...
    month=03/
    ...
  year=2008/
    month=01/
    ...
  ...

上面的分区方案使用 “/key=value/” 目录名称,如 Apache Hive 中所见。按照此约定,dataset_name/year=2007/month=01/data0.parquet 中的文件仅包含 year == 2007month == 01 的数据。

让我们创建一个小的分区数据集。为此,我们将使用 Arrow 的数据集写入功能。

115// Set up a dataset by writing files with partitioning
116arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
117    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
118  auto base_path = root_path + "/parquet_dataset";
119  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
120  // Create an Arrow Table
121  auto schema = arrow::schema(
122      {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
123       arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
124  std::vector<std::shared_ptr<arrow::Array>> arrays(4);
125  arrow::NumericBuilder<arrow::Int64Type> builder;
126  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
127  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
128  builder.Reset();
129  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
130  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
131  builder.Reset();
132  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
133  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
134  arrow::StringBuilder string_builder;
135  ARROW_RETURN_NOT_OK(
136      string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
137  ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
138  auto table = arrow::Table::Make(schema, arrays);
139  // Write it using Datasets
140  auto dataset = std::make_shared<ds::InMemoryDataset>(table);
141  ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
142  ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
143
144  // The partition schema determines which fields are part of the partitioning.
145  auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
146  // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
147  auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
148  // We'll write Parquet files.
149  auto format = std::make_shared<ds::ParquetFileFormat>();
150  ds::FileSystemDatasetWriteOptions write_options;
151  write_options.file_write_options = format->DefaultWriteOptions();
152  write_options.filesystem = filesystem;
153  write_options.base_dir = base_path;
154  write_options.partitioning = partitioning;
155  write_options.basename_template = "part{i}.parquet";
156  ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
157  return base_path;
158}

上面创建了一个包含两个子目录(“part=a” 和 “part=b”)的目录,并且写入这些目录的 Parquet 文件不再包含 “part” 列。

读取此数据集时,我们现在指定数据集应使用类似 Hive 的分区方案。

276// Read an entire dataset, but with partitioning information.
277arrow::Result<std::shared_ptr<arrow::Table>> ScanPartitionedDataset(
278    const std::shared_ptr<fs::FileSystem>& filesystem,
279    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
280  fs::FileSelector selector;
281  selector.base_dir = base_dir;
282  selector.recursive = true;  // Make sure to search subdirectories
283  ds::FileSystemFactoryOptions options;
284  // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
285  // schema.
286  options.partitioning = ds::HivePartitioning::MakeFactory();
287  ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
288                                          filesystem, selector, format, options));
289  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
290  // Print out the fragments
291  ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments());
292  for (const auto& fragment : fragments) {
293    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
294    std::cout << "Partition expression: "
295              << (*fragment)->partition_expression().ToString() << std::endl;
296  }
297  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
298  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
299  return scanner->ToTable();
300}

虽然分区字段未包含在实际的 Parquet 文件中,但在扫描此数据集时,它们将被添加回结果表中。

$ ./debug/dataset_documentation_example file:///tmp parquet_hive partitioned
Found fragment: /tmp/parquet_dataset/part=a/part0.parquet
Partition expression: (part == "a")
Found fragment: /tmp/parquet_dataset/part=b/part1.parquet
Partition expression: (part == "b")
Read 20 rows
a: int64
  -- field metadata --
  PARQUET:field_id: '1'
b: double
  -- field metadata --
  PARQUET:field_id: '2'
c: int64
  -- field metadata --
  PARQUET:field_id: '3'
part: string
----
# snip...

我们现在可以筛选分区键,如果它们不匹配筛选器,则可以完全避免加载文件。

304// Read an entire dataset, but with partitioning information. Also, filter the dataset on
305// the partition values.
306arrow::Result<std::shared_ptr<arrow::Table>> FilterPartitionedDataset(
307    const std::shared_ptr<fs::FileSystem>& filesystem,
308    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
309  fs::FileSelector selector;
310  selector.base_dir = base_dir;
311  selector.recursive = true;
312  ds::FileSystemFactoryOptions options;
313  options.partitioning = ds::HivePartitioning::MakeFactory();
314  ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
315                                          filesystem, selector, format, options));
316  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
317  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
318  // Filter based on the partition values. This will mean that we won't even read the
319  // files whose partition expressions don't match the filter.
320  ARROW_RETURN_NOT_OK(
321      scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
322  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
323  return scanner->ToTable();
324}

不同的分区方案#

上面的例子使用了一种类似 Hive 的目录方案,例如 “/year=2009/month=11/day=15”。我们通过传递 Hive 分区工厂来指定这一点。在这种情况下,分区键的类型是从文件路径推断出来的。

也可以直接构造分区并显式定义分区键的模式。例如:

auto part = std::make_shared<ds::HivePartitioning>(arrow::schema({
    arrow::field("year", arrow::int16()),
    arrow::field("month", arrow::int8()),
    arrow::field("day", arrow::int32())
}));

Arrow 支持另一种分区方案,“目录分区”,其中文件路径中的段表示分区键的值,而不包含名称(字段名称在段的索引中是隐式的)。例如,给定字段名称 “year”、“month” 和 “day”,一个路径可能是 “/2019/11/15”。

由于名称未包含在文件路径中,因此在构造目录分区时必须指定这些名称。

auto part = ds::DirectoryPartitioning::MakeFactory({"year", "month", "day"});

目录分区还支持提供完整模式,而不是从文件路径推断类型。

分区性能注意事项#

分区数据集有两个影响性能的方面:它增加了文件数量,并且它围绕文件创建了一个目录结构。这两者都有好处也有代价。根据配置和数据集的大小,成本可能超过收益。

因为分区将数据集分成多个文件,所以可以并行读取和写入分区数据集。但是,每个额外的文件都会增加文件系统交互处理中的一些开销。它还会增加整体数据集大小,因为每个文件都有一些共享元数据。例如,每个 parquet 文件都包含 schema 和组级统计信息。分区的数量是文件数量的下限。 如果你按日期对数据集进行分区,且有一年的数据,那么你将至少有 365 个文件。 如果你进一步按另一个维度(具有 1,000 个唯一值)进行分区,那么你将最多有 365,000 个文件。 这种精细的分区通常会导致主要由元数据组成的小文件。

分区数据集创建嵌套的文件夹结构,这允许我们修剪扫描中加载的文件。 但是,这会增加发现数据集中文件的开销,因为我们需要递归地“列出目录”以查找数据文件。 过多的精细分区可能会导致问题:按日期对数据集进行一年的数据分区将需要 365 次列表调用才能找到所有文件;添加另一个基数为 1,000 的列将导致 365,365 次调用。

最佳分区布局将取决于你的数据、访问模式以及将读取数据的系统。 大多数系统(包括 Arrow)都应跨越一系列文件大小和分区布局工作,但是应该避免一些极端情况。 这些准则可以帮助避免一些已知的最坏情况

  • 避免小于 20MB 且大于 2GB 的文件。

  • 避免超过 10,000 个不同分区的分区布局。

对于在文件中有组概念的文件格式(例如 Parquet),适用类似的准则。 行组可以在读取时提供并行性,并允许基于统计信息跳过数据,但是非常小的组会导致元数据成为文件大小的很大一部分。 在大多数情况下,Arrow 的文件写入器为组大小提供了合理的默认值。

从其他数据源读取#

读取内存中的数据#

如果你已经在内存中拥有想要与 Datasets API 一起使用的数据(例如,筛选/投影数据,或将其写入文件系统),你可以将其包装在 arrow::dataset::InMemoryDataset 中。

auto table = arrow::Table::FromRecordBatches(...);
auto dataset = std::make_shared<arrow::dataset::InMemoryDataset>(std::move(table));
// Scan the dataset, filter, it, etc.
auto scanner_builder = dataset->NewScan();

在该示例中,我们使用 InMemoryDataset 将示例数据写入本地磁盘,该磁盘在示例的其余部分中使用。

115// Set up a dataset by writing files with partitioning
116arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
117    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
118  auto base_path = root_path + "/parquet_dataset";
119  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
120  // Create an Arrow Table
121  auto schema = arrow::schema(
122      {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
123       arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
124  std::vector<std::shared_ptr<arrow::Array>> arrays(4);
125  arrow::NumericBuilder<arrow::Int64Type> builder;
126  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
127  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
128  builder.Reset();
129  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
130  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
131  builder.Reset();
132  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
133  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
134  arrow::StringBuilder string_builder;
135  ARROW_RETURN_NOT_OK(
136      string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
137  ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
138  auto table = arrow::Table::Make(schema, arrays);
139  // Write it using Datasets
140  auto dataset = std::make_shared<ds::InMemoryDataset>(table);
141  ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
142  ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
143
144  // The partition schema determines which fields are part of the partitioning.
145  auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
146  // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
147  auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
148  // We'll write Parquet files.
149  auto format = std::make_shared<ds::ParquetFileFormat>();
150  ds::FileSystemDatasetWriteOptions write_options;
151  write_options.file_write_options = format->DefaultWriteOptions();
152  write_options.filesystem = filesystem;
153  write_options.base_dir = base_path;
154  write_options.partitioning = partitioning;
155  write_options.basename_template = "part{i}.parquet";
156  ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
157  return base_path;
158}

从云存储读取#

除了本地文件,Arrow Datasets 还支持通过传递不同的文件系统从云存储系统(例如 Amazon S3)读取。

有关可用文件系统的更多详细信息,请参见 文件系统 文档。

关于事务和 ACID 保证的说明#

dataset API 不提供事务支持或任何 ACID 保证。这会影响读取和写入。并发读取没问题。并发写入或与读取同时进行的写入可能具有意外的行为。可以使用各种方法来避免对同一文件进行操作,例如为每个写入器使用唯一的 basename 模板,为新文件使用临时目录,或单独存储文件列表,而不是依赖目录发现。

在写入过程中意外终止进程可能会使系统处于不一致的状态。写入调用通常会在要写入的字节完全传送到 OS 页面缓存后立即返回。即使已经完成写入操作,如果在写入调用后立即发生突然断电,文件的一部分仍可能丢失。

大多数文件格式都有魔数,写在末尾。 这意味着可以安全地检测和丢弃部分文件写入。 CSV 文件格式没有任何这样的概念,部分写入的 CSV 文件可能会被检测为有效。

完整示例#

  1// Licensed to the Apache Software Foundation (ASF) under one
  2// or more contributor license agreements. See the NOTICE file
  3// distributed with this work for additional information
  4// regarding copyright ownership. The ASF licenses this file
  5// to you under the Apache License, Version 2.0 (the
  6// "License"); you may not use this file except in compliance
  7// with the License. You may obtain a copy of the License at
  8//
  9// https://apache.org/licenses/LICENSE-2.0
 10//
 11// Unless required by applicable law or agreed to in writing,
 12// software distributed under the License is distributed on an
 13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 14// KIND, either express or implied. See the License for the
 15// specific language governing permissions and limitations
 16// under the License.
 17
 18// This example showcases various ways to work with Datasets. It's
 19// intended to be paired with the documentation.
 20
 21#include <arrow/api.h>
 22#include <arrow/compute/cast.h>
 23#include <arrow/dataset/dataset.h>
 24#include <arrow/dataset/discovery.h>
 25#include <arrow/dataset/file_base.h>
 26#include <arrow/dataset/file_ipc.h>
 27#include <arrow/dataset/file_parquet.h>
 28#include <arrow/dataset/scanner.h>
 29#include <arrow/filesystem/filesystem.h>
 30#include <arrow/ipc/writer.h>
 31#include <arrow/util/iterator.h>
 32#include <parquet/arrow/writer.h>
 33#include "arrow/compute/expression.h"
 34
 35#include <iostream>
 36#include <vector>
 37
 38namespace ds = arrow::dataset;
 39namespace fs = arrow::fs;
 40namespace cp = arrow::compute;
 41
 42/**
 43 * \brief Run Example
 44 *
 45 * ./debug/dataset-documentation-example file:///<some_path>/<some_directory> parquet
 46 */
 47
 48// (Doc section: Reading Datasets)
 49// Generate some data for the rest of this example.
 50arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
 51  auto schema =
 52      arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
 53                     arrow::field("c", arrow::int64())});
 54  std::shared_ptr<arrow::Array> array_a;
 55  std::shared_ptr<arrow::Array> array_b;
 56  std::shared_ptr<arrow::Array> array_c;
 57  arrow::NumericBuilder<arrow::Int64Type> builder;
 58  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
 59  ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
 60  builder.Reset();
 61  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
 62  ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
 63  builder.Reset();
 64  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
 65  ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
 66  return arrow::Table::Make(schema, {array_a, array_b, array_c});
 67}
 68
 69// Set up a dataset by writing two Parquet files.
 70arrow::Result<std::string> CreateExampleParquetDataset(
 71    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
 72  auto base_path = root_path + "/parquet_dataset";
 73  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
 74  // Create an Arrow Table
 75  ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
 76  // Write it into two Parquet files
 77  ARROW_ASSIGN_OR_RAISE(auto output,
 78                        filesystem->OpenOutputStream(base_path + "/data1.parquet"));
 79  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
 80      *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
 81  ARROW_ASSIGN_OR_RAISE(output,
 82                        filesystem->OpenOutputStream(base_path + "/data2.parquet"));
 83  ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
 84      *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
 85  return base_path;
 86}
 87// (Doc section: Reading Datasets)
 88
 89// (Doc section: Reading different file formats)
 90// Set up a dataset by writing two Feather files.
 91arrow::Result<std::string> CreateExampleFeatherDataset(
 92    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
 93  auto base_path = root_path + "/feather_dataset";
 94  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
 95  // Create an Arrow Table
 96  ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
 97  // Write it into two Feather files
 98  ARROW_ASSIGN_OR_RAISE(auto output,
 99                        filesystem->OpenOutputStream(base_path + "/data1.feather"));
100  ARROW_ASSIGN_OR_RAISE(auto writer,
101                        arrow::ipc::MakeFileWriter(output.get(), table->schema()));
102  ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(0, 5)));
103  ARROW_RETURN_NOT_OK(writer->Close());
104  ARROW_ASSIGN_OR_RAISE(output,
105                        filesystem->OpenOutputStream(base_path + "/data2.feather"));
106  ARROW_ASSIGN_OR_RAISE(writer,
107                        arrow::ipc::MakeFileWriter(output.get(), table->schema()));
108  ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(5)));
109  ARROW_RETURN_NOT_OK(writer->Close());
110  return base_path;
111}
112// (Doc section: Reading different file formats)
113
114// (Doc section: Reading and writing partitioned data)
115// Set up a dataset by writing files with partitioning
116arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
117    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
118  auto base_path = root_path + "/parquet_dataset";
119  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
120  // Create an Arrow Table
121  auto schema = arrow::schema(
122      {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
123       arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
124  std::vector<std::shared_ptr<arrow::Array>> arrays(4);
125  arrow::NumericBuilder<arrow::Int64Type> builder;
126  ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
127  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
128  builder.Reset();
129  ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
130  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
131  builder.Reset();
132  ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
133  ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
134  arrow::StringBuilder string_builder;
135  ARROW_RETURN_NOT_OK(
136      string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
137  ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
138  auto table = arrow::Table::Make(schema, arrays);
139  // Write it using Datasets
140  auto dataset = std::make_shared<ds::InMemoryDataset>(table);
141  ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
142  ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
143
144  // The partition schema determines which fields are part of the partitioning.
145  auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
146  // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
147  auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
148  // We'll write Parquet files.
149  auto format = std::make_shared<ds::ParquetFileFormat>();
150  ds::FileSystemDatasetWriteOptions write_options;
151  write_options.file_write_options = format->DefaultWriteOptions();
152  write_options.filesystem = filesystem;
153  write_options.base_dir = base_path;
154  write_options.partitioning = partitioning;
155  write_options.basename_template = "part{i}.parquet";
156  ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
157  return base_path;
158}
159// (Doc section: Reading and writing partitioned data)
160
161// (Doc section: Dataset discovery)
162// Read the whole dataset with the given format, without partitioning.
163arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
164    const std::shared_ptr<fs::FileSystem>& filesystem,
165    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
166  // Create a dataset by scanning the filesystem for files
167  fs::FileSelector selector;
168  selector.base_dir = base_dir;
169  ARROW_ASSIGN_OR_RAISE(
170      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
171                                                       ds::FileSystemFactoryOptions()));
172  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
173  // Print out the fragments
174  ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
175  for (const auto& fragment : fragments) {
176    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
177  }
178  // Read the entire dataset as a Table
179  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
180  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
181  return scanner->ToTable();
182}
183// (Doc section: Dataset discovery)
184
185// (Doc section: Filtering data)
186// Read a dataset, but select only column "b" and only rows where b < 4.
187//
188// This is useful when you only want a few columns from a dataset. Where possible,
189// Datasets will push down the column selection such that less work is done.
190arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
191    const std::shared_ptr<fs::FileSystem>& filesystem,
192    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
193  fs::FileSelector selector;
194  selector.base_dir = base_dir;
195  ARROW_ASSIGN_OR_RAISE(
196      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
197                                                       ds::FileSystemFactoryOptions()));
198  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
199  // Read specified columns with a row filter
200  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
201  ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
202  ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
203  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
204  return scanner->ToTable();
205}
206// (Doc section: Filtering data)
207
208// (Doc section: Projecting columns)
209// Read a dataset, but with column projection.
210//
211// This is useful to derive new columns from existing data. For example, here we
212// demonstrate casting a column to a different type, and turning a numeric column into a
213// boolean column based on a predicate. You could also rename columns or perform
214// computations involving multiple columns.
215arrow::Result<std::shared_ptr<arrow::Table>> ProjectDataset(
216    const std::shared_ptr<fs::FileSystem>& filesystem,
217    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
218  fs::FileSelector selector;
219  selector.base_dir = base_dir;
220  ARROW_ASSIGN_OR_RAISE(
221      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
222                                                       ds::FileSystemFactoryOptions()));
223  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
224  // Read specified columns with a row filter
225  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
226  ARROW_RETURN_NOT_OK(scan_builder->Project(
227      {
228          // Leave column "a" as-is.
229          cp::field_ref("a"),
230          // Cast column "b" to float32.
231          cp::call("cast", {cp::field_ref("b")},
232                   arrow::compute::CastOptions::Safe(arrow::float32())),
233          // Derive a boolean column from "c".
234          cp::equal(cp::field_ref("c"), cp::literal(1)),
235      },
236      {"a_renamed", "b_as_float32", "c_1"}));
237  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
238  return scanner->ToTable();
239}
240// (Doc section: Projecting columns)
241
242// (Doc section: Projecting columns #2)
243// Read a dataset, but with column projection.
244//
245// This time, we read all original columns plus one derived column. This simply combines
246// the previous two examples: selecting a subset of columns by name, and deriving new
247// columns with an expression.
248arrow::Result<std::shared_ptr<arrow::Table>> SelectAndProjectDataset(
249    const std::shared_ptr<fs::FileSystem>& filesystem,
250    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
251  fs::FileSelector selector;
252  selector.base_dir = base_dir;
253  ARROW_ASSIGN_OR_RAISE(
254      auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
255                                                       ds::FileSystemFactoryOptions()));
256  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
257  // Read specified columns with a row filter
258  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
259  std::vector<std::string> names;
260  std::vector<cp::Expression> exprs;
261  // Read all the original columns.
262  for (const auto& field : dataset->schema()->fields()) {
263    names.push_back(field->name());
264    exprs.push_back(cp::field_ref(field->name()));
265  }
266  // Also derive a new column.
267  names.emplace_back("b_large");
268  exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
269  ARROW_RETURN_NOT_OK(scan_builder->Project(exprs, names));
270  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
271  return scanner->ToTable();
272}
273// (Doc section: Projecting columns #2)
274
275// (Doc section: Reading and writing partitioned data #2)
276// Read an entire dataset, but with partitioning information.
277arrow::Result<std::shared_ptr<arrow::Table>> ScanPartitionedDataset(
278    const std::shared_ptr<fs::FileSystem>& filesystem,
279    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
280  fs::FileSelector selector;
281  selector.base_dir = base_dir;
282  selector.recursive = true;  // Make sure to search subdirectories
283  ds::FileSystemFactoryOptions options;
284  // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
285  // schema.
286  options.partitioning = ds::HivePartitioning::MakeFactory();
287  ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
288                                          filesystem, selector, format, options));
289  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
290  // Print out the fragments
291  ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments());
292  for (const auto& fragment : fragments) {
293    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
294    std::cout << "Partition expression: "
295              << (*fragment)->partition_expression().ToString() << std::endl;
296  }
297  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
298  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
299  return scanner->ToTable();
300}
301// (Doc section: Reading and writing partitioned data #2)
302
303// (Doc section: Reading and writing partitioned data #3)
304// Read an entire dataset, but with partitioning information. Also, filter the dataset on
305// the partition values.
306arrow::Result<std::shared_ptr<arrow::Table>> FilterPartitionedDataset(
307    const std::shared_ptr<fs::FileSystem>& filesystem,
308    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
309  fs::FileSelector selector;
310  selector.base_dir = base_dir;
311  selector.recursive = true;
312  ds::FileSystemFactoryOptions options;
313  options.partitioning = ds::HivePartitioning::MakeFactory();
314  ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
315                                          filesystem, selector, format, options));
316  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
317  ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
318  // Filter based on the partition values. This will mean that we won't even read the
319  // files whose partition expressions don't match the filter.
320  ARROW_RETURN_NOT_OK(
321      scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
322  ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
323  return scanner->ToTable();
324}
325// (Doc section: Reading and writing partitioned data #3)
326
327arrow::Status RunDatasetDocumentation(const std::string& format_name,
328                                      const std::string& uri, const std::string& mode) {
329  std::string base_path;
330  std::shared_ptr<ds::FileFormat> format;
331  std::string root_path;
332  ARROW_ASSIGN_OR_RAISE(auto fs, fs::FileSystemFromUri(uri, &root_path));
333
334  if (format_name == "feather") {
335    format = std::make_shared<ds::IpcFileFormat>();
336    ARROW_ASSIGN_OR_RAISE(base_path, CreateExampleFeatherDataset(fs, root_path));
337  } else if (format_name == "parquet") {
338    format = std::make_shared<ds::ParquetFileFormat>();
339    ARROW_ASSIGN_OR_RAISE(base_path, CreateExampleParquetDataset(fs, root_path));
340  } else if (format_name == "parquet_hive") {
341    format = std::make_shared<ds::ParquetFileFormat>();
342    ARROW_ASSIGN_OR_RAISE(base_path,
343                          CreateExampleParquetHivePartitionedDataset(fs, root_path));
344  } else {
345    std::cerr << "Unknown format: " << format_name << std::endl;
346    std::cerr << "Supported formats: feather, parquet, parquet_hive" << std::endl;
347    return arrow::Status::ExecutionError("Dataset creating failed.");
348  }
349
350  std::shared_ptr<arrow::Table> table;
351  if (mode == "no_filter") {
352    ARROW_ASSIGN_OR_RAISE(table, ScanWholeDataset(fs, format, base_path));
353  } else if (mode == "filter") {
354    ARROW_ASSIGN_OR_RAISE(table, FilterAndSelectDataset(fs, format, base_path));
355  } else if (mode == "project") {
356    ARROW_ASSIGN_OR_RAISE(table, ProjectDataset(fs, format, base_path));
357  } else if (mode == "select_project") {
358    ARROW_ASSIGN_OR_RAISE(table, SelectAndProjectDataset(fs, format, base_path));
359  } else if (mode == "partitioned") {
360    ARROW_ASSIGN_OR_RAISE(table, ScanPartitionedDataset(fs, format, base_path));
361  } else if (mode == "filter_partitioned") {
362    ARROW_ASSIGN_OR_RAISE(table, FilterPartitionedDataset(fs, format, base_path));
363  } else {
364    std::cerr << "Unknown mode: " << mode << std::endl;
365    std::cerr
366        << "Supported modes: no_filter, filter, project, select_project, partitioned"
367        << std::endl;
368    return arrow::Status::ExecutionError("Dataset reading failed.");
369  }
370  std::cout << "Read " << table->num_rows() << " rows" << std::endl;
371  std::cout << table->ToString() << std::endl;
372  return arrow::Status::OK();
373}
374
375int main(int argc, char** argv) {
376  if (argc < 3) {
377    // Fake success for CI purposes.
378    return EXIT_SUCCESS;
379  }
380
381  std::string uri = argv[1];
382  std::string format_name = argv[2];
383  std::string mode = argc > 3 ? argv[3] : "no_filter";
384
385  auto status = RunDatasetDocumentation(format_name, uri, mode);
386  if (!status.ok()) {
387    std::cerr << status.ToString() << std::endl;
388    return EXIT_FAILURE;
389  }
390  return EXIT_SUCCESS;
391}