Crate arrow_avro

Crate arrow_avro 

源代码
展开描述

将数据转换为 Apache Arrow 内存格式和 Apache Avro,或从其转换。

此 crate 提供

  • 一个 reader,用于将 Avro(对象容器文件、Avro 单对象编码和 Confluent Schema Registry 线格式)解码为 Arrow RecordBatches,
  • 以及一个 writer,用于将 Arrow RecordBatches 编码为 Avro(OCF 或 SOE)。

如果您是 Arrow 或 Avro 的新手,请参阅

§示例:OCF (对象容器文件) 往返 (可运行)

下面的示例创建了一个 Arrow 表,将一个 Avro OCF 完全写入内存,然后将其读回。OCF 是一种自描述文件格式,它将 Avro 模式嵌入到头部中,并带有可选的压缩和块同步标记。规范:https://avro.apache.org/docs/1.11.1/specification/#object-container-files

use std::io::Cursor;
use std::sync::Arc;
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::writer::AvroWriter;
use arrow_avro::reader::ReaderBuilder;

// Build a tiny Arrow batch
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
let batch = RecordBatch::try_new(
    Arc::new(schema.clone()),
    vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
)?;

// Write an Avro **Object Container File** (OCF) to a Vec<u8>
let sink: Vec<u8> = Vec::new();
let mut w = AvroWriter::new(sink, schema.clone())?;
w.write(&batch)?;
w.finish()?;
let bytes = w.into_inner();
assert!(!bytes.is_empty());

// Read it back
let mut r = ReaderBuilder::new().build(Cursor::new(bytes))?;
let out = r.next().unwrap()?;
assert_eq!(out.num_rows(), 3);

§快速入门:SOE (单对象编码) 往返 (可运行)

Avro 单对象编码 (SOE) 用一个 2 字节标记 0xC3 0x01 和一个 8 字节小端 CRC-64-AVRO Rabin 指纹包裹 Avro 主体,然后是 Avro 主体。规范:https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding

此示例注册写入器模式(计算 Rabin 指纹),写入一个单行 Avro 主体(使用 AvroStreamWriter),构造 SOE 帧,然后将其解码回 Arrow。

use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::{ArrayRef, Int64Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::writer::{AvroStreamWriter, WriterBuilder};
use arrow_avro::reader::ReaderBuilder;
use arrow_avro::schema::{AvroSchema, SchemaStore, FingerprintStrategy, SCHEMA_METADATA_KEY};

// Writer schema: { "type":"record","name":"User","fields":[{"name":"x","type":"long"}] }
let writer_json = r#"{"type":"record","name":"User","fields":[{"name":"x","type":"long"}]}"#;
let mut store = SchemaStore::new(); // Rabin CRC‑64‑AVRO by default
let _fp = store.register(AvroSchema::new(writer_json.to_string()))?;

// Build an Arrow schema that references the same Avro JSON
let mut md = HashMap::new();
md.insert(SCHEMA_METADATA_KEY.to_string(), writer_json.to_string());
let schema = Schema::new_with_metadata(
    vec![Field::new("x", DataType::Int64, false)],
    md,
);

// One‑row batch: { x: 7 }
let batch = RecordBatch::try_new(
    Arc::new(schema.clone()),
    vec![Arc::new(Int64Array::from(vec![7])) as ArrayRef],
)?;

// Stream‑write a single record; the writer adds **SOE** (C3 01 + Rabin) automatically.
let sink: Vec<u8> = Vec::new();
let mut w: AvroStreamWriter<Vec<u8>> = WriterBuilder::new(schema.clone())
    .with_fingerprint_strategy(FingerprintStrategy::Rabin)
    .build(sink)?;
w.write(&batch)?;
w.finish()?;
let frame = w.into_inner(); // already: C3 01 + 8B LE Rabin + Avro body
assert!(frame.len() > 10);

// Decode
let mut dec = ReaderBuilder::new()
  .with_writer_schema_store(store)
  .build_decoder()?;
dec.decode(&frame)?;
let out = dec.flush()?.expect("one row");
assert_eq!(out.num_rows(), 1);

§模块

  • reader:将 Avro(OCF、SOE、Confluent)读取为 Arrow RecordBatches。
  • writer:将 Arrow RecordBatches 写入为 Avro(OCF、SOE、Confluent、Apicurio)。
  • schema:Avro 模式解析/指纹/注册表。
  • compression:用于 OCF 块压缩的编解码器(即 Deflate、Snappy、Zstandard、BZip2 和 XZ)。
  • codec:内部 Avro-Arrow 类型转换和行解码/编码计划。

§特性

OCF 压缩(默认启用)

  • deflate — 启用 DEFLATE 块压缩(通过 flate2)。
  • snappy — 启用 Snappy 块压缩,带 4 字节 BE CRC32(根据 Avro)。
  • zstd — 启用 Zstandard 块压缩。
  • bzip2 — 启用 BZip2 块压缩。
  • xz — 启用 XZ/LZMA 块压缩。

模式指纹和辅助工具(可选)

  • md5 — 启用 MD5 写入器模式指纹。
  • sha256 — 启用 SHA-256 写入器模式指纹。
  • small_decimals — 支持小 Avro 十进制数的紧凑 Arrow 表示(Decimal32Decimal64)。
  • avro_custom_types — 将使用 Arrow 特定的逻辑类型(如 arrow.duration-nanosarrow.duration-microsarrow.duration-millisarrow.duration-seconds)注释的 Avro 字段解释为 Arrow Duration(TimeUnit)
  • canonical_extension_types — 启用对 arrow-schema 中 Arrow 规范扩展类型的支持,以便 arrow-avro 可以在 Avro↔Arrow 映射期间遵守它们。

备注

  • OCF 压缩编解码器仅适用于 对象容器文件;它们不影响 Avro 单对象编码。

模块§

编解码器
Avro 和 Arrow 类型之间的数据类型转换
压缩
Avro 的压缩编解码器实现
读取器
将 Avro 数据读入 Arrow 数组的核心功能
模式
Avro 模式解析和表示
写入器
将 Arrow 数组写入为 Avro 数据的核心功能

Trait§

Avro字段扩展
AvroField 的扩展 trait,用于添加 Utf8View 支持