数据集#
警告
实验性:Java 模块 dataset
现处于早期开发阶段。在 Apache Arrow 的每个版本发布之前,API 可能会发生改变,直到它成熟。
数据集是 Apache Arrow 中用于查询不同格式或不同分区策略下数据的通用层。通常,要查询的数据应该位于传统文件系统中,但是 Arrow 数据集不仅用于查询文件,还可以扩展到服务所有可能的数据源,例如来自进程间通信或来自其他网络位置等。
入门#
当前支持的文件格式为
Apache Arrow (
.arrow
)Apache ORC (
.orc
)Apache Parquet (
.parquet
)逗号分隔值 (
.csv
)行分隔 JSON 值 (
.json
)
下面展示了使用数据集在 Java 中查询 Parquet 文件的最简单示例
// read data from file /opt/example.parquet
String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()
) {
List<ArrowRecordBatch> batches = new ArrayList<>();
while (reader.loadNextBatch()) {
try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
final VectorUnloader unloader = new VectorUnloader(root);
batches.add(unloader.getRecordBatch());
}
}
// do something with read record batches, for example:
analyzeArrowData(batches);
// finished the analysis of the data, close all resources:
AutoCloseables.close(batches);
} catch (Exception e) {
e.printStackTrace();
}
注意
ArrowRecordBatch
是一个低级复合 Arrow 数据交换格式,它不提供 API 来直接读取类型化的数据。建议使用实用程序 VectorLoader
将其加载到模式感知容器 VectorSchemaRoot
中,用户可以通过它方便地在 Java 中访问解码后的数据。
如果 ScanOptions batchSize
参数设置为小于记录批次行数的值,则该参数才会生效。
另见
使用 VectorSchemaRoot 加载记录批次。
模式#
要查询的数据的模式可以通过方法 DatasetFactory#inspect()
在实际读取之前进行检查。例如
// read data from local file /opt/example.parquet
String uri = "file:/opt/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
// inspect schema
Schema schema = factory.inspect();
对于与用户定义模式兼容的某些数据格式,用户可以使用方法 DatasetFactory#inspect(Schema schema)
来创建数据集
Schema schema = createUserSchema()
Dataset dataset = factory.finish(schema);
否则,当调用非参数方法 DatasetFactory#inspect()
时,将从数据源自动推断模式。与 DatasetFactory#inspect()
的结果相同。
此外,如果在扫描期间指定了投影器(请参见下一节 投影(列子集)),则输出数据的实际模式可以在方法 Scanner::schema()
中获取
Scanner scanner = dataset.newScan(
new ScanOptions(32768, Optional.of(new String[] {"id", "name"})));
Schema projectedSchema = scanner.schema();
投影(列子集)#
用户可以在 ScanOptions 中指定投影。例如
String[] projection = new String[] {"id", "name"};
ScanOptions options = new ScanOptions(32768, Optional.of(projection));
如果不需要投影,则在 ScanOptions 中省略可选投影参数
ScanOptions options = new ScanOptions(32768, Optional.empty());
或使用快捷构造函数
ScanOptions options = new ScanOptions(32768);
然后,在扫描期间将发出所有列。
投影(生成新列)和过滤器#
用户可以使用 Substrait 在 ScanOptions 中指定投影(新列)或过滤器。例如
ByteBuffer substraitExpressionFilter = getSubstraitExpressionFilter();
ByteBuffer substraitExpressionProject = getSubstraitExpressionProjection();
// Use Substrait APIs to create an Expression and serialize to a ByteBuffer
ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
.columns(Optional.empty())
.substraitExpressionFilter(substraitExpressionFilter)
.substraitExpressionProjection(getSubstraitExpressionProjection())
.build();
另见
- 使用扩展表达式执行投影和过滤器
使用 Substrait 进行投影和过滤。
从 HDFS 读取数据#
FileSystemDataset
支持从非本地文件系统读取数据。HDFS 支持包含在官方 Apache Arrow Java 包版本中,可以直接使用,无需重新构建源代码。
要使用数据集 API 访问 HDFS 数据,请将通用 HDFS URI 传递给 FilesSystemDatasetFactory
String uri = "hdfs://{hdfs_host}:{port}/data/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
原生内存管理#
为了获得更好的性能并降低代码复杂度,Java FileSystemDataset
在内部通过 JNI 依赖于 C++ arrow::dataset::FileSystemDataset
。因此,从 FileSystemDataset
读取的所有 Arrow 数据应该在 JVM 堆之外分配。为了管理这部分内存,向用户提供了一个实用程序类 NativeMemoryPool
。
作为一个基本示例,通过使用可监听的 NativeMemoryPool
,用户可以传递一个监听器,该监听器挂钩到 C++ 缓冲区分配/释放
AtomicLong reserved = new AtomicLong(0L);
ReservationListener listener = new ReservationListener() {
@Override
public void reserve(long size) {
reserved.getAndAdd(size);
}
@Override
public void unreserve(long size) {
reserved.getAndAdd(-size);
}
};
NativeMemoryPool pool = NativeMemoryPool.createListenable(listener);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator,
pool, FileFormat.PARQUET, uri);
此外,为从数据集中读取的数据保留相同数量的 JVM 直接内存是一个非常常见的情况。为此,提供了一个内置实用程序类 DirectReservationListener
NativeMemoryPool pool = NativeMemoryPool.createListenable(
DirectReservationListener.instance());
这样,一旦 Arrow 缓冲区的分配字节数达到 JVM 直接内存的限制,在扫描期间将抛出 OutOfMemoryError: Direct buffer memory
。
注意
默认实例 NativeMemoryPool.getDefaultMemoryPool()
在缓冲区分配/释放时不执行任何操作。在 POC 或测试情况下使用它是可以的,但在复杂环境中的生产使用中,建议使用可监听的内存池来管理内存。
注意
传递给 FileSystemDatasetFactory
构造函数的 BufferAllocator
实例也了解所生成数据集实例的总内存使用情况。一旦创建 Java 缓冲区,传递的分配器将成为它们的父分配器。
使用说明#
原生对象资源管理#
作为依赖 JNI 的另一个结果,所有与 FileSystemDataset
相关的组件都应该手动关闭或使用 try-with-resources 在使用后释放相应的原生对象。例如
String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory factory = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = factory.finish();
Scanner scanner = dataset.newScan(options)
) {
// do something
} catch (Exception e) {
e.printStackTrace();
}
如果用户忘记关闭它们,则可能会导致原生对象泄漏。
批量大小#
batchSize
参数 ScanOptions
是单个批次大小的限制。
例如,让我们尝试读取一个使用 gzip 压缩和 3 个行组的 Parquet 文件
# Let configure ScanOptions as:
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
$ parquet-tools meta data4_3rg_gzip.parquet
file schema: schema
age: OPTIONAL INT64 R:0 D:1
name: OPTIONAL BINARY L:STRING R:0 D:1
row group 1: RC:4 TS:182 OFFSET:4
row group 2: RC:4 TS:190 OFFSET:420
row group 3: RC:3 TS:179 OFFSET:838
在这里,我们将 ScanOptions 中的 batchSize 设置为 32768。因为这大于下一个批次的行数,即 4 行(因为第一个行组只有 4 行),所以程序只获取了 4 行。扫描仪不会将较小的批次组合在一起以达到限制,但它会将大型批次拆分为小于限制的块。因此,如果行组超过 32768 行,它将被拆分为 32768 行或更少的块。