读取和写入 CSV 文件#

Arrow 提供了一个快速的 CSV 读取器,允许提取外部数据来创建 Arrow 表或 Arrow RecordBatches 流。

读取 CSV 文件#

CSV 文件中的数据可以使用 TableReader 作为单个 Arrow 表读取,或者使用 StreamingReader 作为 RecordBatches 流式传输。请参阅权衡,了解这两种方法之间的权衡。

这两个读取器都需要一个表示输入文件的 arrow::io::InputStream 实例。它们的行为可以使用 ReadOptionsParseOptionsConvertOptions 的组合进行自定义。

TableReader#

#include "arrow/csv/api.h"

{
   // ...
   arrow::io::IOContext io_context = arrow::io::default_io_context();
   std::shared_ptr<arrow::io::InputStream> input = ...;

   auto read_options = arrow::csv::ReadOptions::Defaults();
   auto parse_options = arrow::csv::ParseOptions::Defaults();
   auto convert_options = arrow::csv::ConvertOptions::Defaults();

   // Instantiate TableReader from input stream and options
   auto maybe_reader =
     arrow::csv::TableReader::Make(io_context,
                                   input,
                                   read_options,
                                   parse_options,
                                   convert_options);
   if (!maybe_reader.ok()) {
     // Handle TableReader instantiation error...
   }
   std::shared_ptr<arrow::csv::TableReader> reader = *maybe_reader;

   // Read table from CSV file
   auto maybe_table = reader->Read();
   if (!maybe_table.ok()) {
     // Handle CSV read error
     // (for example a CSV syntax error or failed type conversion)
   }
   std::shared_ptr<arrow::Table> table = *maybe_table;
}

StreamingReader#

#include "arrow/csv/api.h"

{
   // ...
   arrow::io::IOContext io_context = arrow::io::default_io_context();
   std::shared_ptr<arrow::io::InputStream> input = ...;

   auto read_options = arrow::csv::ReadOptions::Defaults();
   auto parse_options = arrow::csv::ParseOptions::Defaults();
   auto convert_options = arrow::csv::ConvertOptions::Defaults();

   // Instantiate StreamingReader from input stream and options
   auto maybe_reader =
     arrow::csv::StreamingReader::Make(io_context,
                                       input,
                                       read_options,
                                       parse_options,
                                       convert_options);
   if (!maybe_reader.ok()) {
     // Handle StreamingReader instantiation error...
   }
   std::shared_ptr<arrow::csv::StreamingReader> reader = *maybe_reader;

   // Set aside a RecordBatch pointer for re-use while streaming
   std::shared_ptr<RecordBatch> batch;

   while (true) {
       // Attempt to read the first RecordBatch
       arrow::Status status = reader->ReadNext(&batch);

       if (!status.ok()) {
         // Handle read error
       }

       if (batch == NULL) {
         // Handle end of file
         break;
       }

       // Do something with the batch
   }
}

权衡#

使用 TableReader 还是 StreamingReader 的选择最终取决于用例,但需要注意一些权衡

  1. **内存使用:** TableReader 一次将所有数据加载到内存中,并且根据数据量,可能需要比 StreamingReader 多得多的内存,后者一次只加载一个 RecordBatch。这对用户来说可能是最重要的权衡。

  2. **速度:**读取 CSV 的全部内容时,TableReader 通常比 StreamingReader 更快,因为它可以更好地利用可用的内核。有关更多详细信息,请参阅性能

  3. **灵活性:** StreamingReader 可能被认为不如 TableReader 灵活,因为它只对读取的第一个块执行类型推断,之后类型被冻结,后续块中任何无法转换为这些类型的数据都将导致错误。请注意,可以通过将 ReadOptions::block_size 设置为足够大的值或使用 ConvertOptions::column_types 显式设置所需的数据类型来解决此问题。

写入 CSV 文件#

CSV 文件被写入到 OutputStream

#include <arrow/csv/api.h>
{
    // Oneshot write
    // ...
    std::shared_ptr<arrow::io::OutputStream> output = ...;
    auto write_options = arrow::csv::WriteOptions::Defaults();
    if (WriteCSV(table, write_options, output.get()).ok()) {
        // Handle writer error...
    }
}
{
    // Write incrementally
    // ...
    std::shared_ptr<arrow::io::OutputStream> output = ...;
    auto write_options = arrow::csv::WriteOptions::Defaults();
    auto maybe_writer = arrow::csv::MakeCSVWriter(output, schema, write_options);
    if (!maybe_writer.ok()) {
        // Handle writer instantiation error...
    }
    std::shared_ptr<arrow::ipc::RecordBatchWriter> writer = *maybe_writer;

    // Write batches...
    if (!writer->WriteRecordBatch(*batch).ok()) {
        // Handle write error...
    }

    if (!writer->Close().ok()) {
        // Handle close error...
    }
    if (!output->Close().ok()) {
        // Handle file close error...
    }
}

注意

写入器尚不支持所有 Arrow 类型。

列名#

可以通过三种方法从 CSV 文件推断列名

  • 默认情况下,列名是从 CSV 文件的第一行读取的

  • 如果设置了 ReadOptions::column_names,它会强制表中的列名使用这些值(CSV 文件中的第一行将作为数据读取)

  • 如果 ReadOptions::autogenerate_column_names 为 true,则将使用模式“f0”、“f1”...自动生成列名(CSV 文件中的第一行将作为数据读取)

列选择#

默认情况下,Arrow 读取 CSV 文件中的所有列。您可以使用 ConvertOptions::include_columns 选项缩小列的选择范围。如果 CSV 文件中缺少 ConvertOptions::include_columns 中的某些列,则会发出错误,除非 ConvertOptions::include_missing_columns 为 true,在这种情况下,假设缺少的列包含全空值。

与列名的交互#

如果同时指定了 ReadOptions::column_namesConvertOptions::include_columns,则假设 ReadOptions::column_names 映射到 CSV 列,并且 ConvertOptions::include_columns 是 Arrow 表中将包含的这些列名的子集。

数据类型#

默认情况下,CSV 读取器会为每一列推断最合适的数据类型。类型推断按顺序考虑以下数据类型

可以通过设置 ConvertOptions::column_types 选项来覆盖所选列的类型推断。显式数据类型可以从以下列表中选择

  • 空值

  • 所有整数类型

  • Float32 和 Float64

  • Decimal128

  • 布尔值

  • Date32 和 Date64

  • Time32 和 Time64

  • Timestamp

  • Binary 和 Large Binary

  • String 和 Large String(可选 UTF8 输入验证)

  • 定长二进制 (Fixed-Size Binary)

  • 字典,索引类型为 Int32,值类型为以下之一:Binary、String、LargeBinary、LargeString、Int32、UInt32、Int64、UInt64、Float32、Float64、Decimal128

其他数据类型不支持从 CSV 值转换,将会报错。

字典推断#

如果启用了类型推断并且 ConvertOptions::auto_dict_encode 为 true,CSV 读取器将首先尝试将类似字符串的列转换为字典编码的类似字符串的数组。当达到 ConvertOptions::auto_dict_max_cardinality 中的阈值时,它将切换到普通的类似字符串的数组。

时间戳推断/解析#

如果启用了类型推断,CSV 读取器将首先尝试将类似字符串的列解释为时间戳。如果所有行都具有某个时区偏移量(例如 Z+0100),即使偏移量不一致,则推断的类型也将是 UTC 时间戳。如果没有行具有时区偏移量,则推断的类型将是没有时区的时间戳。包含/不包含偏移量的行的混合将导致字符串列。

如果类型被显式指定为带/不带时区的时间戳,则读取器将在该列中对不带/带时区偏移量值的读取报错。请注意,这意味着当前无法让读取器将不带时区偏移量的时间戳列解析为特定时区中的本地时间;而是将该列解析为不带时区的时间戳,然后使用 assume_timezone 计算函数转换值。

指定类型

输入 CSV

结果类型

(推断)

2021-01-01T00:00:00

timestamp[s]

2021-01-01T00:00:00Z

timestamp[s, UTC]

2021-01-01T00:00:00+0100

2021-01-01T00:00:00
2021-01-01T00:00:00Z

string

timestamp[s]

2021-01-01T00:00:00

timestamp[s]

2021-01-01T00:00:00Z

(错误)

2021-01-01T00:00:00+0100

2021-01-01T00:00:00
2021-01-01T00:00:00Z

timestamp[s, UTC]

2021-01-01T00:00:00

(错误)

2021-01-01T00:00:00Z

timestamp[s, UTC]

2021-01-01T00:00:00+0100

2021-01-01T00:00:00
2021-01-01T00:00:00Z

(错误)

timestamp[s, America/New_York]

2021-01-01T00:00:00

(错误)

2021-01-01T00:00:00Z

timestamp[s, America/New_York]

2021-01-01T00:00:00+0100

2021-01-01T00:00:00
2021-01-01T00:00:00Z

(错误)

空值#

空值是根据存储在 ConvertOptions::null_values 中的拼写识别的。ConvertOptions::Defaults() 工厂方法将初始化一些常用的空值拼写,例如 N/A

字符编码#

CSV 文件应采用 UTF8 编码。但是,Binary 列接受非 UTF8 数据。

写入选项#

可以通过 WriteOptions 自定义写入的 CSV 文件的格式。目前可用的选项很少;未来版本中将添加更多选项。

性能#

默认情况下,TableReader 将并行读取以利用计算机上的所有 CPU 内核。您可以在 ReadOptions::use_threads 中更改此设置。在高性能台式机或笔记本电脑上,每个内核至少应达到 100 MB/s 的速度(以源 CSV 字节数衡量,而不是目标 Arrow 数据字节数)。