读取和写入 CSV 文件#
Arrow 提供了一个快速的 CSV 读取器,允许摄取外部数据以创建 Arrow 表或 Arrow RecordBatches 流。
另请参阅
读取 CSV 文件#
CSV 文件中的数据可以使用 TableReader 作为单个 Arrow 表读取,或者使用 StreamingReader 作为 RecordBatches 流式传输。有关这两种方法之间的权衡讨论,请参阅 权衡。
这两个读取器都需要一个表示输入文件的 arrow::io::InputStream 实例。它们的行为可以通过 ReadOptions、ParseOptions 和 ConvertOptions 的组合进行自定义。
TableReader#
#include "arrow/csv/api.h"
{
// ...
arrow::io::IOContext io_context = arrow::io::default_io_context();
std::shared_ptr<arrow::io::InputStream> input = ...;
auto read_options = arrow::csv::ReadOptions::Defaults();
auto parse_options = arrow::csv::ParseOptions::Defaults();
auto convert_options = arrow::csv::ConvertOptions::Defaults();
// Instantiate TableReader from input stream and options
auto maybe_reader =
arrow::csv::TableReader::Make(io_context,
input,
read_options,
parse_options,
convert_options);
if (!maybe_reader.ok()) {
// Handle TableReader instantiation error...
}
std::shared_ptr<arrow::csv::TableReader> reader = *maybe_reader;
// Read table from CSV file
auto maybe_table = reader->Read();
if (!maybe_table.ok()) {
// Handle CSV read error
// (for example a CSV syntax error or failed type conversion)
}
std::shared_ptr<arrow::Table> table = *maybe_table;
}
StreamingReader#
#include "arrow/csv/api.h"
{
// ...
arrow::io::IOContext io_context = arrow::io::default_io_context();
std::shared_ptr<arrow::io::InputStream> input = ...;
auto read_options = arrow::csv::ReadOptions::Defaults();
auto parse_options = arrow::csv::ParseOptions::Defaults();
auto convert_options = arrow::csv::ConvertOptions::Defaults();
// Instantiate StreamingReader from input stream and options
auto maybe_reader =
arrow::csv::StreamingReader::Make(io_context,
input,
read_options,
parse_options,
convert_options);
if (!maybe_reader.ok()) {
// Handle StreamingReader instantiation error...
}
std::shared_ptr<arrow::csv::StreamingReader> reader = *maybe_reader;
// Set aside a RecordBatch pointer for re-use while streaming
std::shared_ptr<RecordBatch> batch;
while (true) {
// Attempt to read the first RecordBatch
arrow::Status status = reader->ReadNext(&batch);
if (!status.ok()) {
// Handle read error
}
if (batch == NULL) {
// Handle end of file
break;
}
// Do something with the batch
}
}
权衡#
使用 TableReader 还是 StreamingReader 最终取决于用例,但需要注意以下几点权衡:
内存使用:
TableReader一次性将所有数据加载到内存中,根据数据量,可能比StreamingReader需要更多的内存,后者一次只加载一个RecordBatch。这可能是用户最显著的权衡。速度: 当读取 CSV 的全部内容时,
TableReader往往比StreamingReader更快,因为它能更好地利用可用的核心。有关更多详细信息,请参阅 性能。灵活性:
StreamingReader可能被认为不如TableReader灵活,因为它只对读取的第一个块进行类型推断,此后类型被冻结,任何后续块中无法转换为这些类型的数据都将导致错误。请注意,这可以通过将ReadOptions::block_size设置为足够大的值,或者通过使用ConvertOptions::column_types显式设置所需数据类型来解决。
写入 CSV 文件#
CSV 文件写入到 OutputStream。
#include <arrow/csv/api.h>
{
// Oneshot write
// ...
std::shared_ptr<arrow::io::OutputStream> output = ...;
auto write_options = arrow::csv::WriteOptions::Defaults();
if (WriteCSV(table, write_options, output.get()).ok()) {
// Handle writer error...
}
}
{
// Write incrementally
// ...
std::shared_ptr<arrow::io::OutputStream> output = ...;
auto write_options = arrow::csv::WriteOptions::Defaults();
auto maybe_writer = arrow::csv::MakeCSVWriter(output, schema, write_options);
if (!maybe_writer.ok()) {
// Handle writer instantiation error...
}
std::shared_ptr<arrow::ipc::RecordBatchWriter> writer = *maybe_writer;
// Write batches...
if (!writer->WriteRecordBatch(*batch).ok()) {
// Handle write error...
}
if (!writer->Close().ok()) {
// Handle close error...
}
if (!output->Close().ok()) {
// Handle file close error...
}
}
注意
写入器尚不支持所有 Arrow 类型。
列名#
有三种可能的方法从 CSV 文件中推断列名
默认情况下,列名从 CSV 文件的第一行读取
如果设置了
ReadOptions::column_names,它会强制表中的列名为这些值(CSV 文件的第一行被读取为数据)如果
ReadOptions::autogenerate_column_names为 true,列名将自动生成模式“f0”、“f1”...(CSV 文件的第一行被读取为数据)
列选择#
默认情况下,Arrow 读取 CSV 文件中的所有列。您可以使用 ConvertOptions::include_columns 选项缩小列的选择范围。如果 ConvertOptions::include_columns 中的某些列在 CSV 文件中缺失,将发出错误,除非 ConvertOptions::include_missing_columns 为 true,在这种情况下,缺失的列被假定为包含所有空值。
与列名的交互#
如果同时指定了 ReadOptions::column_names 和 ConvertOptions::include_columns,则假定 ReadOptions::column_names 映射到 CSV 列,并且 ConvertOptions::include_columns 是这些列名的一个子集,将作为 Arrow 表的一部分。
数据类型#
默认情况下,CSV 读取器为每个列推断最合适的数据类型。类型推断按顺序考虑以下数据类型:
Null
Int64
Boolean
Date32
Time32(带秒单位)
Timestamp(带秒单位)
Timestamp(带纳秒单位)
Float64
Dictionary<String>(如果
ConvertOptions::auto_dict_encode为 true)Dictionary<Binary>(如果
ConvertOptions::auto_dict_encode为 true)String
Binary
可以通过设置 ConvertOptions::column_types 选项来覆盖选定列的类型推断。可以从以下列表中选择显式数据类型:
Null
所有整数类型
Float32 和 Float64
Decimal128
Boolean
Date32 和 Date64
Time32 和 Time64
Timestamp
Binary 和 Large Binary
String 和 Large String(可选 UTF8 输入验证)
Fixed-Size Binary
Duration(与模式单位匹配的数字字符串,例如,“60000”表示 duration[ms])
字典,索引类型为 Int32,值类型为以下之一:Binary、String、LargeBinary、LargeString、Int32、UInt32、Int64、UInt64、Float32、Float64、Decimal128
其他数据类型不支持从 CSV 值转换,并将报错。
字典推断#
如果启用了类型推断且 ConvertOptions::auto_dict_encode 为 true,CSV 读取器首先尝试将类字符串列转换为字典编码的类字符串数组。当达到 ConvertOptions::auto_dict_max_cardinality 中的阈值时,它将切换到纯类字符串数组。
时间戳推断/解析#
如果启用了类型推断,CSV 读取器首先尝试将类字符串列解释为时间戳。如果所有行都有时区偏移(例如 Z 或 +0100),即使偏移量不一致,则推断的类型将是 UTC 时间戳。如果没有行有时区偏移,则推断的类型将是没有时区的时间戳。带有/不带偏移的混合行将导致字符串列。
如果类型显式指定为带/不带时区的时间戳,则读取器将对该列中不带/带时区偏移的值报错。请注意,这意味着当前无法让读取器将不带时区偏移的时间戳列解析为特定时区的本地时间;相反,将该列解析为不带时区的时间戳,然后使用 assume_timezone 计算函数转换值。
指定类型 |
输入 CSV |
结果类型 |
|---|---|---|
(推断) |
|
timestamp[s] |
|
timestamp[s, UTC] |
|
|
||
2021-01-01T00:00:00
2021-01-01T00:00:00Z
|
string |
|
timestamp[s] |
|
timestamp[s] |
|
(错误) |
|
|
||
2021-01-01T00:00:00
2021-01-01T00:00:00Z
|
||
timestamp[s, UTC] |
|
(错误) |
|
timestamp[s, UTC] |
|
|
||
2021-01-01T00:00:00
2021-01-01T00:00:00Z
|
(错误) |
|
timestamp[s, America/New_York] |
|
(错误) |
|
timestamp[s, America/New_York] |
|
|
||
2021-01-01T00:00:00
2021-01-01T00:00:00Z
|
(错误) |
空值#
空值通过存储在 ConvertOptions::null_values 中的拼写识别。ConvertOptions::Defaults() 工厂方法将初始化一些常规的空值拼写,例如 N/A。
字符编码#
CSV 文件应使用 UTF8 编码。但是,Binary 列接受非 UTF8 数据。
写入选项#
写入的 CSV 文件的格式可以通过 WriteOptions 进行自定义。目前可用选项很少;未来版本将添加更多选项。
性能#
默认情况下,TableReader 将并行读取以利用机器上的所有 CPU 核心。您可以在 ReadOptions::use_threads 中更改此设置。在一个性能良好的台式机或笔记本电脑上,合理的预期是每个核心至少 100 MB/秒(以源 CSV 字节而不是目标 Arrow 数据字节衡量)。