数据集¶
警告
实验性功能:Java 模块 dataset 目前处于早期开发阶段。在 Apache Arrow 成熟之前,API 可能会在每个发行版中发生更改。
Dataset 是 Apache Arrow 中的一个通用层,用于查询不同格式或采用不同分区策略的数据。通常情况下,待查询的数据被假定位于传统文件系统中,但 Arrow Dataset 的设计初衷不仅限于查询文件,还可以扩展到为所有可能的数据源提供服务,例如进程间通信或来自其他网络位置的数据等。
入门指南¶
目前支持的文件格式包括:
Apache Arrow (
.arrow)Apache ORC (
.orc)Apache Parquet (
.parquet)逗号分隔值 (
.csv)行分隔 JSON 值 (
.json)
下面展示了在 Java 中使用 Dataset 查询 Parquet 文件的最简单示例:
// read data from file /opt/example.parquet
String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()
) {
List<ArrowRecordBatch> batches = new ArrayList<>();
while (reader.loadNextBatch()) {
try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
final VectorUnloader unloader = new VectorUnloader(root);
batches.add(unloader.getRecordBatch());
}
}
// do something with read record batches, for example:
analyzeArrowData(batches);
// finished the analysis of the data, close all resources:
AutoCloseables.close(batches);
} catch (Exception e) {
e.printStackTrace();
}
注意
ArrowRecordBatch 是一种低级复合 Arrow 数据交换格式,不提供直接从中读取类型化数据的 API。建议使用工具 VectorLoader 将其加载到具备模式感知能力的容器 VectorSchemaRoot 中,用户可以通过该容器在 Java 中方便地访问解码后的数据。
仅当 ScanOptions 中的 batchSize 参数设置为小于记录批次(recordbatch)中行数的值时,该参数才会生效。
另请参阅
使用 VectorSchemaRoot 加载记录批次。
模式 (Schema)¶
在实际读取数据之前,可以通过方法 DatasetFactory#inspect() 检查待查询数据的模式。例如:
// read data from local file /opt/example.parquet
String uri = "file:/opt/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
// inspect schema
Schema schema = factory.inspect();
对于某些与用户定义模式兼容的数据格式,用户可以使用方法 DatasetFactory#inspect(Schema schema) 来创建数据集。
Schema schema = createUserSchema()
Dataset dataset = factory.finish(schema);
否则,当调用无参方法 DatasetFactory#inspect() 时,模式将从数据源自动推断。其结果与 DatasetFactory#inspect() 的结果一致。
此外,如果在扫描期间指定了投影(请参阅下一节 投影(列子集)),则可以通过方法 Scanner::schema() 获取输出数据的实际模式。
Scanner scanner = dataset.newScan(
new ScanOptions(32768, Optional.of(new String[] {"id", "name"})));
Schema projectedSchema = scanner.schema();
投影(列子集)¶
用户可以在 ScanOptions 中指定投影。例如:
String[] projection = new String[] {"id", "name"};
ScanOptions options = new ScanOptions(32768, Optional.of(projection));
如果不需要投影,则可以在 ScanOptions 中省略可选的投影参数。
ScanOptions options = new ScanOptions(32768, Optional.empty());
或者使用快捷构造函数:
ScanOptions options = new ScanOptions(32768);
这样,在扫描期间将输出所有列。
投影(生成新列)和过滤器¶
用户可以使用 Substrait 在 ScanOptions 中指定投影(新列)或过滤器。例如:
ByteBuffer substraitExpressionFilter = getSubstraitExpressionFilter();
ByteBuffer substraitExpressionProject = getSubstraitExpressionProjection();
// Use Substrait APIs to create an Expression and serialize to a ByteBuffer
ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
.columns(Optional.empty())
.substraitExpressionFilter(substraitExpressionFilter)
.substraitExpressionProjection(getSubstraitExpressionProjection())
.build();
另请参阅
- 使用扩展表达式执行投影和筛选
使用 Substrait 的投影和过滤器。
从 HDFS 读取数据¶
FileSystemDataset 支持从非本地文件系统读取数据。Apache Arrow Java 官方版本中已包含 HDFS 支持,无需重新编译源代码即可直接使用。
要使用 Dataset API 访问 HDFS 数据,请将常规 HDFS URI 传递给 FilesSystemDatasetFactory。
String uri = "hdfs://{hdfs_host}:{port}/data/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
原生内存管理¶
为了获得更好的性能并降低代码复杂度,Java FileSystemDataset 在内部通过 JNI 依赖于 C++ 的 arrow::dataset::FileSystemDataset。因此,所有从 FileSystemDataset 读取的 Arrow 数据都应在 JVM 堆外分配。为了管理这部分内存,我们为用户提供了一个工具类 NativeMemoryPool。
作为一个基础示例,通过使用可监听的 NativeMemoryPool,用户可以传递一个监听器来挂载到 C++ 缓冲区的分配/释放上。
AtomicLong reserved = new AtomicLong(0L);
ReservationListener listener = new ReservationListener() {
@Override
public void reserve(long size) {
reserved.getAndAdd(size);
}
@Override
public void unreserve(long size) {
reserved.getAndAdd(-size);
}
};
NativeMemoryPool pool = NativeMemoryPool.createListenable(listener);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator,
pool, FileFormat.PARQUET, uri);
此外,为从数据集中读取的数据预留相同数量的 JVM 直接内存是非常常见的需求。为此,提供了一个内置工具类 DirectReservationListener。
NativeMemoryPool pool = NativeMemoryPool.createListenable(
DirectReservationListener.instance());
通过这种方式,一旦分配的 Arrow 缓冲区字节数达到 JVM 直接内存的限制,扫描期间就会抛出 OutOfMemoryError: Direct buffer memory。
注意
默认实例 NativeMemoryPool.getDefaultMemoryPool() 不会对缓冲区的分配/释放执行任何操作。在概念验证 (POC) 或测试场景下使用它是可以的,但在复杂环境的生产环境中使用时,建议通过使用可监听的内存池来管理内存。
注意
传递给 FileSystemDatasetFactory 构造函数的 BufferAllocator 实例也会感知所产生数据集实例的总体内存使用情况。一旦创建了 Java 缓冲区,传递的分配器将成为它们的父分配器。
使用说明¶
原生对象资源管理¶
由于依赖 JNI,所有与 FileSystemDataset 相关的组件都应手动关闭,或使用 try-with-resources 语法在用完后释放相应的原生对象。例如:
String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory factory = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = factory.finish();
Scanner scanner = dataset.newScan(options)
) {
// do something
} catch (Exception e) {
e.printStackTrace();
}
如果用户忘记关闭它们,可能会导致原生对象泄漏。
批次大小 (BatchSize)¶
ScanOptions 的 batchSize 参数是对单个批次大小的限制。
例如,尝试读取一个带有 Gzip 压缩且包含 3 个行组 (row groups) 的 Parquet 文件:
# Let configure ScanOptions as:
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
$ parquet-tools meta data4_3rg_gzip.parquet
file schema: schema
age: OPTIONAL INT64 R:0 D:1
name: OPTIONAL BINARY L:STRING R:0 D:1
row group 1: RC:4 TS:182 OFFSET:4
row group 2: RC:4 TS:190 OFFSET:420
row group 3: RC:3 TS:179 OFFSET:838
在这里,我们将 ScanOptions 中的 batchSize 设置为 32768。由于这大于下一个批次中的行数(因为第一个行组只有 4 行,所以该批次为 4 行),程序将仅获得 4 行。扫描器不会合并较小的批次以达到限制,但它会拆分较大的批次以保持在限制范围内。因此,如果行组包含超过 32768 行,它将被拆分为 32768 行或更小的块。