读取和写入 Parquet 文件#
另请参阅
Parquet 格式是一种针对复杂数据的高效列式存储格式。Parquet C++ 实现是 Apache Arrow 项目的一部分,并受益于与 Arrow C++ 类和设施的紧密集成。
读取 Parquet 文件#
arrow::FileReader 类将数据读取到 Arrow Table(表)和 Record Batch(记录批次)中。
StreamReader 类允许使用 C++ 输入流的方式按列和按行读取字段。提供此方法是为了方便使用和确保类型安全。当然,当数据必须在文件被增量读取和写入时进行流式处理时,它也非常有用。
请注意,由于类型检查以及列值是逐个处理的,StreamReader 的性能不会那么好。
FileReader#
要将 Parquet 数据读取到 Arrow 结构中,请使用 arrow::FileReader。要构造它,需要一个表示输入文件的 ::arrow::io::RandomAccessFile 实例。要一次性读取整个文件,请使用 arrow::FileReader::ReadTable()。
// #include "arrow/io/api.h"
// #include "parquet/arrow/reader.h"
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::RandomAccessFile> input;
ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(path_to_file));
// Open Parquet file reader
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
ARROW_ASSIGN_OR_RAISE(arrow_reader, parquet::arrow::OpenFile(input, pool));
// Read entire file as a single Arrow table
std::shared_ptr<arrow::Table> table;
ARROW_ASSIGN_OR_RAISE(table, arrow_reader->ReadTable());
通过 arrow::FileReaderBuilder 辅助类可以使用更细粒度的选项,该类接受 ReaderProperties 和 ArrowReaderProperties 类。
要作为批次流进行读取,请使用 arrow::FileReader::GetRecordBatchReader() 方法来获取 arrow::RecordBatchReader。它将使用 ArrowReaderProperties 中设置的批次大小。
// #include "arrow/io/api.h"
// #include "parquet/arrow/reader.h"
arrow::MemoryPool* pool = arrow::default_memory_pool();
// Configure general Parquet reader settings
auto reader_properties = parquet::ReaderProperties(pool);
reader_properties.set_buffer_size(4096 * 4);
reader_properties.enable_buffered_stream();
// Configure Arrow-specific Parquet reader settings
auto arrow_reader_props = parquet::ArrowReaderProperties();
arrow_reader_props.set_batch_size(128 * 1024); // default 64 * 1024
parquet::arrow::FileReaderBuilder reader_builder;
ARROW_RETURN_NOT_OK(
reader_builder.OpenFile(path_to_file, /*memory_map=*/false, reader_properties));
reader_builder.memory_pool(pool);
reader_builder.properties(arrow_reader_props);
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
ARROW_ASSIGN_OR_RAISE(arrow_reader, reader_builder.Build());
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
ARROW_ASSIGN_OR_RAISE(rb_reader, arrow_reader->GetRecordBatchReader());
for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch : *rb_reader) {
// Operate on each batch...
}
另请参阅
对于读取多文件数据集或下推过滤器以修剪行组,请参阅 Tabular Datasets(表格数据集)。
性能与内存效率#
对于远程文件系统,请使用读取合并(预缓冲)来减少 API 调用次数。
auto arrow_reader_props = parquet::ArrowReaderProperties();
reader_properties.set_prebuffer(true);
默认设置通常已调整以获得良好的性能,但并行列解码默认是关闭的。可以在 ArrowReaderProperties 的构造函数中启用它。
auto arrow_reader_props = parquet::ArrowReaderProperties(/*use_threads=*/true);
如果内存效率比性能更重要,那么:
不要在
parquet::ArrowReaderProperties中开启读取合并(预缓冲)。使用
arrow::FileReader::GetRecordBatchReader()分批读取数据。在
parquet::ReaderProperties中开启enable_buffered_stream。
此外,如果您知道某些列包含许多重复值,可以将它们读取为字典编码列。这可以通过 ArrowReaderProperties 上的 set_read_dictionary 设置来启用。如果这些文件是用 Arrow C++ 编写且激活了 store_schema,则原始 Arrow 模式(Schema)将自动被读取并覆盖此设置。
StreamReader#
StreamReader 允许使用标准 C++ 输入运算符读取 Parquet 文件,这确保了类型安全。
请注意,类型必须与模式完全匹配,即如果模式字段是无符号 16 位整数,则必须提供 uint16_t 类型。
异常用于标识错误。在以下情况下会抛出 ParquetException:
尝试通过提供不正确的类型来读取字段。
尝试读取超出行末尾。
尝试读取超出文件末尾。
#include "arrow/io/file.h"
#include "parquet/stream_reader.h"
{
std::shared_ptr<arrow::io::ReadableFile> infile;
PARQUET_ASSIGN_OR_THROW(
infile,
arrow::io::ReadableFile::Open("test.parquet"));
parquet::StreamReader stream{parquet::ParquetFileReader::Open(infile)};
std::string article;
float price;
uint32_t quantity;
while ( !stream.eof() )
{
stream >> article >> price >> quantity >> parquet::EndRow;
// ...
}
}
写入 Parquet 文件#
WriteTable#
arrow::WriteTable() 函数将整个 ::arrow::Table 写入输出文件。
// #include "parquet/arrow/writer.h"
// #include "arrow/util/type_fwd.h"
using parquet::ArrowWriterProperties;
using parquet::WriterProperties;
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, GetTable());
// Choose compression
std::shared_ptr<WriterProperties> props =
WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build();
// Opt to store Arrow schema for easier reads back into Arrow
std::shared_ptr<ArrowWriterProperties> arrow_props =
ArrowWriterProperties::Builder().store_schema()->build();
std::shared_ptr<arrow::io::FileOutputStream> outfile;
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open(path_to_file));
ARROW_RETURN_NOT_OK(
parquet::arrow::WriteTable(*table.get(), arrow::default_memory_pool(), outfile,
/*chunk_size=*/64 * 1024, props, arrow_props));
注意
在 C++ 中,列压缩默认是关闭的。请参阅下方了解如何在写入器属性中选择压缩编解码器。
要逐批次写入数据,请使用 arrow::FileWriter。
// #include "parquet/arrow/writer.h"
// #include "arrow/util/type_fwd.h"
using parquet::ArrowWriterProperties;
using parquet::WriterProperties;
// Data is in RBR
std::shared_ptr<arrow::RecordBatchReader> batch_stream;
ARROW_ASSIGN_OR_RAISE(batch_stream, GetRBR());
// Choose compression
std::shared_ptr<WriterProperties> props =
WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build();
// Opt to store Arrow schema for easier reads back into Arrow
std::shared_ptr<ArrowWriterProperties> arrow_props =
ArrowWriterProperties::Builder().store_schema()->build();
// Create a writer
std::shared_ptr<arrow::io::FileOutputStream> outfile;
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open(path_to_file));
std::unique_ptr<parquet::arrow::FileWriter> writer;
ARROW_ASSIGN_OR_RAISE(
writer, parquet::arrow::FileWriter::Open(*batch_stream->schema().get(),
arrow::default_memory_pool(), outfile,
props, arrow_props));
// Write each batch as a row_group
for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch : *batch_stream) {
ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
ARROW_ASSIGN_OR_RAISE(auto table,
arrow::Table::FromRecordBatches(batch->schema(), {batch}));
ARROW_RETURN_NOT_OK(writer->WriteTable(*table.get(), batch->num_rows()));
}
// Write file footer and close
ARROW_RETURN_NOT_OK(writer->Close());
StreamWriter#
StreamWriter 允许使用标准 C++ 输出运算符写入 Parquet 文件,类似于使用 StreamReader 类进行读取。这种类型安全的方法还确保写入行时不会省略字段,并允许通过使用 EndRowGroup 流修饰符自动(在一定数据量后)或显式创建新的行组。
异常用于标识错误。在以下情况下会抛出 ParquetException:
尝试使用不正确的类型写入字段。
尝试在一行中写入过多的字段。
尝试跳过必需字段。
#include "arrow/io/file.h"
#include "parquet/stream_writer.h"
{
std::shared_ptr<arrow::io::FileOutputStream> outfile;
PARQUET_ASSIGN_OR_THROW(
outfile,
arrow::io::FileOutputStream::Open("test.parquet"));
parquet::WriterProperties::Builder builder;
std::shared_ptr<parquet::schema::GroupNode> schema;
// Set up builder with required compression type etc.
// Define schema.
// ...
parquet::StreamWriter os{
parquet::ParquetFileWriter::Open(outfile, schema, builder.build())};
// Loop over some data structure which provides the required
// fields to be written and write each row.
for (const auto& a : getArticles())
{
os << a.name() << a.price() << a.quantity() << parquet::EndRow;
}
}
写入器属性#
要配置 Parquet 文件的写入方式,请使用 WriterProperties::Builder。
#include "parquet/arrow/writer.h"
#include "arrow/util/type_fwd.h"
using parquet::WriterProperties;
using parquet::ParquetVersion;
using parquet::ParquetDataPageVersion;
using arrow::Compression;
std::shared_ptr<WriterProperties> props = WriterProperties::Builder()
.max_row_group_length(64 * 1024)
.created_by("My Application")
.version(ParquetVersion::PARQUET_2_6)
.data_page_version(ParquetDataPageVersion::V2)
.compression(Compression::SNAPPY)
.build();
max_row_group_length 设置每个行组的最大行数上限,其优先级高于在写入方法中传递的 chunk_size。
您可以使用 version 设置要写入的 Parquet 版本,这决定了哪些逻辑类型可用。此外,您可以使用 data_page_version 设置数据页版本。默认是 V1;设置为 V2 将允许更优的压缩(跳过那些没有空间效益的页面压缩),但并非所有读取器都支持此数据页版本。
压缩默认是关闭的,但为了充分利用 Parquet,您还应该选择一种压缩编解码器。您可以为整个文件选择一个,也可以为各个列分别选择。如果您选择了混合方式,则文件级选项将应用于没有特定压缩编解码器的列。有关选项,请参阅 ::arrow::Compression。
列数据编码同样可以应用于文件级或列级。默认情况下,写入器将尝试对所有支持的列进行字典编码,除非字典变得太大。这种行为可以在文件级或列级通过 disable_dictionary() 进行更改。如果不使用字典编码,它将回退到为该列或整个文件设置的编码;默认为 Encoding::PLAIN,但这可以通过 encoding() 进行更改。
#include "parquet/arrow/writer.h"
#include "arrow/util/type_fwd.h"
using parquet::WriterProperties;
using arrow::Compression;
using parquet::Encoding;
std::shared_ptr<WriterProperties> props = WriterProperties::Builder()
.compression(Compression::SNAPPY) // Fallback
->compression("colA", Compression::ZSTD) // Only applies to column "colA"
->encoding(Encoding::BIT_PACKED) // Fallback
->encoding("colB", Encoding::RLE) // Only applies to column "colB"
->disable_dictionary("colB") // Never dictionary-encode column "colB"
->build();
默认情况下,所有列都启用了统计信息。您可以使用构建器上的 disable_statistics 禁用所有列或特定列的统计信息。有一个 max_statistics_size,它限制了可用于最小值和最大值的最大字节数,这对字符串或二进制 blob 等类型很有用。如果列使用 enable_write_page_index 启用了页面索引,则它不会将统计信息写入页面标题,因为它已在 ColumnIndex 中重复。
还有一些特定于 Arrow 的设置,可以通过 parquet::ArrowWriterProperties 进行配置。
#include "parquet/arrow/writer.h"
using parquet::ArrowWriterProperties;
std::shared_ptr<ArrowWriterProperties> arrow_props = ArrowWriterProperties::Builder()
.enable_deprecated_int96_timestamps() // default False
->store_schema() // default False
->build();
这些选项主要决定了 Arrow 类型如何转换为 Parquet 类型。开启 store_schema 将使写入器将序列化的 Arrow 模式存储在文件元数据中。由于 Parquet 模式和 Arrow 模式之间没有双射关系,存储 Arrow 模式允许 Arrow 读取器更忠实地重建原始数据。这种从 Parquet 类型回溯到原始 Arrow 类型的映射包括:
读取带有原始时区信息的的时间戳(Parquet 不支持时区);
从存储类型读取 Arrow 类型(例如从 int64 列读取 Duration);
将字符串和二进制列读回具有 64 位偏移量的大变体;
将列作为字典编码读回(Arrow 列和序列化的 Parquet 版本是否进行字典编码是独立的)。
支持的 Parquet 特性#
Parquet 格式有许多特性,Parquet C++ 支持其中的一部分。
页面类型#
页面类型 |
备注 |
|---|---|
DATA_PAGE |
|
DATA_PAGE_V2 |
|
DICTIONARY_PAGE |
不支持的页面类型: INDEX_PAGE。读取 Parquet 文件时,此类页面将被忽略。
压缩#
压缩编解码器 |
备注 |
|---|---|
SNAPPY |
|
GZIP |
|
BROTLI |
|
LZ4 |
(1) |
ZSTD |
(1) 在读取侧,Parquet C++ 能够解压普通的 LZ4 块格式以及由参考 Parquet 实现使用的特定于 Hadoop 的 LZ4 格式。在写入侧,Parquet C++ 始终生成特定于 Hadoop 的 LZ4 格式。
不支持的压缩编解码器: LZO。
编码#
编码 |
读取 |
写入 |
备注 |
|---|---|---|---|
PLAIN |
✓ |
✓ |
|
PLAIN_DICTIONARY |
✓ |
✓ |
|
BIT_PACKED |
✓ |
✓ |
(1) |
RLE |
✓ |
✓ |
(1) |
RLE_DICTIONARY |
✓ |
✓ |
(2) |
BYTE_STREAM_SPLIT |
✓ |
✓ |
|
DELTA_BINARY_PACKED |
✓ |
✓ |
|
DELTA_BYTE_ARRAY |
✓ |
✓ |
|
DELTA_LENGTH_BYTE_ARRAY |
✓ |
✓ |
(1) 仅支持用于编码定义和重复级别以及布尔值。
(2) 在写入路径上,仅当在
WriterProperties::version()中选择了 Parquet 格式 2.4 或更高版本时,才会启用 RLE_DICTIONARY。
类型#
物理类型#
物理类型 |
映射的 Arrow 类型 |
备注 |
|---|---|---|
BOOLEAN |
Boolean |
|
INT32 |
Int32 / 其他 |
(1) |
INT64 |
Int64 / 其他 |
(1) |
INT96 |
Timestamp(纳秒) |
(2) |
FLOAT |
Float32 |
|
DOUBLE |
Float64 |
|
BYTE_ARRAY |
Binary / LargeBinary / BinaryView |
(1) |
FIXED_LENGTH_BYTE_ARRAY |
FixedSizeBinary / 其他 |
(1) |
(1) 可以映射到其他 Arrow 类型,具体取决于逻辑类型(请参阅下表)。
(2) 在写入侧,必须启用
ArrowWriterProperties::support_deprecated_int96_timestamps()。
逻辑类型#
特定的逻辑类型可以覆盖给定物理类型的默认 Arrow 类型映射。
逻辑类型 |
物理类型 |
映射的 Arrow 类型 |
备注 |
|---|---|---|---|
NULL |
任何 |
Null |
(1) |
INT |
INT32 |
Int8 / UInt8 / Int16 / UInt16 / Int32 / UInt32 |
|
INT |
INT64 |
Int64 / UInt64 |
|
DECIMAL |
INT32 / INT64 / BYTE_ARRAY / FIXED_LENGTH_BYTE_ARRAY |
Decimal32/ Decimal64 / Decimal128 / Decimal256 |
(2) |
DATE |
INT32 |
Date32 |
(3) |
TIME |
INT32 |
Time32(毫秒) |
|
TIME |
INT64 |
Time64(微秒或纳秒) |
|
TIMESTAMP |
INT64 |
Timestamp(毫秒、微秒或纳秒) |
|
STRING |
BYTE_ARRAY |
String / LargeString / StringView |
|
LIST |
任何 |
List / LargeList |
(4) |
MAP |
任何 |
Map |
(5) |
FLOAT16 |
FIXED_LENGTH_BYTE_ARRAY |
HalfFloat |
|
UUID |
FIXED_LENGTH_BYTE_ARRAY |
Extension ( |
(6) |
JSON |
BYTE_ARRAY |
Extension ( |
(6) |
GEOMETRY |
BYTE_ARRAY |
Extension ( |
(6) (7) |
GEOGRAPHY |
BYTE_ARRAY |
Extension ( |
(6) (7) |
(1) 在写入侧,生成 Parquet 物理类型 INT32。
(2) 在写入侧,始终发出 FIXED_LENGTH_BYTE_ARRAY,除非
store_decimal_as_integer设置为 true。(3) 在写入侧,Arrow Date64 也会映射到 Parquet DATE INT32。
(4) 在写入侧,Arrow FixedSizedList 也会映射到 Parquet LIST。
(5) 在读取侧,具有多个值的键不会被去重,这与 Parquet 规范相矛盾。
(6) 要求
ArrowReaderProperties中的arrow_extensions_enabled为true。当为false时,将读取底层存储类型。(7) 要求已注册
geoarrow.wkb扩展类型。
不支持的逻辑类型: BSON。如果在读取 Parquet 文件时遇到此类类型,则使用默认的物理类型映射(例如,Parquet BSON 列可能会作为 Arrow Binary 或 FixedSizeBinary 读取)。
转换类型#
虽然转换类型在 Parquet 格式中已被弃用(它们已被逻辑类型取代),但 Parquet C++ 实现会识别并发出它们,以最大限度地提高与其他 Parquet 实现的兼容性。
特殊情况#
Arrow Extension 类型将作为其存储类型写出。它仍然可以在读取时使用 Parquet 元数据重建(请参阅下文“Roundtripping Arrow types”)。某些扩展类型具有 Parquet 逻辑类型等效项(例如 UUID、JSON、GEOMETRY、GEOGRAPHY)。即使在 Parquet 元数据中没有存储 Arrow 模式,如果设置了 ArrowReaderProperties 中的适当选项,它们也会自动创建。
Arrow Dictionary 类型将作为其值类型写出。它仍然可以在读取时使用 Parquet 元数据重建。
Roundtripping Arrow 类型和模式#
虽然 Arrow 类型和 Parquet 类型之间没有双射关系,但可以将 Arrow 模式作为 Parquet 文件元数据的一部分进行序列化。这是通过 ArrowWriterProperties::store_schema() 启用的。
在读取路径上,序列化的模式将被自动识别,并将重建原始 Arrow 数据,根据需要转换 Parquet 数据。
例如,当将 Arrow LargeList 序列化为 Parquet 时:
数据被写为 Parquet LIST。
读回时,如果写入文件时启用了
ArrowWriterProperties::store_schema(),则 Parquet LIST 数据被解码为 Arrow LargeList;否则,它被解码为 Arrow List。
Parquet 字段 ID#
Parquet 格式支持一个可选的整数 字段 ID,可以分配给给定的字段。这用于例如 Apache Iceberg 规范中。
在写入侧,如果 PARQUET:field_id 作为元数据键存在于 Arrow 字段上,则其值将被解析为非负整数,并用作相应 Parquet 字段的字段 ID。
在读取侧,Arrow 会将此类字段 ID 转换为相应 Arrow 字段上名为 PARQUET:field_id 的元数据键。
序列化细节#
Arrow 模式被序列化为 Arrow IPC 模式消息,然后进行 base64 编码,并存储在 Parquet 文件元数据的 ARROW:schema 元数据键下。
限制#
不支持写入或读回带有空条目的 FixedSizedList 数据。
加密#
Parquet C++ 实现了加密规范中指定的所有特性,但列索引和布隆过滤器模块的加密除外。
更具体地说,Parquet C++ 支持:
AES_GCM_V1 和 AES_GCM_CTR_V1 加密算法。
用于页脚、ColumnMetaData、数据页、字典页、数据页头、字典页头模块类型的 AAD 后缀。不支持其他模块类型(ColumnIndex, OffsetIndex, BloomFilter Header, BloomFilter Bitset)。
EncryptionWithFooterKey 和 EncryptionWithColumnKey 模式。
加密页脚和明文页脚模式。
配置#
Parquet 加密使用一个 parquet::encryption::CryptoFactory,它可以访问密钥管理系统 (KMS),该系统存储实际的加密密钥,并按密钥 ID 引用。Parquet 加密配置仅使用密钥 ID,不使用实际密钥。
Parquet 元数据加密通过 parquet::encryption::EncryptionConfiguration 进行配置。
// Set write options with encryption configuration.
auto encryption_config = std::make_shared<parquet::encryption::EncryptionConfiguration>(
std::string("footerKeyId"));
如果 encryption_config->uniform_encryption 设置为 true,则所有列都使用与 Parquet 元数据相同的密钥进行加密。否则,各个列使用通过 encryption_config->column_keys 配置的单独密钥进行加密。此字段期望格式为 "columnKeyID1:colName1,colName2;columnKeyID3:colName3..." 的字符串。
// Set write options with encryption configuration.
auto encryption_config = std::make_shared<parquet::encryption::EncryptionConfiguration>(
std::string("footerKeyId"));
encryption_config->column_keys = "columnKeyId: i, s, m, l";
请参阅完整的 Parquet 列加密示例。
注意
具有嵌套字段(结构体或映射数据类型)的列可以整体加密,也可以仅加密单个字段。为根列名配置加密密钥以使用该密钥加密所有嵌套字段,或为单个叶子嵌套字段配置密钥。
按照惯例,映射列 m 的键和值字段名称分别为 m.key_value.key 和 m.key_value.value。结构体列 s 的内部字段 f 名称为 s.f。
在上述示例中,通过分别为列 m 和 s 配置该密钥,所有内部字段都使用相同的密钥加密。
杂项#
功能 |
读取 |
写入 |
备注 |
|---|---|---|---|
列索引 |
✓ |
✓ |
(1) |
偏移索引 |
✓ |
✓ |
(1) |
布隆过滤器 |
✓ |
✓ |
(1) |
CRC 校验和 |
✓ |
✓ |
(1) 提供了对列索引、偏移索引和布隆过滤器结构的访问,但当前数据读取 API 尚未使用它们。