数据集

警告

实验性功能:Java 模块 dataset 目前处于早期开发阶段。在 Apache Arrow 成熟之前,API 可能会在每个发行版中发生更改。

Dataset 是 Apache Arrow 中的一个通用层,用于查询不同格式或采用不同分区策略的数据。通常情况下,待查询的数据被假定位于传统文件系统中,但 Arrow Dataset 的设计初衷不仅限于查询文件,还可以扩展到为所有可能的数据源提供服务,例如进程间通信或来自其他网络位置的数据等。

入门指南

目前支持的文件格式包括:

  • Apache Arrow (.arrow)

  • Apache ORC (.orc)

  • Apache Parquet (.parquet)

  • 逗号分隔值 (.csv)

  • 行分隔 JSON 值 (.json)

下面展示了在 Java 中使用 Dataset 查询 Parquet 文件的最简单示例:

// read data from file /opt/example.parquet
String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(
            allocator, NativeMemoryPool.getDefault(),
            FileFormat.PARQUET, uri);
    Dataset dataset = datasetFactory.finish();
    Scanner scanner = dataset.newScan(options);
    ArrowReader reader = scanner.scanBatches()
) {
    List<ArrowRecordBatch> batches = new ArrayList<>();
    while (reader.loadNextBatch()) {
        try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
            final VectorUnloader unloader = new VectorUnloader(root);
            batches.add(unloader.getRecordBatch());
        }
    }

    // do something with read record batches, for example:
    analyzeArrowData(batches);

    // finished the analysis of the data, close all resources:
    AutoCloseables.close(batches);
} catch (Exception e) {
    e.printStackTrace();
}

注意

ArrowRecordBatch 是一种低级复合 Arrow 数据交换格式,不提供直接从中读取类型化数据的 API。建议使用工具 VectorLoader 将其加载到具备模式感知能力的容器 VectorSchemaRoot 中,用户可以通过该容器在 Java 中方便地访问解码后的数据。

仅当 ScanOptions 中的 batchSize 参数设置为小于记录批次(recordbatch)中行数的值时,该参数才会生效。

另请参阅

使用 VectorSchemaRoot 加载记录批次。

模式 (Schema)

在实际读取数据之前,可以通过方法 DatasetFactory#inspect() 检查待查询数据的模式。例如:

// read data from local file /opt/example.parquet
String uri = "file:/opt/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
    NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);

// inspect schema
Schema schema = factory.inspect();

对于某些与用户定义模式兼容的数据格式,用户可以使用方法 DatasetFactory#inspect(Schema schema) 来创建数据集。

Schema schema = createUserSchema()
Dataset dataset = factory.finish(schema);

否则,当调用无参方法 DatasetFactory#inspect() 时,模式将从数据源自动推断。其结果与 DatasetFactory#inspect() 的结果一致。

此外,如果在扫描期间指定了投影(请参阅下一节 投影(列子集)),则可以通过方法 Scanner::schema() 获取输出数据的实际模式。

Scanner scanner = dataset.newScan(
    new ScanOptions(32768, Optional.of(new String[] {"id", "name"})));
Schema projectedSchema = scanner.schema();

投影(列子集)

用户可以在 ScanOptions 中指定投影。例如:

String[] projection = new String[] {"id", "name"};
ScanOptions options = new ScanOptions(32768, Optional.of(projection));

如果不需要投影,则可以在 ScanOptions 中省略可选的投影参数。

ScanOptions options = new ScanOptions(32768, Optional.empty());

或者使用快捷构造函数:

ScanOptions options = new ScanOptions(32768);

这样,在扫描期间将输出所有列。

投影(生成新列)和过滤器

用户可以使用 Substrait 在 ScanOptions 中指定投影(新列)或过滤器。例如:

ByteBuffer substraitExpressionFilter = getSubstraitExpressionFilter();
ByteBuffer substraitExpressionProject = getSubstraitExpressionProjection();
// Use Substrait APIs to create an Expression and serialize to a ByteBuffer
ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
             .columns(Optional.empty())
             .substraitExpressionFilter(substraitExpressionFilter)
             .substraitExpressionProjection(getSubstraitExpressionProjection())
             .build();

另请参阅

使用扩展表达式执行投影和筛选

使用 Substrait 的投影和过滤器。

从 HDFS 读取数据

FileSystemDataset 支持从非本地文件系统读取数据。Apache Arrow Java 官方版本中已包含 HDFS 支持,无需重新编译源代码即可直接使用。

要使用 Dataset API 访问 HDFS 数据,请将常规 HDFS URI 传递给 FilesSystemDatasetFactory

String uri = "hdfs://{hdfs_host}:{port}/data/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
    NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);

原生内存管理

为了获得更好的性能并降低代码复杂度,Java FileSystemDataset 在内部通过 JNI 依赖于 C++ 的 arrow::dataset::FileSystemDataset。因此,所有从 FileSystemDataset 读取的 Arrow 数据都应在 JVM 堆外分配。为了管理这部分内存,我们为用户提供了一个工具类 NativeMemoryPool

作为一个基础示例,通过使用可监听的 NativeMemoryPool,用户可以传递一个监听器来挂载到 C++ 缓冲区的分配/释放上。

AtomicLong reserved = new AtomicLong(0L);
ReservationListener listener = new ReservationListener() {
  @Override
  public void reserve(long size) {
    reserved.getAndAdd(size);
  }

  @Override
  public void unreserve(long size) {
    reserved.getAndAdd(-size);
  }
};
NativeMemoryPool pool = NativeMemoryPool.createListenable(listener);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator,
    pool, FileFormat.PARQUET, uri);

此外,为从数据集中读取的数据预留相同数量的 JVM 直接内存是非常常见的需求。为此,提供了一个内置工具类 DirectReservationListener

NativeMemoryPool pool = NativeMemoryPool.createListenable(
    DirectReservationListener.instance());

通过这种方式,一旦分配的 Arrow 缓冲区字节数达到 JVM 直接内存的限制,扫描期间就会抛出 OutOfMemoryError: Direct buffer memory

注意

默认实例 NativeMemoryPool.getDefaultMemoryPool() 不会对缓冲区的分配/释放执行任何操作。在概念验证 (POC) 或测试场景下使用它是可以的,但在复杂环境的生产环境中使用时,建议通过使用可监听的内存池来管理内存。

注意

传递给 FileSystemDatasetFactory 构造函数的 BufferAllocator 实例也会感知所产生数据集实例的总体内存使用情况。一旦创建了 Java 缓冲区,传递的分配器将成为它们的父分配器。

使用说明

原生对象资源管理

由于依赖 JNI,所有与 FileSystemDataset 相关的组件都应手动关闭,或使用 try-with-resources 语法在用完后释放相应的原生对象。例如:

String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory factory = new FileSystemDatasetFactory(
            allocator, NativeMemoryPool.getDefault(),
            FileFormat.PARQUET, uri);
    Dataset dataset = factory.finish();
    Scanner scanner = dataset.newScan(options)
) {

    // do something

} catch (Exception e) {
    e.printStackTrace();
}

如果用户忘记关闭它们,可能会导致原生对象泄漏。

批次大小 (BatchSize)

ScanOptionsbatchSize 参数是对单个批次大小的限制。

例如,尝试读取一个带有 Gzip 压缩且包含 3 个行组 (row groups) 的 Parquet 文件:

# Let configure ScanOptions as:
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);

$ parquet-tools meta data4_3rg_gzip.parquet
file schema: schema
age:         OPTIONAL INT64 R:0 D:1
name:        OPTIONAL BINARY L:STRING R:0 D:1
row group 1: RC:4 TS:182 OFFSET:4
row group 2: RC:4 TS:190 OFFSET:420
row group 3: RC:3 TS:179 OFFSET:838

在这里,我们将 ScanOptions 中的 batchSize 设置为 32768。由于这大于下一个批次中的行数(因为第一个行组只有 4 行,所以该批次为 4 行),程序将仅获得 4 行。扫描器不会合并较小的批次以达到限制,但它会拆分较大的批次以保持在限制范围内。因此,如果行组包含超过 32768 行,它将被拆分为 32768 行或更小的块。