pub struct Decoder {
active_decoder: RecordDecoder,
active_fingerprint: Option<Fingerprint>,
batch_size: usize,
remaining_capacity: usize,
cache: IndexMap<Fingerprint, RecordDecoder>,
fingerprint_algorithm: FingerprintAlgorithm,
pending_schema: Option<(Fingerprint, RecordDecoder)>,
awaiting_body: bool,
}展开描述
一个低级、基于推送的解码器,将 Avro 字节解码为 Arrow RecordBatch。
Decoder 专为流式传输场景设计
- 您可以使用
Self::decode馈送新接收的字节,可能多次,直到至少一行完成。 - 然后,您使用
Self::flush排空已完成的行,如果自上次刷新以来有任何行已完成,它会生成一个RecordBatch。
与专门用于 Avro 对象容器文件的 Reader 不同,Decoder 支持分帧的单对象输入和 Confluent Schema Registry 消息,当帧指示新的指纹时,会在流中切换模式。
§支持的前缀
在每个新行边界上,Decoder 尝试匹配以下“前缀”之一
- 单对象编码:魔术字节
0xC3 0x01+ 模式指纹(长度取决于配置的FingerprintAlgorithm);请参见SINGLE_OBJECT_MAGIC。 - Confluent 线缆格式:魔术字节
0x00+ 4 字节大端模式 ID;请参见CONFLUENT_MAGIC。
活动指纹决定了使用哪个缓存的行解码器来解码后续的记录体字节。
§模式切换语义
当观察到新指纹时
- 如果当前批次为空,解码器立即切换;
- 否则,当前批次将在下一次
flush时完成,然后解码器才会切换到新模式。这保证了一个RecordBatch永远不会混合具有不同模式的行。
§示例
构建和使用用于单对象编码的 Decoder
use arrow_avro::schema::{AvroSchema, SchemaStore};
use arrow_avro::reader::ReaderBuilder;
// Use a record schema at the top level so we can build an Arrow RecordBatch
let mut store = SchemaStore::new(); // Rabin fingerprinting by default
let avro = AvroSchema::new(
r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()
);
let fp = store.register(avro)?;
// --- Hidden: write a single-object framed row {x:7} ---
let mut decoder = ReaderBuilder::new()
.with_writer_schema_store(store)
.with_batch_size(16)
.build_decoder()?;
let batch = decoder.flush()?.expect("one row");
assert_eq!(batch.num_rows(), 1);背景:Avro 的单对象编码定义为 0xC3 0x01 + 写入器模式的 8 字节小端 CRC-64-AVRO 指纹 + Avro 二进制主体。有关详细信息,请参阅 Avro 1.11.1 规范。 https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding
构建和使用用于 Confluent Registry 消息的 Decoder
use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm};
use arrow_avro::reader::ReaderBuilder;
let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
store.set(Fingerprint::Id(1234), AvroSchema::new(r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()))?;
// --- Hidden: encode two Confluent-framed messages {x:1} and {x:2} ---
let mut decoder = ReaderBuilder::new()
.with_writer_schema_store(store)
.build_decoder()?;
let batch = decoder.flush()?.expect("two rows");
assert_eq!(batch.num_rows(), 2);字段§
§active_decoder: RecordDecoder§active_fingerprint: Option<Fingerprint>§batch_size: usize§remaining_capacity: usize§cache: IndexMap<Fingerprint, RecordDecoder>§fingerprint_algorithm: FingerprintAlgorithm§pending_schema: Option<(Fingerprint, RecordDecoder)>§awaiting_body: bool实现§
源代码§impl Decoder
impl Decoder
源代码pub fn schema(&self) -> SchemaRef
pub fn schema(&self) -> SchemaRef
返回此解码器解码的行的 Arrow 模式。
注意:对于单对象或 Confluent 帧,当输入指示新指纹时,模式可能会在行边界处更改。
源代码pub fn batch_size(&self) -> usize
pub fn batch_size(&self) -> usize
返回配置的每批最大行数。
fn handle_prefix(&mut self, buf: &[u8]) -> Result<Option<usize>, ArrowError>
源代码fn handle_prefix_common<const MAGIC_LEN: usize, const N: usize>( &mut self, buf: &[u8], magic: &[u8; MAGIC_LEN], fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, ) -> Result<Option<usize>, ArrowError>
fn handle_prefix_common<const MAGIC_LEN: usize, const N: usize>( &mut self, buf: &[u8], magic: &[u8; MAGIC_LEN], fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, ) -> Result<Option<usize>, ArrowError>
此方法检查 buf 开头是否存在提供的 magic 字节,如果存在,则尝试读取后续的 N 字节指纹,并使用 fingerprint_from 将其转换为 Fingerprint。
fn handle_fingerprint<const N: usize>( &mut self, buf: &[u8], fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, ) -> Result<Option<usize>, ArrowError>
fn apply_pending_schema(&mut self)
fn apply_pending_schema_if_batch_empty(&mut self)
fn flush_and_reset(&mut self) -> Result<Option<RecordBatch>, ArrowError>
源代码pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>
如果至少有一行完全解码,则生成一个 RecordBatch,如果没有新行可用,则返回 Ok(None)。
如果在解码当前批次的行时检测到模式更改,则在刷新此批次之后应用模式切换,因此下一批次(如果有)可能具有不同的模式。
源代码pub fn batch_is_full(&self) -> bool
pub fn batch_is_full(&self) -> bool
如果解码器已达到当前批次的容量,则返回 true。
源代码pub fn batch_is_empty(&self) -> bool
pub fn batch_is_empty(&self) -> bool
如果解码器尚未解码任何批次(即当前批次为空),则返回 true。