数据集#
警告
实验性:Java 模块 dataset
目前处于早期开发阶段。在 Apache Arrow 成熟之前,API 可能会在每个版本中发生变化。
数据集是 Apache Arrow 中的一个通用层,用于查询不同格式或不同分区策略的数据。通常,要查询的数据应该位于传统文件系统中,但是 Arrow 数据集的设计不仅用于查询文件,还可以扩展到服务所有可能的数据源,例如来自进程间通信或其他网络位置等。
入门#
目前支持的文件格式有
Apache Arrow (
.arrow
)Apache ORC (
.orc
)Apache Parquet (
.parquet
)逗号分隔值 (
.csv
)行分隔的 JSON 值 (
.json
)
以下展示了在 Java 中使用数据集查询 Parquet 文件的最简单示例
// read data from file /opt/example.parquet
String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()
) {
List<ArrowRecordBatch> batches = new ArrayList<>();
while (reader.loadNextBatch()) {
try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
final VectorUnloader unloader = new VectorUnloader(root);
batches.add(unloader.getRecordBatch());
}
}
// do something with read record batches, for example:
analyzeArrowData(batches);
// finished the analysis of the data, close all resources:
AutoCloseables.close(batches);
} catch (Exception e) {
e.printStackTrace();
}
注意
ArrowRecordBatch
是一种低级复合 Arrow 数据交换格式,它不提供直接从中读取类型化数据的 API。建议使用实用程序 VectorLoader
将其加载到模式感知容器 VectorSchemaRoot
中,用户可以通过它方便地在 Java 中访问解码后的数据。
ScanOptions batchSize
参数仅在其设置为小于记录批次中的行数时才生效。
另请参阅
使用 VectorSchemaRoot 加载记录批次。
模式#
在实际读取数据之前,可以通过方法 DatasetFactory#inspect()
检查要查询数据的模式。例如
// read data from local file /opt/example.parquet
String uri = "file:/opt/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
// inspect schema
Schema schema = factory.inspect();
对于某些与用户定义模式兼容的数据格式,用户可以使用方法 DatasetFactory#inspect(Schema schema)
创建数据集
Schema schema = createUserSchema()
Dataset dataset = factory.finish(schema);
否则,当调用无参数方法 DatasetFactory#inspect()
时,将从数据源自动推断模式。与 DatasetFactory#inspect()
的结果相同。
此外,如果在扫描期间指定了投影器(请参阅下一节 投影(列的子集)),则可以在方法 Scanner::schema()
中获取输出数据的实际模式
Scanner scanner = dataset.newScan(
new ScanOptions(32768, Optional.of(new String[] {"id", "name"})));
Schema projectedSchema = scanner.schema();
投影(列的子集)#
用户可以在 ScanOptions 中指定投影。例如
String[] projection = new String[] {"id", "name"};
ScanOptions options = new ScanOptions(32768, Optional.of(projection));
如果不需要投影,请在 ScanOptions 中省略可选的投影参数
ScanOptions options = new ScanOptions(32768, Optional.empty());
或者使用快捷构造函数
ScanOptions options = new ScanOptions(32768);
然后在扫描期间将发出所有列。
投影(生成新列)和过滤器#
用户可以使用 Substrait 在 ScanOptions 中指定投影(新列)或过滤器。例如
ByteBuffer substraitExpressionFilter = getSubstraitExpressionFilter();
ByteBuffer substraitExpressionProject = getSubstraitExpressionProjection();
// Use Substrait APIs to create an Expression and serialize to a ByteBuffer
ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
.columns(Optional.empty())
.substraitExpressionFilter(substraitExpressionFilter)
.substraitExpressionProjection(getSubstraitExpressionProjection())
.build();
另请参阅
- 使用扩展表达式执行投影和过滤
使用 Substrait 进行投影和过滤。
从 HDFS 读取数据#
FileSystemDataset
支持从非本地文件系统读取数据。HDFS 支持包含在官方 Apache Arrow Java 包发行版中,可以直接使用,无需重新构建源代码。
要使用数据集 API 访问 HDFS 数据,请将常规 HDFS URI 传递给 FilesSystemDatasetFactory
String uri = "hdfs://{hdfs_host}:{port}/data/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
原生内存管理#
为了获得更好的性能并降低代码复杂性,Java FileSystemDataset
在内部通过 JNI 依赖于 C++ arrow::dataset::FileSystemDataset
。因此,从 FileSystemDataset
读取的所有 Arrow 数据都应该在 JVM 堆外分配。为了管理这部分内存,向用户提供了一个实用程序类 NativeMemoryPool
。
作为一个基本示例,通过使用可侦听的 NativeMemoryPool
,用户可以传递一个侦听器来钩住 C++ 缓冲区分配/释放
AtomicLong reserved = new AtomicLong(0L);
ReservationListener listener = new ReservationListener() {
@Override
public void reserve(long size) {
reserved.getAndAdd(size);
}
@Override
public void unreserve(long size) {
reserved.getAndAdd(-size);
}
};
NativeMemoryPool pool = NativeMemoryPool.createListenable(listener);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator,
pool, FileFormat.PARQUET, uri);
此外,为从数据集中读取的数据保留相同数量的 JVM 直接内存也是一种非常常见的情况。为此,提供了一个内置的实用程序类 DirectReservationListener
NativeMemoryPool pool = NativeMemoryPool.createListenable(
DirectReservationListener.instance());
这样,一旦 Arrow 缓冲区的分配字节数达到 JVM 直接内存的限制,在扫描过程中将抛出 OutOfMemoryError: Direct buffer memory
。
注意
默认实例 NativeMemoryPool.getDefaultMemoryPool()
在缓冲区分配/释放时不执行任何操作。在 POC 或测试的情况下使用它是可以的,但在复杂环境中的生产使用中,建议使用可侦听的内存池来管理内存。
注意
传递给 FileSystemDatasetFactory
构造函数的BufferAllocator
实例也了解所生成数据集实例的总体内存使用情况。创建 Java 缓冲区后,传递的分配器将成为其父分配器。
使用说明#
原生对象资源管理#
作为依赖 JNI 的另一个结果,所有与 FileSystemDataset
相关的组件都应手动关闭或使用 try-with-resources 在使用后释放相应的原生对象。例如
String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory factory = new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = factory.finish();
Scanner scanner = dataset.newScan(options)
) {
// do something
} catch (Exception e) {
e.printStackTrace();
}
如果用户忘记关闭它们,则可能会导致原生对象泄漏。
批量大小(BatchSize)#
ScanOptions
的 batchSize
参数是对单个批次大小的限制。
例如,让我们尝试读取一个 gzip 压缩且具有 3 个行组的 Parquet 文件
# Let configure ScanOptions as:
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
$ parquet-tools meta data4_3rg_gzip.parquet
file schema: schema
age: OPTIONAL INT64 R:0 D:1
name: OPTIONAL BINARY L:STRING R:0 D:1
row group 1: RC:4 TS:182 OFFSET:4
row group 2: RC:4 TS:190 OFFSET:420
row group 3: RC:3 TS:179 OFFSET:838
这里,我们将 ScanOptions 中的 batchSize 设置为 32768。因为这大于下一个批次中的行数,即 4 行,因为第一个行组只有 4 行,所以程序只获取 4 行。扫描器不会合并较小的批次以达到限制,但它会拆分较大的批次以保持在限制以下。因此,如果行组的行数超过 32768,它将被拆分为 32768 行或更少的块。