Arrow 文件 I/O#

Apache Arrow 提供文件 I/O 函数,以便从应用程序的开始到结束都方便使用 Arrow。在本文中,您将

  1. 将 Arrow 文件读取到 RecordBatch 中,并在之后将其写回

  2. 将 CSV 文件读取到 Table 中,并在之后将其写回

  3. 将 Parquet 文件读取到 Table 中,并在之后将其写回



  1. 安装了 Arrow,您可以在此处设置:在您自己的项目中使用 Arrow C++

  2. 了解来自 基本 Arrow 数据结构 的基本 Arrow 数据结构

  3. 一个运行最终应用程序的目录 - 此程序将生成一些文件,因此请做好准备。


在编写一些文件 I/O 之前,我们需要填补几个空白

  1. 我们需要包含必要的头文件。

  2. 需要一个 main() 将所有内容粘合在一起。

  3. 我们需要一些文件来进行操作。


在编写 C++ 代码之前,我们需要一些包含文件。我们将获取 iostream 用于输出,然后为本文中将使用的每种文件类型导入 Arrow 的 I/O 功能

#include <arrow/api.h>
#include <arrow/csv/api.h>
#include <arrow/io/api.h>
#include <arrow/ipc/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>

#include <iostream>


对于我们的粘合剂,我们将使用之前数据结构教程中的 main() 模式

int main() {
  arrow::Status st = RunMain();
  if (!st.ok()) {
    std::cerr << st << std::endl;
    return 1;
  return 0;

它与 RunMain() 配对,就像我们之前使用它时一样

arrow::Status RunMain() {


我们需要一些文件来实际操作。在实践中,您可能会有一些输入用于您自己的应用程序。但是,在这里,我们希望探索执行 I/O,因此让我们生成一些文件以使操作易于遵循。为了创建这些文件,我们将定义一个辅助函数,我们将在最开始运行它。请随时通读此内容,但本文稍后将解释所使用的概念。请注意,我们正在使用之前教程中的日期/月/年数据。目前,只需复制函数即可

arrow::Status GenInitialFile() {
  // Make a couple 8-bit integer arrays and a 16-bit integer array -- just like
  // basic Arrow example.
  arrow::Int8Builder int8builder;
  int8_t days_raw[5] = {1, 12, 17, 23, 28};
  ARROW_RETURN_NOT_OK(int8builder.AppendValues(days_raw, 5));
  std::shared_ptr<arrow::Array> days;
  ARROW_ASSIGN_OR_RAISE(days, int8builder.Finish());

  int8_t months_raw[5] = {1, 3, 5, 7, 1};
  ARROW_RETURN_NOT_OK(int8builder.AppendValues(months_raw, 5));
  std::shared_ptr<arrow::Array> months;
  ARROW_ASSIGN_OR_RAISE(months, int8builder.Finish());

  arrow::Int16Builder int16builder;
  int16_t years_raw[5] = {1990, 2000, 1995, 2000, 1995};
  ARROW_RETURN_NOT_OK(int16builder.AppendValues(years_raw, 5));
  std::shared_ptr<arrow::Array> years;
  ARROW_ASSIGN_OR_RAISE(years, int16builder.Finish());

  // Get a vector of our Arrays
  std::vector<std::shared_ptr<arrow::Array>> columns = {days, months, years};

  // Make a schema to initialize the Table with
  std::shared_ptr<arrow::Field> field_day, field_month, field_year;
  std::shared_ptr<arrow::Schema> schema;

  field_day = arrow::field("Day", arrow::int8());
  field_month = arrow::field("Month", arrow::int8());
  field_year = arrow::field("Year", arrow::int16());

  schema = arrow::schema({field_day, field_month, field_year});
  // With the schema and data, create a Table
  std::shared_ptr<arrow::Table> table;
  table = arrow::Table::Make(schema, columns);

  // Write out test files in IPC, CSV, and Parquet for the example to use.
  std::shared_ptr<arrow::io::FileOutputStream> outfile;
  ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_in.arrow"));
  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::ipc::RecordBatchWriter> ipc_writer,
                        arrow::ipc::MakeFileWriter(outfile, schema));

  ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_in.csv"));
  ARROW_ASSIGN_OR_RAISE(auto csv_writer,
                        arrow::csv::MakeCSVWriter(outfile, table->schema()));

  ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_in.parquet"));
      parquet::arrow::WriteTable(*table, arrow::default_memory_pool(), outfile, 5));

  return arrow::Status::OK();

要使您其余代码中的文件正常运行,请确保在 RunMain() 中的第一行调用 GenInitialFile() 以初始化环境

  // Generate initial files for each format with a helper function -- don't worry,
  // we'll also write a table in this example.

使用 Arrow 文件进行 I/O#


  1. 读取文件

    1. 打开文件

    2. 将文件绑定到 ipc::RecordBatchFileReader

    3. 将文件读取到 RecordBatch

  2. 写入文件

    1. 获取 io::FileOutputStream

    2. RecordBatch 写入文件


要实际读取文件,我们需要获得某种指向它的方法。在 Arrow 中,这意味着我们将获得一个 io::ReadableFile 对象 - 就像 ArrayBuilder 可以清除并创建新数组一样,我们可以将其重新分配给新文件,因此我们将在整个示例中使用此实例

  // First, we have to set up a ReadableFile object, which just lets us point our
  // readers to the right data on disk. We'll be reusing this object, and rebinding
  // it to multiple files throughout the example.
  std::shared_ptr<arrow::io::ReadableFile> infile;

一个 io::ReadableFile 本身功能有限 - 我们实际上使用 io::ReadableFile::Open() 将其绑定到文件。对于我们在这里的目的,默认参数就足够了

  // Get "test_in.arrow" into our file pointer
  ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open(
                                    "test_in.arrow", arrow::default_memory_pool()));

打开 Arrow 文件读取器#

一个 io::ReadableFile 太通用,无法提供所有功能来读取 Arrow 文件。我们需要使用它来获取一个 ipc::RecordBatchFileReader 对象。此对象实现了读取正确格式的 Arrow 文件所需的所有逻辑。我们通过 ipc::RecordBatchFileReader::Open() 获取一个

  // Open up the file with the IPC features of the library, gives us a reader object.
  ARROW_ASSIGN_OR_RAISE(auto ipc_reader, arrow::ipc::RecordBatchFileReader::Open(infile));

将打开的 Arrow 文件读取到 RecordBatch#

我们必须使用 RecordBatch 读取 Arrow 文件,因此我们将获取一个 RecordBatch。一旦我们有了它,我们就可以实际读取文件了。Arrow 文件可以有多个 RecordBatches,因此我们必须传递一个索引。此文件只有一个,因此传递 0

  // Using the reader, we can read Record Batches. Note that this is specific to IPC;
  // for other formats, we focus on Tables, but here, RecordBatches are used.
  std::shared_ptr<arrow::RecordBatch> rbatch;
  ARROW_ASSIGN_OR_RAISE(rbatch, ipc_reader->ReadRecordBatch(0));

准备 FileOutputStream#

对于输出,我们需要一个 io::FileOutputStream。就像我们的 io::ReadableFile 一样,我们将重复使用它,因此请做好准备。我们打开文件的方式与读取时相同

  // Just like with input, we get an object for the output file.
  std::shared_ptr<arrow::io::FileOutputStream> outfile;
  // Bind it to "test_out.arrow"
  ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_out.arrow"));

从 RecordBatch 写入 Arrow 文件#

现在,我们获取之前读取的 RecordBatch,并使用它以及我们的目标文件来创建一个 ipc::RecordBatchWriteripc::RecordBatchWriter 需要两件事

  1. 目标文件

  2. 我们 RecordBatchSchema(如果我们需要写入更多相同格式的 RecordBatches)。

Schema 来自我们现有的 RecordBatch,目标文件是我们刚刚创建的输出流。

  // Set up a writer with the output file -- and the schema! We're defining everything
  // here, loading to fire.
  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::ipc::RecordBatchWriter> ipc_writer,
                        arrow::ipc::MakeFileWriter(outfile, rbatch->schema()));

我们可以只使用我们的 RecordBatch 调用 ipc::RecordBatchWriter::WriteRecordBatch() 来填充我们的文件

  // Write the record batch.

特别是对于 IPC,写入器必须关闭,因为它预计可能会写入多个批次。为此

  // Specifically for IPC, the writer needs to be explicitly closed.

现在我们已经读取和写入了一个 IPC 文件!

使用 CSV 进行 I/O#


  1. 读取文件

    1. 打开文件

    2. 准备表格

    3. 使用 csv::TableReader 读取文件

  2. 写入文件

    1. 获取 io::FileOutputStream

    2. Table 写入文件

打开 CSV 文件#

对于 CSV 文件,我们需要打开一个 io::ReadableFile,就像 Arrow 文件一样,并重用我们之前的 io::ReadableFile 对象来执行此操作

  // Bind our input file to "test_in.csv"
  ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open("test_in.csv"));


CSV 可以读取到 Table 中,因此声明一个指向 Table 的指针

  std::shared_ptr<arrow::Table> csv_table;

将 CSV 文件读取到表格#

CSV 读取器具有需要传递的选项结构体 - 幸运的是,这些选项都有默认值,我们可以直接传递。有关其他选项的参考,请访问此处:文件格式。它没有特殊的定界符并且很小,因此我们可以使用默认值创建读取器。

  // The CSV reader has several objects for various options. For now, we'll use defaults.
      auto csv_reader,
          arrow::io::default_io_context(), infile, arrow::csv::ReadOptions::Defaults(),
          arrow::csv::ParseOptions::Defaults(), arrow::csv::ConvertOptions::Defaults()));

CSV 读取器准备就绪后,我们可以使用其csv::TableReader::Read() 方法填充我们的Table

  // Read the table.
  ARROW_ASSIGN_OR_RAISE(csv_table, csv_reader->Read())

从 Table 写入 CSV 文件#

写入 Table 的 CSV 操作与写入 RecordBatch 的 IPC 操作完全相同,只是使用我们的 Table,并使用 ipc::RecordBatchWriter::WriteTable() 而不是 ipc::RecordBatchWriter::WriteRecordBatch()。请注意,使用的是相同的写入器类 - 我们使用 ipc::RecordBatchWriter::WriteTable() 进行写入,因为我们拥有一个 Table。我们将以文件为目标,使用我们的 Table’s Schema,然后写入 Table

  // Bind our output file to "test_out.csv"
  ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_out.csv"));
  // The CSV writer has simpler defaults, review API documentation for more complex usage.
  ARROW_ASSIGN_OR_RAISE(auto csv_writer,
                        arrow::csv::MakeCSVWriter(outfile, csv_table->schema()));
  // Not necessary, but a safe practice.

现在,我们已经读取和写入了一个 CSV 文件!

使用 Parquet 进行文件 I/O#


  1. 读取文件

    1. 打开文件

    2. 准备 parquet::arrow::FileReader

    3. 将文件读取到 Table

  2. 写入文件

    1. Table 写入文件

打开 Parquet 文件#

再次说明,此文件格式 Parquet 需要一个 io::ReadableFile,我们已经有了,并且需要在文件上调用 io::ReadableFile::Open() 方法

  // Bind our input file to "test_in.parquet"
  ARROW_ASSIGN_OR_RAISE(infile, arrow::io::ReadableFile::Open("test_in.parquet"));

设置 Parquet 读取器#

与往常一样,我们需要一个读取器来实际读取文件。我们一直从 Arrow 命名空间获取每个文件格式的读取器。这次,我们进入 Parquet 命名空间以获取 parquet::arrow::FileReader

  std::unique_ptr<parquet::arrow::FileReader> reader;

现在,要设置我们的读取器,我们调用 parquet::arrow::OpenFile()。是的,即使我们使用了 io::ReadableFile::Open(),这也是必要的。请注意,我们通过引用传递 parquet::arrow::FileReader,而不是将其分配给输出

  // Note that Parquet's OpenFile() takes the reader by reference, rather than returning
  // a reader.
      parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader));

将 Parquet 文件读取到 Table#

有了准备好的 parquet::arrow::FileReader,我们可以将其读取到 Table,但我们必须通过引用传递 Table,而不是将其输出

  std::shared_ptr<arrow::Table> parquet_table;
  // Read the table.

从 Table 写入 Parquet 文件#

对于单次写入,写入 Parquet 文件不需要写入器对象。相反,我们提供我们的表格,指向它将用于任何必要内存消耗的内存池,告诉它在哪里写入,以及如果需要将文件分解成块时所需的块大小。

  // Parquet writing does not need a declared writer object. Just get the output
  // file bound, then pass in the table, memory pool, output, and chunk size for
  // breaking up the Table on-disk.
  ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open("test_out.parquet"));
      *parquet_table, arrow::default_memory_pool(), outfile, 5));


最后,我们只需返回 Status::OK(),这样 main() 就会知道我们已完成,并且一切正常。就像第一个教程一样。

  return arrow::Status::OK();

有了这些,您已经使用 Arrow 读取和写入 IPC、CSV 和 Parquet,并且可以正确加载数据并写入输出!现在,我们可以进入下一篇文章中使用计算函数处理数据。


