内存管理#

内存模块包含 Arrow 用于分配和释放内存的所有功能。本文档分为两部分:第一部分,内存基础,提供了一个高级介绍。下一部分,Arrow 内存深入,详细介绍了这些内容。

内存基础#

本节将介绍 Java 内存管理中的主要概念

它还提供了一些在 Arrow 中使用内存的指南,并描述了如何在出现内存问题时调试它们。

入门#

Arrow 的内存管理建立在列式格式和使用堆外内存的需求之上。Arrow Java 有自己的独立实现。它不包装 C++ 实现,尽管该框架足够灵活,可以与 C++ 中分配的内存一起使用,这些内存由 Java 代码使用。

Arrow 提供了多个模块:核心接口和接口的实现。用户需要核心接口,以及其中一个实现。

  • memory-core: 提供 Arrow 库和应用程序使用的接口。

  • memory-netty: 基于 Netty 库的内存接口实现。

  • memory-unsafe: 基于 sun.misc.Unsafe 库的内存接口实现。

ArrowBuf#

ArrowBuf 表示 直接内存 的单个连续区域。它由一个地址和一个长度组成,并提供用于处理内容的低级接口,类似于 ByteBuffer。

与 (Direct)ByteBuffer 不同,它内置了引用计数,如后所述。

为什么 Arrow 使用直接内存#

  • JVM 在使用直接内存/直接缓冲区时可以优化 I/O 操作;它将尝试避免将缓冲区内容复制到/从中间缓冲区。这可以加快 Arrow 中的 IPC。

  • 由于 Arrow 始终使用直接内存,因此 JNI 模块可以直接包装本机内存地址,而不是复制数据。我们在 C 数据接口等模块中使用它。

  • 相反,在 JNI 边界处的 C++ 端,我们可以直接访问 ArrowBuf 中的内存,而无需复制数据。

BufferAllocator#

BufferAllocator 主要是一个用于缓冲区(ArrowBuf 实例)会计的竞技场或托儿所。顾名思义,它可以分配与其自身关联的新缓冲区,但它也可以处理在其他地方分配的缓冲区的会计。例如,它处理在 C++ 中分配并在使用 C-Data 接口与 Java 共享时分配的内存的 Java 端会计。在下面的代码中,它执行分配

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;

try(BufferAllocator bufferAllocator = new RootAllocator(8 * 1024)){
    ArrowBuf arrowBuf = bufferAllocator.buffer(4 * 1024);
    System.out.println(arrowBuf);
    arrowBuf.close();
}
ArrowBuf[2], address:140363641651200, length:4096

BufferAllocator 接口的具体实现是 RootAllocator。应用程序通常应该在程序开始时创建一个 RootAllocator,并通过 BufferAllocator 接口使用它。分配器实现 AutoCloseable,并且必须在应用程序完成使用它们后关闭;这将检查所有未完成的内存是否已释放(参见下一节)。

Arrow 为内存分配提供了一个基于树的模型。首先创建 RootAllocator,然后通过 newChildAllocator 创建更多分配器作为现有分配器的子级。在创建 RootAllocator 或子分配器时,会提供一个内存限制,并在分配内存时检查该限制。此外,当从子分配器分配内存时,这些分配也会反映在所有父分配器中。因此,RootAllocator 有效地设置了程序范围的内存限制,并充当所有内存分配的主记账员。

子分配器不是严格要求的,但可以帮助更好地组织代码。例如,可以为代码的特定部分设置较低的内存限制。当该部分完成时,可以关闭子分配器,此时它会检查该部分是否没有泄漏任何内存。子分配器也可以命名,这使得在调试期间更容易确定 ArrowBuf 来自哪里。

引用计数#

由于直接内存分配和释放成本很高,因此分配器可能会共享直接缓冲区。为了确定性地管理共享缓冲区,我们使用手动引用计数而不是垃圾收集器。这仅仅意味着每个缓冲区都有一个计数器跟踪对缓冲区的引用次数,用户负责在使用缓冲区时正确地增加/减少计数器。

在 Arrow 中,每个 ArrowBuf 都有一个关联的 ReferenceManager 来跟踪引用计数。您可以使用 ArrowBuf.getReferenceManager() 获取它。引用计数使用 ReferenceManager.release 来减少计数,使用 ReferenceManager.retain 来增加计数。

当然,这很繁琐且容易出错,因此我们通常不直接使用缓冲区,而是使用 ValueVector 等更高级别的 API。这些类通常实现 Closeable/AutoCloseable,并且会在关闭时自动减少引用计数。

分配器也实现 AutoCloseable。在这种情况下,关闭分配器将检查从分配器获得的所有缓冲区是否已关闭。如果没有,close() 方法将引发异常;这有助于跟踪来自未关闭缓冲区的内存泄漏。

引用计数需要谨慎处理。为了确保代码的独立部分已完全清理所有分配的缓冲区,请使用新的子分配器。

开发指南#

应用程序通常应该

  • 在 API 中使用 BufferAllocator 接口而不是 RootAllocator。

  • 在程序开始时创建一个 RootAllocator,并在需要时显式传递它。

  • close() 使用完分配器后(无论是子分配器还是 RootAllocator),无论是手动还是最好通过 try-with-resources 语句。

调试内存泄漏/分配#

DEBUG 模式下,分配器和支持类将记录额外的调试跟踪信息,以便更好地跟踪内存泄漏和问题。要启用 DEBUG 模式,在启动时将以下系统属性传递给 VM -Darrow.memory.debug.allocator=true

启用 DEBUG 时,将保留分配日志。配置 SLF4J 以查看这些日志(例如,通过 Logback/Apache Log4j)。考虑以下示例,以了解它如何帮助我们跟踪分配器

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;

try (BufferAllocator bufferAllocator = new RootAllocator(8 * 1024)) {
    ArrowBuf arrowBuf = bufferAllocator.buffer(4 * 1024);
    System.out.println(arrowBuf);
}

在没有启用调试模式的情况下,当我们关闭分配器时,我们会得到以下结果

11:56:48.944 [main] INFO  o.apache.arrow.memory.BaseAllocator - Debug mode disabled.
ArrowBuf[2], address:140508391276544, length:4096
16:28:08.847 [main] ERROR o.apache.arrow.memory.BaseAllocator - Memory was leaked by query. Memory leaked: (4096)
Allocator(ROOT) 0/4096/4096/8192 (res/actual/peak/limit)

启用调试模式后,我们会获得更多详细信息

11:56:48.944 [main] INFO  o.apache.arrow.memory.BaseAllocator - Debug mode enabled.
ArrowBuf[2], address:140437894463488, length:4096
Exception in thread "main" java.lang.IllegalStateException: Allocator[ROOT] closed with outstanding buffers allocated (1).
Allocator(ROOT) 0/4096/4096/8192 (res/actual/peak/limit)
  child allocators: 0
  ledgers: 1
    ledger[1] allocator: ROOT), isOwning: , size: , references: 1, life: 261438177096661..0, allocatorManager: [, life: ] holds 1 buffers.
        ArrowBuf[2], address:140437894463488, length:4096
  reservations: 0

此外,在调试模式下,ArrowBuf.print() 可用于获取调试字符串。这将包括有关缓冲区分配操作的信息,以及堆栈跟踪,例如何时/何地分配了缓冲区。

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;

try (final BufferAllocator allocator = new RootAllocator()) {
  try (final ArrowBuf buf = allocator.buffer(1024)) {
    final StringBuilder sb = new StringBuilder();
    buf.print(sb, /*indent*/ 0);
    System.out.println(sb.toString());
  }
}
ArrowBuf[2], address:140433199984656, length:1024
 event log for: ArrowBuf[2]
   675959093395667 create()
      at org.apache.arrow.memory.util.HistoricalLog$Event.<init>(HistoricalLog.java:175)
      at org.apache.arrow.memory.util.HistoricalLog.recordEvent(HistoricalLog.java:83)
      at org.apache.arrow.memory.ArrowBuf.<init>(ArrowBuf.java:96)
      at org.apache.arrow.memory.BufferLedger.newArrowBuf(BufferLedger.java:271)
      at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:300)
      at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:276)
      at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
      at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:240)
      at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
      at REPL.$JShell$14.do_it$($JShell$14.java:10)
      at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-2)
      at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:566)
      at jdk.jshell.execution.DirectExecutionControl.invoke(DirectExecutionControl.java:209)
      at jdk.jshell.execution.RemoteExecutionControl.invoke(RemoteExecutionControl.java:116)
      at jdk.jshell.execution.DirectExecutionControl.invoke(DirectExecutionControl.java:119)
      at jdk.jshell.execution.ExecutionControlForwarder.processCommand(ExecutionControlForwarder.java:144)
      at jdk.jshell.execution.ExecutionControlForwarder.commandLoop(ExecutionControlForwarder.java:262)
      at jdk.jshell.execution.Util.forwardExecutionControl(Util.java:76)
      at jdk.jshell.execution.Util.forwardExecutionControlAndIO(Util.java:137)
      at jdk.jshell.execution.RemoteExecutionControl.main(RemoteExecutionControl.java:70)

BufferAllocator 还提供了一个 BufferAllocator.toVerboseString(),它可以在 DEBUG 模式下使用,以获取与各种分配器行为相关的广泛堆栈跟踪信息和事件。

最后,启用 TRACE 日志级别将自动在关闭分配器时提供此堆栈跟踪

// Assumes use of Logback; adjust for Log4j, etc. as appropriate
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.slf4j.LoggerFactory;

// Set log level to TRACE to get tracebacks
((Logger) LoggerFactory.getLogger("org.apache.arrow")).setLevel(Level.TRACE);
try (final BufferAllocator allocator = new RootAllocator()) {
  // Leak buffer
  allocator.buffer(1024);
}
|  Exception java.lang.IllegalStateException: Allocator[ROOT] closed with outstanding buffers allocated (1).
Allocator(ROOT) 0/1024/1024/9223372036854775807 (res/actual/peak/limit)
  child allocators: 0
  ledgers: 1
    ledger[1] allocator: ROOT), isOwning: , size: , references: 1, life: 712040870231544..0, allocatorManager: [, life: ] holds 1 buffers.
        ArrowBuf[2], address:139926571810832, length:1024
     event log for: ArrowBuf[2]
       712040888650134 create()
              at org.apache.arrow.memory.util.StackTrace.<init>(StackTrace.java:34)
              at org.apache.arrow.memory.util.HistoricalLog$Event.<init>(HistoricalLog.java:175)
              at org.apache.arrow.memory.util.HistoricalLog.recordEvent(HistoricalLog.java:83)
              at org.apache.arrow.memory.ArrowBuf.<init>(ArrowBuf.java:96)
              at org.apache.arrow.memory.BufferLedger.newArrowBuf(BufferLedger.java:271)
              at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:300)
              at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:276)
              at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
              at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:240)
              at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
              at REPL.$JShell$18.do_it$($JShell$18.java:13)
              at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-2)
              at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
              at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.lang.reflect.Method.invoke(Method.java:566)
              at jdk.jshell.execution.DirectExecutionControl.invoke(DirectExecutionControl.java:209)
              at jdk.jshell.execution.RemoteExecutionControl.invoke(RemoteExecutionControl.java:116)
              at jdk.jshell.execution.DirectExecutionControl.invoke(DirectExecutionControl.java:119)
              at jdk.jshell.execution.ExecutionControlForwarder.processCommand(ExecutionControlForwarder.java:144)
              at jdk.jshell.execution.ExecutionControlForwarder.commandLoop(ExecutionControlForwarder.java:262)
              at jdk.jshell.execution.Util.forwardExecutionControl(Util.java:76)
              at jdk.jshell.execution.Util.forwardExecutionControlAndIO(Util.java:137)

  reservations: 0

|        at BaseAllocator.close (BaseAllocator.java:405)
|        at RootAllocator.close (RootAllocator.java:29)
|        at (#8:1)

有时,显式传递分配器很困难。例如,很难通过现有应用程序或框架代码的层传递额外的状态,例如分配器。全局或单例分配器实例在这里很有用,尽管它不应该是您的首选。

工作原理

  1. 在单例类中设置全局分配器。

  2. 提供从全局分配器创建子分配器的方法。

  3. 为子分配器提供适当的名称,以便在发生错误时更容易找出分配发生的位置。

  4. 确保资源已正确关闭。

  5. 检查全局分配器在某个合适的时间点是否为空,例如在程序关闭之前。

  6. 如果它不为空,请查看上述分配错误。

//1
private static final BufferAllocator allocator = new RootAllocator();
private static final AtomicInteger childNumber = new AtomicInteger(0);
...
//2
public static BufferAllocator getChildAllocator() {
    return allocator.newChildAllocator(nextChildName(), 0, Long.MAX_VALUE);
}
...
//3
private static String nextChildName() {
    return "Allocator-Child-" + childNumber.incrementAndGet();
}
...
//4: Business code
try (BufferAllocator allocator = GlobalAllocator.getChildAllocator()) {
    ...
}
...
//5
public static void checkGlobalCleanUpResources() {
    ...
    if (!allocator.getChildAllocators().isEmpty()) {
      throw new IllegalStateException(...);
    } else if (allocator.getAllocatedMemory() != 0) {
      throw new IllegalStateException(...);
    }
}

Arrow 内存深入#

设计原则#

Arrow 的内存模型基于以下基本概念

  • 内存分配存在上限。该上限可能是实际限制(操作系统/JVM)或本地施加的限制。

  • 分配分为两个阶段:记账和实际分配。分配可能在任一阶段失败。

  • 分配失败应该是可恢复的。在所有情况下,分配器基础设施应将内存分配失败(基于操作系统或内部限制)公开为 OutOfMemoryException

  • 任何分配器在创建时都可以保留内存。此内存应保留,以便此分配器始终能够分配该数量的内存。

  • 特定应用程序组件应努力使用本地分配器来了解本地内存使用情况并更好地调试内存泄漏。

  • 相同的物理内存可以由多个分配器共享,分配器必须为此目的提供记账范式。

保留内存#

Arrow 提供两种不同的方法来保留内存

  • BufferAllocator 记账保留:当一个新的分配器(除了 RootAllocator)被初始化时,它可以预留一些内存,并在其生命周期内保留在本地。这部分内存永远不会在分配器关闭之前释放回其父分配器。

  • AllocationReservation 通过 BufferAllocator.newReservation():允许短期预分配策略,以便特定子系统可以确保将来有足够的内存来支持特定请求。

引用计数细节#

通常,使用的 ReferenceManager 实现是 BufferLedger 的实例。BufferLedger 是一个 ReferenceManager,它还维护 AllocationManagerBufferAllocator 和一个或多个单独的 ArrowBuf 之间的关系。

与单个 BufferLedger/BufferAllocator 组合相关的所有 ArrowBufs(直接或切片)共享相同的引用计数,并且要么全部有效,要么全部无效。为了记账的简单性,我们将该内存视为由与该内存关联的 BufferAllocator 之一使用。当该分配器释放对该内存的声明时,内存所有权将转移到属于同一 AllocationManager 的另一个 BufferLedger。

分配细节#

Arrow Java 中有几种分配器类型

  • BufferAllocator - 应用程序用户应该利用的公共接口

  • BaseAllocator - 内存分配的基本实现,包含 Arrow 分配器实现的核心内容

  • RootAllocator - 根分配器。通常为 JVM 创建一个。它充当子分配器的父级/祖先

  • ChildAllocator - 从根分配器派生的子分配器

许多 BufferAllocator 可以同时引用同一块物理内存。AllocationManager 负责确保在这种情况下,所有内存都从 Root 的角度准确地记账,并确保在所有 BufferAllocator 停止使用该内存后正确释放该内存。

为了记账的简单性,我们将该内存视为由与该内存关联的 BufferAllocator 之一使用。当该分配器释放对该内存的声明时,内存所有权将转移到属于同一 AllocationManager 的另一个 BufferLedger。请注意,由于 ArrowBuf.release() 是实际导致内存所有权转移的原因,因此我们始终进行所有权转移(即使这违反了分配器限制)。拥有特定分配器的应用程序有责任经常确认分配器是否超过其内存限制 (BufferAllocator.isOverLimit()),如果是,则尝试积极释放内存以缓解这种情况。

对象层次结构#

有两种主要方法可以查看 Arrow 内存管理方案的对象层次结构。第一个是基于内存的视角,如下所示

内存视角#

+ AllocationManager
|
|-- UnsignedDirectLittleEndian (One per AllocationManager)
|
|-+ BufferLedger 1 ==> Allocator A (owning)
| ` - ArrowBuf 1
|-+ BufferLedger 2 ==> Allocator B (non-owning)
| ` - ArrowBuf 2
|-+ BufferLedger 3 ==> Allocator C (non-owning)
  | - ArrowBuf 3
  | - ArrowBuf 4
  ` - ArrowBuf 5

在这张图中,一块内存由分配器管理器拥有。分配器管理器负责这块内存,无论它与哪个分配器(s)一起工作。分配器管理器将与一块原始内存(通过其对 UnsignedDirectLittleEndian 的引用)以及对每个与其有关系的 BufferAllocator 的引用建立关系。

分配器视角#

+ RootAllocator
|-+ ChildAllocator 1
| | - ChildAllocator 1.1
| ` ...
|
|-+ ChildAllocator 2
|-+ ChildAllocator 3
| |
| |-+ BufferLedger 1 ==> AllocationManager 1 (owning) ==> UDLE
| | `- ArrowBuf 1
| `-+ BufferLedger 2 ==> AllocationManager 2 (non-owning)==> UDLE
|   `- ArrowBuf 2
|
|-+ BufferLedger 3 ==> AllocationManager 1 (non-owning)==> UDLE
| ` - ArrowBuf 3
|-+ BufferLedger 4 ==> AllocationManager 2 (owning) ==> UDLE
  | - ArrowBuf 4
  | - ArrowBuf 5
  ` - ArrowBuf 6

在这张图中,RootAllocator 拥有三个 ChildAllocator。第一个 ChildAllocator (ChildAllocator 1) 拥有一个后续的 ChildAllocator。ChildAllocator 有两个 BufferLedger/AllocationManager 引用。巧合的是,这些 AllocationManager 中的每一个也与 RootAllocator 相关联。在这种情况下,这些 AllocationManager 之一由 ChildAllocator 3 (AllocationManager 1) 拥有/记账,而另一个 AllocationManager (AllocationManager 2) 由 RootAllocator 拥有/记账。请注意,在这种情况下,ArrowBuf 1 与 ArrowBuf 3 共享底层内存。但是,该内存的子集(例如,通过切片)可能不同。还要注意,ArrowBuf 2 和 ArrowBuf 4、5 和 6 也共享相同的底层内存。还要注意,ArrowBuf 4、5 和 6 都共享相同的引用计数和命运。