内存管理

内存模块包含了 Arrow 用于分配和释放内存的所有功能。本文档分为两部分:第一部分“内存基础”提供了高层概述。接下来的部分“深入了解 Arrow 内存”则补充了详细信息。

内存基础

本节将向您介绍 Java 内存管理中的主要概念:

此外,本节还提供了在 Arrow 中使用内存的一些指南,并描述了在出现内存问题时如何进行调试。

入门指南

Arrow 的内存管理围绕列式格式的需求和堆外内存的使用而构建。Arrow Java 拥有自己独立的实现。它没有封装 C++ 实现,尽管该框架足够灵活,可以与 Java 代码使用的 C++ 分配的内存一起使用。

Arrow 提供了多个模块:核心接口和接口的实现。用户需要核心接口以及其中一个实现。

  • memory-core:提供 Arrow 库和应用程序使用的接口。

  • memory-netty:基于 Netty 库的内存接口实现。

  • memory-unsafe:基于 sun.misc.Unsafe 库的内存接口实现。

ArrowBuf

ArrowBuf 代表一块连续的直接内存(Direct Memory)。它由地址和长度组成,并提供用于处理内容的底层接口,类似于 ByteBuffer。

与(Direct)ByteBuffer 不同的是,它内置了引用计数功能(详见后文)。

为什么 Arrow 使用直接内存

  • 当使用直接内存/直接缓冲区时,JVM 可以优化 I/O 操作;它会尝试避免将缓冲区内容在中间缓冲区之间进行复制。这可以加速 Arrow 中的 IPC(进程间通信)。

  • 由于 Arrow 始终使用直接内存,JNI 模块可以直接封装原生内存地址,而无需复制数据。我们在 C Data Interface 等模块中使用了这一点。

  • 反之,在 JNI 边界的 C++ 端,我们也可以在不复制数据的情况下直接访问 ArrowBuf 中的内存。

BufferAllocator

BufferAllocator 主要是一个用于管理缓冲区(ArrowBuf 实例)记账的区域或容器。顾名思义,它既可以分配与其关联的新缓冲区,也可以处理在其他地方分配的缓冲区的记账。例如,它处理 Java 端对于在 C++ 中分配并通过 C-Data Interface 与 Java 共享的内存的记账。在下方的代码中,它执行了一次分配。

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;

try(BufferAllocator bufferAllocator = new RootAllocator(8 * 1024)){
    ArrowBuf arrowBuf = bufferAllocator.buffer(4 * 1024);
    System.out.println(arrowBuf);
    arrowBuf.close();
}
ArrowBuf[2], address:140363641651200, length:4096

BufferAllocator 接口的具体实现是 RootAllocator。应用程序通常应在程序启动时创建一个 RootAllocator,并通过 BufferAllocator 接口使用它。分配器实现了 AutoCloseable,必须在应用程序使用完毕后关闭;这将检查所有未释放的内存是否已释放(参见下一节)。

Arrow 提供了一种基于树的内存分配模型。首先创建 RootAllocator,然后通过 newChildAllocator 将更多分配器作为现有分配器的子项创建。创建 RootAllocator 或子分配器时,会提供一个内存限制,并在分配内存时检查该限制。此外,当从子分配器分配内存时,这些分配也会反映在所有父分配器中。因此,RootAllocator 有效地设置了程序范围内的内存限制,并充当所有内存分配的总记账员。

子分配器不是必须的,但有助于更好地组织代码。例如,可以为特定的代码段设置较低的内存限制。当该段代码完成时,可以关闭子分配器,此时它会检查该段代码是否泄露了内存。子分配器也可以命名,这使得在调试期间更容易确定 ArrowBuf 的来源。

引用计数

由于直接内存的分配和释放开销较大,分配器可能会共享直接缓冲区。为了确定性地管理共享缓冲区,我们使用手动引用计数而不是垃圾回收器。这意味着每个缓冲区都有一个计数器来跟踪对该缓冲区的引用次数,用户负责在缓冲区使用时适当地增加/减少计数器。

在 Arrow 中,每个 ArrowBuf 都有一个关联的 ReferenceManager 来跟踪引用计数。您可以通过 ArrowBuf.getReferenceManager() 获取它。引用计数通过 ReferenceManager.release 减少,通过 ReferenceManager.retain 增加。

当然,这很繁琐且容易出错,因此我们通常使用更高层次的 API(如 ValueVector)而不是直接操作缓冲区。这些类通常实现了 Closeable/AutoCloseable,并在关闭时自动减少引用计数。

分配器也实现了 AutoCloseable。在这种情况下,关闭分配器将检查从该分配器获得的所有缓冲区是否都已关闭。如果没有,close() 方法将抛出异常;这有助于跟踪来自未关闭缓冲区的内存泄露。

引用计数需要小心处理。为确保独立的代码段已完全清理所有分配的缓冲区,请使用新的子分配器。

开发指南

应用程序通常应该:

  • 在 API 中使用 BufferAllocator 接口,而不是 RootAllocator。

  • 在程序启动时创建一个 RootAllocator,并在需要时显式传递它。

  • 使用后 close() 分配器(无论是子分配器还是 RootAllocator),可以通过手动方式,或者最好通过 try-with-resources 语句。

调试内存泄露/分配

DEBUG 模式下,分配器和支持类将记录额外的调试跟踪信息,以更好地查明内存泄露和问题。要启用 DEBUG 模式,请在启动 VM 时传入以下系统属性:-Darrow.memory.debug.allocator=true

当启用 DEBUG 时,将保存一份分配日志。配置 SLF4J 以查看这些日志(例如通过 Logback/Apache Log4j)。考虑以下示例,了解它如何帮助我们跟踪分配器。

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;

try (BufferAllocator bufferAllocator = new RootAllocator(8 * 1024)) {
    ArrowBuf arrowBuf = bufferAllocator.buffer(4 * 1024);
    System.out.println(arrowBuf);
}

未启用调试模式时,关闭分配器会得到:

11:56:48.944 [main] INFO  o.apache.arrow.memory.BaseAllocator - Debug mode disabled.
ArrowBuf[2], address:140508391276544, length:4096
16:28:08.847 [main] ERROR o.apache.arrow.memory.BaseAllocator - Memory was leaked by query. Memory leaked: (4096)
Allocator(ROOT) 0/4096/4096/8192 (res/actual/peak/limit)

启用调试模式后,我们可以得到更多详细信息:

11:56:48.944 [main] INFO  o.apache.arrow.memory.BaseAllocator - Debug mode enabled.
ArrowBuf[2], address:140437894463488, length:4096
Exception in thread "main" java.lang.IllegalStateException: Allocator[ROOT] closed with outstanding buffers allocated (1).
Allocator(ROOT) 0/4096/4096/8192 (res/actual/peak/limit)
  child allocators: 0
  ledgers: 1
    ledger[1] allocator: ROOT), isOwning: , size: , references: 1, life: 261438177096661..0, allocatorManager: [, life: ] holds 1 buffers.
        ArrowBuf[2], address:140437894463488, length:4096
  reservations: 0

此外,在调试模式下,可以使用 ArrowBuf.print() 获取调试字符串。这将包含有关缓冲区上分配操作的堆栈跟踪信息,例如缓冲区是在何时何地分配的。

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;

try (final BufferAllocator allocator = new RootAllocator()) {
  try (final ArrowBuf buf = allocator.buffer(1024)) {
    final StringBuilder sb = new StringBuilder();
    buf.print(sb, /*indent*/ 0);
    System.out.println(sb.toString());
  }
}
ArrowBuf[2], address:140433199984656, length:1024
 event log for: ArrowBuf[2]
   675959093395667 create()
      at org.apache.arrow.memory.util.HistoricalLog$Event.<init>(HistoricalLog.java:175)
      at org.apache.arrow.memory.util.HistoricalLog.recordEvent(HistoricalLog.java:83)
      at org.apache.arrow.memory.ArrowBuf.<init>(ArrowBuf.java:96)
      at org.apache.arrow.memory.BufferLedger.newArrowBuf(BufferLedger.java:271)
      at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:300)
      at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:276)
      at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
      at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:240)
      at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
      at REPL.$JShell$14.do_it$($JShell$14.java:10)
      at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-2)
      at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:566)
      at jdk.jshell.execution.DirectExecutionControl.invoke(DirectExecutionControl.java:209)
      at jdk.jshell.execution.RemoteExecutionControl.invoke(RemoteExecutionControl.java:116)
      at jdk.jshell.execution.DirectExecutionControl.invoke(DirectExecutionControl.java:119)
      at jdk.jshell.execution.ExecutionControlForwarder.processCommand(ExecutionControlForwarder.java:144)
      at jdk.jshell.execution.ExecutionControlForwarder.commandLoop(ExecutionControlForwarder.java:262)
      at jdk.jshell.execution.Util.forwardExecutionControl(Util.java:76)
      at jdk.jshell.execution.Util.forwardExecutionControlAndIO(Util.java:137)
      at jdk.jshell.execution.RemoteExecutionControl.main(RemoteExecutionControl.java:70)

BufferAllocator 还提供了一个 BufferAllocator.toVerboseString(),它可以在 DEBUG 模式下使用,以获取与各种分配器行为相关的广泛堆栈跟踪信息和事件。

最后,启用 TRACE 日志级别将在分配器关闭时自动提供此堆栈跟踪。

// Assumes use of Logback; adjust for Log4j, etc. as appropriate
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.slf4j.LoggerFactory;

// Set log level to TRACE to get tracebacks
((Logger) LoggerFactory.getLogger("org.apache.arrow")).setLevel(Level.TRACE);
try (final BufferAllocator allocator = new RootAllocator()) {
  // Leak buffer
  allocator.buffer(1024);
}
|  Exception java.lang.IllegalStateException: Allocator[ROOT] closed with outstanding buffers allocated (1).
Allocator(ROOT) 0/1024/1024/9223372036854775807 (res/actual/peak/limit)
  child allocators: 0
  ledgers: 1
    ledger[1] allocator: ROOT), isOwning: , size: , references: 1, life: 712040870231544..0, allocatorManager: [, life: ] holds 1 buffers.
        ArrowBuf[2], address:139926571810832, length:1024
     event log for: ArrowBuf[2]
       712040888650134 create()
              at org.apache.arrow.memory.util.StackTrace.<init>(StackTrace.java:34)
              at org.apache.arrow.memory.util.HistoricalLog$Event.<init>(HistoricalLog.java:175)
              at org.apache.arrow.memory.util.HistoricalLog.recordEvent(HistoricalLog.java:83)
              at org.apache.arrow.memory.ArrowBuf.<init>(ArrowBuf.java:96)
              at org.apache.arrow.memory.BufferLedger.newArrowBuf(BufferLedger.java:271)
              at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:300)
              at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:276)
              at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
              at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:240)
              at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
              at REPL.$JShell$18.do_it$($JShell$18.java:13)
              at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-2)
              at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
              at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.lang.reflect.Method.invoke(Method.java:566)
              at jdk.jshell.execution.DirectExecutionControl.invoke(DirectExecutionControl.java:209)
              at jdk.jshell.execution.RemoteExecutionControl.invoke(RemoteExecutionControl.java:116)
              at jdk.jshell.execution.DirectExecutionControl.invoke(DirectExecutionControl.java:119)
              at jdk.jshell.execution.ExecutionControlForwarder.processCommand(ExecutionControlForwarder.java:144)
              at jdk.jshell.execution.ExecutionControlForwarder.commandLoop(ExecutionControlForwarder.java:262)
              at jdk.jshell.execution.Util.forwardExecutionControl(Util.java:76)
              at jdk.jshell.execution.Util.forwardExecutionControlAndIO(Util.java:137)

  reservations: 0

|        at BaseAllocator.close (BaseAllocator.java:405)
|        at RootAllocator.close (RootAllocator.java:29)
|        at (#8:1)

有时,显式传递分配器比较困难。例如,很难在现有的应用程序或框架代码层中传递额外的状态(如分配器)。在这种情况下,全局或单例分配器实例可能很有用,尽管这不应是您的首选。

如何实现:

  1. 在单例类中设置一个全局分配器。

  2. 提供从全局分配器创建子分配器的方法。

  3. 为子分配器命名,以便在出现错误时更容易弄清楚分配发生在哪里。

  4. 确保资源已正确关闭。

  5. 在合适的时机(例如程序关闭前)检查全局分配器是否为空。

  6. 如果它不为空,请检查上述的分配错误。

//1
private static final BufferAllocator allocator = new RootAllocator();
private static final AtomicInteger childNumber = new AtomicInteger(0);
...
//2
public static BufferAllocator getChildAllocator() {
    return allocator.newChildAllocator(nextChildName(), 0, Long.MAX_VALUE);
}
...
//3
private static String nextChildName() {
    return "Allocator-Child-" + childNumber.incrementAndGet();
}
...
//4: Business code
try (BufferAllocator allocator = GlobalAllocator.getChildAllocator()) {
    ...
}
...
//5
public static void checkGlobalCleanUpResources() {
    ...
    if (!allocator.getChildAllocators().isEmpty()) {
      throw new IllegalStateException(...);
    } else if (allocator.getAllocatedMemory() != 0) {
      throw new IllegalStateException(...);
    }
}

深入了解 Arrow 内存

设计原则

Arrow 的内存模型基于以下基本概念:

  • 内存分配有上限。该限制可以是物理限制(操作系统/JVM)或本地强加的限制。

  • 分配分为两个阶段:记账和实际分配。分配可能在任何一点失败。

  • 分配失败应该是可恢复的。在任何情况下,分配器基础设施都应将内存分配失败(OS 或基于内部限制的失败)作为 OutOfMemoryException 暴露出来。

  • 任何分配器在创建时都可以保留内存。这些内存应被持有,以便分配器始终能够分配该数量的内存。

  • 特定的应用程序组件应尽量使用本地分配器,以了解本地内存使用情况并更好地调试内存泄露。

  • 同一物理内存可以由多个分配器共享,分配器必须为此提供一种记账范式。

预留内存

Arrow 提供了两种不同的预留内存方式:

  • BufferAllocator 记账预留:当新分配器(RootAllocator 除外)初始化时,它可以预留在其生命周期内本地持有的内存。在分配器关闭之前,这些内存永远不会释放回其父分配器。

  • 通过 BufferAllocator.newReservation() 进行 AllocationReservation:允许短期预分配策略,以便特定的子系统确保有未来的可用内存来支持特定的请求。

引用计数详情

通常,使用的 ReferenceManager 实现是 BufferLedger 的一个实例。BufferLedger 是一种 ReferenceManager,它还维护 AllocationManagerBufferAllocator 以及一个或多个单独的 ArrowBuf 之间的关系。

所有与单个 BufferLedger/BufferAllocator 组合相关的 ArrowBuf(直接或切片)共享相同的引用计数,要么全部有效,要么全部无效。为了简化记账,我们将该内存视为由与该内存关联的其中一个 BufferAllocator 使用。当该分配器释放对该内存的声明时,内存所有权将转移到属于同一 AllocationManager 的另一个 BufferLedger。

分配详情

Arrow Java 中有几种分配器类型:

  • BufferAllocator - 应用程序用户应利用的公共接口。

  • BaseAllocator - 内存分配的基础实现,包含了 Arrow 分配器实现的核心。

  • RootAllocator - 根分配器。通常每个 JVM 只创建一个。它是子分配器的父级/祖先。

  • ChildAllocator - 派生自根分配器的子分配器。

许多 BufferAllocator 可以同时引用同一块物理内存。AllocationManager 有责任确保在这种情况下,从根的角度准确核算所有内存,并确保一旦所有 BufferAllocator 停止使用该内存,该内存即被正确释放。

为了简化记账,我们将该内存视为由与该内存关联的其中一个 BufferAllocator 使用。当该分配器释放其对内存的声明时,内存所有权将转移到属于同一 AllocationManager 的另一个 BufferLedger。请注意,因为 ArrowBuf.release() 实际上导致了内存所有权转移,所以我们总是继续进行所有权转移(即使这违反了分配器限制)。拥有特定分配器的应用程序有责任经常确认该分配器是否超过了其内存限制 (BufferAllocator.isOverLimit()),如果是,则尝试积极释放内存以改善情况。

对象层次结构

人们有两种主要方式来看待 Arrow 内存管理方案的对象层次结构。第一种是基于内存的视角,如下所示:

内存视角

+ AllocationManager
|
|-- UnsignedDirectLittleEndian (One per AllocationManager)
|
|-+ BufferLedger 1 ==> Allocator A (owning)
| ` - ArrowBuf 1
|-+ BufferLedger 2 ==> Allocator B (non-owning)
| ` - ArrowBuf 2
|-+ BufferLedger 3 ==> Allocator C (non-owning)
  | - ArrowBuf 3
  | - ArrowBuf 4
  ` - ArrowBuf 5

在这张图中,一块内存由分配器管理器所有。无论它与哪个(哪些)分配器一起工作,分配器管理器都负责那块内存。分配器管理器将与原始内存块建立关系(通过其对 UnsignedDirectLittleEndian 的引用),以及与它有关联的每个 BufferAllocator 建立引用。

分配器视角

+ RootAllocator
|-+ ChildAllocator 1
| | - ChildAllocator 1.1
| ` ...
|
|-+ ChildAllocator 2
|-+ ChildAllocator 3
| |
| |-+ BufferLedger 1 ==> AllocationManager 1 (owning) ==> UDLE
| | `- ArrowBuf 1
| `-+ BufferLedger 2 ==> AllocationManager 2 (non-owning)==> UDLE
|   `- ArrowBuf 2
|
|-+ BufferLedger 3 ==> AllocationManager 1 (non-owning)==> UDLE
| ` - ArrowBuf 3
|-+ BufferLedger 4 ==> AllocationManager 2 (owning) ==> UDLE
  | - ArrowBuf 4
  | - ArrowBuf 5
  ` - ArrowBuf 6

在这张图中,一个 RootAllocator 拥有三个 ChildAllocator。第一个 ChildAllocator 拥有一个后续的 ChildAllocator。ChildAllocator 有两个 BufferLedger/AllocationManager 引用。巧合的是,这些 AllocationManager 中的每一个也与 RootAllocator 相关联。在这种情况下,这些 AllocationManager 之一由 ChildAllocator 3 (AllocationManager 1) 拥有,而另一个 AllocationManager (AllocationManager 2) 则由 RootAllocator 拥有/核算。请注意,在此场景中,ArrowBuf 1 与 ArrowBuf 3 共享底层内存。但是,该内存的子集(例如通过切片)可能不同。还要注意 ArrowBuf 2 和 ArrowBuf 4、5、6 也共享相同的底层内存。还要注意 ArrowBuf 4、5 和 6 都共享相同的引用计数和命运。