Arrow C 设备数据接口#

警告

Arrow C 设备数据接口应被视为实验性功能

原理#

当前的C 数据接口,以及其大多数实现,都假设所有提供的数据缓冲区都是 CPU 缓冲区。由于 Apache Arrow 被设计为用于表示表格(“列式”)数据的通用内存格式,因此将需要利用非 CPU 硬件(如 GPU)上的数据。一个这样的例子是 RAPIDS cuDF 库,它使用 Arrow 内存格式和 CUDA 处理 NVIDIA GPU。由于在主机和设备之间复制数据代价高昂,因此理想情况是能够尽可能长时间地将数据保留在设备上,即使在运行时和库之间传递数据时也是如此。

Arrow C 设备数据接口在现有 C 数据接口的基础上,添加了一组非常小、稳定的 C 定义。这些定义等同于 C 数据接口中的 ArrowArrayArrowArrayStream 结构,并增加了成员以允许指定设备类型和传递必要信息以与生产者同步。对于非 C/C++ 语言和运行时,将 C 定义转换为相应的 C FFI 声明应该像当前的 C 数据接口一样简单。

应用程序和库可以像现在使用 CPU 数据一样,在非 CPU 设备上使用 Arrow 模式和 Arrow 格式的内存来交换数据。这将使数据在这些设备上停留更长时间,并避免在主机和设备之间进行昂贵的来回复制,以利用新的库和运行时。

目标#

  • 公开一个基于现有 C 数据接口构建的 ABI 稳定接口。

  • 使第三方项目易于实现支持,初始投入较少。

  • 允许在同一进程中运行的独立运行时和组件之间零拷贝共享 Arrow 格式的设备内存。

  • 避免了一对一适配层的需求,例如用于 Python 进程传递 CUDA 数据的 CUDA 数组接口

  • 在不显式依赖(编译时或运行时)Arrow 软件项目本身的情况下实现集成。

Arrow C 设备数据接口旨在扩展当前 C 数据接口的覆盖范围,使其也能成为 GPU 或 FPGA 等设备上列式处理的标准低级构建块。

结构定义#

因为它是基于 C 数据接口构建的,所以 C 设备数据接口使用 C 数据接口规范 中定义的 ArrowSchemaArrowArray 结构。然后它添加了以下独立定义。与 Arrow 项目的其余部分一样,它们根据 Apache License 2.0 提供。

#ifndef ARROW_C_DEVICE_DATA_INTERFACE
#define ARROW_C_DEVICE_DATA_INTERFACE

// Device type for the allocated memory
typedef int32_t ArrowDeviceType;

// CPU device, same as using ArrowArray directly
#define ARROW_DEVICE_CPU 1
// CUDA GPU Device
#define ARROW_DEVICE_CUDA 2
// Pinned CUDA CPU memory by cudaMallocHost
#define ARROW_DEVICE_CUDA_HOST 3
// OpenCL Device
#define ARROW_DEVICE_OPENCL 4
// Vulkan buffer for next-gen graphics
#define ARROW_DEVICE_VULKAN 7
// Metal for Apple GPU
#define ARROW_DEVICE_METAL 8
// Verilog simulator buffer
#define ARROW_DEVICE_VPI 9
// ROCm GPUs for AMD GPUs
#define ARROW_DEVICE_ROCM 10
// Pinned ROCm CPU memory allocated by hipMallocHost
#define ARROW_DEVICE_ROCM_HOST 11
// Reserved for extension
//
// used to quickly test extension devices, semantics
// can differ based on implementation
#define ARROW_DEVICE_EXT_DEV 12
// CUDA managed/unified memory allocated by cudaMallocManaged
#define ARROW_DEVICE_CUDA_MANAGED 13
// Unified shared memory allocated on a oneAPI
// non-partitioned device.
//
// A call to the oneAPI runtime is required to determine the
// device type, the USM allocation type and the sycl context
// that it is bound to.
#define ARROW_DEVICE_ONEAPI 14
// GPU support for next-gen WebGPU standard
#define ARROW_DEVICE_WEBGPU 15
// Qualcomm Hexagon DSP
#define ARROW_DEVICE_HEXAGON 16

struct ArrowDeviceArray {
  struct ArrowArray array;
  int64_t device_id;
  ArrowDeviceType device_type;
  void* sync_event;

  // reserved bytes for future expansion
  int64_t reserved[3];
};

#endif  // ARROW_C_DEVICE_DATA_INTERFACE

注意

规范的保护宏 ARROW_C_DEVICE_DATA_INTERFACE 旨在避免当两个项目在其自己的头文件中复制定义,而第三个项目又包含这两个项目时,出现重复定义。因此,在复制这些定义时,保持此保护宏完全不变非常重要。

ArrowDeviceType#

ArrowDeviceType typedef 用于指示提供的内存缓冲区是在哪种设备类型上分配的。这与 device_id 结合,应该足以引用正确的​​数据缓冲区。

然后我们使用宏来定义不同设备类型的值。提供的宏值与广泛使用的 dlpack DLDeviceType 定义值兼容,每个值都与 dlpack.h 中的等效 kDL<type> 枚举使用相同的值。该列表将随着时间的推移与这些等效枚举值保持同步,以确保兼容性,而不是可能出现分歧。为了避免 Arrow 项目必须验证新硬件设备,新添加的设备应首先添加到 dlpack,然后我们再在此处添加相应的宏。

为了确保 ABI 的可预测性,我们使用宏而不是 enum,这样存储类型就不依赖于编译器。

ARROW_DEVICE_CPU#

CPU 设备,等同于直接使用 ArrowArray 而不是使用 ArrowDeviceArray

ARROW_DEVICE_CUDA#

一个 CUDA GPU 设备。这可以表示使用运行时库 (cudaMalloc) 或设备驱动程序 (cuMemAlloc) 分配的数据。

ARROW_DEVICE_CUDA_HOST#

通过使用 cudaMallocHostcuMemAllocHost 经 CUDA 锁定页面并固定的 CPU 内存。

ARROW_DEVICE_OPENCL#

通过使用 OpenCL (Open Computing Language) 框架在设备上分配的数据。

ARROW_DEVICE_VULKAN#

通过 Vulkan 框架和库分配的数据。

ARROW_DEVICE_METAL#

使用 Metal 框架和库的 Apple GPU 设备上的数据。

ARROW_DEVICE_VPI#

表示使用 Verilog 模拟器缓冲区。

ARROW_DEVICE_ROCM#

使用 ROCm 栈的 AMD 设备。

ARROW_DEVICE_ROCM_HOST#

通过使用 hipMallocHost 经 ROCm 锁定页面并固定的 CPU 内存。

ARROW_DEVICE_EXT_DEV#

此值是设备扩展的应急措施,以涵盖当前未以其他方式表示的设备。如果使用此设备类型,生产者需要提供特定于设备的额外信息/上下文。这用于快速测试扩展设备,并且语义可能因实现而异。

ARROW_DEVICE_CUDA_MANAGED#

通过 cudaMallocManaged 分配的 CUDA 管理/统一内存。

ARROW_DEVICE_ONEAPI#

在 Intel oneAPI 非分区设备上分配的统一共享内存。需要调用 oneAPI 运行时以确定特定的设备类型、USM 分配类型以及其绑定的 sycl 上下文。

ARROW_DEVICE_WEBGPU#

对下一代 WebGPU 标准的 GPU 支持

ARROW_DEVICE_HEXAGON#

在高通 Hexagon DSP 设备上分配的数据。

ArrowDeviceArray 结构#

ArrowDeviceArray 结构嵌入了 C 数据 ArrowArray 结构,并添加了消费者使用数据所需的额外信息。它包含以下字段:

struct ArrowArray ArrowDeviceArray.array#

必需。 分配的数组数据。void** 缓冲区中的值(以及任何子缓冲区的缓冲区)是设备上分配的内容。缓冲区值应为设备指针。结构的其余部分应可由 CPU 访问。

此结构的 private_datarelease 回调应包含根据其分配设备释放数组的任何必要信息和结构,而不是在此处拥有单独的释放回调和 private_data 指针。

int64_t ArrowDeviceArray.device_id#

强制。 设备 ID,用于在系统上有多个此类设备时标识特定设备。ID 的语义将取决于硬件,但我们使用 int64_t 以确保 ID 随设备变化而具有未来兼容性。

对于没有固有设备标识符概念的设备类型(例如,ARROW_DEVICE_CPU),建议约定使用 -1 作为 device_id

ArrowDeviceType ArrowDeviceArray.device_type#

强制。 可以访问数组中缓冲区的设备类型。

void *ArrowDeviceArray.sync_event#

可选。 如果需要,用于同步的事件类对象。

许多设备,例如 GPU,相对于 CPU 处理而言,主要是异步的。因此,为了安全地访问设备内存,通常需要一个对象来同步处理。由于不同的设备会使用不同的类型来指定这一点,我们使用 void*,它可以被强制转换为设备适当类型的指针。

如果不需要同步,可以将其设置为 null。如果它不是 null,则在尝试访问缓冲区中的内存之前,必须使用它调用设备的相应同步方法(例如 cudaStreamWaitEventhipStreamWaitEvent)。

如果提供了事件,则生产者必须确保在事件触发之前,导出的数据在设备上可用。消费者在尝试访问导出的数据之前应等待事件。

另请参阅

下面的 同步事件类型 部分。

int64_t ArrowDeviceArray.reserved[3]#

随着非 CPU 开发的扩展,可能需要扩展此结构。为了在不潜在地破坏 ABI 更改的情况下做到这一点,我们在对象的末尾保留了 24 字节。生产者在初始化后必须将这些字节归零,以确保 ABI 在未来安全演进。

同步事件类型#

下表列出了每种设备类型预期支持的事件类型。如果不支持任何事件类型(“N/A”),则 sync_event 成员应始终为 null。

请记住,如果不需要同步来访问数据,事件 可以 为 null。

设备类型

实际事件类型

备注

ARROW_DEVICE_CPU

N/A

ARROW_DEVICE_CUDA

cudaEvent_t*

ARROW_DEVICE_CUDA_HOST

cudaEvent_t*

ARROW_DEVICE_OPENCL

cl_event*

ARROW_DEVICE_VULKAN

VkEvent*

ARROW_DEVICE_METAL

MTLEvent*

ARROW_DEVICE_VPI

N/A

ARROW_DEVICE_ROCM

hipEvent_t*

ARROW_DEVICE_ROCM_HOST

hipEvent_t*

ARROW_DEVICE_EXT_DEV

ARROW_DEVICE_CUDA_MANAGED

cudaEvent_t*

ARROW_DEVICE_ONEAPI

sycl::event*

ARROW_DEVICE_WEBGPU

N/A

ARROW_DEVICE_HEXAGON

N/A

备注

  • (1) 目前尚不清楚框架是否支持事件类型。

  • (2) 扩展设备具有生产者定义的语义,因此如果扩展设备需要同步,生产者应记录类型。

语义#

内存管理#

首先也是最重要的是:在此接口的所有内容中,只有 数据缓冲区本身驻留在设备内存中(即 ArrowArray 结构中的 buffers 成员)。所有其他内容都应在 CPU 内存中。

ArrowDeviceArray 结构包含一个 ArrowArray 对象,该对象本身具有特定的语义来释放内存。下面的术语“基本结构”指的是在生产者和消费者之间直接传递的 ArrowDeviceArray 对象——而不是其任何子结构。

基本结构旨在由消费者进行堆栈或堆分配。在这种情况下,生产者 API 应接受指向消费者分配结构的指针。

但是,结构指向的任何数据都必须由生产者分配和维护。这包括 sync_event 成员(如果非空),以及 ArrowArray 对象中通常的任何指针。数据生命周期通过 ArrowArray 成员的 release 回调进行管理。

对于 ArrowDeviceArray,已释放结构的语义和回调语义与 ArrowArray 本身 的语义相同。释放设备数据缓冲区所需的任何生产者特定上下文信息,以及任何已分配的事件,都应存储在 ArrowArrayprivate_data 成员中,并由 release 回调管理。

移动数组#

消费者可以通过按位复制或浅层成员复制来移动 ArrowDeviceArray 结构。然后,它必须通过将嵌入的 ArrowArray 结构中的 release 成员设置为 NULL 来标记源结构已释放,但调用该释放回调。这确保了在任何给定时间只有一个活跃的结构副本,并且生命周期正确地传达给生产者。

与往常一样,当不再需要时,将对目标结构调用释放回调。

记录批次#

与 C 数据接口本身一样,记录批次可以简单地视为等效的结构数组。在这种情况下,顶级 ArrowSchema 的元数据可用于记录批次的模式级元数据。

可变性#

生产者和消费者都应将导出的数据(即通过嵌入的 ArrowArraybuffers 成员在设备上可访问的数据)视为不可变的,否则任一方在另一方修改数据时可能会看到不一致的数据。

同步#

如果 sync_event 成员非 NULL,消费者在同步该事件之前不应尝试访问或读取数据。如果 sync_event 成员为 NULL,则消费者无需任何同步即可安全访问数据。

C 生产者示例#

导出简单的 int32 设备数组#

导出一个不可为空的 int32 类型,带空元数据。这方面的示例可以直接在 C 数据接口文档 中查看。

要导出数据本身,我们通过释放回调将所有权转移给消费者。此示例将使用 CUDA,但等效调用可用于任何设备

static void release_int32_device_array(struct ArrowArray* array) {
    assert(array->n_buffers == 2);
    // destroy the event
    cudaEvent_t* ev_ptr = (cudaEvent_t*)(array->private_data);
    cudaError_t status = cudaEventDestroy(*ev_ptr);
    assert(status == cudaSuccess);
    free(ev_ptr);

    // free the buffers and the buffers array
    status = cudaFree(array->buffers[1]);
    assert(status == cudaSuccess);
    free(array->buffers);

    // mark released
    array->release = NULL;
}

void export_int32_device_array(void* cudaAllocedPtr,
                               cudaStream_t stream,
                               int64_t length,
                               struct ArrowDeviceArray* array) {
    // get device id
    int device;
    cudaError_t status;
    status = cudaGetDevice(&device);
    assert(status == cudaSuccess);

    cudaEvent_t* ev_ptr = (cudaEvent_t*)malloc(sizeof(cudaEvent_t));
    assert(ev_ptr != NULL);
    status = cudaEventCreate(ev_ptr);
    assert(status == cudaSuccess);

    // record event on the stream, assuming that the passed in
    // stream is where the work to produce the data will be processing.
    status = cudaEventRecord(*ev_ptr, stream);
    assert(status == cudaSuccess);

    memset(array, 0, sizeof(struct ArrowDeviceArray));
    // initialize fields
    *array = (struct ArrowDeviceArray) {
        .array = (struct ArrowArray) {
            .length = length,
            .null_count = 0,
            .offset = 0,
            .n_buffers = 2,
            .n_children = 0,
            .children = NULL,
            .dictionary = NULL,
            // bookkeeping
            .release = &release_int32_device_array,
            // store the event pointer as private data in the array
            // so that we can access it in the release callback.
            .private_data = (void*)(ev_ptr),
        },
        .device_id = (int64_t)(device),
        .device_type = ARROW_DEVICE_CUDA,
        // pass the event pointer to the consumer
        .sync_event = (void*)(ev_ptr),
    };

    // allocate list of buffers
    array->array.buffers = (const void**)malloc(sizeof(void*) * array->array.n_buffers);
    assert(array->array.buffers != NULL);
    array->array.buffers[0] = NULL;
    array->array.buffers[1] = cudaAllocedPtr;
}

// calling the release callback should be done using the array member
// of the device array.
static void release_device_array_helper(struct ArrowDeviceArray* arr) {
    arr->array.release(&arr->array);
}

设备流接口#

C 流接口 类似,C 设备数据接口还指定了一个更高级别的结构,用于简化单个进程内流式数据的通信。

语义#

Arrow C 设备流公开了一个流式数据块源,每个数据块都具有相同的模式。数据块通过调用阻塞式拉取迭代函数获取。预计所有数据块都应在相同设备类型(但不一定相同设备 ID)上提供数据。如果需要提供多个设备类型的数据流,生产者应为每种设备类型提供单独的流对象。

结构定义#

C 设备流接口由单个 struct 定义

#ifndef ARROW_C_DEVICE_STREAM_INTERFACE
#define ARROW_C_DEVICE_STREAM_INTERFACE

struct ArrowDeviceArrayStream {
    // device type that all arrays will be accessible from
    ArrowDeviceType device_type;
    // callbacks
    int (*get_schema)(struct ArrowDeviceArrayStream*, struct ArrowSchema*);
    int (*get_next)(struct ArrowDeviceArrayStream*, struct ArrowDeviceArray*);
    const char* (*get_last_error)(struct ArrowDeviceArrayStream*);

    // release callback
    void (*release)(struct ArrowDeviceArrayStream*);

    // opaque producer-specific data
    void* private_data;
};

#endif  // ARROW_C_DEVICE_STREAM_INTERFACE

注意

规范保护宏 ARROW_C_DEVICE_STREAM_INTERFACE 旨在避免当两个项目将 C 设备流接口定义复制到其自己的头文件中,并且第三个项目包含这两个项目时,出现重复定义。因此,在复制这些定义时,保持此保护宏完全不变非常重要。

ArrowDeviceArrayStream 结构#

ArrowDeviceArrayStream 提供了一种可以访问结果数据的设备类型,以及与流式 Arrow 数组源交互所需的必要回调。它具有以下字段:

ArrowDeviceType device_type#

强制。 此流生成数据的设备类型。此流生成的所有 ArrowDeviceArray 都应具有与此处设置的设备类型相同。这是为了方便消费者不必检查检索到的每个数组,而是允许更高级别的流编码构造。

int (*ArrowDeviceArrayStream.get_schema)(struct ArrowDeviceArrayStream*, struct ArrowSchema *out)#

强制。 此回调允许消费者查询流中数据块的模式。所有数据块的模式都相同。

此回调不得在已释放的 ArrowDeviceArrayStream 上调用。

返回值: 成功返回 0,否则返回非零错误代码

int (*ArrowDeviceArrayStream.get_next)(struct ArrowDeviceArrayStream*, struct ArrowDeviceArray *out)#

强制。 此回调允许消费者获取流中的下一个数据块。

此回调不得在已释放的 ArrowDeviceArrayStream 上调用。

下一个数据块必须可从与 ArrowDeviceArrayStream.device_type 匹配的设备类型访问。

返回值: 成功返回 0,否则返回非零错误代码

成功时,消费者必须检查 ArrowDeviceArray 嵌入的 ArrowArray 是否标记为已释放。如果嵌入的 ArrowDeviceArray.array 已释放,则表示已达到流的末尾。否则,ArrowDeviceArray 包含一个有效的数据块。

const char *(*ArrowDeviceArrayStream.get_last_error)(struct ArrowDeviceArrayStream*)#

强制。 此回调允许消费者获取上次错误的文本描述。

此回调ArrowDeviceArrayStream 上的上次操作返回错误时才能调用。它不得在已释放的 ArrowDeviceArrayStream 上调用。

返回值: 指向以 NULL 结尾的字符串(UTF8 编码)的指针。如果详细描述不可用,也可以返回 NULL。

返回的指针仅在流的下一个回调被调用之前保证有效。如果需要让它存活更长时间,应将其指向的字符串复制到消费者管理的存储中。

void (*ArrowDeviceArrayStream.release)(struct ArrowDeviceArrayStream*)#

强制。 指向生产者提供的释放回调的指针。

void *ArrowDeviceArrayStream.private_data#

可选。 一个不透明指针,指向生产者提供的私有数据。

消费者不得处理此成员。此成员的生命周期由生产者处理,特别是通过释放回调处理。

结果生命周期#

get_schemaget_next 回调返回的数据必须独立释放。它们的生命周期不与 ArrowDeviceArrayStream 的生命周期绑定。

流生命周期#

C 流的生命周期使用与C 数据接口中相似的释放回调进行管理。

线程安全#

流源不假定是线程安全的。想要从多个线程调用 get_next 的消费者应确保这些调用是序列化的。

异步设备流接口#

警告

实验性:异步 C 设备流接口目前仍处于实验阶段。根据反馈和使用情况,协议定义可能会发生变化,直到其完全标准化。

C 流接口 提供了一个同步 API,其核心是消费者调用生产者函数来检索下一个记录批次。为了在生产者和消费者之间进行并发通信,可以使用 ArrowAsyncDeviceStreamHandler。此接口是非主观的,可能适用于不同的并发通信模型。

语义#

异步接口是一个由消费者分配和填充的结构,而不是生产者提供一个回调结构供消费者调用和检索记录。消费者分配的结构提供了处理程序回调,供生产者在模式和数据块可用时调用。

除了 ArrowAsyncDeviceStreamHandler,还有两个额外的结构用于完整的数据流:ArrowAsyncTaskArrowAsyncProducer

结构定义#

C 设备异步流接口由三个 struct 定义组成。

#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
#define ARROW_C_ASYNC_STREAM_INTERFACE

struct ArrowAsyncTask {
  int (*extract_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* out);

  void* private_data;
};

struct ArrowAsyncProducer {
  ArrowDeviceType device_type;

  void (*request)(struct ArrowAsyncProducer* self, int64_t n);
  void (*cancel)(struct ArrowAsyncProducer* self);

  void (*release)(struct ArrowAsyncProducer* self);
  const char* additional_metadata;
  void* private_data;
};

struct ArrowAsyncDeviceStreamHandler {
  // consumer-specific handlers
  int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
                   struct ArrowSchema* stream_schema);
  int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
                      struct ArrowAsyncTask* task, const char* metadata);
  void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self,
                   int code, const char* message, const char* metadata);

  // release callback
  void (*release)(struct ArrowAsyncDeviceStreamHandler* self);

  // must be populated before calling any callbacks
  struct ArrowAsyncProducer* producer;

  // opaque handler-specific data
  void* private_data;
};

#endif  // ARROW_C_ASYNC_STREAM_INTERFACE

注意

规范的保护宏 ARROW_C_ASYNC_STREAM_INTERFACE 旨在避免当两个项目将 C 异步流接口定义复制到其自己的头文件中,而第三个项目又包含这两个项目时,出现重复定义。因此,在复制这些定义时,保持此保护宏完全不变非常重要。

ArrowAsyncDeviceStreamHandler 结构#

该结构有以下字段

int (*ArrowAsyncDeviceStreamHandler.on_schema)(struct ArrowAsyncDeviceStreamHandler*, struct ArrowSchema*)#

强制。 用于接收流模式的处理程序。所有传入记录都应与提供的模式匹配。如果成功,函数应返回 0,否则应返回与 errno 兼容的错误代码。

如果生产者希望提供任何额外的上下文信息,它可以将 ArrowAsyncProducer.additional_metadata 设置为非 NULL 值。这以与 ArrowSchema.metadata 相同的格式编码。如果此元数据非 NULL,其生命周期应与 ArrowAsyncProducer 对象的生命周期绑定。

除非调用了 on_error 处理程序,否则此函数将始终且仅被调用一次,并且将是此对象上调用的第一个方法。因此,生产者必须在调用此函数之前填充 ArrowAsyncProducer 成员,以允许消费者施加反压并控制数据流。生产者保留 ArrowAsyncProducer 的所有权,并且必须在调用 ArrowAsyncDeviceStreamHandler 上的释放回调之后进行清理。

如果生产者在此处收到非零结果,则不得随后调用此对象上的除释放回调之外的任何内容。

int (*ArrowAsyncDeviceStreamHandler.on_next_task)(struct ArrowAsyncDeviceStreamHandler*, struct ArrowAsyncTask*, const char*)#

强制。 当有新的记录可供处理时调用的处理程序。每个记录的模式应与调用 on_schema 时使用的模式相同。如果成功处理,函数应返回 0,否则应返回与 errno 兼容的错误代码。

它不是直接传递记录本身,而是接收一个 ArrowAsyncTask,以促进更好的以消费者为中心的线程控制来接收数据。调用此函数仅表示数据可通过提供的任务获得。

生产者通过为 ArrowAsyncTask 指针传递 NULL 而不是有效地址来表示流的结束。此任务对象仅在此函数调用期间有效。如果消费者想在此方法范围之外使用该任务,它必须将其内容复制或移动到新的 ArrowAsyncTask 对象。

const char* 参数的存在是为了让生产者提供任何他们想要的额外上下文信息。这以与 ArrowSchema.metadata 相同的格式编码。如果非 NULL,其生命周期仅限于此函数调用的范围。想要在此调用生命周期之外维护额外元数据的消费者必须自行复制该值。

生产者不得从多个线程并发调用此函数。

必须调用 ArrowAsyncProducer.request 回调才能开始接收对此处理程序的调用。

void (*ArrowAsyncDeviceStreamHandler.on_error)(struct ArrowAsyncDeviceStreamHandler, int, const char*, const char*)#

强制。 当生产者遇到错误时调用的处理程序。调用此函数后,release 回调将作为此结构上的最后一次调用。参数是与 errno 兼容的错误代码以及可选的错误消息和元数据。

如果消息和元数据非 NULL,则它们的生命周期仅在此调用范围内有效。希望在函数返回后仍保留这些值的消费者必须自行复制这些值。

如果元数据参数非 NULL,以提供键值错误元数据,则其编码方式应与 ArrowSchema.metadata 中元数据的编码方式相同。

此回调可以在调用 ArrowAsyncProducer.request 之前或之后由生产者调用。此回调不得调用 ArrowAsyncProducer 对象的任何方法。

void (*ArrowAsyncDeviceStreamHandler.release)(struct ArrowAsyncDeviceStreamHandler*)#

强制。 指向消费者为处理程序提供的释放回调的指针。

此回调可以在调用 ArrowAsyncProducer.request 之前或之后由生产者调用。此回调不得调用 ArrowAsyncProducer 对象的任何方法。

struct ArrowAsyncProducer ArrowAsyncDeviceStreamHandler.producer#

强制。 生产者对象,消费者将使用它来请求额外数据或取消。

生产者必须在调用 ArrowAsyncDeviceStreamHandler.on_schema 回调之前填充此对象。生产者保留此对象的所有权,并且必须在调用 ArrowAsyncDeviceStreamHandler 上的释放回调之后清理它。

消费者不能假定此对象在调用 on_schema 回调之前是有效的。

void *ArrowAsyncDeviceStreamHandler.private_data#

可选。 一个不透明指针,指向消费者提供的私有数据。

生产者不得处理此成员。此成员的生命周期由消费者处理,特别是通过释放回调处理。

ArrowAsyncTask 结构#

使用 Task 对象而不是直接将数组传递给 on_next 回调的目的是为了实现更复杂和高效的线程处理。利用 Task 对象可以使生产者将“解码”逻辑与 I/O 分离,从而使消费者避免在 CPU 核心之间传输数据(例如,从一个 L1/L2 缓存到另一个)。

此生产者提供的结构具有以下字段

int (*ArrowArrayTask.extract_data)(struct ArrowArrayTask*, struct ArrowDeviceArray*)#

强制。 一个回调函数,用于使用可用数据填充提供的 ArrowDeviceArray。生产者提供的 ArrowAsyncTask 的顺序使消费者能够知道要处理的数据顺序。如果消费者不关心此任务拥有的数据,它仍然必须调用 extract_data,以便生产者可以执行任何必要的清理工作。NULL 应作为设备数组指针传递,以指示消费者不希望实际数据,从而让任务执行必要的清理工作。

如果此函数返回非零值,则随后应仅由生产者调用 ArrowAsyncDeviceStreamHandleron_error 回调。由于调用此方法可能与当前控制流分离,因此返回非零值以表示发生错误允许当前线程决定相应地处理此情况,同时仍允许所有错误日志记录和处理集中在 ArrowAsyncDeviceStreamHandler.on_error 回调中。

无需单独的释放回调,任何所需的清理都应作为此回调调用的一部分执行。数组的所有权被赋予作为参数传入的指针,并且此数组必须单独释放。

此方法仅可调用一次,且仅一次。

void *ArrowArrayTask.private_data#

可选。 一个不透明指针,指向生产者提供的私有数据。

消费者不得处理此成员。此成员的生命周期由创建此对象的生产者拥有,并且在调用 ArrowArrayTask.extract_data 期间如有必要应进行清理。

ArrowAsyncProducer 结构#

此生产者提供和管理的对象具有以下字段:

ArrowDeviceType ArrowAsyncProducer.device_type#

强制。 此生产者将提供数据的设备类型。此生产者生成的所有 ArrowDeviceArray 结构应具有与此处设置的设备类型相同。

void (*ArrowAsyncProducer.request)(struct ArrowAsyncProducer*, uint64_t)#

强制。 消费者必须调用此函数才能开始接收 ArrowAsyncDeviceStreamHandler.on_next_task 的调用。在 ArrowAsyncDeviceStreamHandler.on_next_taskArrowAsyncDeviceStreamHandler.on_schema 内部同步调用此函数必须是有效的。因此,此函数不得同步调用 on_next_taskon_error 以避免递归和重入回调。

调用 cancel 后,对该函数的额外调用必须是空操作 (NOP),但允许。

在未取消的情况下,调用此函数将注册生产者要生成的指定数量的额外数组/批次。生产者应仅在调用此方法的总和达到最大值之前调用相应的 on_next_task 回调,然后传播反压/等待。

调用 request 遇到的任何错误都必须通过调用 ArrowAsyncDeviceStreamHandleron_error 回调来传播。

使用 n<= 0 调用此函数是无效的。如果接收到此类 n 值,生产者应报错(例如调用 on_error)。

void (*ArrowAsyncProducer.cancel)(struct ArrowAsyncProducer*)#

强制。 此函数向生产者发出信号,表明它最终必须停止调用 on_next_taskcancel 的调用必须是幂等且线程安全的。一旦调用一次,后续调用必须是空操作(NOP)。此函数不得调用除 on_error 之外的任何消费者端处理程序。

调用 cancel 不要求立即影响生产者,只需它最终必须停止调用 on_next_task,然后随后调用异步处理程序对象上的 release。因此,即使在调用 cancel 之后,如果仍有待请求的数组,消费者必须准备接收一个或多个对 on_next_taskon_error 的调用。

成功取消不得导致生产者调用 ArrowAsyncDeviceStreamHandler.on_error,而是应完成任何剩余任务(相应地调用 on_next_task),并最终只调用 release

在处理取消调用期间遇到的任何错误都必须通过异步流处理程序上的 on_error 回调报告。

const char *ArrowAsyncProducer.additional_metadata#

可选。 一个额外的元数据字符串,用于向消费者提供任何额外的上下文。此字符串必须NULL 或以与 ArrowSchema.metadata 相同的方式编码的有效字符串。例如,生产者可以利用此元数据来提供流中已知行的总数和/或批次的总数。

如果非 NULL,则其必须至少在此对象的生命周期内有效。

void *ArrowAsyncProducer.private_data#

可选。 一个不透明指针,指向生产者提供的特定数据。

消费者不得处理此成员,其生命周期由构造此对象的生产者拥有。

错误处理#

与常规 C 流接口不同,异步接口允许错误双向流动。因此,错误处理可能会稍微复杂一些。因此,本规范规定了以下规则:

  • 如果生产者在处理过程中遇到错误,它应该调用 on_error 回调,并在其返回后调用 release

  • 如果 on_schemaon_next_task 返回非零整数值,生产者不应调用 on_error 回调,而应在错误代码的任何日志记录或处理之前或之后某个时间点调用 release

结果生命周期#

传递给 on_schema 回调的 ArrowSchema 必须独立释放,对象本身需要移动到消费者拥有的 ArrowSchema 对象中。作为参数传递给回调的 ArrowSchema* 不得存储和保留。

提供给 on_next_taskArrowAsyncTask 对象由生产者拥有,并将在调用其 extract_data 期间进行清理。如果消费者不关心数据,它应该传递 NULL 而不是有效的 ArrowDeviceArray*

传递给 on_errorconst char* 错误 messagemetadata 仅在 on_error 函数本身的范围内有效。如果需要在其返回后仍然存在,则必须复制它们。

流处理程序生命周期#

异步流处理程序的生命周期通过与C 数据接口中相似的释放回调进行管理。

ArrowAsyncProducer 生命周期#

ArrowAsyncProducer 的生命周期由生产者自己拥有并管理。它必须在调用除 release 之外的任何方法之前填充,并且必须至少在调用流处理程序对象上的 release 之前保持有效。

线程安全#

ArrowAsyncDeviceStreamHandler 上的所有处理程序函数都应以串行方式调用,但不保证每次都从同一线程调用。生产者应等待处理程序回调返回,然后再调用下一个处理程序回调,以及在调用 release 回调之前。

反压由消费者通过调用 ArrowAsyncProducer.request 来管理,以指示它准备接收多少个数组。

ArrowAsyncDeviceStreamHandler 对象应在传递给生产者后立即能够处理回调,任何初始化都应在其提供之前执行。

可能的序列图#

sequenceDiagram Consumer->>+Producer: ArrowAsyncDeviceStreamHandler* Producer-->>+Consumer: on_schema(ArrowAsyncProducer*, ArrowSchema*) Consumer->>Producer: ArrowAsyncProducer->request(n) par loop up to n times Producer-->>Consumer: on_next_task(ArrowAsyncTask*) end and for each task Consumer-->>Producer: ArrowAsyncTask.extract_data(...) Consumer-->>Producer: ArrowAsyncProducer->request(1) end break Optionally Consumer->>-Producer: ArrowAsyncProducer->cancel() end loop possible remaining Producer-->>Consumer: on_next_task(ArrowAsyncTask*) end Producer->>-Consumer: ArrowAsyncDeviceStreamHandler->release()

与其他交换格式的互操作性#

其他交换 API,例如 CUDA 数组接口,包含传递正在导出数据缓冲区的形状和数据类型的成员。这些信息对于解释正在共享的设备数据缓冲区中的原始字节是必要的。用户不应将数据的形状/类型与 ArrowDeviceArray 一起存储,而应利用现有的 ArrowSchema 结构来传递任何数据类型和形状信息。

更新此规范#

注意

由于此规范仍被视为实验性,因此其可能会略有变化的可能性仍然很低。将其标记为“实验性”的原因是我们不知道我们不知道什么。已经进行了工作和研究以确保与许多不同框架兼容的通用 ABI,但总有可能遗漏了什么。一旦在官方 Arrow 版本中支持此功能并观察到使用情况以确认无需任何修改,将删除“实验性”标签并冻结 ABI。

一旦此规范在官方 Arrow 版本中得到支持,C ABI 就被冻结了。这意味着 ArrowDeviceArray 结构定义不应以任何方式更改——包括添加新成员。

允许向后兼容的更改,例如 ArrowDeviceType 的新宏值,或将保留的 24 字节转换为不同的类型/成员,而不改变结构的大小。

任何不兼容的更改都应作为新规范的一部分,例如 ArrowDeviceArrayV2