读取和写入 Parquet 文件#

Parquet 格式是一种用于复杂数据的节省空间的列式存储格式。Parquet C++ 实现是 Apache Arrow 项目的一部分,并受益于与 Arrow C++ 类和工具的紧密集成。

读取 Parquet 文件#

arrow::FileReader 类将数据读取到 Arrow 表和记录批次中。

StreamReader 类允许使用 C++ 输入流方法逐列逐行读取字段。此方法提供易用性和类型安全。当然,当数据必须在文件增量读取和写入时进行流式传输时,它也很有用。

请注意,由于类型检查和列值一次处理一个的事实,StreamReader 的性能不会那么好。

FileReader#

要将 Parquet 数据读取到 Arrow 结构中,请使用 arrow::FileReader。要构造,它需要一个表示输入文件的 ::arrow::io::RandomAccessFile 实例。要一次性读取整个文件,请使用 arrow::FileReader::ReadTable()

// #include "arrow/io/api.h"
// #include "parquet/arrow/reader.h"

arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::io::RandomAccessFile> input;
ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open(path_to_file));

// Open Parquet file reader
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
ARROW_ASSIGN_OR_RAISE(arrow_reader, parquet::arrow::OpenFile(input, pool));

// Read entire file as a single Arrow table
std::shared_ptr<arrow::Table> table;
ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table));

更细粒度的选项可通过 arrow::FileReaderBuilder 辅助类获得,该类接受 ReaderPropertiesArrowReaderProperties 类。

要以批次流的形式读取,请使用 arrow::FileReader::GetRecordBatchReader() 方法检索 arrow::RecordBatchReader。它将使用 ArrowReaderProperties 中设置的批次大小。

// #include "arrow/io/api.h"
// #include "parquet/arrow/reader.h"

arrow::MemoryPool* pool = arrow::default_memory_pool();

// Configure general Parquet reader settings
auto reader_properties = parquet::ReaderProperties(pool);
reader_properties.set_buffer_size(4096 * 4);
reader_properties.enable_buffered_stream();

// Configure Arrow-specific Parquet reader settings
auto arrow_reader_props = parquet::ArrowReaderProperties();
arrow_reader_props.set_batch_size(128 * 1024);  // default 64 * 1024

parquet::arrow::FileReaderBuilder reader_builder;
ARROW_RETURN_NOT_OK(
    reader_builder.OpenFile(path_to_file, /*memory_map=*/false, reader_properties));
reader_builder.memory_pool(pool);
reader_builder.properties(arrow_reader_props);

std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
ARROW_ASSIGN_OR_RAISE(arrow_reader, reader_builder.Build());

std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
ARROW_ASSIGN_OR_RAISE(rb_reader, arrow_reader->GetRecordBatchReader());

for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch : *rb_reader) {
  // Operate on each batch...
}

另请参阅

要读取多文件数据集或将过滤器下推以修剪行组,请参阅 表格数据集

性能和内存效率#

对于远程文件系统,使用读取合并(预缓冲)以减少 API 调用次数

auto arrow_reader_props = parquet::ArrowReaderProperties();
reader_properties.set_prebuffer(true);

默认值通常经过调整以获得良好的性能,但默认情况下禁用并行列解码。在 ArrowReaderProperties 的构造函数中启用它

auto arrow_reader_props = parquet::ArrowReaderProperties(/*use_threads=*/true);

如果内存效率比性能更重要,那么

  1. 不要在 parquet::ArrowReaderProperties 中打开读取合并(预缓冲)。

  2. 使用 arrow::FileReader::GetRecordBatchReader() 分批读取数据。

  3. parquet::ReaderProperties 中打开 enable_buffered_stream

此外,如果您知道某些列包含许多重复值,您可以将它们读取为字典编码列。这通过 ArrowReaderProperties 上的 set_read_dictionary 设置启用。如果文件是用 Arrow C++ 编写的并且 store_schema 已激活,那么原始 Arrow 模式将自动读取并覆盖此设置。

StreamReader#

StreamReader 允许使用标准 C++ 输入运算符读取 Parquet 文件,从而确保类型安全。

请注意,类型必须与模式完全匹配,即如果模式字段是无符号 16 位整数,则必须提供 uint16_t 类型。

异常用于表示错误。ParquetException 在以下情况下抛出

  • 尝试通过提供不正确的类型来读取字段。

  • 尝试读取超出行尾。

  • 尝试读取超出文件末尾。

#include "arrow/io/file.h"
#include "parquet/stream_reader.h"

{
   std::shared_ptr<arrow::io::ReadableFile> infile;

   PARQUET_ASSIGN_OR_THROW(
      infile,
      arrow::io::ReadableFile::Open("test.parquet"));

   parquet::StreamReader stream{parquet::ParquetFileReader::Open(infile)};

   std::string article;
   float price;
   uint32_t quantity;

   while ( !stream.eof() )
   {
      stream >> article >> price >> quantity >> parquet::EndRow;
      // ...
   }
}

写入 Parquet 文件#

WriteTable#

arrow::WriteTable() 函数将整个 ::arrow::Table 写入输出文件。

// #include "parquet/arrow/writer.h"
// #include "arrow/util/type_fwd.h"
using parquet::ArrowWriterProperties;
using parquet::WriterProperties;

ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, GetTable());

// Choose compression
std::shared_ptr<WriterProperties> props =
    WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build();

// Opt to store Arrow schema for easier reads back into Arrow
std::shared_ptr<ArrowWriterProperties> arrow_props =
    ArrowWriterProperties::Builder().store_schema()->build();

std::shared_ptr<arrow::io::FileOutputStream> outfile;
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open(path_to_file));

ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table.get(),
                                               arrow::default_memory_pool(), outfile,
                                               /*chunk_size=*/3, props, arrow_props));

注意

C++ 中默认关闭列压缩。有关如何在写入器属性中选择压缩编解码器,请参阅下文

要按批次写入数据,请使用 arrow::FileWriter

// #include "parquet/arrow/writer.h"
// #include "arrow/util/type_fwd.h"
using parquet::ArrowWriterProperties;
using parquet::WriterProperties;

// Data is in RBR
std::shared_ptr<arrow::RecordBatchReader> batch_stream;
ARROW_ASSIGN_OR_RAISE(batch_stream, GetRBR());

// Choose compression
std::shared_ptr<WriterProperties> props =
    WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build();

// Opt to store Arrow schema for easier reads back into Arrow
std::shared_ptr<ArrowWriterProperties> arrow_props =
    ArrowWriterProperties::Builder().store_schema()->build();

// Create a writer
std::shared_ptr<arrow::io::FileOutputStream> outfile;
ARROW_ASSIGN_OR_RAISE(outfile, arrow::io::FileOutputStream::Open(path_to_file));
std::unique_ptr<parquet::arrow::FileWriter> writer;
ARROW_ASSIGN_OR_RAISE(
    writer, parquet::arrow::FileWriter::Open(*batch_stream->schema().get(),
                                             arrow::default_memory_pool(), outfile,
                                             props, arrow_props));

// Write each batch as a row_group
for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch : *batch_stream) {
  ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
  ARROW_ASSIGN_OR_RAISE(auto table,
                        arrow::Table::FromRecordBatches(batch->schema(), {batch}));
  ARROW_RETURN_NOT_OK(writer->WriteTable(*table.get(), batch->num_rows()));
}

// Write file footer and close
ARROW_RETURN_NOT_OK(writer->Close());

StreamWriter#

StreamWriter 允许使用标准 C++ 输出运算符写入 Parquet 文件,类似于使用 StreamReader 类进行读取。这种类型安全的方法还确保行写入时不会遗漏字段,并允许自动创建新的行组(在一定数据量之后)或通过使用 EndRowGroup 流修饰符显式创建。

异常用于表示错误。ParquetException 在以下情况下抛出

  • 尝试使用不正确的类型写入字段。

  • 尝试在一行中写入过多字段。

  • 尝试跳过必填字段。

#include "arrow/io/file.h"
#include "parquet/stream_writer.h"

{
   std::shared_ptr<arrow::io::FileOutputStream> outfile;

   PARQUET_ASSIGN_OR_THROW(
      outfile,
      arrow::io::FileOutputStream::Open("test.parquet"));

   parquet::WriterProperties::Builder builder;
   std::shared_ptr<parquet::schema::GroupNode> schema;

   // Set up builder with required compression type etc.
   // Define schema.
   // ...

   parquet::StreamWriter os{
      parquet::ParquetFileWriter::Open(outfile, schema, builder.build())};

   // Loop over some data structure which provides the required
   // fields to be written and write each row.
   for (const auto& a : getArticles())
   {
      os << a.name() << a.price() << a.quantity() << parquet::EndRow;
   }
}

写入器属性#

要配置如何写入 Parquet 文件,请使用 WriterProperties::Builder

#include "parquet/arrow/writer.h"
#include "arrow/util/type_fwd.h"

using parquet::WriterProperties;
using parquet::ParquetVersion;
using parquet::ParquetDataPageVersion;
using arrow::Compression;

std::shared_ptr<WriterProperties> props = WriterProperties::Builder()
   .max_row_group_length(64 * 1024)
   .created_by("My Application")
   .version(ParquetVersion::PARQUET_2_6)
   .data_page_version(ParquetDataPageVersion::V2)
   .compression(Compression::SNAPPY)
   .build();

max_row_group_length 设置每个行组的最大行数上限,它优先于写入方法中传递的 chunk_size

您可以使用 version 设置要写入的 Parquet 版本,这决定了可用的逻辑类型。此外,您可以使用 data_page_version 设置数据页版本。默认情况下是 V1;设置为 V2 将允许更优化的压缩(跳过没有空间效益的压缩页面),但并非所有读取器都支持此数据页版本。

默认情况下压缩是关闭的,但要充分利用 Parquet,您还应该选择一个压缩编解码器。您可以为整个文件选择一个,也可以为单个列选择一个。如果您选择混合,文件级选项将应用于没有特定压缩编解码器的列。有关选项,请参阅 ::arrow::Compression

列数据编码同样可以应用于文件级或列级。默认情况下,写入器将尝试对所有支持的列进行字典编码,除非字典变得太大。此行为可以在文件级或列级通过 disable_dictionary() 更改。当不使用字典编码时,它将回退到为列或整个文件设置的编码;默认情况下是 Encoding::PLAIN,但这可以通过 encoding() 更改。

#include "parquet/arrow/writer.h"
#include "arrow/util/type_fwd.h"

using parquet::WriterProperties;
using arrow::Compression;
using parquet::Encoding;

std::shared_ptr<WriterProperties> props = WriterProperties::Builder()
  .compression(Compression::SNAPPY)        // Fallback
  ->compression("colA", Compression::ZSTD) // Only applies to column "colA"
  ->encoding(Encoding::BIT_PACKED)         // Fallback
  ->encoding("colB", Encoding::RLE)        // Only applies to column "colB"
  ->disable_dictionary("colB")             // Never dictionary-encode column "colB"
  ->build();

默认情况下,所有列都启用统计信息。您可以使用构建器上的 disable_statistics 禁用所有列或特定列的统计信息。有一个 max_statistics_size 限制了可用于最小值和最大值的最大字节数,这对于字符串或二进制大对象等类型很有用。如果列使用 enable_write_page_index 启用了页索引,则它不会将统计信息写入页眉,因为它在 ColumnIndex 中重复。

还有一些 Arrow 特定的设置可以使用 parquet::ArrowWriterProperties 进行配置

#include "parquet/arrow/writer.h"

using parquet::ArrowWriterProperties;

std::shared_ptr<ArrowWriterProperties> arrow_props = ArrowWriterProperties::Builder()
   .enable_deprecated_int96_timestamps() // default False
   ->store_schema() // default False
   ->build();

这些选项主要决定 Arrow 类型如何转换为 Parquet 类型。打开 store_schema 将导致写入器将序列化的 Arrow 模式存储在文件元数据中。由于 Parquet 模式和 Arrow 模式之间没有双射,存储 Arrow 模式允许 Arrow 读取器更忠实地重新创建原始数据。此从 Parquet 类型到原始 Arrow 类型的映射包括

  • 读取带有原始时区信息的时间戳(Parquet 不支持时区);

  • 从其存储类型读取 Arrow 类型(例如,从 int64 列读取 Duration);

  • 将字符串和二进制列读回具有 64 位偏移量的大变体;

  • 将列读回为字典编码(Arrow 列和序列化的 Parquet 版本是否字典编码是独立的)。

支持的 Parquet 功能#

Parquet 格式有许多功能,Parquet C++ 支持其中的一部分。

页面类型#

页面类型

备注

DATA_PAGE

DATA_PAGE_V2

DICTIONARY_PAGE

不支持的页面类型: INDEX_PAGE。读取 Parquet 文件时,此类型的页面将被忽略。

压缩#

压缩编解码器

备注

SNAPPY

GZIP

BROTLI

LZ4

(1)

ZSTD

  • (1) 在读取端,Parquet C++ 能够解压缩常规 LZ4 块格式和 参考 Parquet 实现使用的临时 Hadoop LZ4 格式。在写入端,Parquet C++ 始终生成临时 Hadoop LZ4 格式。

不支持的压缩编解码器: LZO。

编码#

编码

读取

写入

备注

PLAIN

PLAIN_DICTIONARY

BIT_PACKED

(1)

RLE

(1)

RLE_DICTIONARY

(2)

BYTE_STREAM_SPLIT

DELTA_BINARY_PACKED

DELTA_BYTE_ARRAY

DELTA_LENGTH_BYTE_ARRAY

  • (1) 仅支持编码定义和重复级别,以及布尔值。

  • (2) 在写入路径上,仅当在 WriterProperties::version() 中选择 Parquet 格式版本 2.4 或更高版本时才启用 RLE_DICTIONARY。

类型#

物理类型#

物理类型

映射的 Arrow 类型

备注

BOOLEAN

Boolean

INT32

Int32 / 其他

(1)

INT64

Int64 / 其他

(1)

INT96

时间戳 (纳秒)

(2)

FLOAT

Float32

DOUBLE

Float64

BYTE_ARRAY

Binary / LargeBinary / BinaryView

(1)

FIXED_LENGTH_BYTE_ARRAY

FixedSizeBinary / 其他

(1)

  • (1) 可以映射到其他 Arrow 类型,具体取决于逻辑类型(参见下表)。

  • (2) 在写入端,必须启用 ArrowWriterProperties::support_deprecated_int96_timestamps()

逻辑类型#

特定的逻辑类型可以覆盖给定物理类型的默认 Arrow 类型映射。

逻辑类型

物理类型

映射的 Arrow 类型

备注

NULL

任何

Null

(1)

INT

INT32

Int8 / UInt8 / Int16 / UInt16 / Int32 / UInt32

INT

INT64

Int64 / UInt64

DECIMAL

INT32 / INT64 / BYTE_ARRAY / FIXED_LENGTH_BYTE_ARRAY

Decimal32/ Decimal64 / Decimal128 / Decimal256

(2)

DATE

INT32

Date32

(3)

TIME

INT32

Time32 (毫秒)

TIME

INT64

Time64 (微秒或纳秒)

TIMESTAMP

INT64

时间戳 (毫秒、微秒或纳秒)

STRING

BYTE_ARRAY

String / LargeString / StringView

LIST

任何

List / LargeList

(4)

MAP

任何

Map

(5)

FLOAT16

FIXED_LENGTH_BYTE_ARRAY

HalfFloat

UUID

FIXED_LENGTH_BYTE_ARRAY

扩展 (arrow.uuid)

(6)

JSON

BYTE_ARRAY

扩展 (arrow.json)

(6)

GEOMETRY

BYTE_ARRAY

扩展 (geoarrow.wkb)

(6) (7)

GEOGRAPHY

BYTE_ARRAY

扩展 (geoarrow.wkb)

(6) (7)

  • (1) 在写入端,生成 Parquet 物理类型 INT32。

  • (2) 在写入端,总是发出 FIXED_LENGTH_BYTE_ARRAY,除非 store_decimal_as_integer 设置为 true。

  • (3) 在写入端,Arrow Date64 也映射到 Parquet DATE INT32。

  • (4) 在写入端,Arrow FixedSizedList 也映射到 Parquet LIST。

  • (5) 在读取端,具有多个值的键不会被去重,这与 Parquet 规范相矛盾。

  • (6) 要求 ArrowReaderProperties 中的 arrow_extensions_enabledtrue。当为 false 时,读取底层存储类型。

  • (7) 要求注册 geoarrow.wkb 扩展类型。

不支持的逻辑类型: BSON。如果在读取 Parquet 文件时遇到此类类型,则使用默认的物理类型映射(例如,Parquet BSON 列可以读取为 Arrow Binary 或 FixedSizeBinary)。

转换类型#

尽管转换类型在 Parquet 格式中已弃用(它们已被逻辑类型取代),但 Parquet C++ 实现识别并发出它们,以最大限度地提高与其他 Parquet 实现的兼容性。

特殊情况#

Arrow 扩展类型作为其存储类型写入。它仍然可以在读取时使用 Parquet 元数据重新创建(参见下面的“往返 Arrow 类型”)。某些扩展类型具有 Parquet LogicalType 等效项(例如,UUID、JSON、GEOMETRY、GEOGRAPHY)。如果 ArrowReaderProperties 中设置了适当的选项,即使 Parquet 元数据中没有存储 Arrow 模式,它们也会自动创建。

Arrow 字典类型作为其值类型写入。它仍然可以在读取时使用 Parquet 元数据重新创建(参见下面的“往返 Arrow 类型”)。

往返 Arrow 类型和模式#

虽然 Arrow 类型和 Parquet 类型之间没有双射,但可以将 Arrow 模式序列化为 Parquet 文件元数据的一部分。这通过 ArrowWriterProperties::store_schema() 启用。

在读取路径上,序列化的模式将被自动识别,并重新创建原始 Arrow 数据,根据需要转换 Parquet 数据。

例如,当将 Arrow LargeList 序列化为 Parquet 时

  • 数据作为 Parquet LIST 写入

  • 读回时,如果写入文件时启用了 ArrowWriterProperties::store_schema(),则 Parquet LIST 数据被解码为 Arrow LargeList;否则,它被解码为 Arrow List。

Parquet 字段 ID#

Parquet 格式支持可选的整数字段 ID,可以将其分配给给定字段。例如,这在 Apache Iceberg 规范中使用。

在写入端,如果 PARQUET:field_id 作为 Arrow 字段上的元数据键存在,则其值被解析为非负整数,并用作相应 Parquet 字段的字段 ID。

在读取端,Arrow 会将此类字段 ID 转换为相应 Arrow 字段上的元数据键 PARQUET:field_id

序列化细节#

Arrow 模式被序列化为 Arrow IPC 模式消息,然后进行 base64 编码,并存储在 Parquet 文件元数据中的 ARROW:schema 元数据键下。

限制#

不支持写入或读取带有空条目的 FixedSizedList 数据。

加密#

Parquet C++ 实现了 加密规范中指定的所有功能,除了列索引和布隆过滤器模块的加密。

更具体地说,Parquet C++ 支持

  • AES_GCM_V1 和 AES_GCM_CTR_V1 加密算法。

  • AAD 后缀用于 Footer、ColumnMetaData、Data Page、Dictionary Page、Data PageHeader、Dictionary PageHeader 模块类型。不支持其他模块类型(ColumnIndex、OffsetIndex、BloomFilter Header、BloomFilter Bitset)。

  • EncryptionWithFooterKey 和 EncryptionWithColumnKey 模式。

  • Encrypted Footer 和 Plaintext Footer 模式。

配置#

Parquet 加密使用 parquet::encryption::CryptoFactory,它有权访问密钥管理系统 (KMS),该系统存储实际的加密密钥,并由密钥 ID 引用。Parquet 加密配置仅使用密钥 ID,不使用实际密钥。

Parquet 元数据加密通过 parquet::encryption::EncryptionConfiguration 进行配置

// Set write options with encryption configuration.
auto encryption_config = std::make_shared<parquet::encryption::EncryptionConfiguration>(
    std::string("footerKeyId"));

如果 encryption_config->uniform_encryption 设置为 true,则所有列都使用与 Parquet 元数据相同的密钥加密。否则,单个列将使用 encryption_config->column_keys 配置的单个密钥加密。此字段需要格式为 "columnKeyID1:colName1,colName2;columnKeyID3:colName3..." 的字符串。

// Set write options with encryption configuration.
auto encryption_config = std::make_shared<parquet::encryption::EncryptionConfiguration>(
    std::string("footerKeyId"));
encryption_config->column_keys =
    "columnKeyId: i, s.a, s.b, m.key_value.key, m.key_value.value, l.list.element";

请参阅完整的Parquet 列加密示例

注意

加密具有嵌套字段(结构体、映射或列表数据类型)的列需要内部字段的列密钥,而不是外部列本身。为外部列配置列密钥会导致此错误(此处列名为 col

OSError: Encrypted column col not in file schema

通常,映射列 m 的键和值字段分别具有名称 m.key_value.keym.key_value.value。列表列 l 的内部字段具有名称 l.list.element。结构体列 s 的内部字段 f 具有名称 s.f

杂项#

功能

读取

写入

备注

列索引

(1)

偏移索引

(1)

布隆过滤器

(2)

CRC 校验和

  • (1) 提供了对列和偏移索引结构的访问,但数据读取 API 目前不使用它们。

  • (2) 提供了创建、序列化和反序列化布隆过滤器的 API,但它们未集成到数据读取 API 中。