表格数据集#
另请参阅
Arrow 数据集库提供了有效处理表格、可能大于内存以及多文件数据集的功能。这包括
支持不同来源和文件格式以及不同文件系统(本地、云)的统一接口。
源发现(爬取目录、处理具有各种分区方案的分区数据集、基本模式规范化等)。
使用谓词下推(过滤行)、投影(选择和派生列)以及可选的并行读取进行优化读取。
目前支持的文件格式为 Parquet、Feather/Arrow IPC、CSV 和 ORC(请注意,ORC 数据集目前只能读取,尚不能写入)。目标是在未来扩展对其他文件格式和数据源(例如数据库连接)的支持。
读取数据集#
对于下面的示例,让我们创建一个由包含两个 parquet 文件的目录组成的小型数据集
49// Generate some data for the rest of this example.
50arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
51 auto schema =
52 arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
53 arrow::field("c", arrow::int64())});
54 std::shared_ptr<arrow::Array> array_a;
55 std::shared_ptr<arrow::Array> array_b;
56 std::shared_ptr<arrow::Array> array_c;
57 arrow::NumericBuilder<arrow::Int64Type> builder;
58 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
59 ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
60 builder.Reset();
61 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
62 ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
63 builder.Reset();
64 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
65 ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
66 return arrow::Table::Make(schema, {array_a, array_b, array_c});
67}
68
69// Set up a dataset by writing two Parquet files.
70arrow::Result<std::string> CreateExampleParquetDataset(
71 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
72 auto base_path = root_path + "/parquet_dataset";
73 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
74 // Create an Arrow Table
75 ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
76 // Write it into two Parquet files
77 ARROW_ASSIGN_OR_RAISE(auto output,
78 filesystem->OpenOutputStream(base_path + "/data1.parquet"));
79 ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
80 *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
81 ARROW_ASSIGN_OR_RAISE(output,
82 filesystem->OpenOutputStream(base_path + "/data2.parquet"));
83 ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
84 *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
85 return base_path;
86}
(请参阅底部的完整示例:关于事务和 ACID 保证的说明。)
数据集发现#
可以使用各种arrow::dataset::DatasetFactory
对象创建arrow::dataset::Dataset
对象。在这里,我们将使用arrow::dataset::FileSystemDatasetFactory
,它可以根据基本目录路径创建数据集
162// Read the whole dataset with the given format, without partitioning.
163arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
164 const std::shared_ptr<fs::FileSystem>& filesystem,
165 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
166 // Create a dataset by scanning the filesystem for files
167 fs::FileSelector selector;
168 selector.base_dir = base_dir;
169 ARROW_ASSIGN_OR_RAISE(
170 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
171 ds::FileSystemFactoryOptions()));
172 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
173 // Print out the fragments
174 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
175 for (const auto& fragment : fragments) {
176 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
177 }
178 // Read the entire dataset as a Table
179 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
180 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
181 return scanner->ToTable();
182}
我们还传递了要使用的文件系统和用于读取的文件格式。这让我们可以在(例如)读取本地文件或 Amazon S3 中的文件,或在 Parquet 和 CSV 之间进行选择。
除了搜索基本目录之外,我们还可以手动列出文件路径。
创建arrow::dataset::Dataset
不会开始读取数据本身。它仅爬取目录以查找所有文件(如果需要),这些文件可以使用arrow::dataset::FileSystemDataset::files()
检索
// Print out the files crawled (only for FileSystemDataset)
for (const auto& filename : dataset->files()) {
std::cout << filename << std::endl;
}
……并推断数据集的模式(默认情况下从第一个文件推断)。
std::cout << dataset->schema()->ToString() << std::endl;
使用arrow::dataset::Dataset::NewScan()
方法,我们可以构建一个arrow::dataset::Scanner
并使用arrow::dataset::Scanner::ToTable()
方法将数据集(或其一部分)读取到arrow::Table
中
162// Read the whole dataset with the given format, without partitioning.
163arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
164 const std::shared_ptr<fs::FileSystem>& filesystem,
165 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
166 // Create a dataset by scanning the filesystem for files
167 fs::FileSelector selector;
168 selector.base_dir = base_dir;
169 ARROW_ASSIGN_OR_RAISE(
170 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
171 ds::FileSystemFactoryOptions()));
172 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
173 // Print out the fragments
174 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
175 for (const auto& fragment : fragments) {
176 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
177 }
178 // Read the entire dataset as a Table
179 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
180 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
181 return scanner->ToTable();
182}
注意
根据数据集的大小,这可能需要大量内存;请参阅下面关于过滤/投影的过滤数据。
读取不同的文件格式#
以上示例使用本地磁盘上的 Parquet 文件,但 Dataset API 在多种文件格式和文件系统之间提供一致的接口。(有关后者的更多信息,请参阅从云存储读取。)目前,支持 Parquet、ORC、Feather/Arrow IPC 和 CSV 文件格式;未来计划添加更多格式。
如果我们将表另存为 Feather 文件而不是 Parquet 文件
90// Set up a dataset by writing two Feather files.
91arrow::Result<std::string> CreateExampleFeatherDataset(
92 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
93 auto base_path = root_path + "/feather_dataset";
94 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
95 // Create an Arrow Table
96 ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
97 // Write it into two Feather files
98 ARROW_ASSIGN_OR_RAISE(auto output,
99 filesystem->OpenOutputStream(base_path + "/data1.feather"));
100 ARROW_ASSIGN_OR_RAISE(auto writer,
101 arrow::ipc::MakeFileWriter(output.get(), table->schema()));
102 ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(0, 5)));
103 ARROW_RETURN_NOT_OK(writer->Close());
104 ARROW_ASSIGN_OR_RAISE(output,
105 filesystem->OpenOutputStream(base_path + "/data2.feather"));
106 ARROW_ASSIGN_OR_RAISE(writer,
107 arrow::ipc::MakeFileWriter(output.get(), table->schema()));
108 ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(5)));
109 ARROW_RETURN_NOT_OK(writer->Close());
110 return base_path;
111}
……然后我们可以通过传递arrow::dataset::IpcFileFormat
来读取 Feather 文件
auto format = std::make_shared<ds::ParquetFileFormat>();
// ...
auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
.ValueOrDie();
自定义文件格式#
arrow::dataset::FileFormat
对象具有控制文件读取方式的属性。例如
auto format = std::make_shared<ds::ParquetFileFormat>();
format->reader_options.dict_columns.insert("a");
将配置列"a"
在读取时进行字典编码。类似地,设置arrow::dataset::CsvFileFormat::parse_options
允许我们更改诸如读取逗号分隔或制表符分隔数据之类的事情。
此外,将arrow::dataset::FragmentScanOptions
传递给arrow::dataset::ScannerBuilder::FragmentScanOptions()
提供了对数据扫描的细粒度控制。例如,对于 CSV 文件,我们可以更改在扫描时转换为布尔值 true 和 false 的值。
过滤数据#
到目前为止,我们一直在读取整个数据集,但如果我们只需要数据的一个子集,则读取不需要的数据可能会浪费时间或内存。arrow::dataset::Scanner
提供了对要读取数据的控制。
在此代码片段中,我们使用arrow::dataset::ScannerBuilder::Project()
选择要读取的列
186// Read a dataset, but select only column "b" and only rows where b < 4.
187//
188// This is useful when you only want a few columns from a dataset. Where possible,
189// Datasets will push down the column selection such that less work is done.
190arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
191 const std::shared_ptr<fs::FileSystem>& filesystem,
192 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
193 fs::FileSelector selector;
194 selector.base_dir = base_dir;
195 ARROW_ASSIGN_OR_RAISE(
196 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
197 ds::FileSystemFactoryOptions()));
198 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
199 // Read specified columns with a row filter
200 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
201 ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
202 ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
203 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
204 return scanner->ToTable();
205}
某些格式(例如 Parquet)可以通过仅从文件系统读取指定的列来减少这里的 I/O 成本。
可以使用arrow::dataset::ScannerBuilder::Filter()
提供过滤器,以便返回的表中不包含与过滤器谓词不匹配的行。同样,某些格式(例如 Parquet)可以使用此过滤器来减少所需的 I/O 量。
186// Read a dataset, but select only column "b" and only rows where b < 4.
187//
188// This is useful when you only want a few columns from a dataset. Where possible,
189// Datasets will push down the column selection such that less work is done.
190arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
191 const std::shared_ptr<fs::FileSystem>& filesystem,
192 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
193 fs::FileSelector selector;
194 selector.base_dir = base_dir;
195 ARROW_ASSIGN_OR_RAISE(
196 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
197 ds::FileSystemFactoryOptions()));
198 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
199 // Read specified columns with a row filter
200 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
201 ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
202 ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
203 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
204 return scanner->ToTable();
205}
投影列#
除了选择列之外,arrow::dataset::ScannerBuilder::Project()
还可以用于更复杂的投影,例如重命名列、将它们转换为其他类型,甚至根据计算表达式派生新列。
在这种情况下,我们传递一个用于构造列值的表达式向量和一个用于列名称的向量
209// Read a dataset, but with column projection.
210//
211// This is useful to derive new columns from existing data. For example, here we
212// demonstrate casting a column to a different type, and turning a numeric column into a
213// boolean column based on a predicate. You could also rename columns or perform
214// computations involving multiple columns.
215arrow::Result<std::shared_ptr<arrow::Table>> ProjectDataset(
216 const std::shared_ptr<fs::FileSystem>& filesystem,
217 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
218 fs::FileSelector selector;
219 selector.base_dir = base_dir;
220 ARROW_ASSIGN_OR_RAISE(
221 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
222 ds::FileSystemFactoryOptions()));
223 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
224 // Read specified columns with a row filter
225 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
226 ARROW_RETURN_NOT_OK(scan_builder->Project(
227 {
228 // Leave column "a" as-is.
229 cp::field_ref("a"),
230 // Cast column "b" to float32.
231 cp::call("cast", {cp::field_ref("b")},
232 arrow::compute::CastOptions::Safe(arrow::float32())),
233 // Derive a boolean column from "c".
234 cp::equal(cp::field_ref("c"), cp::literal(1)),
235 },
236 {"a_renamed", "b_as_float32", "c_1"}));
237 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
238 return scanner->ToTable();
239}
这也决定了列选择;结果表中只会存在给定的列。如果您想除了现有列之外还包含一个派生列,则可以根据数据集模式构建表达式
243// Read a dataset, but with column projection.
244//
245// This time, we read all original columns plus one derived column. This simply combines
246// the previous two examples: selecting a subset of columns by name, and deriving new
247// columns with an expression.
248arrow::Result<std::shared_ptr<arrow::Table>> SelectAndProjectDataset(
249 const std::shared_ptr<fs::FileSystem>& filesystem,
250 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
251 fs::FileSelector selector;
252 selector.base_dir = base_dir;
253 ARROW_ASSIGN_OR_RAISE(
254 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
255 ds::FileSystemFactoryOptions()));
256 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
257 // Read specified columns with a row filter
258 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
259 std::vector<std::string> names;
260 std::vector<cp::Expression> exprs;
261 // Read all the original columns.
262 for (const auto& field : dataset->schema()->fields()) {
263 names.push_back(field->name());
264 exprs.push_back(cp::field_ref(field->name()));
265 }
266 // Also derive a new column.
267 names.emplace_back("b_large");
268 exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
269 ARROW_RETURN_NOT_OK(scan_builder->Project(exprs, names));
270 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
271 return scanner->ToTable();
272}
注意
组合过滤器和投影时,Arrow 将确定要读取的所有必要列。例如,如果您根据最终未选择的列进行过滤,Arrow 仍将读取该列以计算过滤器。
读取和写入分区数据#
到目前为止,我们一直在使用由包含文件的扁平目录组成的数据集。通常,数据集将包含一个或多个经常对其进行过滤的列。通过将文件组织成嵌套的目录结构,我们可以定义一个分区数据集,其中子目录名称包含有关该目录中存储的数据子集的信息。然后,我们可以通过使用该信息来避免加载与过滤器不匹配的文件,从而更有效地过滤数据。
例如,按年份和月份分区的数据集可能具有以下布局
dataset_name/
year=2007/
month=01/
data0.parquet
data1.parquet
...
month=02/
data0.parquet
data1.parquet
...
month=03/
...
year=2008/
month=01/
...
...
上述分区方案使用在 Apache Hive 中找到的“/key=value/”目录名称。根据此约定,dataset_name/year=2007/month=01/data0.parquet
中的文件仅包含year == 2007
且month == 01
的数据。
让我们创建一个小型分区数据集。为此,我们将使用 Arrow 的数据集写入功能。
115// Set up a dataset by writing files with partitioning
116arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
117 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
118 auto base_path = root_path + "/parquet_dataset";
119 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
120 // Create an Arrow Table
121 auto schema = arrow::schema(
122 {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
123 arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
124 std::vector<std::shared_ptr<arrow::Array>> arrays(4);
125 arrow::NumericBuilder<arrow::Int64Type> builder;
126 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
127 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
128 builder.Reset();
129 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
130 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
131 builder.Reset();
132 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
133 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
134 arrow::StringBuilder string_builder;
135 ARROW_RETURN_NOT_OK(
136 string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
137 ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
138 auto table = arrow::Table::Make(schema, arrays);
139 // Write it using Datasets
140 auto dataset = std::make_shared<ds::InMemoryDataset>(table);
141 ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
142 ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
143
144 // The partition schema determines which fields are part of the partitioning.
145 auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
146 // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
147 auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
148 // We'll write Parquet files.
149 auto format = std::make_shared<ds::ParquetFileFormat>();
150 ds::FileSystemDatasetWriteOptions write_options;
151 write_options.file_write_options = format->DefaultWriteOptions();
152 write_options.filesystem = filesystem;
153 write_options.base_dir = base_path;
154 write_options.partitioning = partitioning;
155 write_options.basename_template = "part{i}.parquet";
156 ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
157 return base_path;
158}
以上创建了一个包含两个子目录(“part=a”和“part=b”)的目录,并且写入这些目录中的 Parquet 文件不再包含“part”列。
读取此数据集时,我们现在指定数据集应使用类似 Hive 的分区方案
276// Read an entire dataset, but with partitioning information.
277arrow::Result<std::shared_ptr<arrow::Table>> ScanPartitionedDataset(
278 const std::shared_ptr<fs::FileSystem>& filesystem,
279 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
280 fs::FileSelector selector;
281 selector.base_dir = base_dir;
282 selector.recursive = true; // Make sure to search subdirectories
283 ds::FileSystemFactoryOptions options;
284 // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
285 // schema.
286 options.partitioning = ds::HivePartitioning::MakeFactory();
287 ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
288 filesystem, selector, format, options));
289 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
290 // Print out the fragments
291 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments());
292 for (const auto& fragment : fragments) {
293 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
294 std::cout << "Partition expression: "
295 << (*fragment)->partition_expression().ToString() << std::endl;
296 }
297 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
298 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
299 return scanner->ToTable();
300}
尽管分区字段未包含在实际的 Parquet 文件中,但在扫描此数据集时,它们将被添加回结果表中
$ ./debug/dataset_documentation_example file:///tmp parquet_hive partitioned
Found fragment: /tmp/parquet_dataset/part=a/part0.parquet
Partition expression: (part == "a")
Found fragment: /tmp/parquet_dataset/part=b/part1.parquet
Partition expression: (part == "b")
Read 20 rows
a: int64
-- field metadata --
PARQUET:field_id: '1'
b: double
-- field metadata --
PARQUET:field_id: '2'
c: int64
-- field metadata --
PARQUET:field_id: '3'
part: string
----
# snip...
我们现在可以根据分区键进行过滤,如果它们与过滤器不匹配,则完全避免加载文件
304// Read an entire dataset, but with partitioning information. Also, filter the dataset on
305// the partition values.
306arrow::Result<std::shared_ptr<arrow::Table>> FilterPartitionedDataset(
307 const std::shared_ptr<fs::FileSystem>& filesystem,
308 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
309 fs::FileSelector selector;
310 selector.base_dir = base_dir;
311 selector.recursive = true;
312 ds::FileSystemFactoryOptions options;
313 options.partitioning = ds::HivePartitioning::MakeFactory();
314 ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
315 filesystem, selector, format, options));
316 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
317 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
318 // Filter based on the partition values. This will mean that we won't even read the
319 // files whose partition expressions don't match the filter.
320 ARROW_RETURN_NOT_OK(
321 scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
322 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
323 return scanner->ToTable();
324}
不同的分区方案#
以上示例使用类似 Hive 的目录方案,例如“/year=2009/month=11/day=15”。我们通过传递 Hive 分区工厂指定了这一点。在这种情况下,分区键的类型是从文件路径推断的。
还可以直接构造分区并显式定义分区键的模式。例如
auto part = std::make_shared<ds::HivePartitioning>(arrow::schema({
arrow::field("year", arrow::int16()),
arrow::field("month", arrow::int8()),
arrow::field("day", arrow::int32())
}));
Arrow 支持另一种分区方案,“目录分区”,其中文件路径中的段表示分区键的值,不包括名称(字段名称隐含在段的索引中)。例如,给定字段名称“year”、“month”和“day”,一条路径可能是“/2019/11/15”。
由于名称未包含在文件路径中,因此在构建目录分区时必须指定这些名称
auto part = ds::DirectoryPartitioning::MakeFactory({"year", "month", "day"});
目录分区还支持提供完整模式而不是从文件路径推断类型。
分区性能注意事项#
分区数据集有两个影响性能的方面:它增加了文件数量,并且在文件周围创建了目录结构。这两者都有好处和成本。根据配置和数据集的大小,成本可能会超过收益。
由于分区将数据集拆分为多个文件,因此可以使用并行性读取和写入分区数据集。但是,每个额外文件都会在处理文件系统交互时增加一点开销。它还会增加数据集的整体大小,因为每个文件都有一些共享的元数据。例如,每个 parquet 文件都包含模式和组级统计信息。分区数是文件数的下限。如果您按日期对包含一年数据的数据集进行分区,则至少将有 365 个文件。如果您进一步按另一个具有 1,000 个唯一值的维度进行分区,则将最多有 365,000 个文件。这种分区通常会导致文件很小,主要由元数据组成。
分区数据集创建嵌套文件夹结构,这些结构允许我们修剪扫描中加载的文件。但是,这会给发现数据集中的文件增加开销,因为我们需要递归“列出目录”以查找数据文件。分区过于精细可能会在这里造成问题:按日期对包含一年数据的数据集进行分区将需要 365 次列表调用才能找到所有文件;添加另一个基数为 1,000 的列将使此调用次数达到 365,365 次。
最佳分区布局将取决于您的数据、访问模式以及将读取数据的系统。大多数系统(包括 Arrow)都应该能够跨一系列文件大小和分区布局工作,但您应该避免一些极端情况。这些指南可以帮助避免一些已知的最坏情况
避免小于 20MB 和大于 2GB 的文件。
避免分区布局具有超过 10,000 个不同的分区。
对于在文件中具有组概念的文件格式(例如 Parquet),类似的指南也适用。行组在读取时可以提供并行性并允许根据统计信息跳过数据,但非常小的组会导致元数据成为文件大小的很大一部分。在大多数情况下,Arrow 的文件写入器为组大小提供了合理的默认值。
从其他数据源读取#
读取内存中的数据#
如果您已经在内存中拥有想要与数据集 API 一起使用的数据(例如,过滤/投影数据或将其写入文件系统),则可以将其包装在arrow::dataset::InMemoryDataset
中
auto table = arrow::Table::FromRecordBatches(...);
auto dataset = std::make_shared<arrow::dataset::InMemoryDataset>(std::move(table));
// Scan the dataset, filter, it, etc.
auto scanner_builder = dataset->NewScan();
在示例中,我们使用了 InMemoryDataset 将示例数据写入本地磁盘,并在示例的其余部分使用它。
115// Set up a dataset by writing files with partitioning
116arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
117 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
118 auto base_path = root_path + "/parquet_dataset";
119 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
120 // Create an Arrow Table
121 auto schema = arrow::schema(
122 {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
123 arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
124 std::vector<std::shared_ptr<arrow::Array>> arrays(4);
125 arrow::NumericBuilder<arrow::Int64Type> builder;
126 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
127 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
128 builder.Reset();
129 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
130 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
131 builder.Reset();
132 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
133 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
134 arrow::StringBuilder string_builder;
135 ARROW_RETURN_NOT_OK(
136 string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
137 ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
138 auto table = arrow::Table::Make(schema, arrays);
139 // Write it using Datasets
140 auto dataset = std::make_shared<ds::InMemoryDataset>(table);
141 ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
142 ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
143
144 // The partition schema determines which fields are part of the partitioning.
145 auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
146 // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
147 auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
148 // We'll write Parquet files.
149 auto format = std::make_shared<ds::ParquetFileFormat>();
150 ds::FileSystemDatasetWriteOptions write_options;
151 write_options.file_write_options = format->DefaultWriteOptions();
152 write_options.filesystem = filesystem;
153 write_options.base_dir = base_path;
154 write_options.partitioning = partitioning;
155 write_options.basename_template = "part{i}.parquet";
156 ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
157 return base_path;
158}
从云存储读取#
除了本地文件之外,Arrow Datasets 还支持通过传递不同的文件系统从云存储系统(例如 Amazon S3)读取数据。
有关可用文件系统的更多详细信息,请参阅文件系统 文档。
关于事务和 ACID 保证的说明#
数据集 API 不提供事务支持或任何 ACID 保证。这会影响读取和写入。并发读取是可以的。并发写入或与读取并发进行的写入可能会出现意外行为。可以使用各种方法来避免对相同文件进行操作,例如为每个写入器使用唯一的基文件名模板、为新文件使用临时目录或单独存储文件列表,而不是依赖于目录发现。
在写入过程中意外终止进程可能会使系统处于不一致状态。写入调用通常在要写入的字节完全传递到操作系统页面缓存后立即返回。即使写入操作已完成,如果写入调用后立即发生突然断电,也可能导致部分文件丢失。
大多数文件格式都有在末尾写入的幻数。这意味着可以安全地检测到和丢弃部分文件写入。CSV 文件格式没有任何此类概念,部分写入的 CSV 文件可能会被检测为有效。
完整示例#
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// This example showcases various ways to work with Datasets. It's
19// intended to be paired with the documentation.
20
21#include <arrow/api.h>
22#include <arrow/compute/cast.h>
23#include <arrow/dataset/dataset.h>
24#include <arrow/dataset/discovery.h>
25#include <arrow/dataset/file_base.h>
26#include <arrow/dataset/file_ipc.h>
27#include <arrow/dataset/file_parquet.h>
28#include <arrow/dataset/scanner.h>
29#include <arrow/filesystem/filesystem.h>
30#include <arrow/ipc/writer.h>
31#include <arrow/util/iterator.h>
32#include <parquet/arrow/writer.h>
33#include "arrow/compute/expression.h"
34
35#include <iostream>
36#include <vector>
37
38namespace ds = arrow::dataset;
39namespace fs = arrow::fs;
40namespace cp = arrow::compute;
41
42/**
43 * \brief Run Example
44 *
45 * ./debug/dataset-documentation-example file:///<some_path>/<some_directory> parquet
46 */
47
48// (Doc section: Reading Datasets)
49// Generate some data for the rest of this example.
50arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
51 auto schema =
52 arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
53 arrow::field("c", arrow::int64())});
54 std::shared_ptr<arrow::Array> array_a;
55 std::shared_ptr<arrow::Array> array_b;
56 std::shared_ptr<arrow::Array> array_c;
57 arrow::NumericBuilder<arrow::Int64Type> builder;
58 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
59 ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
60 builder.Reset();
61 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
62 ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
63 builder.Reset();
64 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
65 ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
66 return arrow::Table::Make(schema, {array_a, array_b, array_c});
67}
68
69// Set up a dataset by writing two Parquet files.
70arrow::Result<std::string> CreateExampleParquetDataset(
71 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
72 auto base_path = root_path + "/parquet_dataset";
73 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
74 // Create an Arrow Table
75 ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
76 // Write it into two Parquet files
77 ARROW_ASSIGN_OR_RAISE(auto output,
78 filesystem->OpenOutputStream(base_path + "/data1.parquet"));
79 ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
80 *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
81 ARROW_ASSIGN_OR_RAISE(output,
82 filesystem->OpenOutputStream(base_path + "/data2.parquet"));
83 ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
84 *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
85 return base_path;
86}
87// (Doc section: Reading Datasets)
88
89// (Doc section: Reading different file formats)
90// Set up a dataset by writing two Feather files.
91arrow::Result<std::string> CreateExampleFeatherDataset(
92 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
93 auto base_path = root_path + "/feather_dataset";
94 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
95 // Create an Arrow Table
96 ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
97 // Write it into two Feather files
98 ARROW_ASSIGN_OR_RAISE(auto output,
99 filesystem->OpenOutputStream(base_path + "/data1.feather"));
100 ARROW_ASSIGN_OR_RAISE(auto writer,
101 arrow::ipc::MakeFileWriter(output.get(), table->schema()));
102 ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(0, 5)));
103 ARROW_RETURN_NOT_OK(writer->Close());
104 ARROW_ASSIGN_OR_RAISE(output,
105 filesystem->OpenOutputStream(base_path + "/data2.feather"));
106 ARROW_ASSIGN_OR_RAISE(writer,
107 arrow::ipc::MakeFileWriter(output.get(), table->schema()));
108 ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(5)));
109 ARROW_RETURN_NOT_OK(writer->Close());
110 return base_path;
111}
112// (Doc section: Reading different file formats)
113
114// (Doc section: Reading and writing partitioned data)
115// Set up a dataset by writing files with partitioning
116arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
117 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
118 auto base_path = root_path + "/parquet_dataset";
119 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
120 // Create an Arrow Table
121 auto schema = arrow::schema(
122 {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
123 arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
124 std::vector<std::shared_ptr<arrow::Array>> arrays(4);
125 arrow::NumericBuilder<arrow::Int64Type> builder;
126 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
127 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
128 builder.Reset();
129 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
130 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
131 builder.Reset();
132 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
133 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
134 arrow::StringBuilder string_builder;
135 ARROW_RETURN_NOT_OK(
136 string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
137 ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
138 auto table = arrow::Table::Make(schema, arrays);
139 // Write it using Datasets
140 auto dataset = std::make_shared<ds::InMemoryDataset>(table);
141 ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
142 ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
143
144 // The partition schema determines which fields are part of the partitioning.
145 auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
146 // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
147 auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
148 // We'll write Parquet files.
149 auto format = std::make_shared<ds::ParquetFileFormat>();
150 ds::FileSystemDatasetWriteOptions write_options;
151 write_options.file_write_options = format->DefaultWriteOptions();
152 write_options.filesystem = filesystem;
153 write_options.base_dir = base_path;
154 write_options.partitioning = partitioning;
155 write_options.basename_template = "part{i}.parquet";
156 ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
157 return base_path;
158}
159// (Doc section: Reading and writing partitioned data)
160
161// (Doc section: Dataset discovery)
162// Read the whole dataset with the given format, without partitioning.
163arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
164 const std::shared_ptr<fs::FileSystem>& filesystem,
165 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
166 // Create a dataset by scanning the filesystem for files
167 fs::FileSelector selector;
168 selector.base_dir = base_dir;
169 ARROW_ASSIGN_OR_RAISE(
170 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
171 ds::FileSystemFactoryOptions()));
172 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
173 // Print out the fragments
174 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
175 for (const auto& fragment : fragments) {
176 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
177 }
178 // Read the entire dataset as a Table
179 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
180 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
181 return scanner->ToTable();
182}
183// (Doc section: Dataset discovery)
184
185// (Doc section: Filtering data)
186// Read a dataset, but select only column "b" and only rows where b < 4.
187//
188// This is useful when you only want a few columns from a dataset. Where possible,
189// Datasets will push down the column selection such that less work is done.
190arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
191 const std::shared_ptr<fs::FileSystem>& filesystem,
192 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
193 fs::FileSelector selector;
194 selector.base_dir = base_dir;
195 ARROW_ASSIGN_OR_RAISE(
196 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
197 ds::FileSystemFactoryOptions()));
198 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
199 // Read specified columns with a row filter
200 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
201 ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
202 ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
203 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
204 return scanner->ToTable();
205}
206// (Doc section: Filtering data)
207
208// (Doc section: Projecting columns)
209// Read a dataset, but with column projection.
210//
211// This is useful to derive new columns from existing data. For example, here we
212// demonstrate casting a column to a different type, and turning a numeric column into a
213// boolean column based on a predicate. You could also rename columns or perform
214// computations involving multiple columns.
215arrow::Result<std::shared_ptr<arrow::Table>> ProjectDataset(
216 const std::shared_ptr<fs::FileSystem>& filesystem,
217 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
218 fs::FileSelector selector;
219 selector.base_dir = base_dir;
220 ARROW_ASSIGN_OR_RAISE(
221 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
222 ds::FileSystemFactoryOptions()));
223 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
224 // Read specified columns with a row filter
225 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
226 ARROW_RETURN_NOT_OK(scan_builder->Project(
227 {
228 // Leave column "a" as-is.
229 cp::field_ref("a"),
230 // Cast column "b" to float32.
231 cp::call("cast", {cp::field_ref("b")},
232 arrow::compute::CastOptions::Safe(arrow::float32())),
233 // Derive a boolean column from "c".
234 cp::equal(cp::field_ref("c"), cp::literal(1)),
235 },
236 {"a_renamed", "b_as_float32", "c_1"}));
237 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
238 return scanner->ToTable();
239}
240// (Doc section: Projecting columns)
241
242// (Doc section: Projecting columns #2)
243// Read a dataset, but with column projection.
244//
245// This time, we read all original columns plus one derived column. This simply combines
246// the previous two examples: selecting a subset of columns by name, and deriving new
247// columns with an expression.
248arrow::Result<std::shared_ptr<arrow::Table>> SelectAndProjectDataset(
249 const std::shared_ptr<fs::FileSystem>& filesystem,
250 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
251 fs::FileSelector selector;
252 selector.base_dir = base_dir;
253 ARROW_ASSIGN_OR_RAISE(
254 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
255 ds::FileSystemFactoryOptions()));
256 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
257 // Read specified columns with a row filter
258 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
259 std::vector<std::string> names;
260 std::vector<cp::Expression> exprs;
261 // Read all the original columns.
262 for (const auto& field : dataset->schema()->fields()) {
263 names.push_back(field->name());
264 exprs.push_back(cp::field_ref(field->name()));
265 }
266 // Also derive a new column.
267 names.emplace_back("b_large");
268 exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
269 ARROW_RETURN_NOT_OK(scan_builder->Project(exprs, names));
270 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
271 return scanner->ToTable();
272}
273// (Doc section: Projecting columns #2)
274
275// (Doc section: Reading and writing partitioned data #2)
276// Read an entire dataset, but with partitioning information.
277arrow::Result<std::shared_ptr<arrow::Table>> ScanPartitionedDataset(
278 const std::shared_ptr<fs::FileSystem>& filesystem,
279 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
280 fs::FileSelector selector;
281 selector.base_dir = base_dir;
282 selector.recursive = true; // Make sure to search subdirectories
283 ds::FileSystemFactoryOptions options;
284 // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
285 // schema.
286 options.partitioning = ds::HivePartitioning::MakeFactory();
287 ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
288 filesystem, selector, format, options));
289 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
290 // Print out the fragments
291 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments());
292 for (const auto& fragment : fragments) {
293 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
294 std::cout << "Partition expression: "
295 << (*fragment)->partition_expression().ToString() << std::endl;
296 }
297 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
298 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
299 return scanner->ToTable();
300}
301// (Doc section: Reading and writing partitioned data #2)
302
303// (Doc section: Reading and writing partitioned data #3)
304// Read an entire dataset, but with partitioning information. Also, filter the dataset on
305// the partition values.
306arrow::Result<std::shared_ptr<arrow::Table>> FilterPartitionedDataset(
307 const std::shared_ptr<fs::FileSystem>& filesystem,
308 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
309 fs::FileSelector selector;
310 selector.base_dir = base_dir;
311 selector.recursive = true;
312 ds::FileSystemFactoryOptions options;
313 options.partitioning = ds::HivePartitioning::MakeFactory();
314 ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
315 filesystem, selector, format, options));
316 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
317 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
318 // Filter based on the partition values. This will mean that we won't even read the
319 // files whose partition expressions don't match the filter.
320 ARROW_RETURN_NOT_OK(
321 scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
322 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
323 return scanner->ToTable();
324}
325// (Doc section: Reading and writing partitioned data #3)
326
327arrow::Status RunDatasetDocumentation(const std::string& format_name,
328 const std::string& uri, const std::string& mode) {
329 std::string base_path;
330 std::shared_ptr<ds::FileFormat> format;
331 std::string root_path;
332 ARROW_ASSIGN_OR_RAISE(auto fs, fs::FileSystemFromUri(uri, &root_path));
333
334 if (format_name == "feather") {
335 format = std::make_shared<ds::IpcFileFormat>();
336 ARROW_ASSIGN_OR_RAISE(base_path, CreateExampleFeatherDataset(fs, root_path));
337 } else if (format_name == "parquet") {
338 format = std::make_shared<ds::ParquetFileFormat>();
339 ARROW_ASSIGN_OR_RAISE(base_path, CreateExampleParquetDataset(fs, root_path));
340 } else if (format_name == "parquet_hive") {
341 format = std::make_shared<ds::ParquetFileFormat>();
342 ARROW_ASSIGN_OR_RAISE(base_path,
343 CreateExampleParquetHivePartitionedDataset(fs, root_path));
344 } else {
345 std::cerr << "Unknown format: " << format_name << std::endl;
346 std::cerr << "Supported formats: feather, parquet, parquet_hive" << std::endl;
347 return arrow::Status::ExecutionError("Dataset creating failed.");
348 }
349
350 std::shared_ptr<arrow::Table> table;
351 if (mode == "no_filter") {
352 ARROW_ASSIGN_OR_RAISE(table, ScanWholeDataset(fs, format, base_path));
353 } else if (mode == "filter") {
354 ARROW_ASSIGN_OR_RAISE(table, FilterAndSelectDataset(fs, format, base_path));
355 } else if (mode == "project") {
356 ARROW_ASSIGN_OR_RAISE(table, ProjectDataset(fs, format, base_path));
357 } else if (mode == "select_project") {
358 ARROW_ASSIGN_OR_RAISE(table, SelectAndProjectDataset(fs, format, base_path));
359 } else if (mode == "partitioned") {
360 ARROW_ASSIGN_OR_RAISE(table, ScanPartitionedDataset(fs, format, base_path));
361 } else if (mode == "filter_partitioned") {
362 ARROW_ASSIGN_OR_RAISE(table, FilterPartitionedDataset(fs, format, base_path));
363 } else {
364 std::cerr << "Unknown mode: " << mode << std::endl;
365 std::cerr
366 << "Supported modes: no_filter, filter, project, select_project, partitioned"
367 << std::endl;
368 return arrow::Status::ExecutionError("Dataset reading failed.");
369 }
370 std::cout << "Read " << table->num_rows() << " rows" << std::endl;
371 std::cout << table->ToString() << std::endl;
372 return arrow::Status::OK();
373}
374
375int main(int argc, char** argv) {
376 if (argc < 3) {
377 // Fake success for CI purposes.
378 return EXIT_SUCCESS;
379 }
380
381 std::string uri = argv[1];
382 std::string format_name = argv[2];
383 std::string mode = argc > 3 ? argv[3] : "no_filter";
384
385 auto status = RunDatasetDocumentation(format_name, uri, mode);
386 if (!status.ok()) {
387 std::cerr << status.ToString() << std::endl;
388 return EXIT_FAILURE;
389 }
390 return EXIT_SUCCESS;
391}