内存管理#

内存模块包含 Arrow 用于分配和释放内存的所有功能。本文档分为两部分:第一部分,内存基础,提供了一个高级介绍。接下来的部分,Arrow 内存深入,详细说明了细节。

内存基础#

本节将向您介绍 Java 内存管理中的主要概念

它还提供了一些在 Arrow 中使用内存的指南,并描述了在出现内存问题时如何调试这些问题。

入门#

Arrow 的内存管理建立在列式格式和使用堆外内存的需求之上。Arrow Java 有其自己的独立实现。它不包装 C++ 实现,尽管该框架足够灵活,可以与 C++ 中分配的内存一起使用,这些内存由 Java 代码使用。

Arrow 提供多个模块:核心接口和接口的实现。用户需要核心接口,以及正好一个实现。

  • memory-core:提供 Arrow 库和应用程序使用的接口。

  • memory-netty:基于 Netty 库的内存接口实现。

  • memory-unsafe:基于 sun.misc.Unsafe 库的内存接口实现。

ArrowBuf#

ArrowBuf 表示一个连续的 直接内存区域。它由一个地址和一个长度组成,并提供用于处理内容的低级接口,类似于 ByteBuffer。

与 (Direct)ByteBuffer 不同,它内置了引用计数,如下所述。

为什么 Arrow 使用直接内存#

  • JVM 在使用直接内存/直接缓冲区时可以优化 I/O 操作;它会尝试避免将缓冲区内容复制到/从中间缓冲区。这可以加快 Arrow 中的 IPC 速度。

  • 由于 Arrow 始终使用直接内存,因此 JNI 模块可以直接包装本机内存地址,而不是复制数据。我们在 C 数据接口等模块中使用此功能。

  • 相反,在 JNI 边界的 C++ 侧,我们可以直接访问 ArrowBuf 中的内存,而无需复制数据。

BufferAllocator#

BufferAllocator 主要是一个用于缓冲区 (ArrowBuf 实例) 会计的区域或育苗场。顾名思义,它可以分配与自身关联的新缓冲区,但它也可以处理在其他地方分配的缓冲区的会计。例如,它处理 C++ 中分配并在使用 C 数据接口与 Java 共享的内存的 Java 端会计。在下面的代码中,它执行分配

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;

try(BufferAllocator bufferAllocator = new RootAllocator(8 * 1024)){
    ArrowBuf arrowBuf = bufferAllocator.buffer(4 * 1024);
    System.out.println(arrowBuf);
    arrowBuf.close();
}
ArrowBuf[2], address:140363641651200, length:4096

BufferAllocator 接口的具体实现是 RootAllocator。应用程序通常应在程序开始时创建一个 RootAllocator,并通过 BufferAllocator 接口使用它。分配器实现 AutoCloseable 并且必须在应用程序完成后关闭;这将检查所有未完成的内存是否已释放(参见下一节)。

Arrow 为内存分配提供了一个基于树的模型。首先创建 RootAllocator,然后通过 newChildAllocator创建更多分配器作为现有分配器的子节点。创建 RootAllocator 或子分配器时,会提供内存限制,并且在分配内存时会检查该限制。此外,当从子分配器分配内存时,这些分配也会反映在所有父分配器中。因此,RootAllocator 有效地设置了程序范围的内存限制,并充当所有内存分配的主记账员。

子分配器不是严格要求的,但可以帮助更好地组织代码。例如,可以为特定代码段设置较低的内存限制。该代码段完成后可以关闭子分配器,此时它会检查该代码段是否未泄漏任何内存。子分配器也可以命名,这使得在调试期间更容易了解 ArrowBuf 的来源。

引用计数#

由于直接内存的分配和释放成本很高,因此分配器可能会共享直接缓冲区。为了确定性地管理共享缓冲区,我们使用手动引用计数而不是垃圾收集器。这仅仅意味着每个缓冲区都有一个计数器跟踪对该缓冲区的引用次数,并且用户负责在使用缓冲区时正确增加/减少计数器。

在 Arrow 中,每个 ArrowBuf 都有一个关联的 ReferenceManager 来跟踪引用计数。您可以使用 ArrowBuf.getReferenceManager() 获取它。引用计数使用 ReferenceManager.release 来递减计数,以及使用 ReferenceManager.retain 来递增计数。

当然,这很繁琐且容易出错,因此,我们通常不直接使用缓冲区,而是使用 ValueVector 等更高级别的 API。此类类通常实现 Closeable/AutoCloseable,并在关闭时自动递减引用计数。

分配器也实现 AutoCloseable。在这种情况下,关闭分配器将检查从分配器获得的所有缓冲区是否都已关闭。如果没有,close() 方法将引发异常;这有助于跟踪未关闭缓冲区导致的内存泄漏。

引用计数需要谨慎处理。为了确保代码的独立部分已完全清理所有分配的缓冲区,请使用新的子分配器。

开发指南#

应用程序通常应

  • 在 API 中使用 BufferAllocator 接口而不是 RootAllocator。

  • 在程序开始时创建一个 RootAllocator,并在需要时显式传递它。

  • 在使用后关闭分配器(无论是子分配器还是 RootAllocator),无论是手动还是最好通过 try-with-resources 语句。

调试内存泄漏/分配#

DEBUG 模式下,分配器和支持类将记录其他调试跟踪信息,以更好地跟踪内存泄漏和问题。要启用 DEBUG 模式,在启动时向 VM 传递以下系统属性 -Darrow.memory.debug.allocator=true

启用 DEBUG 后,将保留分配的日志。配置 SLF4J 以查看这些日志(例如,通过 Logback/Apache Log4j)。考虑以下示例以了解它如何帮助我们跟踪分配器

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;

try (BufferAllocator bufferAllocator = new RootAllocator(8 * 1024)) {
    ArrowBuf arrowBuf = bufferAllocator.buffer(4 * 1024);
    System.out.println(arrowBuf);
}

在未启用调试模式的情况下,当我们关闭分配器时,我们会得到以下内容

11:56:48.944 [main] INFO  o.apache.arrow.memory.BaseAllocator - Debug mode disabled.
ArrowBuf[2], address:140508391276544, length:4096
16:28:08.847 [main] ERROR o.apache.arrow.memory.BaseAllocator - Memory was leaked by query. Memory leaked: (4096)
Allocator(ROOT) 0/4096/4096/8192 (res/actual/peak/limit)

启用调试模式,我们可以获得更多详细信息

11:56:48.944 [main] INFO  o.apache.arrow.memory.BaseAllocator - Debug mode enabled.
ArrowBuf[2], address:140437894463488, length:4096
Exception in thread "main" java.lang.IllegalStateException: Allocator[ROOT] closed with outstanding buffers allocated (1).
Allocator(ROOT) 0/4096/4096/8192 (res/actual/peak/limit)
  child allocators: 0
  ledgers: 1
    ledger[1] allocator: ROOT), isOwning: , size: , references: 1, life: 261438177096661..0, allocatorManager: [, life: ] holds 1 buffers.
        ArrowBuf[2], address:140437894463488, length:4096
  reservations: 0

此外,在调试模式下,ArrowBuf.print() 可用于获取调试字符串。这将包括有关缓冲区上分配操作的信息以及堆栈跟踪,例如缓冲区何时/何处分配。

import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;

try (final BufferAllocator allocator = new RootAllocator()) {
  try (final ArrowBuf buf = allocator.buffer(1024)) {
    final StringBuilder sb = new StringBuilder();
    buf.print(sb, /*indent*/ 0);
    System.out.println(sb.toString());
  }
}
ArrowBuf[2], address:140433199984656, length:1024
 event log for: ArrowBuf[2]
   675959093395667 create()
      at org.apache.arrow.memory.util.HistoricalLog$Event.<init>(HistoricalLog.java:175)
      at org.apache.arrow.memory.util.HistoricalLog.recordEvent(HistoricalLog.java:83)
      at org.apache.arrow.memory.ArrowBuf.<init>(ArrowBuf.java:96)
      at org.apache.arrow.memory.BufferLedger.newArrowBuf(BufferLedger.java:271)
      at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:300)
      at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:276)
      at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
      at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:240)
      at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
      at REPL.$JShell$14.do_it$($JShell$14.java:10)
      at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-2)
      at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:566)
      at jdk.jshell.execution.DirectExecutionControl.invoke(DirectExecutionControl.java:209)
      at jdk.jshell.execution.RemoteExecutionControl.invoke(RemoteExecutionControl.java:116)
      at jdk.jshell.execution.DirectExecutionControl.invoke(DirectExecutionControl.java:119)
      at jdk.jshell.execution.ExecutionControlForwarder.processCommand(ExecutionControlForwarder.java:144)
      at jdk.jshell.execution.ExecutionControlForwarder.commandLoop(ExecutionControlForwarder.java:262)
      at jdk.jshell.execution.Util.forwardExecutionControl(Util.java:76)
      at jdk.jshell.execution.Util.forwardExecutionControlAndIO(Util.java:137)
      at jdk.jshell.execution.RemoteExecutionControl.main(RemoteExecutionControl.java:70)

BufferAllocator 还提供了一个 BufferAllocator.toVerboseString(),它可以在 DEBUG 模式下使用以获取与各种分配器行为相关的广泛堆栈跟踪信息和事件。

最后,启用 TRACE 日志级别将自动在关闭分配器时提供此堆栈跟踪

// Assumes use of Logback; adjust for Log4j, etc. as appropriate
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.slf4j.LoggerFactory;

// Set log level to TRACE to get tracebacks
((Logger) LoggerFactory.getLogger("org.apache.arrow")).setLevel(Level.TRACE);
try (final BufferAllocator allocator = new RootAllocator()) {
  // Leak buffer
  allocator.buffer(1024);
}
|  Exception java.lang.IllegalStateException: Allocator[ROOT] closed with outstanding buffers allocated (1).
Allocator(ROOT) 0/1024/1024/9223372036854775807 (res/actual/peak/limit)
  child allocators: 0
  ledgers: 1
    ledger[1] allocator: ROOT), isOwning: , size: , references: 1, life: 712040870231544..0, allocatorManager: [, life: ] holds 1 buffers.
        ArrowBuf[2], address:139926571810832, length:1024
     event log for: ArrowBuf[2]
       712040888650134 create()
              at org.apache.arrow.memory.util.StackTrace.<init>(StackTrace.java:34)
              at org.apache.arrow.memory.util.HistoricalLog$Event.<init>(HistoricalLog.java:175)
              at org.apache.arrow.memory.util.HistoricalLog.recordEvent(HistoricalLog.java:83)
              at org.apache.arrow.memory.ArrowBuf.<init>(ArrowBuf.java:96)
              at org.apache.arrow.memory.BufferLedger.newArrowBuf(BufferLedger.java:271)
              at org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:300)
              at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:276)
              at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
              at org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:240)
              at org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
              at REPL.$JShell$18.do_it$($JShell$18.java:13)
              at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(NativeMethodAccessorImpl.java:-2)
              at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
              at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
              at java.lang.reflect.Method.invoke(Method.java:566)
              at jdk.jshell.execution.DirectExecutionControl.invoke(DirectExecutionControl.java:209)
              at jdk.jshell.execution.RemoteExecutionControl.invoke(RemoteExecutionControl.java:116)
              at jdk.jshell.execution.DirectExecutionControl.invoke(DirectExecutionControl.java:119)
              at jdk.jshell.execution.ExecutionControlForwarder.processCommand(ExecutionControlForwarder.java:144)
              at jdk.jshell.execution.ExecutionControlForwarder.commandLoop(ExecutionControlForwarder.java:262)
              at jdk.jshell.execution.Util.forwardExecutionControl(Util.java:76)
              at jdk.jshell.execution.Util.forwardExecutionControlAndIO(Util.java:137)

  reservations: 0

|        at BaseAllocator.close (BaseAllocator.java:405)
|        at RootAllocator.close (RootAllocator.java:29)
|        at (#8:1)

有时,显式传递分配器很困难。例如,很难通过现有应用程序或框架代码的各层传递额外的状态,例如分配器。全局或单例分配器实例在这里很有用,尽管它不应是您的首选。

工作原理

  1. 在单例类中设置全局分配器。

  2. 提供从全局分配器创建子分配器的方法。

  3. 为子分配器提供合适的名称,以便在发生错误时更容易找出分配发生的位置。

  4. 确保正确关闭资源。

  5. 检查全局分配器在某个合适的时间点(例如,在程序关闭之前)是否为空。

  6. 如果它不为空,请查看上述分配错误。

//1
private static final BufferAllocator allocator = new RootAllocator();
private static final AtomicInteger childNumber = new AtomicInteger(0);
...
//2
public static BufferAllocator getChildAllocator() {
    return allocator.newChildAllocator(nextChildName(), 0, Long.MAX_VALUE);
}
...
//3
private static String nextChildName() {
    return "Allocator-Child-" + childNumber.incrementAndGet();
}
...
//4: Business code
try (BufferAllocator allocator = GlobalAllocator.getChildAllocator()) {
    ...
}
...
//5
public static void checkGlobalCleanUpResources() {
    ...
    if (!allocator.getChildAllocators().isEmpty()) {
      throw new IllegalStateException(...);
    } else if (allocator.getAllocatedMemory() != 0) {
      throw new IllegalStateException(...);
    }
}

Arrow 内存深入#

设计原则#

Arrow 的内存模型基于以下基本概念

  • 内存可以分配到某个限制。该限制可能是实际限制(操作系统/JVM)或本地施加的限制。

  • 分配分两个阶段进行:会计然后实际分配。分配在任一阶段都可能失败。

  • 分配失败应该是可恢复的。在所有情况下,分配器基础设施都应将内存分配失败(操作系统或内部限制)公开为 OutOfMemoryException

  • 任何分配器都可以在创建时预留内存。此内存应保留,以便此分配器始终能够分配该数量的内存。

  • 特定的应用程序组件应努力使用本地分配器来了解本地内存使用情况并更好地调试内存泄漏。

  • 相同的物理内存可以由多个分配器共享,并且分配器必须为此目的提供会计范例。

预留内存#

Arrow 提供两种不同的预留内存方式

  • BufferAllocator 会计预留:当新的分配器(RootAllocator 除外)初始化时,它可以留出它将在其生命周期内本地保留的内存。这是永远不会释放回其父分配器的内存,直到分配器关闭。

  • AllocationReservation 通过 BufferAllocator.newReservation():允许短期预分配策略,以便特定子系统可以确保将来有足够的内存来支持特定请求。

引用计数细节#

通常,使用的 ReferenceManager 实现是 BufferLedger 的实例。BufferLedger 是一个 ReferenceManager,它还维护 AllocationManagerBufferAllocator 和一个或多个单独的 ArrowBuf 之间的关系

与单个 BufferLedger/BufferAllocator 组合相关的全部 ArrowBuf(直接或切片)共享相同的引用计数,并且所有缓冲区要么都有效,要么都无效。为了简化会计,我们将该内存视为由与该内存关联的 BufferAllocator 之一使用。当该分配器释放对该内存的索赔时,内存所有权将移至属于相同 AllocationManager 的另一个 BufferLedger。

分配细节#

Arrow Java 中有几种分配器类型

  • BufferAllocator - 应用程序用户应该使用的公共接口

  • BaseAllocator - 内存分配的基本实现,包含 Arrow 分配器实现的核心逻辑

  • RootAllocator - 根分配器。通常在 JVM 中只创建一个。它充当子分配器的父级/祖先

  • ChildAllocator - 从根分配器派生的子分配器

多个 BufferAllocator 可以同时引用同一块物理内存。AllocationManager 负责确保在这种情况下,从 Root 的角度来看所有内存都被准确地记录,并确保在所有 BufferAllocator 停止使用该内存后正确释放内存。

为了简化记账,我们将该内存视为由与该内存关联的 BufferAllocator 之一使用。当该分配器释放其对该内存的声明时,内存所有权将转移到属于同一 AllocationManager 的另一个 BufferLedger。请注意,由于 ArrowBuf.release() 是实际导致内存所有权转移的原因,因此我们始终继续进行所有权转移(即使这违反了分配器限制)。拥有特定分配器的应用程序有责任频繁确认分配器是否超过其内存限制 (BufferAllocator.isOverLimit()),如果是,则尝试积极释放内存以缓解这种情况。

对象层次结构#

有两种主要方法可以查看 Arrow 内存管理方案的对象层次结构。第一种是基于内存的视角,如下所示

内存视角#

+ AllocationManager
|
|-- UnsignedDirectLittleEndian (One per AllocationManager)
|
|-+ BufferLedger 1 ==> Allocator A (owning)
| ` - ArrowBuf 1
|-+ BufferLedger 2 ==> Allocator B (non-owning)
| ` - ArrowBuf 2
|-+ BufferLedger 3 ==> Allocator C (non-owning)
  | - ArrowBuf 3
  | - ArrowBuf 4
  ` - ArrowBuf 5

在此图中,一块内存由分配器管理器拥有。分配器管理器负责该内存块,无论它与哪个分配器(或分配器)一起工作。分配器管理器将与一块原始内存(通过其对 UnsignedDirectLittleEndian 的引用)以及与其相关的每个 BufferAllocator 的引用建立关系。

分配器视角#

+ RootAllocator
|-+ ChildAllocator 1
| | - ChildAllocator 1.1
| ` ...
|
|-+ ChildAllocator 2
|-+ ChildAllocator 3
| |
| |-+ BufferLedger 1 ==> AllocationManager 1 (owning) ==> UDLE
| | `- ArrowBuf 1
| `-+ BufferLedger 2 ==> AllocationManager 2 (non-owning)==> UDLE
|   `- ArrowBuf 2
|
|-+ BufferLedger 3 ==> AllocationManager 1 (non-owning)==> UDLE
| ` - ArrowBuf 3
|-+ BufferLedger 4 ==> AllocationManager 2 (owning) ==> UDLE
  | - ArrowBuf 4
  | - ArrowBuf 5
  ` - ArrowBuf 6

在此图中,RootAllocator 拥有三个 ChildAllocator。第一个 ChildAllocator (ChildAllocator 1) 拥有一个后续的 ChildAllocator。ChildAllocator 有两个 BufferLedger/AllocationManager 引用。巧合的是,这些 AllocationManager 中的每一个也与 RootAllocator 相关联。在这种情况下,这些 AllocationManager 之一由 ChildAllocator 3 (AllocationManager 1) 拥有,而另一个 AllocationManager (AllocationManager 2) 由 RootAllocator 拥有/记录。请注意,在这种情况下,ArrowBuf 1 与 ArrowBuf 3 共享底层内存。但是,该内存的子集(例如通过切片)可能不同。另请注意,ArrowBuf 2 和 ArrowBuf 4、5 和 6 也共享相同的底层内存。另请注意,ArrowBuf 4、5 和 6 都共享相同的引用计数和命运。