数据集#

警告

实验性:Java 模块 dataset 现处于早期开发阶段。在 Apache Arrow 的每个版本发布之前,API 可能会发生改变,直到它成熟。

数据集是 Apache Arrow 中用于查询不同格式或不同分区策略下数据的通用层。通常,要查询的数据应该位于传统文件系统中,但是 Arrow 数据集不仅用于查询文件,还可以扩展到服务所有可能的数据源,例如来自进程间通信或来自其他网络位置等。

入门#

当前支持的文件格式为

  • Apache Arrow (.arrow)

  • Apache ORC (.orc)

  • Apache Parquet (.parquet)

  • 逗号分隔值 (.csv)

  • 行分隔 JSON 值 (.json)

下面展示了使用数据集在 Java 中查询 Parquet 文件的最简单示例

// read data from file /opt/example.parquet
String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory datasetFactory = new FileSystemDatasetFactory(
            allocator, NativeMemoryPool.getDefault(),
            FileFormat.PARQUET, uri);
    Dataset dataset = datasetFactory.finish();
    Scanner scanner = dataset.newScan(options);
    ArrowReader reader = scanner.scanBatches()
) {
    List<ArrowRecordBatch> batches = new ArrayList<>();
    while (reader.loadNextBatch()) {
        try (VectorSchemaRoot root = reader.getVectorSchemaRoot()) {
            final VectorUnloader unloader = new VectorUnloader(root);
            batches.add(unloader.getRecordBatch());
        }
    }

    // do something with read record batches, for example:
    analyzeArrowData(batches);

    // finished the analysis of the data, close all resources:
    AutoCloseables.close(batches);
} catch (Exception e) {
    e.printStackTrace();
}

注意

ArrowRecordBatch 是一个低级复合 Arrow 数据交换格式,它不提供 API 来直接读取类型化的数据。建议使用实用程序 VectorLoader 将其加载到模式感知容器 VectorSchemaRoot 中,用户可以通过它方便地在 Java 中访问解码后的数据。

如果 ScanOptions batchSize 参数设置为小于记录批次行数的值,则该参数才会生效。

另见

使用 VectorSchemaRoot 加载记录批次。

模式#

要查询的数据的模式可以通过方法 DatasetFactory#inspect() 在实际读取之前进行检查。例如

// read data from local file /opt/example.parquet
String uri = "file:/opt/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
    NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);

// inspect schema
Schema schema = factory.inspect();

对于与用户定义模式兼容的某些数据格式,用户可以使用方法 DatasetFactory#inspect(Schema schema) 来创建数据集

Schema schema = createUserSchema()
Dataset dataset = factory.finish(schema);

否则,当调用非参数方法 DatasetFactory#inspect() 时,将从数据源自动推断模式。与 DatasetFactory#inspect() 的结果相同。

此外,如果在扫描期间指定了投影器(请参见下一节 投影(列子集)),则输出数据的实际模式可以在方法 Scanner::schema() 中获取

Scanner scanner = dataset.newScan(
    new ScanOptions(32768, Optional.of(new String[] {"id", "name"})));
Schema projectedSchema = scanner.schema();

投影(列子集)#

用户可以在 ScanOptions 中指定投影。例如

String[] projection = new String[] {"id", "name"};
ScanOptions options = new ScanOptions(32768, Optional.of(projection));

如果不需要投影,则在 ScanOptions 中省略可选投影参数

ScanOptions options = new ScanOptions(32768, Optional.empty());

或使用快捷构造函数

ScanOptions options = new ScanOptions(32768);

然后,在扫描期间将发出所有列。

投影(生成新列)和过滤器#

用户可以使用 Substrait 在 ScanOptions 中指定投影(新列)或过滤器。例如

ByteBuffer substraitExpressionFilter = getSubstraitExpressionFilter();
ByteBuffer substraitExpressionProject = getSubstraitExpressionProjection();
// Use Substrait APIs to create an Expression and serialize to a ByteBuffer
ScanOptions options = new ScanOptions.Builder(/*batchSize*/ 32768)
             .columns(Optional.empty())
             .substraitExpressionFilter(substraitExpressionFilter)
             .substraitExpressionProjection(getSubstraitExpressionProjection())
             .build();

另见

使用扩展表达式执行投影和过滤器

使用 Substrait 进行投影和过滤。

从 HDFS 读取数据#

FileSystemDataset 支持从非本地文件系统读取数据。HDFS 支持包含在官方 Apache Arrow Java 包版本中,可以直接使用,无需重新构建源代码。

要使用数据集 API 访问 HDFS 数据,请将通用 HDFS URI 传递给 FilesSystemDatasetFactory

String uri = "hdfs://{hdfs_host}:{port}/data/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
    NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);

原生内存管理#

为了获得更好的性能并降低代码复杂度,Java FileSystemDataset 在内部通过 JNI 依赖于 C++ arrow::dataset::FileSystemDataset。因此,从 FileSystemDataset 读取的所有 Arrow 数据应该在 JVM 堆之外分配。为了管理这部分内存,向用户提供了一个实用程序类 NativeMemoryPool

作为一个基本示例,通过使用可监听的 NativeMemoryPool,用户可以传递一个监听器,该监听器挂钩到 C++ 缓冲区分配/释放

AtomicLong reserved = new AtomicLong(0L);
ReservationListener listener = new ReservationListener() {
  @Override
  public void reserve(long size) {
    reserved.getAndAdd(size);
  }

  @Override
  public void unreserve(long size) {
    reserved.getAndAdd(-size);
  }
};
NativeMemoryPool pool = NativeMemoryPool.createListenable(listener);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator,
    pool, FileFormat.PARQUET, uri);

此外,为从数据集中读取的数据保留相同数量的 JVM 直接内存是一个非常常见的情况。为此,提供了一个内置实用程序类 DirectReservationListener

NativeMemoryPool pool = NativeMemoryPool.createListenable(
    DirectReservationListener.instance());

这样,一旦 Arrow 缓冲区的分配字节数达到 JVM 直接内存的限制,在扫描期间将抛出 OutOfMemoryError: Direct buffer memory

注意

默认实例 NativeMemoryPool.getDefaultMemoryPool() 在缓冲区分配/释放时不执行任何操作。在 POC 或测试情况下使用它是可以的,但在复杂环境中的生产使用中,建议使用可监听的内存池来管理内存。

注意

传递给 FileSystemDatasetFactory 构造函数的 BufferAllocator 实例也了解所生成数据集实例的总内存使用情况。一旦创建 Java 缓冲区,传递的分配器将成为它们的父分配器。

使用说明#

原生对象资源管理#

作为依赖 JNI 的另一个结果,所有与 FileSystemDataset 相关的组件都应该手动关闭或使用 try-with-resources 在使用后释放相应的原生对象。例如

String uri = "file:/opt/example.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
    BufferAllocator allocator = new RootAllocator();
    DatasetFactory factory = new FileSystemDatasetFactory(
            allocator, NativeMemoryPool.getDefault(),
            FileFormat.PARQUET, uri);
    Dataset dataset = factory.finish();
    Scanner scanner = dataset.newScan(options)
) {

    // do something

} catch (Exception e) {
    e.printStackTrace();
}

如果用户忘记关闭它们,则可能会导致原生对象泄漏。

批量大小#

batchSize 参数 ScanOptions 是单个批次大小的限制。

例如,让我们尝试读取一个使用 gzip 压缩和 3 个行组的 Parquet 文件

# Let configure ScanOptions as:
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);

$ parquet-tools meta data4_3rg_gzip.parquet
file schema: schema
age:         OPTIONAL INT64 R:0 D:1
name:        OPTIONAL BINARY L:STRING R:0 D:1
row group 1: RC:4 TS:182 OFFSET:4
row group 2: RC:4 TS:190 OFFSET:420
row group 3: RC:3 TS:179 OFFSET:838

在这里,我们将 ScanOptions 中的 batchSize 设置为 32768。因为这大于下一个批次的行数,即 4 行(因为第一个行组只有 4 行),所以程序只获取了 4 行。扫描仪不会将较小的批次组合在一起以达到限制,但它会将大型批次拆分为小于限制的块。因此,如果行组超过 32768 行,它将被拆分为 32768 行或更少的块。