Arrow C 设备数据接口#

警告

Arrow C 设备数据接口应被视为实验性的

原理#

当前的 C 数据接口及其大多数实现都假设提供的所有数据缓冲区都是 CPU 缓冲区。由于 Apache Arrow 旨在成为表示表格(“柱状”)数据的通用内存格式,因此人们希望在非 CPU 硬件(例如 GPU)上利用此数据。这种案例的一个例子是 RAPIDS cuDF 库,它将 Arrow 内存格式与 CUDA 用于 NVIDIA GPU。由于将数据从主机复制到设备再复制回来开销很大,因此理想的情况是能够将数据保留在设备上尽可能长的时间,即使在运行时和库之间传递数据时也是如此。

Arrow C 设备数据接口建立在现有的 C 数据接口之上,通过向其添加一组非常小且稳定的 C 定义。 这些定义等效于 C 数据接口中的 ArrowArrayArrowArrayStream 结构,这些结构添加了成员以允许指定设备类型并传递必要的信息以与生产者同步。 对于非 C/C++ 语言和运行时,将 C 定义转换为相应的 C FFI 声明应该与当前 C 数据接口一样简单。

然后,应用程序和库可以使用非 CPU 设备上的 Arrow 模式和 Arrow 格式化内存来轻松交换数据,就像现在使用 CPU 数据一样。 这将使数据在这些设备上保留更长时间,并避免在主机和设备之间进行昂贵的来回复制,仅仅是为了利用新的库和运行时。

目标#

  • 公开一个建立在现有 C 数据接口之上的 ABI 稳定接口。

  • 使第三方项目能够轻松地实现支持,而无需大量的初始投资。

  • 允许在同一进程中运行的独立运行时和组件之间零拷贝共享 Arrow 格式化的设备内存。

  • 避免需要像 CUDA Array Interface 这样的 Python 进程的 一对一适配层来传递 CUDA 数据。

  • 无需显式依赖(编译时或运行时)Arrow 软件项目本身即可实现集成。

Arrow C 设备数据接口的目的是扩展当前 C 数据接口的范围,使其也成为 GPU 或 FPGA 等设备上柱状处理的标准底层构建块。

结构定义#

由于它建立在 C 数据接口之上,因此 C 设备数据接口使用 C 数据接口规范中定义的 ArrowSchemaArrowArray 结构。 然后,它添加以下独立定义。 与 Arrow 项目的其余部分一样,它们在 Apache License 2.0 下可用。

#ifndef ARROW_C_DEVICE_DATA_INTERFACE
#define ARROW_C_DEVICE_DATA_INTERFACE

// Device type for the allocated memory
typedef int32_t ArrowDeviceType;

// CPU device, same as using ArrowArray directly
#define ARROW_DEVICE_CPU 1
// CUDA GPU Device
#define ARROW_DEVICE_CUDA 2
// Pinned CUDA CPU memory by cudaMallocHost
#define ARROW_DEVICE_CUDA_HOST 3
// OpenCL Device
#define ARROW_DEVICE_OPENCL 4
// Vulkan buffer for next-gen graphics
#define ARROW_DEVICE_VULKAN 7
// Metal for Apple GPU
#define ARROW_DEVICE_METAL 8
// Verilog simulator buffer
#define ARROW_DEVICE_VPI 9
// ROCm GPUs for AMD GPUs
#define ARROW_DEVICE_ROCM 10
// Pinned ROCm CPU memory allocated by hipMallocHost
#define ARROW_DEVICE_ROCM_HOST 11
// Reserved for extension
//
// used to quickly test extension devices, semantics
// can differ based on implementation
#define ARROW_DEVICE_EXT_DEV 12
// CUDA managed/unified memory allocated by cudaMallocManaged
#define ARROW_DEVICE_CUDA_MANAGED 13
// Unified shared memory allocated on a oneAPI
// non-partitioned device.
//
// A call to the oneAPI runtime is required to determine the
// device type, the USM allocation type and the sycl context
// that it is bound to.
#define ARROW_DEVICE_ONEAPI 14
// GPU support for next-gen WebGPU standard
#define ARROW_DEVICE_WEBGPU 15
// Qualcomm Hexagon DSP
#define ARROW_DEVICE_HEXAGON 16

struct ArrowDeviceArray {
  struct ArrowArray array;
  int64_t device_id;
  ArrowDeviceType device_type;
  void* sync_event;

  // reserved bytes for future expansion
  int64_t reserved[3];
};

#endif  // ARROW_C_DEVICE_DATA_INTERFACE

注意

规范保护 ARROW_C_DEVICE_DATA_INTERFACE 旨在避免如果两个项目在其自己的头文件中复制定义,并且第三方项目从这两个项目包含,则会发生重复定义的情况。 因此,重要的是,复制这些定义时,此保护应保持完全不变。

ArrowDeviceType#

ArrowDeviceType typedef 用于指示提供的内存缓冲区是在哪种类型的设备上分配的。 这与 device_id 结合使用,应该足以引用正确的数据缓冲区。

然后,我们使用宏来定义不同设备类型的值。 提供的宏值与广泛使用的 dlpack DLDeviceType 定义值兼容,对每个值使用与 dlpack.h 中的等效 kDL<type> 枚举相同的值。 该列表将与这些等效的枚举值保持同步,以确保兼容性,而不是可能出现差异。 为了避免 Arrow 项目必须审查新硬件设备,新的添加应首先添加到 dlpack,然后再在此处添加相应的宏。

为了确保 ABI 的可预测性,我们使用宏而不是 enum,因此存储类型与编译器无关。

ARROW_DEVICE_CPU#

CPU 设备,相当于直接使用 ArrowArray 而不是使用 ArrowDeviceArray

ARROW_DEVICE_CUDA#

CUDA GPU 设备。 这可以表示使用运行时库 (cudaMalloc) 或设备驱动程序 (cuMemAlloc) 分配的数据。

ARROW_DEVICE_CUDA_HOST#

使用 cudaMallocHostcuMemAllocHost 由 CUDA 锁定并进行分页的 CPU 内存。

ARROW_DEVICE_OPENCL#

使用 OpenCL (Open Computing Language) 框架在设备上分配的数据。

ARROW_DEVICE_VULKAN#

Vulkan 框架和库分配的数据。

ARROW_DEVICE_METAL#

使用 Metal 框架和库的 Apple GPU 设备上的数据。

ARROW_DEVICE_VPI#

指示 Verilog 模拟器缓冲区的使用。

ARROW_DEVICE_ROCM#

使用 ROCm 堆栈的 AMD 设备。

ARROW_DEVICE_ROCM_HOST#

使用 hipMallocHost 由 ROCm 锁定并进行分页的 CPU 内存。

ARROW_DEVICE_EXT_DEV#

此值是设备扩展的转义通道,当前未以其他方式表示。 如果使用此设备类型,生产者将需要提供特定于设备的附加信息/上下文。 这用于快速测试扩展设备,并且语义可能因实现而异。

ARROW_DEVICE_CUDA_MANAGED#

cudaMallocManaged 分配的 CUDA 托管/统一内存。

ARROW_DEVICE_ONEAPI#

在英特尔 oneAPI 非分区设备上分配的统一共享内存。 需要调用 oneAPI 运行时来确定特定设备类型、USM 分配类型以及它绑定到的 sycl 上下文。

ARROW_DEVICE_WEBGPU#

对下一代 WebGPU 标准的 GPU 支持

ARROW_DEVICE_HEXAGON#

在 Qualcomm Hexagon DSP 设备上分配的数据。

ArrowDeviceArray 结构#

ArrowDeviceArray 结构嵌入了 C 数据 ArrowArray 结构,并添加了消费者使用数据所需的附加信息。 它具有以下字段

struct ArrowArray ArrowDeviceArray.array#

强制性。 已分配的数组数据。 void** 缓冲区中的值(以及任何子级的缓冲区)是在设备上分配的内容。 缓冲区值应该是设备指针。 结构的其余部分应该可以被 CPU 访问。

此结构的 private_datarelease 回调应包含根据其分配的设备释放数组所需的任何必要信息和结构,而不是在此处具有单独的 release 回调和 private_data 指针。

int64_t ArrowDeviceArray.device_id#

强制性。 设备 ID 用于在系统上存在多个此类设备时标识特定设备。 ID 的语义将取决于硬件,但我们使用 int64_t 来为 ID 提供面向未来的能力,因为设备会随着时间的推移而发生变化。

对于没有设备标识符内在概念的设备类型(例如,ARROW_DEVICE_CPU),建议使用 device_id -1 作为约定。

ArrowDeviceType ArrowDeviceArray.device_type#

强制性。 可以访问数组中缓冲区的设备类型。

void *ArrowDeviceArray.sync_event#

可选。 如果需要,可用于同步的类事件对象。

许多设备(例如 GPU)主要相对于 CPU 处理是异步的。 因此,为了安全地访问设备内存,通常需要一个对象来同步处理。 由于不同的设备将使用不同的类型来指定此对象,因此我们使用一个 void*,可以将其强制转换为指向设备适当类型的指针。

如果不需要同步,则可以将其设为空。 如果它不是空值,则在尝试访问缓冲区中的内存之前,必须使用它来调用设备的相应同步方法(例如 cudaStreamWaitEventhipStreamWaitEvent)。

如果提供了事件,则生产者必须确保导出的数据在触发该事件之前在设备上可用。 消费者应该在尝试访问导出的数据之前等待该事件。

另请参阅

下面的 同步事件类型 部分。

int64_t ArrowDeviceArray.reserved[3]#

随着非 CPU 开发的扩展,可能需要扩展此结构。 为了在不潜在地破坏 ABI 更改的情况下执行此操作,我们在对象的末尾保留 24 个字节。 生产者初始化后,必须将这些字节清零,以确保将来安全地发展 ABI。

同步事件类型#

下表列出了每种设备类型预期的事件类型。 如果不支持任何事件类型 (“N/A”),则 sync_event 成员应始终为空。

请记住,如果不需要同步来访问数据,则事件可以为空。

设备类型

实际事件类型

注释

ARROW_DEVICE_CPU

N/A

ARROW_DEVICE_CUDA

cudaEvent_t*

ARROW_DEVICE_CUDA_HOST

cudaEvent_t*

ARROW_DEVICE_OPENCL

cl_event*

ARROW_DEVICE_VULKAN

VkEvent*

ARROW_DEVICE_METAL

MTLEvent*

ARROW_DEVICE_VPI

N/A

ARROW_DEVICE_ROCM

hipEvent_t*

ARROW_DEVICE_ROCM_HOST

hipEvent_t*

ARROW_DEVICE_EXT_DEV

ARROW_DEVICE_CUDA_MANAGED

cudaEvent_t*

ARROW_DEVICE_ONEAPI

sycl::event*

ARROW_DEVICE_WEBGPU

N/A

ARROW_DEVICE_HEXAGON

N/A

注释

  • (1) 目前尚不清楚框架是否具有支持的事件类型。

  • (2) 扩展设备具有生产者定义的语义,因此如果扩展设备需要同步,生产者应记录该类型。

语义#

内存管理#

首先也是最重要的:在此接口的所有内容中,只有数据缓冲区本身驻留在设备内存中(即 ArrowArray 结构的 buffers 成员)。 其他所有内容都应该位于 CPU 内存中。

ArrowDeviceArray 结构包含一个 ArrowArray 对象,该对象本身具有用于释放内存的特定语义。 下面的术语“基本结构”是指生产者和使用者之间直接传递的 ArrowDeviceArray 对象 – 而不是其任何子结构。

基本结构旨在由使用者进行堆栈或堆分配。 在这种情况下,生产者 API 应采用指向使用者分配结构的指针。

但是,结构指向的任何数据必须由生产者分配和维护。 这包括 sync_event 成员(如果它不是空值)以及 ArrowArray 对象中的任何指针(与往常一样)。 数据生命周期通过 ArrowArray 成员的 release 回调来管理。

对于 ArrowDeviceArray,已释放结构的语义和回调语义与 ArrowArray 本身 的语义相同。 释放设备数据缓冲区所需的任何生产者特定上下文信息(以及任何已分配的事件)应存储在 ArrowArrayprivate_data 成员中,并由 release 回调管理。

移动数组#

使用者可以通过按位复制或浅层成员复制来移动 ArrowDeviceArray 结构。 然后,它必须通过将嵌入式 ArrowArray 结构的 release 成员设置为 NULL 来标记源结构已释放,但调用该 release 回调。 这可确保在任何给定时间只有一个活动结构的副本处于活动状态,并且生命周期已正确传达给生产者。

与往常一样,当不再需要目标结构时,将在其上调用 release 回调。

记录批次#

与 C 数据接口本身一样,记录批次可以简单地视为等效的结构数组。 在这种情况下,顶层 ArrowSchema 的元数据可用于记录批次的架构级元数据。

可变性#

生产者和使用者都应将导出的数据(即可通过嵌入式 ArrowArraybuffers 成员在设备上访问的数据)视为不可变的,因为任何一方都可能在另一方更改数据时看到不一致的数据。

同步#

如果 sync_event 成员不是 NULL,则使用者不应尝试访问或读取数据,直到他们在该事件上同步为止。 如果 sync_event 成员为 NULL,则必须可以安全地访问数据,而无需使用者进行任何必要的同步。

C 生产者示例#

导出简单的 int32 设备数组#

导出具有空元数据的不可为空的 int32 类型。 可以在 C 数据接口文档直接中看到此示例。

要导出数据本身,我们通过 release 回调将所有权转移给使用者。 此示例将使用 CUDA,但等效的调用可用于任何设备

static void release_int32_device_array(struct ArrowArray* array) {
    assert(array->n_buffers == 2);
    // destroy the event
    cudaEvent_t* ev_ptr = (cudaEvent_t*)(array->private_data);
    cudaError_t status = cudaEventDestroy(*ev_ptr);
    assert(status == cudaSuccess);
    free(ev_ptr);

    // free the buffers and the buffers array
    status = cudaFree(array->buffers[1]);
    assert(status == cudaSuccess);
    free(array->buffers);

    // mark released
    array->release = NULL;
}

void export_int32_device_array(void* cudaAllocedPtr,
                               cudaStream_t stream,
                               int64_t length,
                               struct ArrowDeviceArray* array) {
    // get device id
    int device;
    cudaError_t status;
    status = cudaGetDevice(&device);
    assert(status == cudaSuccess);

    cudaEvent_t* ev_ptr = (cudaEvent_t*)malloc(sizeof(cudaEvent_t));
    assert(ev_ptr != NULL);
    status = cudaEventCreate(ev_ptr);
    assert(status == cudaSuccess);

    // record event on the stream, assuming that the passed in
    // stream is where the work to produce the data will be processing.
    status = cudaEventRecord(*ev_ptr, stream);
    assert(status == cudaSuccess);

    memset(array, 0, sizeof(struct ArrowDeviceArray));
    // initialize fields
    *array = (struct ArrowDeviceArray) {
        .array = (struct ArrowArray) {
            .length = length,
            .null_count = 0,
            .offset = 0,
            .n_buffers = 2,
            .n_children = 0,
            .children = NULL,
            .dictionary = NULL,
            // bookkeeping
            .release = &release_int32_device_array,
            // store the event pointer as private data in the array
            // so that we can access it in the release callback.
            .private_data = (void*)(ev_ptr),
        },
        .device_id = (int64_t)(device),
        .device_type = ARROW_DEVICE_CUDA,
        // pass the event pointer to the consumer
        .sync_event = (void*)(ev_ptr),
    };

    // allocate list of buffers
    array->array.buffers = (const void**)malloc(sizeof(void*) * array->array.n_buffers);
    assert(array->array.buffers != NULL);
    array->array.buffers[0] = NULL;
    array->array.buffers[1] = cudaAllocedPtr;
}

// calling the release callback should be done using the array member
// of the device array.
static void release_device_array_helper(struct ArrowDeviceArray* arr) {
    arr->array.release(&arr->array);
}

设备流接口#

C 流接口一样,C 设备数据接口还指定了一个更高级别的结构,用于简化单个进程中流数据的通信。

语义#

一个 Arrow C 设备数据流暴露了一个数据块的流式源,每个数据块都具有相同的模式。通过调用一个阻塞式的拉取迭代函数来获取数据块。预期所有数据块都应在相同的设备类型上提供数据(但不一定是相同的设备 ID)。如果需要在多个设备类型上提供数据流,生产者应为每种设备类型提供一个单独的数据流对象。

结构定义#

C 设备数据流接口由单个 struct 定义。

#ifndef ARROW_C_DEVICE_STREAM_INTERFACE
#define ARROW_C_DEVICE_STREAM_INTERFACE

struct ArrowDeviceArrayStream {
    // device type that all arrays will be accessible from
    ArrowDeviceType device_type;
    // callbacks
    int (*get_schema)(struct ArrowDeviceArrayStream*, struct ArrowSchema*);
    int (*get_next)(struct ArrowDeviceArrayStream*, struct ArrowDeviceArray*);
    const char* (*get_last_error)(struct ArrowDeviceArrayStream*);

    // release callback
    void (*release)(struct ArrowDeviceArrayStream*);

    // opaque producer-specific data
    void* private_data;
};

#endif  // ARROW_C_DEVICE_STREAM_INTERFACE

注意

规范的保护符 ARROW_C_DEVICE_STREAM_INTERFACE 旨在避免重复定义的情况,即如果两个项目将 C 设备数据流接口定义复制到它们自己的头文件中,并且第三方项目从这两个项目都包含了这些定义。因此,当复制这些定义时,务必保持此保护符完全不变。

ArrowDeviceArrayStream 结构#

ArrowDeviceArrayStream 提供了一种设备类型,该类型可以访问结果数据以及与 Arrow 数组的流式源交互所需的调回函数。它具有以下字段:

ArrowDeviceType device_type#

必需。 此数据流在其上生成数据的设备类型。此数据流生成的所有 ArrowDeviceArray 应该具有与此处设置的相同的设备类型。这为使用者提供了一种便利,无需检查检索到的每个数组,而是允许用于数据流的更高级别的编码结构。

int (*ArrowDeviceArrayStream.get_schema)(struct ArrowDeviceArrayStream*, struct ArrowSchema *out)#

必需。 此回调允许使用者查询数据流中数据块的模式。该模式对于所有数据块都是相同的。

不得在已释放的 ArrowDeviceArrayStream 上调用此回调。

返回值: 成功时返回 0,否则返回非零的错误代码

int (*ArrowDeviceArrayStream.get_next)(struct ArrowDeviceArrayStream*, struct ArrowDeviceArray *out)#

必需。 此回调允许使用者获取数据流中的下一个数据块。

不得在已释放的 ArrowDeviceArrayStream 上调用此回调。

下一个数据块必须可以从与 ArrowDeviceArrayStream.device_type 匹配的设备类型访问。

返回值: 成功时返回 0,否则返回非零的错误代码

成功后,使用者必须检查 ArrowDeviceArray 的嵌入式 ArrowArray 是否标记为已释放。如果嵌入的 ArrowDeviceArray.array 已释放,则表示已到达数据流的末尾。否则,ArrowDeviceArray 包含有效的数据块。

const char *(*ArrowDeviceArrayStream.get_last_error)(struct ArrowDeviceArrayStream*)#

必需。 此回调允许使用者获取上次错误的文本描述。

仅当对 ArrowDeviceArrayStream 的上次操作返回错误时,才必须调用此回调。不得在已释放的 ArrowDeviceArrayStream 上调用它。

返回值: 指向以 NULL 结尾的字符串(UTF8 编码)的指针。如果不存在详细描述,也可以返回 NULL。

仅保证返回的指针在下次调用数据流的回调函数之一之前有效。如果希望字符字符串保持更长时间,则应将其指向的字符字符串复制到使用者管理的存储中。

void (*ArrowDeviceArrayStream.release)(struct ArrowDeviceArrayStream*)#

必需。 指向生产者提供的释放回调函数的指针。

void *ArrowDeviceArrayStream.private_data#

可选。 指向生产者提供的私有数据的非透明指针。

使用者不得处理此成员。此成员的生命周期由生产者处理,尤其是由释放回调函数处理。

结果生命周期#

get_schemaget_next 回调函数返回的数据必须单独释放。它们的生命周期与 ArrowDeviceArrayStream 的生命周期无关。

数据流生命周期#

C 数据流的生命周期使用释放回调函数进行管理,其用法与C 数据接口中的用法类似。

线程安全性#

不假定数据流源是线程安全的。希望从多个线程调用 get_next 的使用者应确保这些调用是序列化的。

异步设备数据流接口#

警告

实验性:异步 C 设备数据流接口在其当前形式中是实验性的。根据反馈和使用情况,协议定义可能会更改,直到完全标准化。

C 数据流接口提供了一个同步 API,该 API 以使用者调用生产者函数来检索下一个记录批次为中心。对于生产者和使用者之间的并发通信,可以使用 ArrowAsyncDeviceStreamHandler。此接口不带主观意见,可以适应不同的并发通信模型。

语义#

异步接口不是由生产者提供一个回调函数结构供使用者调用和检索记录,而是一个由使用者分配和填充的结构。使用者分配的结构为生产者提供处理程序回调,以便在模式和数据块可用时调用。

除了 ArrowAsyncDeviceStreamHandler 之外,还有两个额外的结构用于完整的数据流:ArrowAsyncTaskArrowAsyncProducer

结构定义#

C 设备异步数据流接口由三个 struct 定义组成。

#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
#define ARROW_C_ASYNC_STREAM_INTERFACE

struct ArrowAsyncTask {
  int (*extract_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* out);

  void* private_data;
};

struct ArrowAsyncProducer {
  ArrowDeviceType device_type;

  void (*request)(struct ArrowAsyncProducer* self, int64_t n);
  void (*cancel)(struct ArrowAsyncProducer* self);

  void (*release)(struct ArrowAsyncProducer* self);
  const char* additional_metadata;
  void* private_data;
};

struct ArrowAsyncDeviceStreamHandler {
  // consumer-specific handlers
  int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
                   struct ArrowSchema* stream_schema);
  int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
                      struct ArrowAsyncTask* task, const char* metadata);
  void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self,
                   int code, const char* message, const char* metadata);

  // release callback
  void (*release)(struct ArrowAsyncDeviceStreamHandler* self);

  // must be populated before calling any callbacks
  struct ArrowAsyncProducer* producer;

  // opaque handler-specific data
  void* private_data;
};

#endif  // ARROW_C_ASYNC_STREAM_INTERFACE

注意

规范的保护符 ARROW_C_ASYNC_STREAM_INTERFACE 旨在避免重复定义的情况,即如果两个项目将 C 异步数据流接口定义复制到它们自己的头文件中,并且第三方项目从这两个项目都包含了这些定义。因此,当复制这些定义时,务必保持此保护符完全不变。

ArrowAsyncDeviceStreamHandler 结构#

该结构具有以下字段:

int (*ArrowAsyncDeviceStreamHandler.on_schema)(struct ArrowAsyncDeviceStreamHandler*, struct ArrowSchema*)#

必需。 用于接收流的模式的处理器。所有传入的记录都应与提供的模式匹配。如果成功,该函数应返回 0,否则应返回一个与 errno 兼容的错误代码。

如果生产者想要提供任何额外的上下文信息,它可以将 ArrowAsyncProducer.additional_metadata 设置为非 NULL 值。这与 ArrowSchema.metadata 采用相同的格式进行编码。如果此元数据不是 NULL,则其生存期应与 ArrowAsyncProducer 对象的生存期相关联。

除非调用 on_error 处理器,否则此函数始终会精确调用一次,并将是此对象上调用的第一个方法。因此,生产者必须在调用此函数之前填充 ArrowAsyncProducer 成员,以便消费者可以应用反压力并控制数据流。生产者保持 ArrowAsyncProducer 的所有权,并且必须ArrowAsyncDeviceStreamHandler 上调用释放回调之后清理它。

如果生产者在此处收到非零结果,则不得随后调用此对象上的释放回调以外的任何内容。

int (*ArrowAsyncDeviceStreamHandler.on_next_task)(struct ArrowAsyncDeviceStreamHandler*, struct ArrowAsyncTask*, const char*)#

必需。 当有新记录可供处理时要调用的处理器。每个记录的模式应与调用 on_schema 时使用的模式相同。如果成功处理,该函数应返回 0,否则应返回一个与 errno 兼容的错误代码。

它不是传递记录本身,而是接收一个 ArrowAsyncTask,以便更好地进行以消费者为中心的线程控制,以便接收数据。调用此函数只是表明可以通过提供的任务获得数据。

生产者通过传递 NULL 作为 ArrowAsyncTask 指针而不是有效地址来表示流的结束。此任务对象仅在此函数调用的生命周期内有效。如果消费者想要在此方法范围之外使用该任务,则必须将其内容复制或移动到新的 ArrowAsyncTask 对象。

const char* 参数的存在是为了让生产者提供他们想要的任何额外的上下文信息。这与 ArrowSchema.metadata 采用相同的格式进行编码。如果不是 NULL,则生存期仅限于调用此函数的范围。想要在此调用生命周期之外维护附加元数据的消费者必须自己复制该值。

生产者不得从多个线程并发调用此函数。

必须调用 ArrowAsyncProducer.request 回调才能开始接收对此处理器的调用。

void (*ArrowAsyncDeviceStreamHandler.on_error)(struct ArrowAsyncDeviceStreamHandler, int, const char*, const char*)#

必需。 生产者遇到错误时要调用的处理器。调用此函数后,将调用 release 回调作为此结构上的最后一个调用。这些参数是一个与 errno 兼容的错误代码和一个可选的错误消息和元数据。

如果消息和元数据不是 NULL,则它们的生存期仅在此调用的范围内有效。想要在此函数返回后保持这些值的消费者必须自己复制这些值。

如果元数据参数不是 NULL,要提供键值错误元数据,则应以与在 ArrowSchema.metadata 中编码元数据的方式相同的方式进行编码。

生产者可以在调用 ArrowAsyncProducer.request 之前或之后调用此函数是有效的。此回调不得调用 ArrowAsyncProducer 对象的任何方法。

void (*ArrowAsyncDeviceStreamHandler.release)(struct ArrowAsyncDeviceStreamHandler*)#

必需。 指向消费者提供的处理器的释放回调的指针。

生产者可以在调用 ArrowAsyncProducer.request 之前或之后调用此函数是有效的。 这不得调用 ArrowAsyncProducer 对象的任何方法。

struct ArrowAsyncProducer ArrowAsyncDeviceStreamHandler.producer#

必需。 消费者将用于请求其他数据或取消的生产者对象。

生产者必须在调用 ArrowAsyncDeviceStreamHandler.on_schema 回调之前填充此对象。生产者保持此对象的所有权,并且必须ArrowAsyncDeviceStreamHandler 上调用释放回调之后清理它。

在调用 on_schema 回调之前,消费者不能假定它是有效的。

void *ArrowAsyncDeviceStreamHandler.private_data#

可选。 指向消费者提供的私有数据的不透明指针。

生产者不得处理此成员。此成员的生存期由消费者处理,尤其是由释放回调处理。

ArrowAsyncTask 结构#

使用 Task 对象而不是直接将数组传递给 on_next 回调函数的目的是为了实现更复杂和高效的线程处理。利用 Task 对象允许生产者将“解码”逻辑与 I/O 分离,使消费者能够避免在 CPU 核心之间传输数据(例如,从一个 L1/L2 缓存到另一个)。

这种由生产者提供的结构体具有以下字段

int (*ArrowArrayTask.extract_data)(struct ArrowArrayTask*, struct ArrowDeviceArray*)#

必需的。一个回调函数,用于使用可用数据填充提供的 ArrowDeviceArray。生产者提供的 ArrowAsyncTasks 的顺序使消费者能够知道要处理的数据的顺序。如果消费者不关心此任务拥有的数据,它仍然必须调用 extract_data,以便生产者可以执行任何必需的清理。NULL 应该作为设备数组指针传递,以表明消费者不需要实际数据,从而允许任务执行必要的清理。

如果从此函数返回非零值,则应仅在生产者调用 ArrowAsyncDeviceStreamHandleron_error 回调函数之后。 由于调用此方法很可能与当前控制流分离,因此返回非零值以指示发生的错误允许当前线程决定如何相应地处理该情况,同时仍然允许所有错误日志记录和处理集中在 ArrowAsyncDeviceStreamHandler.on_error 回调函数中。

不需要单独的释放回调,任何必需的清理都应作为此回调调用的一部分执行。 Array 的所有权被赋予作为参数传入的指针,并且必须单独释放此数组。

仅调用此方法一次有效。

void *ArrowArrayTask.private_data#

可选。 指向生产者提供的私有数据的非透明指针。

消费者绝不能处理此成员。此成员的生命周期由创建此对象的生产者处理,如果需要,应在调用 ArrowArrayTask.extract_data 期间进行清理。

ArrowAsyncProducer 结构体#

此生产者提供和管理的对象具有以下字段

ArrowDeviceType ArrowAsyncProducer.device_type#

必需的。此生产者将提供数据的设备类型。此生产者生成的所有 ArrowDeviceArray 结构体应具有与此处设置的相同的设备类型。

void (*ArrowAsyncProducer.request)(struct ArrowAsyncProducer*, uint64_t)#

必需的。消费者必须调用此函数才能开始接收对 ArrowAsyncDeviceStreamHandler.on_next_task 的调用。从 ArrowAsyncDeviceStreamHandler.on_next_taskArrowAsyncDeviceStreamHandler.on_schema 中同步调用此函数必须有效。因此,此函数绝不能同步调用 on_next_taskon_error 以避免递归和可重入回调。

在调用 cancel 之后,对此函数的其他调用必须是 NOP,但允许。

在未取消的情况下,调用此函数会注册给定数量的其他数组/批次以供生产者生成。 生产者应仅调用相应的 on_next_task 回调,最大值为调用此方法的总和,然后再传播背压/等待。

通过调用 ArrowAsyncDeviceStreamHandleron_error 回调来传播调用 request 时遇到的任何错误。

使用 n 的值为 <= 0 调用此函数无效。 如果收到 n 的此类值,生产者应出错(例如,调用 on_error)。

void (*ArrowAsyncProducer.cancel)(struct ArrowAsyncProducer*)#

必需的。此函数向生产者发出信号,表明它最终必须停止调用 on_next_task。 对 cancel 的调用必须是幂等的和线程安全的。 首次调用后,后续调用必须是 NOP。 这绝不能调用任何消费者端处理程序,除了 on_error

不需要调用 cancel 立即影响生产者,只需要它最终停止调用 on_next_task,然后随后在异步处理程序对象上调用 release。 因此,如果仍有待处理的请求数组,即使在调用 cancel 之后,消费者必须准备好接收一次或多次对 on_next_taskon_error 的调用。

成功取消绝不能导致生产者调用 ArrowAsyncDeviceStreamHandler.on_error,而是应该完成任何剩余的任务(相应地调用 on_next_task),并最终只调用 release

在处理对 cancel 的调用期间遇到的任何错误都必须通过异步流处理程序上的 on_error 回调报告。

const char *ArrowAsyncProducer.additional_metadata#

可选的。一个额外的元数据字符串,用于向消费者提供任何额外的上下文。 这必须NULL 或以与 ArrowSchema.metadata 相同的方式编码的有效字符串。 例如,如果已知,生产者可以利用此元数据来提供流中的总行数和/或批次数。

如果不是 NULL,则对于此对象的生命周期,它必须有效。

void *ArrowAsyncProducer.private_data#

可选。 指向生产者提供的特定数据的不透明指针。

消费者不得处理此成员,其生命周期由构造此对象的生产者拥有。

错误处理#

与常规 C Stream 接口不同,Async 接口允许错误在两个方向上流动。 因此,错误处理可能会稍微复杂一些。 因此,本规范指定以下规则

  • 如果生产者在处理过程中遇到错误,它应该调用 on_error 回调,然后在它返回后调用 release

  • 如果 on_schemaon_next_task 返回非零整数值,则生产者不应调用 on_error 回调,而应最终在记录或处理错误代码之前或之后的某个时间点调用 release

结果生命周期#

传递给 on_schema 回调的 ArrowSchema 必须独立释放,对象本身需要移动到消费者拥有的 ArrowSchema 对象。 作为参数传递给回调的 ArrowSchema* 不得存储和保留。

提供给 on_next_taskArrowAsyncTask 对象由生产者拥有,并将通过调用其上的 extract_data 来清理。 如果消费者不关心数据,它应该传递 NULL 而不是有效的 ArrowDeviceArray*

传递给 on_errorconst char* 错误 messagemetadata 仅在 on_error 函数本身的作用域内有效。 如果需要在它返回后存在,则必须复制它们。

流处理器生命周期#

异步流处理器的生命周期是使用 release 回调来管理的,其用法与 C 数据接口 中类似。

ArrowAsyncProducer 生命周期#

ArrowAsyncProducer 的生命周期由生产者自身拥有,应由其管理。 它必须在调用除 release 之外的任何方法之前填充,并且必须至少在调用流处理器对象上的 release 之前保持有效。

线程安全#

ArrowAsyncDeviceStreamHandler 上的所有处理程序函数都应该以串行方式调用,但不保证每次都从同一线程调用。 生产者应等待处理程序回调返回,然后再调用下一个处理程序回调,并在调用 release 回调之前。

反压由消费者调用 ArrowAsyncProducer.request 来指示它准备好接收多少个数组来管理。

ArrowAsyncDeviceStreamHandler 对象应该能够在传递给生产者后立即处理回调,任何初始化都应该在提供之前执行。

可能的时序图#

sequenceDiagram Consumer->>+Producer: ArrowAsyncDeviceStreamHandler* Producer-->>+Consumer: on_schema(ArrowAsyncProducer*, ArrowSchema*) Consumer->>Producer: ArrowAsyncProducer->request(n) par loop up to n times Producer-->>Consumer: on_next_task(ArrowAsyncTask*) end and for each task Consumer-->>Producer: ArrowAsyncTask.extract_data(...) Consumer-->>Producer: ArrowAsyncProducer->request(1) end break Optionally Consumer->>-Producer: ArrowAsyncProducer->cancel() end loop possible remaining Producer-->>Consumer: on_next_task(ArrowAsyncTask*) end Producer->>-Consumer: ArrowAsyncDeviceStreamHandler->release()

与其他交换格式的互操作性#

其他交换 API(例如 CUDA Array Interface)包含传递要导出的数据缓冲区形状和数据类型的成员。 此信息对于解释正在共享的设备数据缓冲区中的原始字节是必要的。 用户应该利用现有的 ArrowSchema 结构来传递任何数据类型和形状信息,而不是将数据的形状/类型与 ArrowDeviceArray 一起存储。

更新本规范#

注意

由于本规范仍被认为是实验性的,因此存在(仍然很低的)可能会略有更改的可能性。 将其标记为“实验性”的原因是我们不知道我们不知道什么。已经完成了工作和研究,以确保与许多不同框架兼容的通用 ABI,但总是可能遗漏了一些东西。一旦在官方 Arrow 版本中支持此功能,并且观察到使用情况以确认没有必要的修改,则将删除“实验性”标签并冻结 ABI。

一旦本规范在官方 Arrow 版本中得到支持,C ABI 将被冻结。 这意味着 ArrowDeviceArray 结构定义不应以任何方式更改 - 包括添加新成员。

允许向后兼容的更改,例如 ArrowDeviceType 的新宏值,或者将保留的 24 个字节转换为不同的类型/成员而不更改结构的大小。

任何不兼容的更改都应该是新规范的一部分,例如 ArrowDeviceArrayV2