内存管理#

缓冲区#

为了避免传递具有不同且不明确生命周期规则的原始数据指针,Arrow 提供了一个名为 arrow::Buffer 的通用抽象。缓冲区封装了指针和数据大小,通常还将其生命周期绑定到底层提供者(换句话说,缓冲区应始终指向有效内存直到其销毁)。缓冲区是无类型的:它们只是表示物理内存区域,而不考虑其预期含义或解释。

缓冲区可以由 Arrow 本身分配,也可以由第三方例程分配。例如,可以将 Python 字节串的数据作为 Arrow 缓冲区传递,并根据需要保持 Python 对象存活。

此外,缓冲区有各种类型:可变的或不可变的,可调整大小的或不可调整大小的。通常,在构建数据块时,您将拥有一个可变缓冲区,然后它将被冻结为一个不可变容器,例如 数组

注意

某些缓冲区可能指向非 CPU 内存,例如由 CUDA 上下文提供的基于 GPU 的内存。如果您正在编写一个支持 GPU 的应用程序,则需要注意不要将 GPU 内存指针解释为 CPU 可访问的指针,反之亦然。

访问缓冲区内存#

缓冲区使用 size()data() 访问器(或 mutable_data() 用于对可变缓冲区的可写访问)提供对底层内存的快速访问。

切片#

可以对缓冲区进行零拷贝切片,以获取一个引用底层数据某个连续子集的缓冲区。这可以通过调用 arrow::SliceBuffer()arrow::SliceMutableBuffer() 函数来完成。

分配缓冲区#

您可以通过调用 arrow::AllocateBuffer()arrow::AllocateResizableBuffer() 的重载之一自己分配缓冲区。

arrow::Result<std::unique_ptr<Buffer>> maybe_buffer = arrow::AllocateBuffer(4096);
if (!maybe_buffer.ok()) {
   // ... handle allocation error
}

std::shared_ptr<arrow::Buffer> buffer = *std::move(maybe_buffer);
uint8_t* buffer_data = buffer->mutable_data();
memcpy(buffer_data, "hello world", 11);

以这种方式分配缓冲区可确保它为 64 字节对齐,并根据 Arrow 内存规范 的建议进行填充。

构建缓冲区#

您还可以使用 arrow::BufferBuilder API 来分配和构建缓冲区。

BufferBuilder builder;
builder.Resize(11);  // reserve enough space for 11 bytes
builder.Append("hello ", 6);
builder.Append("world", 5);

auto maybe_buffer = builder.Finish();
if (!maybe_buffer.ok()) {
   // ... handle buffer allocation error
}
std::shared_ptr<arrow::Buffer> buffer = *maybe_buffer;

如果缓冲区旨在包含给定固定宽度类型的值(例如 List 数组的 32 位偏移量),则使用模板 arrow::TypedBufferBuilder API 可能更方便。

TypedBufferBuilder<int32_t> builder;
builder.Reserve(2);  // reserve enough space for two int32_t values
builder.Append(0x12345678);
builder.Append(-0x765643210);

auto maybe_buffer = builder.Finish();
if (!maybe_buffer.ok()) {
   // ... handle buffer allocation error
}
std::shared_ptr<arrow::Buffer> buffer = *maybe_buffer;

内存池#

使用 Arrow C++ API 分配缓冲区时,缓冲区的底层内存由 arrow::MemoryPool 实例分配。通常,这将是进程范围内的 *默认内存池*,但许多 Arrow API 允许您为其内部分配传递另一个 MemoryPool 实例。

内存池用于大型长期数据,例如数组缓冲区。其他数据,例如小型 C++ 对象和临时工作区,通常会通过常规的 C++ 分配器进行分配。

默认内存池#

默认内存池取决于 Arrow C++ 的编译方式。

  • 如果在编译时启用,则为 mimalloc 堆;

  • 否则,如果在编译时启用,则为 jemalloc 堆;

  • 否则,为 C 库 malloc 堆。

覆盖默认内存池#

可以通过设置 ARROW_DEFAULT_MEMORY_POOL 环境变量来覆盖上述选择算法。

STL 集成#

如果您希望使用 Arrow 内存池来分配 STL 容器的数据,可以使用 arrow::stl::allocator 包装器。

相反,您也可以使用 STL 分配器来分配 Arrow 内存,使用 arrow::stl::STLMemoryPool 类。但是,这可能效率较低,因为 STL 分配器不提供调整大小操作。

设备#

许多 Arrow 应用程序只访问主机(CPU)内存。但是,在某些情况下,希望处理设备内存(例如 GPU 上的板载内存)以及主机内存。

Arrow 使用 arrow::Device 抽象来表示 CPU 和其他设备。相关类 arrow::MemoryManager 指定了如何在给定设备上分配内存。每个设备都有一个默认的内存管理器,但可以构造其他实例(例如,包装 CPU 上的自定义 arrow::MemoryPool)。arrow::MemoryManager 实例,它们指定了如何在给定设备上分配内存(例如,使用 CPU 上的特定 arrow::MemoryPool)。

设备无关编程#

如果您从第三方代码接收缓冲区,可以通过调用其 is_cpu() 方法来查询它是否可由 CPU 读取。

您还可以以通用方式查看给定设备上的缓冲区,方法是调用 arrow::Buffer::View()arrow::Buffer::ViewOrCopy()。如果源设备和目标设备相同,这将是一个空操作。否则,一个与设备相关的机制将尝试为目标设备构造一个内存地址,该地址可以访问缓冲区内容。实际的设备到设备传输可能会延迟发生,当读取缓冲区内容时。

类似地,如果您想在缓冲区上进行 I/O 而无需假设 CPU 可读缓冲区,则可以调用 arrow::Buffer::GetReader()arrow::Buffer::GetWriter()

例如,要获取任意缓冲区的 CPU 视图或副本,您只需执行以下操作:

std::shared_ptr<arrow::Buffer> arbitrary_buffer = ... ;
std::shared_ptr<arrow::Buffer> cpu_buffer = arrow::Buffer::ViewOrCopy(
   arbitrary_buffer, arrow::default_cpu_memory_manager());

内存分析#

在 Linux 上,可以使用 perf record 生成内存分配的详细配置文件,而无需修改二进制文件。这些配置文件除了分配大小之外还可以显示回溯。这需要调试符号,可以来自调试构建或带有调试符号构建的发布版本。

注意

如果您正在其他平台上分析 Arrow 的测试,可以使用 Archery 运行以下 Docker 容器以访问 Linux 环境

archery docker run ubuntu-cpp bash
# Inside the Docker container...
/arrow/ci/scripts/cpp_build.sh /arrow /build
cd build/cpp/debug
./arrow-array-test # Run a test
apt-get update
apt-get install -y linux-tools-generic
alias perf=/usr/lib/linux-tools/<version-path>/perf

要跟踪分配,请在使用的每个分配器方法上创建探测点。收集 $params 允许我们记录请求的分配大小,而收集 $retval 允许我们记录已记录的分配的地址,因此我们可以将它们与对 free/de-allocate 的调用相关联。

perf probe -x libarrow.so je_arrow_mallocx '$params'
perf probe -x libarrow.so je_arrow_mallocx%return '$retval'
perf probe -x libarrow.so je_arrow_rallocx '$params'
perf probe -x libarrow.so je_arrow_rallocx%return '$retval'
perf probe -x libarrow.so je_arrow_dallocx '$params'
PROBE_ARGS="-e probe_libarrow:je_arrow_mallocx \
   -e probe_libarrow:je_arrow_mallocx__return \
   -e probe_libarrow:je_arrow_rallocx \
   -e probe_libarrow:je_arrow_rallocx__return \
   -e probe_libarrow:je_arrow_dallocx"
perf probe -x libarrow.so mi_malloc_aligned '$params'
perf probe -x libarrow.so mi_malloc_aligned%return '$retval'
perf probe -x libarrow.so mi_realloc_aligned '$params'
perf probe -x libarrow.so mi_realloc_aligned%return '$retval'
perf probe -x libarrow.so mi_free '$params'
PROBE_ARGS="-e probe_libarrow:mi_malloc_aligned \
   -e probe_libarrow:mi_malloc_aligned__return \
   -e probe_libarrow:mi_realloc_aligned \
   -e probe_libarrow:mi_realloc_aligned__return \
   -e probe_libarrow:mi_free"

设置探测点后,可以使用 perf record 记录与回溯相关的调用。在本示例中,我们正在运行 Arrow 中的 StructArray 单元测试

perf record -g --call-graph dwarf \
  $PROBE_ARGS \
  ./arrow-array-test --gtest_filter=StructArray*

如果您想分析正在运行的进程,可以运行 perf record -p <PID>,它将一直记录到您使用 CTRL+C 中断为止。或者,可以执行 perf record -P <PID> sleep 10 来记录 10 秒。

可以使用标准工具处理生成的数据来处理 perf,或者可以使用 perf script 将数据的文本格式管道传输到自定义脚本。以下脚本解析 perf script 输出,并将输出以换行符分隔的 JSON 格式打印出来,以便更容易处理。

process_perf_events.py#
import sys
import re
import json

# Example non-traceback line
# arrow-array-tes 14344 [003]  7501.073802: probe_libarrow:je_arrow_mallocx: (7fbcd20bb640) size=0x80 flags=6

current = {}
current_traceback = ''

def new_row():
    global current_traceback
    current['traceback'] = current_traceback
    print(json.dumps(current))
    current_traceback = ''

for line in sys.stdin:
    if line == '\n':
        continue
    elif line[0] == '\t':
        # traceback line
        current_traceback += line.strip("\t")
    else:
        line = line.rstrip('\n')
        if not len(current) == 0:
            new_row()
        parts = re.sub(' +', ' ', line).split(' ')

        parts.reverse()
        parts.pop() # file
        parts.pop() # "14344"
        parts.pop() # "[003]"

        current['time'] = float(parts.pop().rstrip(":"))
        current['event'] = parts.pop().rstrip(":")

        parts.pop() # (7fbcd20bddf0)
        if parts[-1] == "<-":
            parts.pop()
            parts.pop()

        params = {}

        for pair in parts:
            key, value = pair.split("=")
            params[key] = value

        current['params'] = params

以下是该脚本的示例调用,以及输出数据的预览

$ perf script | python3 /arrow/process_perf_events.py > processed_events.jsonl
$ head processed_events.jsonl | cut -c -120
{"time": 14814.954378, "event": "probe_libarrow:je_arrow_mallocx", "params": {"flags": "6", "size": "0x80"}, "traceback"
{"time": 14814.95443, "event": "probe_libarrow:je_arrow_mallocx__return", "params": {"arg1": "0x7f4a97e09000"}, "traceba
{"time": 14814.95448, "event": "probe_libarrow:je_arrow_mallocx", "params": {"flags": "6", "size": "0x40"}, "traceback":
{"time": 14814.954486, "event": "probe_libarrow:je_arrow_mallocx__return", "params": {"arg1": "0x7f4a97e0a000"}, "traceb
{"time": 14814.954502, "event": "probe_libarrow:je_arrow_rallocx", "params": {"flags": "6", "size": "0x40", "ptr": "0x7f
{"time": 14814.954507, "event": "probe_libarrow:je_arrow_rallocx__return", "params": {"arg1": "0x7f4a97e0a040"}, "traceb
{"time": 14814.954796, "event": "probe_libarrow:je_arrow_mallocx", "params": {"flags": "6", "size": "0x40"}, "traceback"
{"time": 14814.954805, "event": "probe_libarrow:je_arrow_mallocx__return", "params": {"arg1": "0x7f4a97e0a080"}, "traceb
{"time": 14814.954817, "event": "probe_libarrow:je_arrow_mallocx", "params": {"flags": "6", "size": "0x40"}, "traceback"
{"time": 14814.95482, "event": "probe_libarrow:je_arrow_mallocx__return", "params": {"arg1": "0x7f4a97e0a0c0"}, "traceba

从那里,可以回答许多问题。例如,以下脚本将查找从未释放的分配,并打印相关的回溯以及悬挂分配的数量

count_tracebacks.py#
'''Find tracebacks of allocations with no corresponding free'''
import sys
import json
from collections import defaultdict

allocated = dict()

for line in sys.stdin:
    line = line.rstrip('\n')
    data = json.loads(line)

    if data['event'] == "probe_libarrow:je_arrow_mallocx__return":
        address = data['params']['arg1']
        allocated[address] = data['traceback']
    elif data['event'] == "probe_libarrow:je_arrow_rallocx":
        address = data['params']['ptr']
        del allocated[address]
    elif data['event'] == "probe_libarrow:je_arrow_rallocx__return":
        address = data['params']['arg1']
        allocated[address] = data['traceback']
    elif data['event'] == "probe_libarrow:je_arrow_dallocx":
        address = data['params']['ptr']
        if address in allocated:
            del allocated[address]
    elif data['event'] == "probe_libarrow:mi_malloc_aligned__return":
        address = data['params']['arg1']
        allocated[address] = data['traceback']
    elif data['event'] == "probe_libarrow:mi_realloc_aligned":
        address = data['params']['p']
        del allocated[address]
    elif data['event'] == "probe_libarrow:mi_realloc_aligned__return":
        address = data['params']['arg1']
        allocated[address] = data['traceback']
    elif data['event'] == "probe_libarrow:mi_free":
        address = data['params']['p']
        if address in allocated:
            del allocated[address]

traceback_counts = defaultdict(int)

for traceback in allocated.values():
    traceback_counts[traceback] += 1

for traceback, count in sorted(traceback_counts.items(), key=lambda x: -x[1]):
    print("Num of dangling allocations:", count)
    print(traceback)

该脚本可以像这样调用

$ cat processed_events.jsonl | python3 /arrow/count_tracebacks.py
Num of dangling allocations: 1
 7fc945e5cfd2 arrow::(anonymous namespace)::JemallocAllocator::ReallocateAligned+0x13b (/build/cpp/debug/libarrow.so.700.0.0)
 7fc945e5fe4f arrow::BaseMemoryPoolImpl<arrow::(anonymous namespace)::JemallocAllocator>::Reallocate+0x93 (/build/cpp/debug/libarrow.so.700.0.0)
 7fc945e618f7 arrow::PoolBuffer::Resize+0xed (/build/cpp/debug/libarrow.so.700.0.0)
 55a38b163859 arrow::BufferBuilder::Resize+0x12d (/build/cpp/debug/arrow-array-test)
 55a38b163bbe arrow::BufferBuilder::Finish+0x48 (/build/cpp/debug/arrow-array-test)
 55a38b163e3a arrow::BufferBuilder::Finish+0x50 (/build/cpp/debug/arrow-array-test)
 55a38b163f90 arrow::BufferBuilder::FinishWithLength+0x4e (/build/cpp/debug/arrow-array-test)
 55a38b2c8fa7 arrow::TypedBufferBuilder<int, void>::FinishWithLength+0x4f (/build/cpp/debug/arrow-array-test)
 55a38b2bcce7 arrow::NumericBuilder<arrow::Int32Type>::FinishInternal+0x107 (/build/cpp/debug/arrow-array-test)
 7fc945c065ae arrow::ArrayBuilder::Finish+0x5a (/build/cpp/debug/libarrow.so.700.0.0)
 7fc94736ed41 arrow::ipc::internal::json::(anonymous namespace)::Converter::Finish+0x123 (/build/cpp/debug/libarrow.so.700.0.0)
 7fc94737426e arrow::ipc::internal::json::ArrayFromJSON+0x299 (/build/cpp/debug/libarrow.so.700.0.0)
 7fc948e98858 arrow::ArrayFromJSON+0x64 (/build/cpp/debug/libarrow_testing.so.700.0.0)
 55a38b6773f3 arrow::StructArray_FlattenOfSlice_Test::TestBody+0x79 (/build/cpp/debug/arrow-array-test)
 7fc944689633 testing::internal::HandleSehExceptionsInMethodIfSupported<testing::Test, void>+0x68 (/build/cpp/googletest_ep-prefix/lib/libgtestd.so.1.11.0)
 7fc94468132a testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>+0x5d (/build/cpp/googletest_ep-prefix/lib/libgtestd.so.1.11.0)
 7fc9446555eb testing::Test::Run+0xf1 (/build/cpp/googletest_ep-prefix/lib/libgtestd.so.1.11.0)
 7fc94465602d testing::TestInfo::Run+0x13f (/build/cpp/googletest_ep-prefix/lib/libgtestd.so.1.11.0)
 7fc944656947 testing::TestSuite::Run+0x14b (/build/cpp/googletest_ep-prefix/lib/libgtestd.so.1.11.0)
 7fc9446663f5 testing::internal::UnitTestImpl::RunAllTests+0x433 (/build/cpp/googletest_ep-prefix/lib/libgtestd.so.1.11.0)
 7fc94468ab61 testing::internal::HandleSehExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>+0x68 (/build/cpp/googletest_ep-prefix/lib/libgtestd.so.1.11.0)
 7fc944682568 testing::internal::HandleExceptionsInMethodIfSupported<testing::internal::UnitTestImpl, bool>+0x5d (/build/cpp/googletest_ep-prefix/lib/libgtestd.so.1.11.0)
 7fc944664b0c testing::UnitTest::Run+0xcc (/build/cpp/googletest_ep-prefix/lib/libgtestd.so.1.11.0)
 7fc9446d0299 RUN_ALL_TESTS+0x14 (/build/cpp/googletest_ep-prefix/lib/libgtest_maind.so.1.11.0)
 7fc9446d021b main+0x42 (/build/cpp/googletest_ep-prefix/lib/libgtest_maind.so.1.11.0)
 7fc9441e70b2 __libc_start_main+0xf2 (/usr/lib/x86_64-linux-gnu/libc-2.31.so)
 55a38b10a50d _start+0x2d (/build/cpp/debug/arrow-array-test)