模块阅读器

模块 reader 

源代码
展开描述

用于将 Avro 数据读取到 Arrow 数组的核心功能

实现主要读取器接口和记录解码逻辑。Avro 阅读器

将 Apache Avro 编码数据读取为 Arrow 的 RecordBatch 格式的工具。

§限制

  • 不支持分支数大于 127 的 Avro 联合类型。 将 Avro 联合类型解码为 Arrow UnionArray 时,Arrow 将联合类型标识符存储在 8 位有符号缓冲区 (i8) 中。这意味着实际限制为 127 个不同的分支 ID。解析为超过 127 个分支的输入将返回错误。如果您确实需要更多,请根据 Arrow 格式规范,将 schema 建模为 联合的联合

    参阅:Arrow Columnar Format — 紧凑联合 (“类型缓冲区:8 位有符号;超过 127 种可能类型的联合可以建模为联合的联合”)。

此模块公开了三层 API 表面,从高到低

  • ReaderBuilder:配置 Avro 的读取方式(批次大小、严格联合处理、字符串表示、读取器 schema 等),并生成
    • 一个用于从任何 BufRead 读取 Avro 对象容器文件 (OCF)Reader,或
    • 一个用于 单对象编码 Avro 字节和 Confluent Schema Registry 帧消息的低级 Decoder
  • Reader:一个方便的、同步的迭代器,用于迭代从 OCF 输入解码的 RecordBatch。实现 Iterator<Item = Result<RecordBatch, ArrowError>>RecordBatchReader
  • Decoder:一个基于推送的行解码器,它消耗 SOE 帧的 Avro 字节,并在批次填满时生成就绪的 RecordBatch 值。这适用于与异步字节流、网络协议或其他自定义数据源集成。

§编码及何时使用何种类型

§基本文件使用(OCF)

使用 ReaderBuilder::build 从任何 BufRead 构建一个 Reader。下面的 doctest 使用 AvroWriter 在内存中创建一个微小的 OCF,然后将其读回。

use std::io::Cursor;
use std::sync::Arc;
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::writer::AvroWriter;
use arrow_avro::reader::ReaderBuilder;

// Build a minimal Arrow schema and batch
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
let batch = RecordBatch::try_new(
    Arc::new(schema.clone()),
    vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
)?;

// Write an Avro OCF to memory
let buffer: Vec<u8> = Vec::new();
let mut writer = AvroWriter::new(buffer, schema.clone())?;
writer.write(&batch)?;
writer.finish()?;
let bytes = writer.into_inner();

// Read it back with ReaderBuilder
let mut reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
let out = reader.next().unwrap()?;
assert_eq!(out.num_rows(), 3);

§流式使用(单对象 / Confluent / Apicurio)

Decoder 允许您通过定期调用 Decoder::decode 传入新数据,并在至少一行完成后调用 Decoder::flush 获取 RecordBatch,从而将 Avro 解码与 任何 字节源集成。

以下示例展示了如何使用 futures 工具从任意 bytes::Bytes 流中解码。注意:这只是为了说明,为简单起见,它保留了一个内存中的 Bytes 缓冲区——实际应用程序通常维护一个滚动缓冲区。

use bytes::{Buf, Bytes};
use futures::{Stream, StreamExt};
use std::task::{Poll, ready};
use arrow_array::RecordBatch;
use arrow_schema::ArrowError;
use arrow_avro::reader::Decoder;

/// Decode a stream of Avro-framed bytes into RecordBatch values.
fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
    mut decoder: Decoder,
    mut input: S,
) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
    let mut buffered = Bytes::new();
    futures::stream::poll_fn(move |cx| {
        loop {
            if buffered.is_empty() {
                buffered = match ready!(input.poll_next_unpin(cx)) {
                    Some(b) => b,
                    None => break, // EOF
                };
            }
            // Feed as much as possible
            let decoded = match decoder.decode(buffered.as_ref()) {
                Ok(n) => n,
                Err(e) => return Poll::Ready(Some(Err(e))),
            };
            let read = buffered.len();
            buffered.advance(decoded);
            if decoded != read {
                // decoder made partial progress; request more bytes
                break
            }
        }
        // Return a batch if one or more rows are complete
        Poll::Ready(decoder.flush().transpose())
    })
}

§单对象编码(Rabin 指纹)构建和使用 Decoder

下面的 doctest 使用 Avro 写入器(无手动变长整数)为写入器 schema({"type":"record","name":"User","fields":[{"name":"id","type":"long"}]}写入一个单对象帧记录,然后将其解码为 RecordBatch

use std::sync::Arc;
use std::collections::HashMap;
use arrow_array::{ArrayRef, Int64Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::schema::{AvroSchema, SchemaStore, SCHEMA_METADATA_KEY, FingerprintStrategy};
use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
use arrow_avro::reader::ReaderBuilder;

// Register the writer schema (Rabin fingerprint by default).
let mut store = SchemaStore::new();
let avro_schema = AvroSchema::new(r#"{"type":"record","name":"User","fields":[
  {"name":"id","type":"long"}]}"#.to_string());
let _fp = store.register(avro_schema.clone())?;

// Create a single-object framed record { id: 42 } with the Avro writer.
let mut md = HashMap::new();
md.insert(SCHEMA_METADATA_KEY.to_string(), avro_schema.json_string.clone());
let arrow = Schema::new_with_metadata(vec![Field::new("id", DataType::Int64, false)], md);
let batch = RecordBatch::try_new(
    Arc::new(arrow.clone()),
    vec![Arc::new(Int64Array::from(vec![42])) as ArrayRef],
)?;
let mut w = WriterBuilder::new(arrow)
    .with_fingerprint_strategy(FingerprintStrategy::Rabin) // SOE prefix
    .build::<_, AvroSoeFormat>(Vec::new())?;
w.write(&batch)?;
w.finish()?;
let frame = w.into_inner(); // C3 01 + fp + Avro body

// Decode with a `Decoder`
let mut dec = ReaderBuilder::new()
  .with_writer_schema_store(store)
  .with_batch_size(1024)
  .build_decoder()?;

dec.decode(&frame)?;
let out = dec.flush()?.expect("one batch");
assert_eq!(out.num_rows(), 1);

有关 2 字节标记和小端 CRC-64-AVRO 指纹的详细信息,请参阅 Avro 1.11.1 “Single object encoding”:https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding

§Confluent Schema Registry 帧构建和使用 Decoder

Confluent 线格式为:1 字节魔法值 0x00,然后是一个 4 字节大端 schema ID,然后是 Avro 主体。下面的 doctest 为相同的 schema ID 制作了两条消息,并将它们解码为一个包含两行的 RecordBatch

use std::sync::Arc;
use std::collections::HashMap;
use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm, SCHEMA_METADATA_KEY, FingerprintStrategy};
use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
use arrow_avro::reader::ReaderBuilder;

// Set up a store keyed by numeric IDs (Confluent).
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
let schema_id = 7u32;
let avro_schema = AvroSchema::new(r#"{"type":"record","name":"User","fields":[
  {"name":"id","type":"long"}, {"name":"name","type":"string"}]}"#.to_string());
store.set(Fingerprint::Id(schema_id), avro_schema.clone())?;

// Write two Confluent-framed messages {id:1,name:"a"} and {id:2,name:"b"}.
fn msg(id: i64, name: &str, schema: &AvroSchema, schema_id: u32) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
    let mut md = HashMap::new();
    md.insert(SCHEMA_METADATA_KEY.to_string(), schema.json_string.clone());
    let arrow = Schema::new_with_metadata(
        vec![Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, false)],
        md,
    );
    let batch = RecordBatch::try_new(
        Arc::new(arrow.clone()),
        vec![
          Arc::new(Int64Array::from(vec![id])) as ArrayRef,
          Arc::new(StringArray::from(vec![name])) as ArrayRef,
        ],
    )?;
    let mut w = WriterBuilder::new(arrow)
        .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id)) // 0x00 + ID + body
        .build::<_, AvroSoeFormat>(Vec::new())?;
    w.write(&batch)?; w.finish()?;
    Ok(w.into_inner())
}
let m1 = msg(1, "a", &avro_schema, schema_id)?;
let m2 = msg(2, "b", &avro_schema, schema_id)?;

// Decode both into a single batch.
let mut dec = ReaderBuilder::new()
  .with_writer_schema_store(store)
  .with_batch_size(1024)
  .build_decoder()?;
dec.decode(&m1)?;
dec.decode(&m2)?;
let batch = dec.flush()?.expect("batch");
assert_eq!(batch.num_rows(), 2);

参阅 Confluent 的“线格式”说明:魔法字节 0x00,4 字节 大端 schema ID,然后是 Avro 编码载荷。https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format

§Schema 解析(读取器 vs. 写入器 schema)

Avro 支持使用 字段别名默认值数字提升 等规则,将使用一个 schema (“写入器”) 写入的数据解析为另一个 schema (“读取器”)。实际上,这允许您随着时间的推移演进 schema,同时保持与旧数据的兼容性。

规范背景:参阅 Avro 的 Schema Resolution(别名、默认值)和 Confluent Wire format(魔法值 0x00 + 大端 schema ID + Avro 主体)。https://avro.apache.org/docs/1.11.1/specification/#schema-resolution https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format

§OCF 示例:通过读取器 schema 重命名字段并添加默认值

下面我们写入一个 OCF,其 写入器 schema 包含字段 id: long, name: string。然后我们使用一个 读取器 schema 读取它,该 schema

  • 通过 aliasesname 重命名full_name,并且
  • 添加 is_active: boolean 并带有 默认true
use std::io::Cursor;
use std::sync::Arc;
use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::writer::AvroWriter;
use arrow_avro::reader::ReaderBuilder;
use arrow_avro::schema::AvroSchema;

// Writer (past version): { id: long, name: string }
let writer_arrow = Schema::new(vec![
    Field::new("id", DataType::Int64, false),
    Field::new("name", DataType::Utf8, false),
]);
let batch = RecordBatch::try_new(
    Arc::new(writer_arrow.clone()),
    vec![
        Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
        Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
    ],
)?;

// Write an OCF entirely in memory
let mut w = AvroWriter::new(Vec::<u8>::new(), writer_arrow)?;
w.write(&batch)?;
w.finish()?;
let bytes = w.into_inner();

// Reader (current version):
//  - record name "topLevelRecord" matches the crate's default for OCF
//  - rename `name` -> `full_name` using aliases (optional)
let reader_json = r#"
{
  "type": "record",
  "name": "topLevelRecord",
  "fields": [
    { "name": "id", "type": "long" },
    { "name": "full_name", "type": ["null","string"], "aliases": ["name"], "default": null },
    { "name": "is_active", "type": "boolean", "default": true }
  ]
}"#;

let mut reader = ReaderBuilder::new()
  .with_reader_schema(AvroSchema::new(reader_json.to_string()))
  .build(Cursor::new(bytes))?;

let out = reader.next().unwrap()?;
assert_eq!(out.num_rows(), 2);

§Confluent 单对象示例:将 过去的 写入器版本解析为主题的 当前 读取器 schema

在此场景中,读取器 schema 是主题的 当前 schema,而注册在 Confluent ID 12 下的两个 写入器 schema 代表 过去的版本。解码器使用读取器 schema 解析这两个版本。

use std::sync::Arc;
use std::collections::HashMap;
use arrow_avro::reader::ReaderBuilder;
use arrow_avro::schema::{
    AvroSchema, Fingerprint, FingerprintAlgorithm, SchemaStore,
    SCHEMA_METADATA_KEY, FingerprintStrategy,
};
use arrow_array::{ArrayRef, Int32Array, Int64Array, StringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Reader: current topic schema (no reader-added fields)
    //   {"type":"record","name":"User","fields":[
    //     {"name":"id","type":"long"},
    //     {"name":"name","type":"string"}]}
    let reader_schema = AvroSchema::new(
        r#"{"type":"record","name":"User",
            "fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]}"#
            .to_string(),
    );

    // Register two *writer* schemas under Confluent IDs 0 and 1
    let writer_v0 = AvroSchema::new(
        r#"{"type":"record","name":"User",
            "fields":[{"name":"id","type":"int"},{"name":"name","type":"string"}]}"#
            .to_string(),
    );
    let writer_v1 = AvroSchema::new(
        r#"{"type":"record","name":"User",
            "fields":[{"name":"id","type":"long"},{"name":"name","type":"string"},
                      {"name":"email","type":["null","string"],"default":null}]}"#
            .to_string(),
    );

    let id_v0: u32 = 0;
    let id_v1: u32 = 1;

    let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id); // integer IDs
    store.set(Fingerprint::Id(id_v0), writer_v0.clone())?;
    store.set(Fingerprint::Id(id_v1), writer_v1.clone())?;

    // Write two Confluent-framed messages using each writer version
    // frame0: writer v0 body {id:1001_i32, name:"v0-alice"}
    let mut md0 = HashMap::new();
    md0.insert(SCHEMA_METADATA_KEY.to_string(), writer_v0.json_string.clone());
    let arrow0 = Schema::new_with_metadata(
        vec![Field::new("id", DataType::Int32, false),
             Field::new("name", DataType::Utf8, false)], md0);
    let batch0 = RecordBatch::try_new(
        Arc::new(arrow0.clone()),
        vec![Arc::new(Int32Array::from(vec![1001])) as ArrayRef,
             Arc::new(StringArray::from(vec!["v0-alice"])) as ArrayRef])?;
    let mut w0 = arrow_avro::writer::WriterBuilder::new(arrow0)
        .with_fingerprint_strategy(FingerprintStrategy::Id(id_v0))
        .build::<_, arrow_avro::writer::format::AvroSoeFormat>(Vec::new())?;
    w0.write(&batch0)?; w0.finish()?;
    let frame0 = w0.into_inner(); // 0x00 + id_v0 + body

    // frame1: writer v1 body {id:2002_i64, name:"v1-bob", email: Some("bob@example.com")}
    let mut md1 = HashMap::new();
   md1.insert(SCHEMA_METADATA_KEY.to_string(), writer_v1.json_string.clone());
    let arrow1 = Schema::new_with_metadata(
        vec![Field::new("id", DataType::Int64, false),
             Field::new("name", DataType::Utf8, false),
             Field::new("email", DataType::Utf8, true)], md1);
    let batch1 = RecordBatch::try_new(
        Arc::new(arrow1.clone()),
        vec![Arc::new(Int64Array::from(vec![2002])) as ArrayRef,
             Arc::new(StringArray::from(vec!["v1-bob"])) as ArrayRef,
             Arc::new(StringArray::from(vec![Some("bob@example.com")])) as ArrayRef])?;
    let mut w1 = arrow_avro::writer::WriterBuilder::new(arrow1)
        .with_fingerprint_strategy(FingerprintStrategy::Id(id_v1))
        .build::<_, arrow_avro::writer::format::AvroSoeFormat>(Vec::new())?;
    w1.write(&batch1)?; w1.finish()?;
    let frame1 = w1.into_inner(); // 0x00 + id_v1 + body

    // Build a streaming Decoder that understands Confluent framing
    let mut decoder = ReaderBuilder::new()
        .with_reader_schema(reader_schema)
        .with_writer_schema_store(store)
        .with_batch_size(8) // small demo batches
        .build_decoder()?;

    // Decode each whole frame, then drain completed rows with flush()
    let mut total_rows = 0usize;

    let consumed0 = decoder.decode(&frame0)?;
    assert_eq!(consumed0, frame0.len(), "decoder must consume the whole frame");
    while let Some(batch) = decoder.flush()? { total_rows += batch.num_rows(); }

    let consumed1 = decoder.decode(&frame1)?;
    assert_eq!(consumed1, frame1.len(), "decoder must consume the whole frame");
    while let Some(batch) = decoder.flush()? { total_rows += batch.num_rows(); }

    // We sent 2 records so we should get 2 rows (possibly one per flush)
    assert_eq!(total_rows, 2);
    Ok(())
}

§Schema 演变和批次边界

当输入帧带有 schema 指纹(单对象或 Confluent)时,Decoder 支持流中间的 schema 更改。当观察到新的指纹时

  • 如果当前 RecordBatch 为空,解码器会立即切换到新 schema。
  • 如果不是,解码器会首先完成当前批次,然后才进行切换。

因此,Decoder::flush 生成的批次的 schema 可能会随时间变化,并且 Decoder 特意 实现 RecordBatchReader。相比之下,Reader (OCF) 对整个文件只有一个写入器 schema,因此实现了 RecordBatchReader

§性能与内存

  • batch_size 控制每个 RecordBatch 的最大行数。更大的批次可以分摊每个批次的开销;更小的批次可以减少峰值内存使用和延迟。
  • 当启用 utf8_view 时,字符串列使用 Arrow 的 StringViewArray,这可以减少短字符串的分配。
  • 对于 OCF,数据块可能被压缩;Reader 将使用文件头部中指定的编解码器解压缩,并将未压缩的字节馈送给行 Decoder

§错误处理

  • 不完整的输入会返回带“Unexpected EOF”的解析错误;调用者通常会提供更多字节并重试。
  • 如果指纹对于提供的 SchemaStore 是未知的,解码将失败并返回描述性错误。请预先填充 store 以避免此问题。

模块§

block 🔒
Block 的解码器
cursor 🔒
header 🔒
Header 的解码器
record 🔒
Arrow 类型的 Avro 解码器。
vlq 🔒

结构体§

解码器
一个低级、基于推送的解码器,将 Avro 字节转换为 Arrow RecordBatch
阅读器
一个高级 Avro 对象容器文件 阅读器。
阅读器构建器
一个用于配置和构建 Avro 阅读器和解码器的构建器。

函数§

is_不完整_数据 🔒