表格数据

虽然数组(又称:ValueVector)表示同构值的一维序列,但数据通常以异构数据的二维集合形式出现(例如数据库表、CSV 文件……)。Arrow 提供了几种抽象来方便且高效地处理此类数据。

字段(Fields)

字段用于表示表格数据的特定列。一个字段,即 Field 的一个实例,包含字段名称、数据类型以及一些可选的键值元数据。

// Create a column "document" of string type with metadata
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;

Map<String, String> metadata = new HashMap<>();
metadata.put("A", "Id card");
metadata.put("B", "Passport");
metadata.put("C", "Visa");
Field document = new Field("document", new FieldType(true, new ArrowType.Utf8(), /*dictionary*/ null, metadata), /*children*/ null);

模式(Schemas)

一个 Schema 描述了由任意数量列组成的整体结构。它包含一个字段序列以及一些可选的模式级元数据(除了每个字段的元数据外)。

// Create a schema describing datasets with two columns:
// a int32 column "A" and a utf8-encoded string column "B"
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import static java.util.Arrays.asList;

Map<String, String> metadata = new HashMap<>();
metadata.put("K1", "V1");
metadata.put("K2", "V2");
Field a = new Field("A", FieldType.nullable(new ArrowType.Int(32, true)), null);
Field b = new Field("B", FieldType.nullable(new ArrowType.Utf8()), null);
Schema schema = new Schema(asList(a, b), metadata);

VectorSchemaRoot

一个 VectorSchemaRoot 是数据批次的容器。作为流水线的一部分,数据批次在 VectorSchemaRoot 中流动。

注意

VectorSchemaRoot 与其他 Arrow 实现中的表或记录批次(Record Batch)有几分相似,因为它们都是二维数据集,但它们的用法不同。

推荐的用法是基于已知的模式创建一个单一的 VectorSchemaRoot,并在批次流中反复向该 root 填充数据,而不是每次都创建一个新的实例(以 FlightArrowFileWriter 为例)。因此,在任何时间点,VectorSchemaRoot 可能包含数据,也可能不包含数据(例如数据已被传输到下游或尚未填充)。

以下是创建 VectorSchemaRoot 的示例

BitVector bitVector = new BitVector("boolean", allocator);
VarCharVector varCharVector = new VarCharVector("varchar", allocator);
bitVector.allocateNew();
varCharVector.allocateNew();
for (int i = 0; i < 10; i++) {
  bitVector.setSafe(i, i % 2 == 0 ? 0 : 1);
  varCharVector.setSafe(i, ("test" + i).getBytes(StandardCharsets.UTF_8));
}
bitVector.setValueCount(10);
varCharVector.setValueCount(10);

List<Field> fields = Arrays.asList(bitVector.getField(), varCharVector.getField());
List<FieldVector> vectors = Arrays.asList(bitVector, varCharVector);
VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(fields, vectors);

可以通过 VectorLoaderVectorUnloader 将数据加载到 VectorSchemaRoot 中或从其中卸载。它们处理 VectorSchemaRoot 与 ArrowRecordBatch(记录批次 IPC 消息的一种表示)之间的转换。例如

// create a VectorSchemaRoot root1 and convert its data into recordBatch
VectorSchemaRoot root1 = new VectorSchemaRoot(fields, vectors);
VectorUnloader unloader = new VectorUnloader(root1);
ArrowRecordBatch recordBatch = unloader.getRecordBatch();

// create a VectorSchemaRoot root2 and load the recordBatch
VectorSchemaRoot root2 = VectorSchemaRoot.create(root1.getSchema(), allocator);
VectorLoader loader = new VectorLoader(root2);
loader.load(recordBatch);

可以在不复制数据的情况下从现有的 root 中切分出新的 VectorSchemaRoot

// 0 indicates start index (inclusive) and 5 indicated length (exclusive).
VectorSchemaRoot newRoot = vectorSchemaRoot.slice(0, 5);

表(Table)

一个 Table 是一种不可变的表格数据结构,与 VectorSchemaRoot 非常相似,因为它也是基于 ValueVectors 和模式构建的。与 VectorSchemaRoot 不同,Table 不是为批处理设计的。以下是上述示例的一个版本,展示了如何创建一个 Table,而不是 VectorSchemaRoot

BitVector bitVector = new BitVector("boolean", allocator);
VarCharVector varCharVector = new VarCharVector("varchar", allocator);
bitVector.allocateNew();
varCharVector.allocateNew();
for (int i = 0; i < 10; i++) {
  bitVector.setSafe(i, i % 2 == 0 ? 0 : 1);
  varCharVector.setSafe(i, ("test" + i).getBytes(StandardCharsets.UTF_8));
}
bitVector.setValueCount(10);
varCharVector.setValueCount(10);

List<FieldVector> vectors = Arrays.asList(bitVector, varCharVector);
Table table = new Table(vectors);

有关更多信息,请参阅 Table 文档。