在 Arrow Rust 中宣布 arrow-avro


发布 2025年10月23日
作者 Connor Sanders (jecsand838)

arrow-avro 是一个新重写的 Rust crate,它能将 Apache Avro 数据直接读取和写入为 Arrow RecordBatch。它支持 Avro 对象容器文件 (OCF)、单对象编码 (SOE)、Confluent Schema Registry 线路格式 以及 Apicurio Registry 线路格式,并支持投影/演化、可调批处理大小和可选的 StringViewArray 以实现更快的字符串处理。其向量化设计减少了复制和缓存未命中,使批处理和流式管道更简单、更快。

动机

Apache Avro 的面向行设计对于一次编码一条记录非常有效,而 Apache Arrow 的列式布局则针对向量化分析进行了优化。一个主要的挑战在于如何在不重新引入行式开销的情况下在这些格式之间进行转换。逐行解码 Avro 然后构建 Arrow 数组会产生额外的分配和对缓存不友好的访问(这正是 Arrow 旨在避免的开销)。在实际应用中,这种开销通常出现在分析热路径中。例如,DataFusion 的 Avro 数据源 目前自带其自己的以行为中心的 Avro 到 Arrow 层。此实现导致了一个开放问题,即 使用上游 arrow-avro 读取器 来简化代码并加快扫描速度。此外,DataFusion 还有一个开放问题,即 支持 Avro 格式写入,该问题以开发上游 arrow-avro 写入器为前提。

为什么不使用现有的 apache-avro crate?

Rust 已经有一个成熟的通用 Avro crate,apache-avro。它将 Avro 记录读取和写入为 Avro 值类型,并提供对象容器文件读取器和写入器。它不直接解码为 Arrow 数组,因此任何 Arrow 集成都必须实现行然后构建列。

我们需要一种补充方法,该方法能逐列直接解码到 Arrow 构建器中并发出 RecordBatch。这将支持投影下推,同时保持执行端到端向量化。对于 Apache DataFusion 等项目,访问成熟的上游 Arrow 本机读取器和写入器将有助于简化代码路径并减少重复。

现代管道加剧了这种需求,因为 Avro 也用于传输,而不仅仅是文件。Kafka 生态系统通常使用 Confluent 的 Schema Registry 帧,许多服务采用 Avro 单对象编码格式。一种能够直接解码到 Arrow 批次(而不是通过每行值)的方法将使下游计算能够以流式速率保持向量化。

为什么这很重要

Apache Avro 是流处理器和云服务中的一流格式

简而言之:Arrow 用户会遇到磁盘(OCF)和传输(SOE)上的 Avro。用于 OCF、SOE 和 Confluent 帧的 Arrow 优先、向量化读/写器消除了普遍存在的瓶颈,并使管道端到端保持列式。

介绍 arrow-avro

arrow-avro 是一个高性能的 Rust crate,它以列优先、批处理导向的设计在 Avro 和 Arrow 之间进行转换。在读取端,它直接将 Avro 对象容器文件 (OCF)、单对象编码 (SOE) 和 Confluent Schema Registry 线路格式解码为 Arrow RecordBatch。同时,写入路径也提供了编码为 OCF 和 SOE 的格式。

该 crate 暴露了两个主要的读取 API:用于 OCF 输入的高级 Reader 和用于流式 SOE 帧的低级 Decoder。对于 SOE 和 Confluent/Apicurio 帧,提供了一个 SchemaStore,它将指纹或 schema ID 解析为完整的 Avro 写入器 schema,从而实现 schema 演化,同时保持解码路径的向量化。

在写入端,AvroWriter 生成 OCF(包括容器级压缩),而 AvroStreamWriter 生成用于单对象或 Confluent/Apicurio 编码的帧 Avro 消息,通过 WriterBuilder::with_fingerprint_strategy(...) 旋钮进行配置。

配置有意地保持最小但实用。例如,ReaderBuilder 暴露了涵盖批量文件摄取和流式系统的旋钮,而无需强制使用特定于格式的代码路径。

这如何反映 Arrow-rs 中的 Parquet

如果您在 Arrow-rs 中使用过 Parquet,您已经了解了这种模式。parquet crate 暴露了一个 parquet::arrow 模块,该模块直接读取和写入 Arrow RecordBatch。大多数用户在读取时使用 ParquetRecordBatchReaderBuilder,在写入时使用 ArrowWriter。您预先选择列,设置批处理大小,读取器会为您提供直接流入向量化操作符的 Arrow 批处理。这是 Rust 中广泛采用的“格式 crate + Arrow 本机桥接”方法。

arrow-avro 将同样的桥梁带到了 Avro。您会得到一个单一的 ReaderBuilder,它可以为 OCF 生成一个 Reader,或者为在线帧生成一个流式 Decoder。两者都返回 Arrow RecordBatch,这意味着引擎可以使投影和过滤靠近读取器,避免只构建行以便稍后将它们重新组装成列。对于不断演进的流,一个小的 SchemaStore 在解码之前解析指纹或 ID,因此输出的批次已经适合向量化执行。

这种模式很重要的原因很简单。Arrow 的列式格式旨在进行向量化工作和良好的缓存局部性。当格式读取器直接生成 Arrow 批次时,复制和分支式的逐行工作被最小化,从而保持下游操作符的快速。这与 parquet::arrow 在 Rust 中流行的原因相同,也是 arrow-avro 现在为 Avro 所实现的功能。

架构与技术概述

High-level `arrow-avro` architecture

在高层次上,arrow-avro 清晰地分为围绕 Arrow RecordBatch 构建的读写路径。读端将 Avro(OCF 文件或帧字节流)转换为批处理的 Arrow 数组,而写端则获取 Arrow 批处理并生成 OCF 文件或流帧。当使用 AvroStreamWriter 时,帧(SOE 或 Confluent)是流输出的一部分,基于配置的指纹策略;因此无需单独的帧工作。公共 API 和模块布局有意地很小,因此大多数应用程序只涉及一个构建器、一个读取器/解码器和(可选)一个用于模式演化的模式存储。

读取路径上,一切都始于ReaderBuilder。一个构建器可以创建一个用于对象容器文件(OCF)的Reader或一个用于SOE/Confluent/Apicurio帧的流式DecoderReader拉取OCF块并生成Arrow RecordBatch,而Decoder是基于推送的,即字节在到达时被馈入,然后在调用flush后作为完成的批次排出。两者都使用相同的模式驱动解码逻辑(每个列解码器带有投影/联合/可空性处理),因此文件和流式输入生成的批次使用更少的每行分配和最小的分支/冗余。此外,流式Decoder维护一个按指纹键入的每个模式记录解码器缓存,以避免在流交错模式版本时重新规划。这使得即使模式演变,稳定状态的解码也能保持快速。

读取 OCF 时,Reader 解析一个头部,然后迭代编码数据块。头部包含一个元数据映射,其中包含嵌入的 Avro 模式和可选的压缩(即 deflatesnappyzstdbzip2xz),以及一个用于分隔块的 16 字节同步标记。每个后续的 OCF 块都带有行数和编码负载。解析后的 OCF 头部和块结构也使用可变长度整数编码,其中带符号值使用 Zig-Zag 编码。arrow-avro 实现了一个小的 vlq(可变长度数量)模块,该模块在头部解析和块迭代期间使用。高效的 vlq 解码是 ReaderDecoder 能够保持向量化并避免不必要的逐行开销的原因之一。

写入路径上,WriterBuilder 会生成一个AvroWriter (OCF) 或一个AvroStreamWriter (SOE/Message)。with_compression(...) 旋钮用于 OCF 块压缩,而 with_fingerprint_strategy(...) 选择流帧,即 SOE 的 Rabin,Confluent 的 32 位模式 ID,或 Apicurio 的 64 位模式 ID。AvroStreamWriter 在编码时也会自动添加适当的前缀,从而消除了对可能昂贵的后处理步骤来包装输出 Avro SOE 的需求。

Schema 处理集中在schema模块中。AvroSchema 封装了一个有效的 Avro Schema JSON 字符串,支持计算 Fingerprint,并且可以作为写入器 schema 加载到SchemaStore 中。在运行时,Reader/Decoder 可以使用 SchemaStore 在解码前解析指纹,从而实现schema 解析FingerprintAlgorithm 捕获了指纹的派生方式(即 CRC‑64‑AVRO Rabin、MD5、SHA‑256 或注册表 ID),而 FingerprintStrategy 配置了 Writer 在编码 SOE 流时如何为每条记录添加前缀。这个 schema 模块是实现 SOE 和 Confluent/Apicurio 支持的粘合剂,而无需与特定的注册表客户端耦合。

arrow-avro 的核心是一个类型映射 Codec,库用它来构建编码器和解码器。Codec 捕获了每个 Avro 字段如何映射到 Arrow 以及如何编码或解码。Reader 逻辑为每个(写入器,读取器)模式对构建一个 Codec,解码器随后使用该 Codec 将 Avro 值的解析向量化,直接解析到正确的 Arrow 构建器中。Writer 逻辑使用相同的 Codec 映射来驱动预先计算的记录编码计划,这使得 Arrow 数组能够快速序列化到正确的 Avro 物理表示(即,小数作为字节而不是定长,枚举符号处理,联合分支标记等)。因为 Codec 在编码器和解码器中都告知联合和可空性决策,所以常见的 Avro 模式 ["null", T] 无缝地映射到 Arrow 可选字段并从中映射,而 Avro 联合使用 8 位类型 ID 映射到 Arrow 联合,开销极小。同时,启用 strict_mode 会在 Codec 中应用更严格的 Avro 解析规则,以帮助尽早发现模糊的联合。

最后,通过将容器和流帧(OCF 与 SOE)与编码和解码分开,该 crate 自然地与 Arrow-rs 的其余部分组合:您读取或写入 Arrow RecordBatch,根据需要选择 OCF 或 SOE 流,并且仅在流式路径上才连接指纹。这导致了一个紧凑的 API 表面,涵盖了批处理文件和高吞吐量流,而不会牺牲列式、向量化执行。

示例

解码 Confluent 帧 Kafka 流

use arrow_avro::reader::ReaderBuilder;
use arrow_avro::schema::{
    SchemaStore, AvroSchema, Fingerprint, FingerprintAlgorithm, CONFLUENT_MAGIC
};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Register writer schema under Confluent id=1.
    let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
    store.set(
        Fingerprint::Id(1),
        AvroSchema::new(r#"{"type":"record","name":"T","fields":[{"name":"x","type":"long"}]}"#.into()),
    )?;

    // Define reader schema to enable projection/schema evolution.
    let reader_schema = AvroSchema::new(r#"{"type":"record","name":"T","fields":[{"name":"x","type":"long"}]}"#.into());

    // Build Decoder using reader and writer schemas
    let mut decoder = ReaderBuilder::new()
        .with_reader_schema(reader_schema)
        .with_writer_schema_store(store)
        .build_decoder()?;

    // Simulate one frame: magic 0x00 + 4‑byte big‑endian schema ID + Avro body (x=1 encoded as zig‑zag/VLQ).
    let mut frame = Vec::from(CONFLUENT_MAGIC); frame.extend_from_slice(&1u32.to_be_bytes()); frame.extend_from_slice(&[2]);

    // Consume from decoder
    let _consumed = decoder.decode(&frame)?;
    while let Some(batch) = decoder.flush()? {
        println!("rows={}, cols={}", batch.num_rows(), batch.num_columns());
    }
    Ok(())
}

SchemaStore 将传入的模式 ID 映射到正确的 Avro 写入器模式,以便解码器可以根据读取器模式执行投影/演化。Confluent 的线路格式在每条消息前加上一个魔术字节 0x00,后跟一个大端 4 字节的模式 ID。解码 Avro 消息后,Decoder::flush() 方法会生成适合向量化处理的 Arrow RecordBatch

一个更高级的示例可以在这里找到。

写入 Snappy 压缩的 Avro OCF 文件

use arrow_array::{Int64Array, RecordBatch};
use arrow_schema::{Schema, Field, DataType};
use arrow_avro::writer::{Writer, WriterBuilder};
use arrow_avro::writer::format::AvroOcfFormat;
use arrow_avro::compression::CompressionCodec;
use std::{sync::Arc, fs::File, io::BufWriter};

fn main() -> Result<(), Box<dyn std::error::Error>> {
  let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
  let batch = RecordBatch::try_new(
    Arc::new(schema.clone()),
    vec![Arc::new(Int64Array::from(vec![1,2,3]))],
  )?;
  let file = File::create("target/example.avro")?;

  // Choose OCF block compression (e.g., None, Deflate, Snappy, Zstd)
  let mut writer: Writer<_, AvroOcfFormat> = WriterBuilder::new(schema)
      .with_compression(Some(CompressionCodec::Snappy))
      .build(BufWriter::new(file))?;
  writer.write(&batch)?;
  writer.finish()?;
  Ok(())
}

上面的示例配置了一个 Avro OCF Writer。它使用 WriterBuilder::new(schema) 构造了一个 Writer<_, AvroOcfFormat>,并将一个 File 包装在一个 BufWriter 中以实现高效 I/O。对 .with_compression(Some(CompressionCodec::Snappy)) 的调用启用了块级 snappy 压缩。最后,writer.write(&batch)? 将批处理序列化为 Avro 编码块,而 writer.finish()? 则刷新并最终确定输出文件。

替代方案与基准

将 Avro 引入 Arrow 的方法从根本上分为两种

  1. 以行为中心的方法,这是通用 Avro 库(如 apache-avro)的典型做法,它一次反序列化一条记录到原生的 Rust 值(即 Value 或 Serde 类型),然后从这些值构建 Arrow 数组。
  2. 向量化方法,即 arrow-avro 提供的方法,直接解码到 Arrow 构建器/数组并发出 RecordBatch,从而避免了大多数逐行开销。

本节使用这些 Criterion 基准测试 比较了这两种方法的性能。

读取性能 (1M)

1M Row Read Violin Plot

读取性能 (10K)

10K Row Read Violin Plot

写入性能 (1M)

1M Row Write Violin Plot

写入性能 (10K)

10K Row Write Violin Plot

在所有基准测试中,小提琴图显示 arrow-avro 在读写路径上都具有更低的中位数和更紧密的分布。当逐行工作占主导地位时(即 10K 行场景),差距会扩大。在 1M 行时,分布仍然有利于 arrow-avro,这反映了直接解码到 Arrow 数组后更好的缓存局部性和更少的复制。总体行为与 apache-avro 的逐条记录迭代和 arrow-avro 的批处理设计一致。

下表列出了我们在图中报告的案例

  • 10K 对比 1M 行,针对多种数据形态。
  • 读取案例
    • f8完整模式,8K 批次大小。以 batch_size = 8192 解码所有四列。
    • f1完整模式,1K 批次大小。以 batch_size = 1024 解码所有四列。
    • p8投影 {id,name},8K 批次大小(下推)。以 batch_size = 8192 解码 idname投影的实现方式:
      • arrow-avro/p8:通过读取器模式(ReaderBuilder::with_reader_schema(...))进行投影,因此解码在 Arrow-first 读取器中按列下推。
      • apache-avro/p8:通过 Avro 读取器模式(AvroReader::with_schema(...))进行投影,因此 Avro 库仅解码投影字段。
    • np投影 {id,name},无下推,8K 批次大小。两个读取器都解码完整记录(所有四列),物化所有数组,然后在解码后投影到 {id,name}。这模拟了无法将投影推送到文件/编解码器读取器中的系统。
  • 写入案例
    • c(冷):每次迭代的模式转换。
    • h(热):Avro JSON“热”路径。
  • 计算出的 Apache-Avro 与 Arrow-Avro 中位数以及加速比。

基准测试中位数时间结果 (Apple Silicon Mac)

案例 apache-avro 中位数 arrow-avro 中位数 加速
R/f8/10K 2.60 毫秒 0.24 毫秒 10.83 倍
R/p8/10K 7.91 毫秒 0.24 毫秒 32.95 倍
R/f1/10K 2.65 毫秒 0.25 毫秒 10.60 倍
R/np/10K 2.62 毫秒 0.25 毫秒 10.48 倍
R/f8/1M 267.21 毫秒 27.91 毫秒 9.57 倍
R/p8/1M 791.79 毫秒 26.28 毫秒 30.13 倍
R/f1/1M 262.93 毫秒 28.25 毫秒 9.31 倍
R/np/1M 268.79 毫秒 27.69 毫秒 9.71 倍
W/c/10K 4.78 毫秒 0.27 毫秒 17.70 倍
W/h/10K 0.82 毫秒 0.28 毫秒 2.93 倍
W/c/1M 485.58 毫秒 36.97 毫秒 13.13 倍
W/h/1M 83.58 毫秒 36.75 毫秒 2.27 倍

结束语

arrow-avro 带来了一个专门构建的向量化桥梁,连接了 Arrow-rs 和 Avro,涵盖了对象容器文件(OCF)、单对象编码(SOE)以及 Confluent/Apicurio Schema Registry 线路格式。这意味着您现在可以为批处理文件和流式系统保持摄取路径的列式化。上述读取器和写入器 API 现已随 arrow-rs v57.0.0 版本提供,供您使用。

这项工作是正在进行的 Arrow-rs 努力的一部分,旨在在 Rust 中实现一流的 Avro 支持。我们非常乐意听取您关于实际用例、工作负载和集成的反馈。我们也欢迎贡献,无论是问题、基准测试还是 PR。要关注或提供帮助,请在 GitHub 上提出问题 和/或在 apache/arrow-rs 中跟踪 添加 Avro 支持

致谢

特别感谢

如果您对这篇博客文章有任何疑问,请随时联系作者 Connor Sanders