使用 Schema

让我们开始讨论表格数据。数据通常以二维异构数据集的形式出现(例如数据库表、CSV 文件……)。Arrow 提供了几个抽象来方便高效地处理此类数据。

创建字段

字段用于表示表格数据的特定列。

import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;

Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null);
System.out.print(name);
name: Utf8
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;

Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null);
System.out.print(age);
age: Int(32, true)
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;

FieldType intType = new FieldType(true, new ArrowType.Int(32, true), null);
FieldType listType = new FieldType(true, new ArrowType.List(), null);
Field childField = new Field("intCol", intType, null);
List<Field> childFields = new ArrayList<>();
childFields.add(childField);
Field points = new Field("points", listType, childFields);

System.out.print(points);
points: List<intCol: Int(32, true)>

创建 Schema

Schema 描述表格数据中的一系列列,由字段列表组成。

import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import java.util.ArrayList;
import java.util.List;
import static java.util.Arrays.asList;

Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null);
Field document = new Field("document", new FieldType(true, new ArrowType.Utf8(), null), null);
Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null);
FieldType intType = new FieldType(true, new ArrowType.Int(32, true), /*dictionary=*/null);
FieldType listType = new FieldType(true, new ArrowType.List(), /*dictionary=*/null);
Field childField = new Field("intCol", intType, null);
List<Field> childFields = new ArrayList<>();
childFields.add(childField);
Field points = new Field("points", listType, childFields);
Schema schemaPerson = new Schema(asList(name, document, age, points));

System.out.print(schemaPerson);
Schema<name: Utf8, document: Utf8, age: Int(32, true), points: List<intCol: Int(32, true)>>

向字段和 Schema 添加元数据

如果需要向 Field 添加元数据,可以使用

import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;

Map<String, String> metadata = new HashMap<>();
metadata.put("A", "Id card");
metadata.put("B", "Passport");
metadata.put("C", "Visa");
Field document = new Field("document", new FieldType(true, new ArrowType.Utf8(), null, metadata), null);

System.out.print(document.getMetadata());
{A=Id card, B=Passport, C=Visa}

如果需要向 Schema 添加元数据,可以使用

import org.apache.arrow.vector.types.pojo.Schema;

import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Arrays.asList;

Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null);
Field document = new Field("document", new FieldType(true, new ArrowType.Utf8(), null), null);
Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null);
FieldType intType = new FieldType(true, new ArrowType.Int(32, true), /*dictionary=*/null);
FieldType listType = new FieldType(true, new ArrowType.List(), /*dictionary=*/null);
Field childField = new Field("intCol", intType, null);
List<Field> childFields = new ArrayList<>();
childFields.add(childField);
Field points = new Field("points", listType, childFields);
Map<String, String> metadataSchema = new HashMap<>();
metadataSchema.put("Key-1", "Value-1");
Schema schemaPerson = new Schema(asList(name, document, age, points), metadataSchema);

System.out.print(schemaPerson);
Schema<name: Utf8, document: Utf8, age: Int(32, true), points: List<intCol: Int(32, true)>>(metadata: {Key-1=Value-1})

创建 VectorSchemaRoot

VectorSchemaRoot 在某种程度上类似于其他 Arrow 实现中的表和记录批次,因为它们都是 2D 数据集,但用法不同。

让我们用一小批记录填充 VectorSchemaRoot

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.complex.impl.UnionListWriter;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import java.util.ArrayList;
import java.util.List;
import static java.util.Arrays.asList;

Field name = new Field("name", FieldType.nullable(new ArrowType.Utf8()), null);
Field age = new Field("age", FieldType.nullable(new ArrowType.Int(32, true)), null);
FieldType intType = new FieldType(true, new ArrowType.Int(32, true), null);
FieldType listType = new FieldType(true, new ArrowType.List(), null);
Field childField = new Field("intCol", intType, null);
List<Field> childFields = new ArrayList<>();
childFields.add(childField);
Field points = new Field("points", listType, childFields);
Schema schema = new Schema(asList(name, age, points));
try(
    BufferAllocator allocator = new RootAllocator();
    VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)
){
    VarCharVector nameVector = (VarCharVector) root.getVector("name");
    nameVector.allocateNew(3);
    nameVector.set(0, "David".getBytes());
    nameVector.set(1, "Gladis".getBytes());
    nameVector.set(2, "Juan".getBytes());
    nameVector.setValueCount(3);
    IntVector ageVector = (IntVector) root.getVector("age");
    ageVector.allocateNew(3);
    ageVector.set(0, 10);
    ageVector.set(1, 20);
    ageVector.set(2, 30);
    ageVector.setValueCount(3);
    ListVector listVector = (ListVector) root.getVector("points");
    UnionListWriter listWriter = listVector.getWriter();
    int[] data = new int[] { 4, 8, 12, 10, 20, 30, 5, 10, 15 };
    int tmp_index = 0;
    for(int i = 0; i < 3; i++) {
        listWriter.setPosition(i);
        listWriter.startList();
        for(int j = 0; j < 3; j++) {
            listWriter.writeInt(data[tmp_index]);
            tmp_index = tmp_index + 1;
        }
        listWriter.setValueCount(2);
        listWriter.endList();
    }
    listVector.setValueCount(3);
    root.setRowCount(3);

    System.out.print(root.contentToTSVString());
} catch (Exception e) {
    e.printStackTrace();
}
name    age    points
David    10    [4,8,12]
Gladis    20    [10,20,30]
Juan    30    [5,10,15]