展开描述
用于将 Avro 数据读取到 Arrow 数组的核心功能
实现主要读取器接口和记录解码逻辑。Avro 阅读器
将 Apache Avro 编码数据读取为 Arrow 的 RecordBatch 格式的工具。
§限制
-
不支持分支数大于 127 的 Avro 联合类型。 将 Avro 联合类型解码为 Arrow
UnionArray时,Arrow 将联合类型标识符存储在 8 位有符号缓冲区 (i8) 中。这意味着实际限制为 127 个不同的分支 ID。解析为超过 127 个分支的输入将返回错误。如果您确实需要更多,请根据 Arrow 格式规范,将 schema 建模为 联合的联合。参阅:Arrow Columnar Format — 紧凑联合 (“类型缓冲区:8 位有符号;超过 127 种可能类型的联合可以建模为联合的联合”)。
此模块公开了三层 API 表面,从高到低
ReaderBuilder:配置 Avro 的读取方式(批次大小、严格联合处理、字符串表示、读取器 schema 等),并生成- 一个用于从任何
BufRead读取 Avro 对象容器文件 (OCF) 的Reader,或 - 一个用于 单对象编码 Avro 字节和 Confluent Schema Registry 帧消息的低级
Decoder。
- 一个用于从任何
Reader:一个方便的、同步的迭代器,用于迭代从 OCF 输入解码的RecordBatch。实现Iterator<Item = Result<RecordBatch, ArrowError>>和RecordBatchReader。Decoder:一个基于推送的行解码器,它消耗 SOE 帧的 Avro 字节,并在批次填满时生成就绪的RecordBatch值。这适用于与异步字节流、网络协议或其他自定义数据源集成。
§编码及何时使用何种类型
- 对象容器文件 (OCF):一种自描述文件格式,包含一个带有写入器 schema、可选压缩编解码器和同步标记的头部,后跟一个或多个数据块。此格式使用
Reader。参阅 Avro 1.11.1 规范(“Object Container Files”)。https://avro.apache.org/docs/1.11.1/specification/#object-container-files - 单对象编码:一种流友好的帧格式,每个记录体前缀为 2 字节标记
0xC3 0x01,后跟写入器 schema 的 8 字节小端 CRC-64-AVRO Rabin 指纹,然后是 Avro 二进制体。使用带有已填充SchemaStore的Decoder将指纹解析为完整 schema。参阅 Avro 1.11.1 规范中的“单对象编码”。https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding - Confluent Schema Registry 线格式:一个 1 字节魔法值
0x00,一个 4 字节大端 schema ID,然后是 Avro 编码体。使用带有SchemaStore的Decoder,该SchemaStore配置为FingerprintAlgorithm::Id,并且条目以Fingerprint::Id为键。参阅 Confluent 的“线格式”文档。https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format - Apicurio Schema Registry 线格式:一个 1 字节魔法值
0x00,一个 8 字节大端 全局 schema ID,然后是 Avro 编码体。使用带有SchemaStore的Decoder,该SchemaStore配置为FingerprintAlgorithm::Id64,并且条目以Fingerprint::Id64为键。参阅 Apicurio 的“Avro SerDe”文档。https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry
§基本文件使用(OCF)
使用 ReaderBuilder::build 从任何 BufRead 构建一个 Reader。下面的 doctest 使用 AvroWriter 在内存中创建一个微小的 OCF,然后将其读回。
use std::io::Cursor;
use std::sync::Arc;
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::writer::AvroWriter;
use arrow_avro::reader::ReaderBuilder;
// Build a minimal Arrow schema and batch
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
)?;
// Write an Avro OCF to memory
let buffer: Vec<u8> = Vec::new();
let mut writer = AvroWriter::new(buffer, schema.clone())?;
writer.write(&batch)?;
writer.finish()?;
let bytes = writer.into_inner();
// Read it back with ReaderBuilder
let mut reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
let out = reader.next().unwrap()?;
assert_eq!(out.num_rows(), 3);§流式使用(单对象 / Confluent / Apicurio)
Decoder 允许您通过定期调用 Decoder::decode 传入新数据,并在至少一行完成后调用 Decoder::flush 获取 RecordBatch,从而将 Avro 解码与 任何 字节源集成。
以下示例展示了如何使用 futures 工具从任意 bytes::Bytes 流中解码。注意:这只是为了说明,为简单起见,它保留了一个内存中的 Bytes 缓冲区——实际应用程序通常维护一个滚动缓冲区。
use bytes::{Buf, Bytes};
use futures::{Stream, StreamExt};
use std::task::{Poll, ready};
use arrow_array::RecordBatch;
use arrow_schema::ArrowError;
use arrow_avro::reader::Decoder;
/// Decode a stream of Avro-framed bytes into RecordBatch values.
fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
mut decoder: Decoder,
mut input: S,
) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
let mut buffered = Bytes::new();
futures::stream::poll_fn(move |cx| {
loop {
if buffered.is_empty() {
buffered = match ready!(input.poll_next_unpin(cx)) {
Some(b) => b,
None => break, // EOF
};
}
// Feed as much as possible
let decoded = match decoder.decode(buffered.as_ref()) {
Ok(n) => n,
Err(e) => return Poll::Ready(Some(Err(e))),
};
let read = buffered.len();
buffered.advance(decoded);
if decoded != read {
// decoder made partial progress; request more bytes
break
}
}
// Return a batch if one or more rows are complete
Poll::Ready(decoder.flush().transpose())
})
}§为 单对象编码(Rabin 指纹)构建和使用 Decoder
下面的 doctest 使用 Avro 写入器(无手动变长整数)为写入器 schema({"type":"record","name":"User","fields":[{"name":"id","type":"long"}]})写入一个单对象帧记录,然后将其解码为 RecordBatch。
use std::sync::Arc;
use std::collections::HashMap;
use arrow_array::{ArrayRef, Int64Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::schema::{AvroSchema, SchemaStore, SCHEMA_METADATA_KEY, FingerprintStrategy};
use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
use arrow_avro::reader::ReaderBuilder;
// Register the writer schema (Rabin fingerprint by default).
let mut store = SchemaStore::new();
let avro_schema = AvroSchema::new(r#"{"type":"record","name":"User","fields":[
{"name":"id","type":"long"}]}"#.to_string());
let _fp = store.register(avro_schema.clone())?;
// Create a single-object framed record { id: 42 } with the Avro writer.
let mut md = HashMap::new();
md.insert(SCHEMA_METADATA_KEY.to_string(), avro_schema.json_string.clone());
let arrow = Schema::new_with_metadata(vec![Field::new("id", DataType::Int64, false)], md);
let batch = RecordBatch::try_new(
Arc::new(arrow.clone()),
vec![Arc::new(Int64Array::from(vec![42])) as ArrayRef],
)?;
let mut w = WriterBuilder::new(arrow)
.with_fingerprint_strategy(FingerprintStrategy::Rabin) // SOE prefix
.build::<_, AvroSoeFormat>(Vec::new())?;
w.write(&batch)?;
w.finish()?;
let frame = w.into_inner(); // C3 01 + fp + Avro body
// Decode with a `Decoder`
let mut dec = ReaderBuilder::new()
.with_writer_schema_store(store)
.with_batch_size(1024)
.build_decoder()?;
dec.decode(&frame)?;
let out = dec.flush()?.expect("one batch");
assert_eq!(out.num_rows(), 1);有关 2 字节标记和小端 CRC-64-AVRO 指纹的详细信息,请参阅 Avro 1.11.1 “Single object encoding”:https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding
§为 Confluent Schema Registry 帧构建和使用 Decoder
Confluent 线格式为:1 字节魔法值 0x00,然后是一个 4 字节大端 schema ID,然后是 Avro 主体。下面的 doctest 为相同的 schema ID 制作了两条消息,并将它们解码为一个包含两行的 RecordBatch。
use std::sync::Arc;
use std::collections::HashMap;
use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm, SCHEMA_METADATA_KEY, FingerprintStrategy};
use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
use arrow_avro::reader::ReaderBuilder;
// Set up a store keyed by numeric IDs (Confluent).
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
let schema_id = 7u32;
let avro_schema = AvroSchema::new(r#"{"type":"record","name":"User","fields":[
{"name":"id","type":"long"}, {"name":"name","type":"string"}]}"#.to_string());
store.set(Fingerprint::Id(schema_id), avro_schema.clone())?;
// Write two Confluent-framed messages {id:1,name:"a"} and {id:2,name:"b"}.
fn msg(id: i64, name: &str, schema: &AvroSchema, schema_id: u32) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let mut md = HashMap::new();
md.insert(SCHEMA_METADATA_KEY.to_string(), schema.json_string.clone());
let arrow = Schema::new_with_metadata(
vec![Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, false)],
md,
);
let batch = RecordBatch::try_new(
Arc::new(arrow.clone()),
vec![
Arc::new(Int64Array::from(vec![id])) as ArrayRef,
Arc::new(StringArray::from(vec![name])) as ArrayRef,
],
)?;
let mut w = WriterBuilder::new(arrow)
.with_fingerprint_strategy(FingerprintStrategy::Id(schema_id)) // 0x00 + ID + body
.build::<_, AvroSoeFormat>(Vec::new())?;
w.write(&batch)?; w.finish()?;
Ok(w.into_inner())
}
let m1 = msg(1, "a", &avro_schema, schema_id)?;
let m2 = msg(2, "b", &avro_schema, schema_id)?;
// Decode both into a single batch.
let mut dec = ReaderBuilder::new()
.with_writer_schema_store(store)
.with_batch_size(1024)
.build_decoder()?;
dec.decode(&m1)?;
dec.decode(&m2)?;
let batch = dec.flush()?.expect("batch");
assert_eq!(batch.num_rows(), 2);参阅 Confluent 的“线格式”说明:魔法字节 0x00,4 字节 大端 schema ID,然后是 Avro 编码载荷。https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
§Schema 解析(读取器 vs. 写入器 schema)
Avro 支持使用 字段别名、默认值 和 数字提升 等规则,将使用一个 schema (“写入器”) 写入的数据解析为另一个 schema (“读取器”)。实际上,这允许您随着时间的推移演进 schema,同时保持与旧数据的兼容性。
规范背景:参阅 Avro 的 Schema Resolution(别名、默认值)和 Confluent Wire format(魔法值 0x00 + 大端 schema ID + Avro 主体)。https://avro.apache.org/docs/1.11.1/specification/#schema-resolution https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
§OCF 示例:通过读取器 schema 重命名字段并添加默认值
下面我们写入一个 OCF,其 写入器 schema 包含字段 id: long, name: string。然后我们使用一个 读取器 schema 读取它,该 schema
- 通过
aliases将name重命名 为full_name,并且 - 添加
is_active: boolean并带有 默认 值true。
use std::io::Cursor;
use std::sync::Arc;
use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::writer::AvroWriter;
use arrow_avro::reader::ReaderBuilder;
use arrow_avro::schema::AvroSchema;
// Writer (past version): { id: long, name: string }
let writer_arrow = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
]);
let batch = RecordBatch::try_new(
Arc::new(writer_arrow.clone()),
vec![
Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
],
)?;
// Write an OCF entirely in memory
let mut w = AvroWriter::new(Vec::<u8>::new(), writer_arrow)?;
w.write(&batch)?;
w.finish()?;
let bytes = w.into_inner();
// Reader (current version):
// - record name "topLevelRecord" matches the crate's default for OCF
// - rename `name` -> `full_name` using aliases (optional)
let reader_json = r#"
{
"type": "record",
"name": "topLevelRecord",
"fields": [
{ "name": "id", "type": "long" },
{ "name": "full_name", "type": ["null","string"], "aliases": ["name"], "default": null },
{ "name": "is_active", "type": "boolean", "default": true }
]
}"#;
let mut reader = ReaderBuilder::new()
.with_reader_schema(AvroSchema::new(reader_json.to_string()))
.build(Cursor::new(bytes))?;
let out = reader.next().unwrap()?;
assert_eq!(out.num_rows(), 2);§Confluent 单对象示例:将 过去的 写入器版本解析为主题的 当前 读取器 schema
在此场景中,读取器 schema 是主题的 当前 schema,而注册在 Confluent ID 1 和 2 下的两个 写入器 schema 代表 过去的版本。解码器使用读取器 schema 解析这两个版本。
use std::sync::Arc;
use std::collections::HashMap;
use arrow_avro::reader::ReaderBuilder;
use arrow_avro::schema::{
AvroSchema, Fingerprint, FingerprintAlgorithm, SchemaStore,
SCHEMA_METADATA_KEY, FingerprintStrategy,
};
use arrow_array::{ArrayRef, Int32Array, Int64Array, StringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Reader: current topic schema (no reader-added fields)
// {"type":"record","name":"User","fields":[
// {"name":"id","type":"long"},
// {"name":"name","type":"string"}]}
let reader_schema = AvroSchema::new(
r#"{"type":"record","name":"User",
"fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]}"#
.to_string(),
);
// Register two *writer* schemas under Confluent IDs 0 and 1
let writer_v0 = AvroSchema::new(
r#"{"type":"record","name":"User",
"fields":[{"name":"id","type":"int"},{"name":"name","type":"string"}]}"#
.to_string(),
);
let writer_v1 = AvroSchema::new(
r#"{"type":"record","name":"User",
"fields":[{"name":"id","type":"long"},{"name":"name","type":"string"},
{"name":"email","type":["null","string"],"default":null}]}"#
.to_string(),
);
let id_v0: u32 = 0;
let id_v1: u32 = 1;
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id); // integer IDs
store.set(Fingerprint::Id(id_v0), writer_v0.clone())?;
store.set(Fingerprint::Id(id_v1), writer_v1.clone())?;
// Write two Confluent-framed messages using each writer version
// frame0: writer v0 body {id:1001_i32, name:"v0-alice"}
let mut md0 = HashMap::new();
md0.insert(SCHEMA_METADATA_KEY.to_string(), writer_v0.json_string.clone());
let arrow0 = Schema::new_with_metadata(
vec![Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false)], md0);
let batch0 = RecordBatch::try_new(
Arc::new(arrow0.clone()),
vec![Arc::new(Int32Array::from(vec![1001])) as ArrayRef,
Arc::new(StringArray::from(vec!["v0-alice"])) as ArrayRef])?;
let mut w0 = arrow_avro::writer::WriterBuilder::new(arrow0)
.with_fingerprint_strategy(FingerprintStrategy::Id(id_v0))
.build::<_, arrow_avro::writer::format::AvroSoeFormat>(Vec::new())?;
w0.write(&batch0)?; w0.finish()?;
let frame0 = w0.into_inner(); // 0x00 + id_v0 + body
// frame1: writer v1 body {id:2002_i64, name:"v1-bob", email: Some("bob@example.com")}
let mut md1 = HashMap::new();
md1.insert(SCHEMA_METADATA_KEY.to_string(), writer_v1.json_string.clone());
let arrow1 = Schema::new_with_metadata(
vec![Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
Field::new("email", DataType::Utf8, true)], md1);
let batch1 = RecordBatch::try_new(
Arc::new(arrow1.clone()),
vec![Arc::new(Int64Array::from(vec![2002])) as ArrayRef,
Arc::new(StringArray::from(vec!["v1-bob"])) as ArrayRef,
Arc::new(StringArray::from(vec![Some("bob@example.com")])) as ArrayRef])?;
let mut w1 = arrow_avro::writer::WriterBuilder::new(arrow1)
.with_fingerprint_strategy(FingerprintStrategy::Id(id_v1))
.build::<_, arrow_avro::writer::format::AvroSoeFormat>(Vec::new())?;
w1.write(&batch1)?; w1.finish()?;
let frame1 = w1.into_inner(); // 0x00 + id_v1 + body
// Build a streaming Decoder that understands Confluent framing
let mut decoder = ReaderBuilder::new()
.with_reader_schema(reader_schema)
.with_writer_schema_store(store)
.with_batch_size(8) // small demo batches
.build_decoder()?;
// Decode each whole frame, then drain completed rows with flush()
let mut total_rows = 0usize;
let consumed0 = decoder.decode(&frame0)?;
assert_eq!(consumed0, frame0.len(), "decoder must consume the whole frame");
while let Some(batch) = decoder.flush()? { total_rows += batch.num_rows(); }
let consumed1 = decoder.decode(&frame1)?;
assert_eq!(consumed1, frame1.len(), "decoder must consume the whole frame");
while let Some(batch) = decoder.flush()? { total_rows += batch.num_rows(); }
// We sent 2 records so we should get 2 rows (possibly one per flush)
assert_eq!(total_rows, 2);
Ok(())
}§Schema 演变和批次边界
当输入帧带有 schema 指纹(单对象或 Confluent)时,Decoder 支持流中间的 schema 更改。当观察到新的指纹时
- 如果当前
RecordBatch为空,解码器会立即切换到新 schema。 - 如果不是,解码器会首先完成当前批次,然后才进行切换。
因此,Decoder::flush 生成的批次的 schema 可能会随时间变化,并且 Decoder 特意 不 实现 RecordBatchReader。相比之下,Reader (OCF) 对整个文件只有一个写入器 schema,因此实现了 RecordBatchReader。
§性能与内存
batch_size控制每个RecordBatch的最大行数。更大的批次可以分摊每个批次的开销;更小的批次可以减少峰值内存使用和延迟。- 当启用
utf8_view时,字符串列使用 Arrow 的StringViewArray,这可以减少短字符串的分配。 - 对于 OCF,数据块可能被压缩;
Reader将使用文件头部中指定的编解码器解压缩,并将未压缩的字节馈送给行Decoder。
§错误处理
- 不完整的输入会返回带“Unexpected EOF”的解析错误;调用者通常会提供更多字节并重试。
- 如果指纹对于提供的
SchemaStore是未知的,解码将失败并返回描述性错误。请预先填充 store 以避免此问题。
模块§
结构体§
- 解码器
- 一个低级、基于推送的解码器,将 Avro 字节转换为 Arrow
RecordBatch。 - 阅读器
- 一个高级 Avro 对象容器文件 阅读器。
- 阅读器
构建器 - 一个用于配置和构建 Avro 阅读器和解码器的构建器。