解码器

结构体 Decoder 

源代码
pub struct Decoder {
    active_decoder: RecordDecoder,
    active_fingerprint: Option<Fingerprint>,
    batch_size: usize,
    remaining_capacity: usize,
    cache: IndexMap<Fingerprint, RecordDecoder>,
    fingerprint_algorithm: FingerprintAlgorithm,
    pending_schema: Option<(Fingerprint, RecordDecoder)>,
    awaiting_body: bool,
}
展开描述

一个低级、基于推送的解码器,将 Avro 字节解码为 Arrow RecordBatch

Decoder 专为流式传输场景设计

  • 您可以使用 Self::decode 馈送新接收的字节,可能多次,直到至少一行完成。
  • 然后,您使用 Self::flush 排空已完成的行,如果自上次刷新以来有任何行已完成,它会生成一个 RecordBatch

与专门用于 Avro 对象容器文件Reader 不同,Decoder 支持分帧的单对象输入和 Confluent Schema Registry 消息,当帧指示新的指纹时,会在流中切换模式。

§支持的前缀

在每个新行边界上,Decoder 尝试匹配以下“前缀”之一

  • 单对象编码:魔术字节 0xC3 0x01 + 模式指纹(长度取决于配置的 FingerprintAlgorithm);请参见 SINGLE_OBJECT_MAGIC
  • Confluent 线缆格式:魔术字节 0x00 + 4 字节大端模式 ID;请参见 CONFLUENT_MAGIC

活动指纹决定了使用哪个缓存的行解码器来解码后续的记录体字节。

§模式切换语义

当观察到新指纹时

  • 如果当前批次为空,解码器立即切换;
  • 否则,当前批次将在下一次 flush 时完成,然后解码器才会切换到新模式。这保证了一个 RecordBatch 永远不会混合具有不同模式的行。

§示例

构建和使用用于单对象编码的 Decoder

use arrow_avro::schema::{AvroSchema, SchemaStore};
use arrow_avro::reader::ReaderBuilder;

// Use a record schema at the top level so we can build an Arrow RecordBatch
let mut store = SchemaStore::new(); // Rabin fingerprinting by default
let avro = AvroSchema::new(
    r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()
);
let fp = store.register(avro)?;

// --- Hidden: write a single-object framed row {x:7} ---

let mut decoder = ReaderBuilder::new()
    .with_writer_schema_store(store)
    .with_batch_size(16)
    .build_decoder()?;

let batch = decoder.flush()?.expect("one row");
assert_eq!(batch.num_rows(), 1);

背景:Avro 的单对象编码定义为 0xC3 0x01 + 写入器模式的 8 字节小端 CRC-64-AVRO 指纹 + Avro 二进制主体。有关详细信息,请参阅 Avro 1.11.1 规范。 https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding

构建和使用用于 Confluent Registry 消息的 Decoder

use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint, FingerprintAlgorithm};
use arrow_avro::reader::ReaderBuilder;

let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
store.set(Fingerprint::Id(1234), AvroSchema::new(r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()))?;

// --- Hidden: encode two Confluent-framed messages {x:1} and {x:2} ---

let mut decoder = ReaderBuilder::new()
    .with_writer_schema_store(store)
    .build_decoder()?;
let batch = decoder.flush()?.expect("two rows");
assert_eq!(batch.num_rows(), 2);

字段§

§active_decoder: RecordDecoder§active_fingerprint: Option<Fingerprint>§batch_size: usize§remaining_capacity: usize§cache: IndexMap<Fingerprint, RecordDecoder>§fingerprint_algorithm: FingerprintAlgorithm§pending_schema: Option<(Fingerprint, RecordDecoder)>§awaiting_body: bool

实现§

源代码§

impl Decoder

源代码

pub fn schema(&self) -> SchemaRef

返回此解码器解码的行的 Arrow 模式。

注意:对于单对象或 Confluent 帧,当输入指示新指纹时,模式可能会在行边界处更改。

源代码

pub fn batch_size(&self) -> usize

返回配置的每批最大行数。

源代码

pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError>

将字节块馈送给解码器。

这将

  • 最多解码 Self::batch_size 行;
  • 返回从 data 消耗的输入字节数(如果需要更多字节,则可能为 0,如果前缀/主体跨越块边界,则可能小于 data.len());
  • 在您调用 Self::flush 之前推迟生成 RecordBatch
§返回

data 消耗的字节数。

§错误

如果出现以下情况,则返回错误

  • 输入指示未知指纹(未在提供的 SchemaStore 中存在);
  • Avro 主体格式错误;
  • 违反了严格模式的联合规则(请参见 ReaderBuilder::with_strict_mode)。
源代码

fn handle_prefix(&mut self, buf: &[u8]) -> Result<Option<usize>, ArrowError>

源代码

fn handle_prefix_common<const MAGIC_LEN: usize, const N: usize>( &mut self, buf: &[u8], magic: &[u8; MAGIC_LEN], fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, ) -> Result<Option<usize>, ArrowError>

此方法检查 buf 开头是否存在提供的 magic 字节,如果存在,则尝试读取后续的 N 字节指纹,并使用 fingerprint_from 将其转换为 Fingerprint

源代码

fn handle_fingerprint<const N: usize>( &mut self, buf: &[u8], fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, ) -> Result<Option<usize>, ArrowError>

源代码

fn apply_pending_schema(&mut self)

源代码

fn apply_pending_schema_if_batch_empty(&mut self)

源代码

fn flush_and_reset(&mut self) -> Result<Option<RecordBatch>, ArrowError>

源代码

pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>

如果至少有一行完全解码,则生成一个 RecordBatch,如果没有新行可用,则返回 Ok(None)

如果在解码当前批次的行时检测到模式更改,则在刷新此批次之后应用模式切换,因此下一批次(如果有)可能具有不同的模式。

源代码

pub fn capacity(&self) -> usize

返回此解码器在满载之前可以添加的行数。

源代码

pub fn batch_is_full(&self) -> bool

如果解码器已达到当前批次的容量,则返回 true。

源代码

pub fn batch_is_empty(&self) -> bool

如果解码器尚未解码任何批次(即当前批次为空),则返回 true。

源代码

fn decode_block( &mut self, data: &[u8], count: usize, ) -> Result<(usize, usize), ArrowError>

源代码

fn flush_block(&mut self) -> Result<Option<RecordBatch>, ArrowError>

Trait 实现§

源代码§

impl Debug for Decoder

源代码§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

使用给定的格式化程序格式化值。 阅读更多

自动 Trait 实现§

通用实现§

§

impl<T> Any for T
where T: 'static + ?Sized,

§

fn type_id(&self) -> TypeId

获取 selfTypeId阅读更多
§

impl<T> Borrow<T> for T
where T: ?Sized,

§

fn borrow(&self) -> &T

从拥有的值进行不可变借用。 阅读更多
§

impl<T> BorrowMut<T> for T
where T: ?Sized,

§

fn borrow_mut(&mut self) -> &mut T

从拥有的值进行可变借用。 阅读更多
§

impl<T> From<T> for T

§

fn from(t: T) -> T

返回未更改的参数。

§

impl<T, U> Into<U> for T
where U: From<T>,

§

fn into(self) -> U

调用 U::from(self)

也就是说,此转换是 From<T> for U 的实现选择执行的任何操作。

§

impl<T> Same for T

§

type Output = T

应该总是 Self
§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

转换错误时返回的类型。
§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

执行转换。
§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

转换错误时返回的类型。
§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

执行转换。
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> Allocation for T
where T: RefUnwindSafe + Send + Sync,