快速入门指南#
Arrow Java 提供了几个构建块。数据类型描述了值的类型;ValueVectors 是类型化值的序列;字段描述了表格数据中列的类型;模式描述了表格数据中列的序列,而 VectorSchemaRoot 代表表格数据。Arrow 还为从存储中加载数据和将数据持久化到存储中提供了读取器和写入器。
创建 ValueVector#
ValueVectors 代表相同类型的值序列。它们在列式格式中也被称为“数组”。
示例:创建一个包含 32 位整数的向量,代表 [1, null, 2]
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
try(
BufferAllocator allocator = new RootAllocator();
IntVector intVector = new IntVector("fixed-size-primitive-layout", allocator);
){
intVector.allocateNew(3);
intVector.set(0,1);
intVector.setNull(1);
intVector.set(2,2);
intVector.setValueCount(3);
System.out.println("Vector created in memory: " + intVector);
}
Vector created in memory: [1, null, 2]
示例:创建一个包含 UTF-8 编码字符串的向量,代表 ["one", "two", "three"]
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarCharVector;
try(
BufferAllocator allocator = new RootAllocator();
VarCharVector varCharVector = new VarCharVector("variable-size-primitive-layout", allocator);
){
varCharVector.allocateNew(3);
varCharVector.set(0, "one".getBytes());
varCharVector.set(1, "two".getBytes());
varCharVector.set(2, "three".getBytes());
varCharVector.setValueCount(3);
System.out.println("Vector created in memory: " + varCharVector);
}
Vector created in memory: [one, two, three]
创建 Field#
字段用于表示表格数据的特定列。它们由名称、数据类型、一个指示该列是否可以具有空值的标志以及可选的键值元数据组成。
示例:创建一个名为“document”的字符串类型字段
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import java.util.HashMap;
import java.util.Map;
Map<String, String> metadata = new HashMap<>();
metadata.put("A", "Id card");
metadata.put("B", "Passport");
metadata.put("C", "Visa");
Field document = new Field("document",
new FieldType(true, new ArrowType.Utf8(), /*dictionary*/ null, metadata),
/*children*/ null);
System.out.println("Field created: " + document + ", Metadata: " + document.getMetadata());
Field created: document: Utf8, Metadata: {A=Id card, B=Passport, C=Visa}
创建 Schema#
模式将字段序列与一些可选的元数据一起保存。
示例:创建一个描述具有两列的数据集的模式:一个 int32 列“A”和一个 UTF8 编码字符串列“B”
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import java.util.HashMap;
import java.util.Map;
import static java.util.Arrays.asList;
Map<String, String> metadata = new HashMap<>();
metadata.put("K1", "V1");
metadata.put("K2", "V2");
Field a = new Field("A", FieldType.nullable(new ArrowType.Int(32, true)), /*children*/ null);
Field b = new Field("B", FieldType.nullable(new ArrowType.Utf8()), /*children*/ null);
Schema schema = new Schema(asList(a, b), metadata);
System.out.println("Schema created: " + schema);
Schema created: Schema<A: Int(32, true), B: Utf8>(metadata: {K1=V1, K2=V2})
创建 VectorSchemaRoot#
VectorSchemaRoot 将 ValueVectors 与 Schema 结合在一起以表示表格数据。
示例:创建一个包含姓名(字符串)和年龄(32 位有符号整数)的数据集。
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import static java.util.Arrays.asList;
Field age = new Field("age",
FieldType.nullable(new ArrowType.Int(32, true)),
/*children*/null
);
Field name = new Field("name",
FieldType.nullable(new ArrowType.Utf8()),
/*children*/null
);
Schema schema = new Schema(asList(age, name), /*metadata*/ null);
try(
BufferAllocator allocator = new RootAllocator();
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
IntVector ageVector = (IntVector) root.getVector("age");
VarCharVector nameVector = (VarCharVector) root.getVector("name");
){
ageVector.allocateNew(3);
ageVector.set(0, 10);
ageVector.set(1, 20);
ageVector.set(2, 30);
nameVector.allocateNew(3);
nameVector.set(0, "Dave".getBytes(StandardCharsets.UTF_8));
nameVector.set(1, "Peter".getBytes(StandardCharsets.UTF_8));
nameVector.set(2, "Mary".getBytes(StandardCharsets.UTF_8));
root.setRowCount(3);
System.out.println("VectorSchemaRoot created: \n" + root.contentToTSVString());
}
VectorSchemaRoot created:
age name
10 Dave
20 Peter
30 Mary
进程间通信 (IPC)#
Arrow 数据可以写入和读取磁盘,并且这两种操作都可以根据应用程序需求以流式和/或随机访问方式完成。
将数据写入箭头文件
示例:将上一示例中的数据集写入 Arrow IPC 文件(随机访问)。
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowFileWriter;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import static java.util.Arrays.asList;
Field age = new Field("age",
FieldType.nullable(new ArrowType.Int(32, true)),
/*children*/ null);
Field name = new Field("name",
FieldType.nullable(new ArrowType.Utf8()),
/*children*/ null);
Schema schema = new Schema(asList(age, name));
try(
BufferAllocator allocator = new RootAllocator();
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator);
IntVector ageVector = (IntVector) root.getVector("age");
VarCharVector nameVector = (VarCharVector) root.getVector("name");
){
ageVector.allocateNew(3);
ageVector.set(0, 10);
ageVector.set(1, 20);
ageVector.set(2, 30);
nameVector.allocateNew(3);
nameVector.set(0, "Dave".getBytes(StandardCharsets.UTF_8));
nameVector.set(1, "Peter".getBytes(StandardCharsets.UTF_8));
nameVector.set(2, "Mary".getBytes(StandardCharsets.UTF_8));
root.setRowCount(3);
File file = new File("random_access_file.arrow");
try (
FileOutputStream fileOutputStream = new FileOutputStream(file);
ArrowFileWriter writer = new ArrowFileWriter(root, /*provider*/ null, fileOutputStream.getChannel());
) {
writer.start();
writer.writeBatch();
writer.end();
System.out.println("Record batches written: " + writer.getRecordBlocks().size()
+ ". Number of rows written: " + root.getRowCount());
} catch (IOException e) {
e.printStackTrace();
}
}
Record batches written: 1. Number of rows written: 3
从箭头文件读取数据
示例:从 Arrow IPC 文件(随机访问)读取上一示例中的数据集。
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.message.ArrowBlock;
import org.apache.arrow.vector.VectorSchemaRoot;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
try(
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
FileInputStream fileInputStream = new FileInputStream(new File("random_access_file.arrow"));
ArrowFileReader reader = new ArrowFileReader(fileInputStream.getChannel(), allocator);
){
System.out.println("Record batches in file: " + reader.getRecordBlocks().size());
for (ArrowBlock arrowBlock : reader.getRecordBlocks()) {
reader.loadRecordBatch(arrowBlock);
VectorSchemaRoot root = reader.getVectorSchemaRoot();
System.out.println("VectorSchemaRoot read: \n" + root.contentToTSVString());
}
} catch (IOException e) {
e.printStackTrace();
}
Record batches in file: 1
VectorSchemaRoot read:
age name
10 Dave
20 Peter
30 Mary
更多示例请参见 Arrow Java 食谱。