读取/写入 IPC 格式#
Arrow 定义了两种用于序列化记录批次的二进制格式
流式格式:用于发送任意数量的记录批次。此格式必须从头到尾处理,不支持随机访问
文件或随机访问格式:用于序列化固定数量的记录批次。它支持随机访问,因此在与内存映射一起使用时非常有用
写入和读取流式格式#
首先,让我们使用少量记录填充一个 VectorSchemaRoot
BitVector bitVector = new BitVector("boolean", allocator);
VarCharVector varCharVector = new VarCharVector("varchar", allocator);
for (int i = 0; i < 10; i++) {
bitVector.setSafe(i, i % 2 == 0 ? 0 : 1);
varCharVector.setSafe(i, ("test" + i).getBytes(StandardCharsets.UTF_8));
}
bitVector.setValueCount(10);
varCharVector.setValueCount(10);
List<Field> fields = Arrays.asList(bitVector.getField(), varCharVector.getField());
List<FieldVector> vectors = Arrays.asList(bitVector, varCharVector);
VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors);
现在,我们可以开始写入包含这些批次中的一些批次的流。为此,我们使用 ArrowStreamWriter
(对于任何字典编码的向量使用的 DictionaryProvider 是可选的,可以为 null)
try (
ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowStreamWriter writer = new ArrowStreamWriter(root, /*DictionaryProvider=*/null, Channels.newChannel(out));
) {
// ... do write into the ArrowStreamWriter
}
这里我们使用了内存流,但这可以是套接字或其他一些 IO 流。然后我们可以执行
writer.start();
// write the first batch
writer.writeBatch();
// write another four batches.
for (int i = 0; i < 4; i++) {
// populate VectorSchemaRoot data and write the second batch
BitVector childVector1 = (BitVector)root.getVector(0);
VarCharVector childVector2 = (VarCharVector)root.getVector(1);
childVector1.reset();
childVector2.reset();
// ... do some populate work here, could be different for each batch
writer.writeBatch();
}
writer.end();
请注意,由于写入器中的 VectorSchemaRoot
是一个可以容纳批次的容器,批次作为管道的一部分流经 VectorSchemaRoot
,因此我们需要在 writeBatch
之前填充数据,以便以后的批次可以覆盖之前的批次。
现在 ByteArrayOutputStream
包含完整的流,其中包含 5 个记录批次。我们可以使用 ArrowStreamReader
读取此类流。请注意,读取器中的 VectorSchemaRoot
将在每次调用 loadNextBatch()
时加载新值
try (ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), allocator)) {
// This will be loaded with new values on every call to loadNextBatch
VectorSchemaRoot readRoot = reader.getVectorSchemaRoot();
Schema schema = readRoot.getSchema();
for (int i = 0; i < 5; i++) {
reader.loadNextBatch();
// ... do something with readRoot
}
}
这里我们还提供了一个使用字典编码向量的简单示例
// create provider
DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider();
try (
final VarCharVector dictVector = new VarCharVector("dict", allocator);
final VarCharVector vector = new VarCharVector("vector", allocator);
) {
// create dictionary vector
dictVector.allocateNewSafe();
dictVector.setSafe(0, "aa".getBytes());
dictVector.setSafe(1, "bb".getBytes());
dictVector.setSafe(2, "cc".getBytes());
dictVector.setValueCount(3);
// create dictionary
Dictionary dictionary =
new Dictionary(dictVector, new DictionaryEncoding(1L, false, /*indexType=*/null));
provider.put(dictionary);
// create original data vector
vector.allocateNewSafe();
vector.setSafe(0, "bb".getBytes());
vector.setSafe(1, "bb".getBytes());
vector.setSafe(2, "cc".getBytes());
vector.setSafe(3, "aa".getBytes());
vector.setValueCount(4);
// get the encoded vector
IntVector encodedVector = (IntVector) DictionaryEncoder.encode(vector, dictionary);
ByteArrayOutputStream out = new ByteArrayOutputStream();
// create VectorSchemaRoot
List<Field> fields = Arrays.asList(encodedVector.getField());
List<FieldVector> vectors = Arrays.asList(encodedVector);
try (VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors)) {
// write data
ArrowStreamWriter writer = new ArrowStreamWriter(root, provider, Channels.newChannel(out));
writer.start();
writer.writeBatch();
writer.end();
}
// read data
try (ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), allocator)) {
reader.loadNextBatch();
VectorSchemaRoot readRoot = reader.getVectorSchemaRoot();
// get the encoded vector
IntVector intVector = (IntVector) readRoot.getVector(0);
// get dictionaries and decode the vector
Map<Long, Dictionary> dictionaryMap = reader.getDictionaryVectors();
long dictionaryId = intVector.getField().getDictionary().getId();
try (VarCharVector varCharVector =
(VarCharVector) DictionaryEncoder.decode(intVector, dictionaryMap.get(dictionaryId))) {
// ... use decoded vector
}
}
}
写入和读取随机访问文件#
ArrowFileWriter
与 ArrowStreamWriter
具有相同的 API
try (
ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowFileWriter writer = new ArrowFileWriter(root, /*DictionaryProvider=*/null, Channels.newChannel(out));
) {
writer.start();
// write the first batch
writer.writeBatch();
// write another four batches.
for (int i = 0; i < 4; i++) {
// ... do populate work
writer.writeBatch();
}
writer.end();
}
ArrowFileReader
和 ArrowStreamReader
之间的区别在于,输入源必须具有用于随机访问的 seek
方法。因为我们可以访问整个有效负载,所以我们知道文件中记录批次的数目,并且可以随机读取任何批次
try (ArrowFileReader reader = new ArrowFileReader(
new ByteArrayReadableSeekableByteChannel(out.toByteArray()), allocator)) {
// read the 4-th batch
ArrowBlock block = reader.getRecordBlocks().get(3);
reader.loadRecordBatch(block);
VectorSchemaRoot readBatch = reader.getVectorSchemaRoot();
}