展开描述
将数据转换为 Apache Arrow 内存格式和 Apache Avro,或从其转换。
此 crate 提供
- 一个
reader,用于将 Avro(对象容器文件、Avro 单对象编码和 Confluent Schema Registry 线格式)解码为 ArrowRecordBatches, - 以及一个
writer,用于将 ArrowRecordBatches 编码为 Avro(OCF 或 SOE)。
如果您是 Arrow 或 Avro 的新手,请参阅
- Arrow 项目网站:https://arrow.apache.org/
- Avro 1.11.1 规范:https://avro.apache.org/docs/1.11.1/specification/
§示例:OCF (对象容器文件) 往返 (可运行)
下面的示例创建了一个 Arrow 表,将一个 Avro OCF 完全写入内存,然后将其读回。OCF 是一种自描述文件格式,它将 Avro 模式嵌入到头部中,并带有可选的压缩和块同步标记。规范:https://avro.apache.org/docs/1.11.1/specification/#object-container-files
use std::io::Cursor;
use std::sync::Arc;
use arrow_array::{ArrayRef, Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::writer::AvroWriter;
use arrow_avro::reader::ReaderBuilder;
// Build a tiny Arrow batch
let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
)?;
// Write an Avro **Object Container File** (OCF) to a Vec<u8>
let sink: Vec<u8> = Vec::new();
let mut w = AvroWriter::new(sink, schema.clone())?;
w.write(&batch)?;
w.finish()?;
let bytes = w.into_inner();
assert!(!bytes.is_empty());
// Read it back
let mut r = ReaderBuilder::new().build(Cursor::new(bytes))?;
let out = r.next().unwrap()?;
assert_eq!(out.num_rows(), 3);§快速入门:SOE (单对象编码) 往返 (可运行)
Avro 单对象编码 (SOE) 用一个 2 字节标记 0xC3 0x01 和一个 8 字节小端 CRC-64-AVRO Rabin 指纹包裹 Avro 主体,然后是 Avro 主体。规范:https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding
此示例注册写入器模式(计算 Rabin 指纹),写入一个单行 Avro 主体(使用 AvroStreamWriter),构造 SOE 帧,然后将其解码回 Arrow。
use std::collections::HashMap;
use std::sync::Arc;
use arrow_array::{ArrayRef, Int64Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use arrow_avro::writer::{AvroStreamWriter, WriterBuilder};
use arrow_avro::reader::ReaderBuilder;
use arrow_avro::schema::{AvroSchema, SchemaStore, FingerprintStrategy, SCHEMA_METADATA_KEY};
// Writer schema: { "type":"record","name":"User","fields":[{"name":"x","type":"long"}] }
let writer_json = r#"{"type":"record","name":"User","fields":[{"name":"x","type":"long"}]}"#;
let mut store = SchemaStore::new(); // Rabin CRC‑64‑AVRO by default
let _fp = store.register(AvroSchema::new(writer_json.to_string()))?;
// Build an Arrow schema that references the same Avro JSON
let mut md = HashMap::new();
md.insert(SCHEMA_METADATA_KEY.to_string(), writer_json.to_string());
let schema = Schema::new_with_metadata(
vec![Field::new("x", DataType::Int64, false)],
md,
);
// One‑row batch: { x: 7 }
let batch = RecordBatch::try_new(
Arc::new(schema.clone()),
vec![Arc::new(Int64Array::from(vec![7])) as ArrayRef],
)?;
// Stream‑write a single record; the writer adds **SOE** (C3 01 + Rabin) automatically.
let sink: Vec<u8> = Vec::new();
let mut w: AvroStreamWriter<Vec<u8>> = WriterBuilder::new(schema.clone())
.with_fingerprint_strategy(FingerprintStrategy::Rabin)
.build(sink)?;
w.write(&batch)?;
w.finish()?;
let frame = w.into_inner(); // already: C3 01 + 8B LE Rabin + Avro body
assert!(frame.len() > 10);
// Decode
let mut dec = ReaderBuilder::new()
.with_writer_schema_store(store)
.build_decoder()?;
dec.decode(&frame)?;
let out = dec.flush()?.expect("one row");
assert_eq!(out.num_rows(), 1);§模块
reader:将 Avro(OCF、SOE、Confluent)读取为 ArrowRecordBatches。writer:将 ArrowRecordBatches 写入为 Avro(OCF、SOE、Confluent、Apicurio)。schema:Avro 模式解析/指纹/注册表。compression:用于 OCF 块压缩的编解码器(即 Deflate、Snappy、Zstandard、BZip2 和 XZ)。codec:内部 Avro-Arrow 类型转换和行解码/编码计划。
§特性
OCF 压缩(默认启用)
deflate— 启用 DEFLATE 块压缩(通过flate2)。snappy— 启用 Snappy 块压缩,带 4 字节 BE CRC32(根据 Avro)。zstd— 启用 Zstandard 块压缩。bzip2— 启用 BZip2 块压缩。xz— 启用 XZ/LZMA 块压缩。
模式指纹和辅助工具(可选)
md5— 启用 MD5 写入器模式指纹。sha256— 启用 SHA-256 写入器模式指纹。small_decimals— 支持小 Avro 十进制数的紧凑 Arrow 表示(Decimal32和Decimal64)。avro_custom_types— 将使用 Arrow 特定的逻辑类型(如arrow.duration-nanos、arrow.duration-micros、arrow.duration-millis或arrow.duration-seconds)注释的 Avro 字段解释为 ArrowDuration(TimeUnit)。canonical_extension_types— 启用对arrow-schema中 Arrow 规范扩展类型的支持,以便arrow-avro可以在 Avro↔Arrow 映射期间遵守它们。
备注
- OCF 压缩编解码器仅适用于 对象容器文件;它们不影响 Avro 单对象编码。
模块§
- 编解码器
- Avro 和 Arrow 类型之间的数据类型转换
- 压缩
- Avro 的压缩编解码器实现
- 读取器
- 将 Avro 数据读入 Arrow 数组的核心功能
- 模式
- Avro 模式解析和表示
- 写入器
- 将 Arrow 数组写入为 Avro 数据的核心功能
Trait§
- Avro
字段 扩展 - AvroField 的扩展 trait,用于添加 Utf8View 支持