读取/写入 IPC 格式#

Arrow 定义了两种用于序列化记录批次的二进制格式

  • 流式格式:用于发送任意数量的记录批次。此格式必须从头到尾处理,不支持随机访问

  • 文件或随机访问格式:用于序列化固定数量的记录批次。它支持随机访问,因此在与内存映射一起使用时非常有用

写入和读取流式格式#

首先,让我们使用少量记录填充一个 VectorSchemaRoot

BitVector bitVector = new BitVector("boolean", allocator);
VarCharVector varCharVector = new VarCharVector("varchar", allocator);
for (int i = 0; i < 10; i++) {
  bitVector.setSafe(i, i % 2 == 0 ? 0 : 1);
  varCharVector.setSafe(i, ("test" + i).getBytes(StandardCharsets.UTF_8));
}
bitVector.setValueCount(10);
varCharVector.setValueCount(10);

List<Field> fields = Arrays.asList(bitVector.getField(), varCharVector.getField());
List<FieldVector> vectors = Arrays.asList(bitVector, varCharVector);
VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors);

现在,我们可以开始写入包含这些批次中的一些批次的流。为此,我们使用 ArrowStreamWriter(对于任何字典编码的向量使用的 DictionaryProvider 是可选的,可以为 null)

try (
  ByteArrayOutputStream out = new ByteArrayOutputStream();
  ArrowStreamWriter writer = new ArrowStreamWriter(root, /*DictionaryProvider=*/null, Channels.newChannel(out));
) {
  // ... do write into the ArrowStreamWriter
}

这里我们使用了内存流,但这可以是套接字或其他一些 IO 流。然后我们可以执行

writer.start();
// write the first batch
writer.writeBatch();

// write another four batches.
for (int i = 0; i < 4; i++) {
  // populate VectorSchemaRoot data and write the second batch
  BitVector childVector1 = (BitVector)root.getVector(0);
  VarCharVector childVector2 = (VarCharVector)root.getVector(1);
  childVector1.reset();
  childVector2.reset();
  // ... do some populate work here, could be different for each batch
  writer.writeBatch();
}

writer.end();

请注意,由于写入器中的 VectorSchemaRoot 是一个可以容纳批次的容器,批次作为管道的一部分流经 VectorSchemaRoot,因此我们需要在 writeBatch 之前填充数据,以便以后的批次可以覆盖之前的批次。

现在 ByteArrayOutputStream 包含完整的流,其中包含 5 个记录批次。我们可以使用 ArrowStreamReader 读取此类流。请注意,读取器中的 VectorSchemaRoot 将在每次调用 loadNextBatch() 时加载新值

try (ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), allocator)) {
  // This will be loaded with new values on every call to loadNextBatch
  VectorSchemaRoot readRoot = reader.getVectorSchemaRoot();
  Schema schema = readRoot.getSchema();
  for (int i = 0; i < 5; i++) {
    reader.loadNextBatch();
    // ... do something with readRoot
  }
}

这里我们还提供了一个使用字典编码向量的简单示例

// create provider
DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider();

try (
  final VarCharVector dictVector = new VarCharVector("dict", allocator);
  final VarCharVector vector = new VarCharVector("vector", allocator);
) {
  // create dictionary vector
  dictVector.allocateNewSafe();
  dictVector.setSafe(0, "aa".getBytes());
  dictVector.setSafe(1, "bb".getBytes());
  dictVector.setSafe(2, "cc".getBytes());
  dictVector.setValueCount(3);

  // create dictionary
  Dictionary dictionary =
      new Dictionary(dictVector, new DictionaryEncoding(1L, false, /*indexType=*/null));
  provider.put(dictionary);

  // create original data vector
  vector.allocateNewSafe();
  vector.setSafe(0, "bb".getBytes());
  vector.setSafe(1, "bb".getBytes());
  vector.setSafe(2, "cc".getBytes());
  vector.setSafe(3, "aa".getBytes());
  vector.setValueCount(4);

  // get the encoded vector
  IntVector encodedVector = (IntVector) DictionaryEncoder.encode(vector, dictionary);

  ByteArrayOutputStream out = new ByteArrayOutputStream();

  // create VectorSchemaRoot
  List<Field> fields = Arrays.asList(encodedVector.getField());
  List<FieldVector> vectors = Arrays.asList(encodedVector);
  try (VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors)) {

      // write data
      ArrowStreamWriter writer = new ArrowStreamWriter(root, provider, Channels.newChannel(out));
      writer.start();
      writer.writeBatch();
      writer.end();
  }

  // read data
  try (ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), allocator)) {
    reader.loadNextBatch();
    VectorSchemaRoot readRoot = reader.getVectorSchemaRoot();
    // get the encoded vector
    IntVector intVector = (IntVector) readRoot.getVector(0);

    // get dictionaries and decode the vector
    Map<Long, Dictionary> dictionaryMap = reader.getDictionaryVectors();
    long dictionaryId = intVector.getField().getDictionary().getId();
    try (VarCharVector varCharVector =
        (VarCharVector) DictionaryEncoder.decode(intVector, dictionaryMap.get(dictionaryId))) {
      // ... use decoded vector
    }
  }
}

写入和读取随机访问文件#

ArrowFileWriterArrowStreamWriter 具有相同的 API

try (
  ByteArrayOutputStream out = new ByteArrayOutputStream();
  ArrowFileWriter writer = new ArrowFileWriter(root, /*DictionaryProvider=*/null, Channels.newChannel(out));
) {
  writer.start();
  // write the first batch
  writer.writeBatch();
  // write another four batches.
  for (int i = 0; i < 4; i++) {
    // ... do populate work
    writer.writeBatch();
  }
  writer.end();
}

ArrowFileReaderArrowStreamReader 之间的区别在于,输入源必须具有用于随机访问的 seek 方法。因为我们可以访问整个有效负载,所以我们知道文件中记录批次的数目,并且可以随机读取任何批次

try (ArrowFileReader reader = new ArrowFileReader(
    new ByteArrayReadableSeekableByteChannel(out.toByteArray()), allocator)) {

  // read the 4-th batch
  ArrowBlock block = reader.getRecordBlocks().get(3);
  reader.loadRecordBatch(block);
  VectorSchemaRoot readBatch = reader.getVectorSchemaRoot();
}