Arrow C 设备数据接口#

警告

Arrow C 设备数据接口应被视为实验性的

原理#

当前的 C 数据接口 及其大多数实现都假设提供的所有数据缓冲区都是 CPU 缓冲区。由于 Apache Arrow 旨在成为一种通用的内存格式,用于表示表格(“列式”)数据,因此需要在非 CPU 硬件(如 GPU)上利用这些数据。一个这样的例子是 RAPIDS cuDF 库,它将 Arrow 内存格式与 CUDA 用于 NVIDIA GPU。由于将数据从主机复制到设备以及从设备复制回主机成本高昂,理想情况下,应尽可能长时间地将数据保留在设备上,即使在运行时和库之间传递数据时也是如此。

Arrow C 设备数据接口建立在现有 C 数据接口的基础之上,向其添加了一小组稳定的 C 定义。这些定义等效于 C 数据接口中的 ArrowArrayArrowArrayStream 结构,它们添加了成员以允许指定设备类型并传递必要的信息以与生产者同步。对于非 C/C++ 语言和运行时,将 C 定义转换为相应的 C FFI 声明应该与当前的 C 数据接口一样简单。

然后,应用程序和库可以使用非 CPU 设备上的 Arrow 模式和 Arrow 格式的内存来交换数据,就像现在使用 CPU 数据一样容易。这将能够更长时间地将数据保留在这些设备上,并避免为了利用新的库和运行时而在主机和设备之间来回复制数据的高昂成本。

目标#

  • 公开一个建立在现有 C 数据接口之上的 ABI 稳定接口。

  • 使第三方项目能够轻松实现支持,只需很少的初始投资。

  • 允许在同一进程中运行的独立运行时和组件之间零拷贝共享 Arrow 格式的设备内存。

  • 避免需要一对一的适配层,例如 Python 进程传递 CUDA 数据的 CUDA 数组接口

  • 无需对 Arrow 软件项目本身进行显式依赖(在编译时或运行时)即可实现集成。

Arrow C 设备数据接口的目的是扩展当前 C 数据接口的范围,使其也成为 GPU 或 FPGA 等设备上列式处理的标准低级构建块。

结构定义#

由于它建立在 C 数据接口之上,因此 C 设备数据接口使用 C 数据接口规范 中定义的 ArrowSchemaArrowArray 结构。然后,它添加了以下独立定义。与 Arrow 项目的其余部分一样,它们在 Apache 许可证 2.0 下可用。

#ifndef ARROW_C_DEVICE_DATA_INTERFACE
#define ARROW_C_DEVICE_DATA_INTERFACE

// Device type for the allocated memory
typedef int32_t ArrowDeviceType;

// CPU device, same as using ArrowArray directly
#define ARROW_DEVICE_CPU 1
// CUDA GPU Device
#define ARROW_DEVICE_CUDA 2
// Pinned CUDA CPU memory by cudaMallocHost
#define ARROW_DEVICE_CUDA_HOST 3
// OpenCL Device
#define ARROW_DEVICE_OPENCL 4
// Vulkan buffer for next-gen graphics
#define ARROW_DEVICE_VULKAN 7
// Metal for Apple GPU
#define ARROW_DEVICE_METAL 8
// Verilog simulator buffer
#define ARROW_DEVICE_VPI 9
// ROCm GPUs for AMD GPUs
#define ARROW_DEVICE_ROCM 10
// Pinned ROCm CPU memory allocated by hipMallocHost
#define ARROW_DEVICE_ROCM_HOST 11
// Reserved for extension
//
// used to quickly test extension devices, semantics
// can differ based on implementation
#define ARROW_DEVICE_EXT_DEV 12
// CUDA managed/unified memory allocated by cudaMallocManaged
#define ARROW_DEVICE_CUDA_MANAGED 13
// Unified shared memory allocated on a oneAPI
// non-partitioned device.
//
// A call to the oneAPI runtime is required to determine the
// device type, the USM allocation type and the sycl context
// that it is bound to.
#define ARROW_DEVICE_ONEAPI 14
// GPU support for next-gen WebGPU standard
#define ARROW_DEVICE_WEBGPU 15
// Qualcomm Hexagon DSP
#define ARROW_DEVICE_HEXAGON 16

struct ArrowDeviceArray {
  struct ArrowArray array;
  int64_t device_id;
  ArrowDeviceType device_type;
  void* sync_event;

  // reserved bytes for future expansion
  int64_t reserved[3];
};

#endif  // ARROW_C_DEVICE_DATA_INTERFACE

注意

规范保护 ARROW_C_DEVICE_DATA_INTERFACE 旨在避免如果两个项目在其自己的头文件中复制定义,而第三方项目包含这两个项目,则会出现重复定义。因此,复制这些定义时,必须完全按原样保留此保护。

ArrowDeviceType#

ArrowDeviceType typedef 用于指示提供的内存缓冲区是在哪种类型的设备上分配的。这与 device_id 一起应该足以引用正确的数据缓冲区。

然后,我们使用宏为不同的设备类型定义值。提供的宏值与广泛使用的 dlpack DLDeviceType 定义值兼容,每个值都与 dlpack.h 中等效的 kDL<type> 枚举值相同。该列表将随着时间的推移与这些等效的枚举值保持同步,以确保兼容性,而不是潜在地发生分歧。为了避免 Arrow 项目必须审查新硬件设备,应首先将新添加内容添加到 dlpack,然后再在此处添加相应的宏。

为了确保 ABI 的可预测性,我们使用宏而不是 enum,因此存储类型不依赖于编译器。

ARROW_DEVICE_CPU#

CPU 设备,等效于直接使用 ArrowArray 而不是使用 ArrowDeviceArray

ARROW_DEVICE_CUDA#

CUDA GPU 设备。这可以表示使用运行时库(cudaMalloc)或设备驱动程序(cuMemAlloc)分配的数据。

ARROW_DEVICE_CUDA_HOST#

通过使用 cudaMallocHostcuMemAllocHost 由 CUDA 锁定和页面锁定的 CPU 内存。

ARROW_DEVICE_OPENCL#

使用 OpenCL(开放计算语言) 框架在设备上分配的数据。

ARROW_DEVICE_VULKAN#

Vulkan 框架和库分配的数据。

ARROW_DEVICE_METAL#

使用 Metal 框架和库在 Apple GPU 设备上的数据。

ARROW_DEVICE_VPI#

指示使用 Verilog 模拟器缓冲区。

ARROW_DEVICE_ROCM#

使用 ROCm 堆栈的 AMD 设备。

ARROW_DEVICE_ROCM_HOST#

通过使用 hipMallocHost 由 ROCm 锁定和页面锁定的 CPU 内存。

ARROW_DEVICE_EXT_DEV#

此值是设备扩展的应急出口,这些设备当前没有其他表示方式。如果使用此设备类型,生产者需要提供特定于设备的附加信息/上下文。这用于快速测试扩展设备,语义可能因实现而异。

ARROW_DEVICE_CUDA_MANAGED#

cudaMallocManaged 分配的 CUDA 托管/统一内存。

ARROW_DEVICE_ONEAPI#

在 Intel oneAPI 非分区设备上分配的统一共享内存。需要调用 oneAPI 运行时来确定具体的设备类型、USM 分配类型及其绑定的 sycl 上下文。

ARROW_DEVICE_WEBGPU#

下一代 WebGPU 标准的 GPU 支持

ARROW_DEVICE_HEXAGON#

在 Qualcomm Hexagon DSP 设备上分配的数据。

ArrowDeviceArray 结构#

ArrowDeviceArray 结构嵌入了 C 数据 ArrowArray 结构,并添加了消费者使用数据所需的附加信息。它具有以下字段

struct ArrowArray ArrowDeviceArray.array#

*必填。* 已分配的数组数据。void** 缓冲区(以及任何子缓冲区)中的值是在设备上分配的值。缓冲区值应该是设备指针。结构的其余部分应该可以被 CPU 访问。

此结构的 private_datarelease 回调应该包含根据分配它的设备释放数组所需的任何必要信息和结构,而不是在此处具有单独的 release 回调和 private_data 指针。

int64_t ArrowDeviceArray.device_id#

必需。 如果系统上有多个此类型的设备,则使用设备 ID 来标识特定设备。ID 的语义将取决于硬件,但我们使用 int64_t 来确保 ID 随着时间的推移设备发生变化也能适用。

对于没有设备标识符固有概念的设备类型(例如,ARROW_DEVICE_CPU),建议使用 -1 作为 device_id 的约定。

ArrowDeviceType ArrowDeviceArray.device_type#

必需。 可以访问数组中缓冲区的设备类型。

void *ArrowDeviceArray.sync_event#

可选。 如果需要,用于同步的类事件对象。

许多设备(如 GPU)主要与 CPU 处理异步。因此,为了安全地访问设备内存,通常需要一个对象来与处理同步。由于不同的设备将使用不同的类型来指定这一点,我们使用 void*,它可以被强制转换为指向任何设备适当类型的指针。

如果不需要同步,则可以为空。如果它不为空,则在尝试访问缓冲区中的内存之前,必须使用它来调用设备的适当同步方法(例如 cudaStreamWaitEventhipStreamWaitEvent)。

如果提供了事件,则生产者必须确保在触发事件之前设备上可用导出的数据。使用者应该在尝试访问导出的数据之前等待事件。

另请参阅

下面的同步事件类型部分。

int64_t ArrowDeviceArray.reserved[3]#

随着非 CPU 开发的扩展,可能需要扩展此结构。为了在不破坏 ABI 更改的情况下进行扩展,我们在对象的末尾保留了 24 个字节。这些字节必须在生产者初始化后清零,以确保将来 ABI 的安全演进。

同步事件类型#

下表列出了每种设备类型的预期事件类型。如果不支持事件类型(“N/A”),则 sync_event 成员应始终为空。

请记住,如果访问数据不需要同步,则事件*可以*为空。

设备类型

实际事件类型

备注

ARROW_DEVICE_CPU

N/A

ARROW_DEVICE_CUDA

cudaEvent_t*

ARROW_DEVICE_CUDA_HOST

cudaEvent_t*

ARROW_DEVICE_OPENCL

cl_event*

ARROW_DEVICE_VULKAN

VkEvent*

ARROW_DEVICE_METAL

MTLEvent*

ARROW_DEVICE_VPI

N/A

ARROW_DEVICE_ROCM

hipEvent_t*

ARROW_DEVICE_ROCM_HOST

hipEvent_t*

ARROW_DEVICE_EXT_DEV

ARROW_DEVICE_CUDA_MANAGED

cudaEvent_t*

ARROW_DEVICE_ONEAPI

sycl::event*

ARROW_DEVICE_WEBGPU

N/A

ARROW_DEVICE_HEXAGON

N/A

备注

  • (1) 目前未知框架是否有支持的事件类型。

  • (2) 扩展设备具有生产者定义的语义,因此如果需要同步扩展设备,生产者应记录该类型。

语义#

内存管理#

首先也是最重要的:在这个接口中,*只有*数据缓冲区本身驻留在设备内存中(即 ArrowArray 结构的 buffers 成员)。其他所有内容都应位于 CPU 内存中。

ArrowDeviceArray 结构包含一个 ArrowArray 对象,该对象本身具有特定语义用于释放内存。下面的术语*“基础结构”*指的是在生产者和消费者之间直接传递的 ArrowDeviceArray 对象,而不是其任何子结构。

基础结构旨在由*消费者*在堆栈或堆上分配。在这种情况下,生产者 API 应接受指向消费者分配结构的指针。

但是,结构指向的任何数据都必须由生产者分配和维护。这包括 sync_event 成员(如果不为空),以及 ArrowArray 对象中的任何指针(照常)。数据生命周期通过 ArrowArray 成员的 release 回调进行管理。

对于 ArrowDeviceArray,已释放结构的语义和回调语义与ArrowArray 本身的语义相同。除了任何已分配的事件之外,释放设备数据缓冲区所需的任何特定于生产者的上下文信息都应存储在 ArrowArrayprivate_data 成员中,并由 release 回调进行管理。

移动数组#

消费者可以通过按位复制或浅成员复制来*移动* ArrowDeviceArray 结构。然后,它必须通过将嵌入式 ArrowArray 结构的 release 成员设置为 NULL 来将源结构标记为已释放,但*不*调用该释放回调。这可确保在任何给定时间只有一个活动的结构副本,并且生命周期信息能正确地传达给生产者。

照常,当不再需要目标结构时,将调用其上的释放回调。

记录批次#

与 C 数据接口本身一样,记录批次可以简单地视为等效的结构数组。在这种情况下,顶级 ArrowSchema 的元数据可以用作记录批次的架构级元数据。

可变性#

生产者和消费者都应将导出的数据(即通过嵌入式 ArrowArraybuffers 成员在设备上可访问的数据)视为不可变的,因为否则任何一方都可能在另一方对其进行突变时看到不一致的数据。

同步#

如果 sync_event 成员不为 NULL,则消费者在同步该事件之前不应尝试访问或读取数据。如果 sync_event 成员为 NULL,则必须能够安全地访问数据,而消费者无需进行任何同步。

C 生产者示例#

导出简单的 int32 设备数组#

导出具有空元数据的非空 int32 类型。可以在C 数据接口文档中直接看到一个示例。

要导出数据本身,我们通过释放回调将所有权转移给消费者。此示例将使用 CUDA,但可以使用等效的调用来处理任何设备

static void release_int32_device_array(struct ArrowArray* array) {
    assert(array->n_buffers == 2);
    // destroy the event
    cudaEvent_t* ev_ptr = (cudaEvent_t*)(array->private_data);
    cudaError_t status = cudaEventDestroy(*ev_ptr);
    assert(status == cudaSuccess);
    free(ev_ptr);

    // free the buffers and the buffers array
    status = cudaFree(array->buffers[1]);
    assert(status == cudaSuccess);
    free(array->buffers);

    // mark released
    array->release = NULL;
}

void export_int32_device_array(void* cudaAllocedPtr,
                               cudaStream_t stream,
                               int64_t length,
                               struct ArrowDeviceArray* array) {
    // get device id
    int device;
    cudaError_t status;
    status = cudaGetDevice(&device);
    assert(status == cudaSuccess);

    cudaEvent_t* ev_ptr = (cudaEvent_t*)malloc(sizeof(cudaEvent_t));
    assert(ev_ptr != NULL);
    status = cudaEventCreate(ev_ptr);
    assert(status == cudaSuccess);

    // record event on the stream, assuming that the passed in
    // stream is where the work to produce the data will be processing.
    status = cudaEventRecord(*ev_ptr, stream);
    assert(status == cudaSuccess);

    memset(array, 0, sizeof(struct ArrowDeviceArray));
    // initialize fields
    *array = (struct ArrowDeviceArray) {
        .array = (struct ArrowArray) {
            .length = length,
            .null_count = 0,
            .offset = 0,
            .n_buffers = 2,
            .n_children = 0,
            .children = NULL,
            .dictionary = NULL,
            // bookkeeping
            .release = &release_int32_device_array,
            // store the event pointer as private data in the array
            // so that we can access it in the release callback.
            .private_data = (void*)(ev_ptr),
        },
        .device_id = (int64_t)(device),
        .device_type = ARROW_DEVICE_CUDA,
        // pass the event pointer to the consumer
        .sync_event = (void*)(ev_ptr),
    };

    // allocate list of buffers
    array->array.buffers = (const void**)malloc(sizeof(void*) * array->array.n_buffers);
    assert(array->array.buffers != NULL);
    array->array.buffers[0] = NULL;
    array->array.buffers[1] = cudaAllocedPtr;
}

// calling the release callback should be done using the array member
// of the device array.
static void release_device_array_helper(struct ArrowDeviceArray* arr) {
    arr->array.release(&arr->array);
}

设备流接口#

C 流接口一样,C 设备数据接口也指定了一个更高级别的结构,用于简化单个进程内流数据的通信。

语义#

Arrow C 设备流公开数据块的流式源,每个数据块具有相同的架构。通过调用阻塞拉式迭代函数获取块。预计所有块都应在同一设备类型(但不一定是同一设备 ID)上提供数据。如果需要在多个设备类型上提供数据流,则生产者应为每种设备类型提供单独的流对象。

结构定义#

C 设备流接口由单个 struct 定义定义

#ifndef ARROW_C_DEVICE_STREAM_INTERFACE
#define ARROW_C_DEVICE_STREAM_INTERFACE

struct ArrowDeviceArrayStream {
    // device type that all arrays will be accessible from
    ArrowDeviceType device_type;
    // callbacks
    int (*get_schema)(struct ArrowDeviceArrayStream*, struct ArrowSchema*);
    int (*get_next)(struct ArrowDeviceArrayStream*, struct ArrowDeviceArray*);
    const char* (*get_last_error)(struct ArrowDeviceArrayStream*);

    // release callback
    void (*release)(struct ArrowDeviceArrayStream*);

    // opaque producer-specific data
    void* private_data;
};

#endif  // ARROW_C_DEVICE_STREAM_INTERFACE

注意

规范保护 ARROW_C_DEVICE_STREAM_INTERFACE 旨在避免重复定义,如果两个项目将 C 设备流接口定义复制到它们自己的头文件中,而第三方项目包含来自这两个项目的内容。因此,复制这些定义时,必须严格按照原样保留此保护。

ArrowDeviceArrayStream 结构#

ArrowDeviceArrayStream 提供了一种可以访问结果数据的设备类型,以及与 Arrow 数组的流式源进行交互所需的回调。它具有以下字段

ArrowDeviceType device_type#

必需。 此流生成数据的设备类型。此流生成的所有 ArrowDeviceArray 都应具有与此处设置的相同的设备类型。这为消费者提供了便利,使其不必检查检索到的每个数组,而是允许对流进行更高级别的编码构造。

int (*ArrowDeviceArrayStream.get_schema)(struct ArrowDeviceArrayStream*, struct ArrowSchema *out)#

必需。 此回调允许消费者查询流中数据块的模式。所有数据块的模式都相同。

此回调不得在已释放的 ArrowDeviceArrayStream 上调用。

返回值: 成功时返回 0,否则返回非零 错误代码

int (*ArrowDeviceArrayStream.get_next)(struct ArrowDeviceArrayStream*, struct ArrowDeviceArray *out)#

必需。 此回调允许消费者获取流中的下一个数据块。

此回调不得在已释放的 ArrowDeviceArrayStream 上调用。

下一个数据块必须可从与 ArrowDeviceArrayStream.device_type 匹配的设备类型访问。

返回值: 成功时返回 0,否则返回非零 错误代码

成功后,消费者必须检查 ArrowDeviceArray 的嵌入式 ArrowArray 是否标记为 已释放。如果嵌入式 ArrowDeviceArray.array 已释放,则已到达流的末尾。否则,ArrowDeviceArray 包含有效的数据块。

const char *(*ArrowDeviceArrayStream.get_last_error)(struct ArrowDeviceArrayStream*)#

必需。 此回调允许消费者获取最后一个错误的文本描述。

仅当 ArrowDeviceArrayStream 上的最后一个操作返回错误时,才必须调用此回调。不得在已释放的 ArrowDeviceArrayStream 上调用它。

返回值: 指向以 NULL 结尾的字符串(UTF8 编码)的指针。如果没有可用的详细描述,也可以返回 NULL。

返回的指针仅保证在下一次调用流的某个回调之前有效。如果要使其存活更长时间,则应将其指向的字符串复制到消费者管理的存储中。

void (*ArrowDeviceArrayStream.release)(struct ArrowDeviceArrayStream*)#

必需。 指向生产者提供的释放回调的指针。

void *ArrowDeviceArrayStream.private_data#

*可选。* 指向生产者提供的私有数据的不透明指针。

消费者不得处理此成员。此成员的生命周期由生产者处理,尤其由释放回调处理。

结果生命周期#

get_schemaget_next 回调返回的数据必须单独释放。它们的生命周期与 ArrowDeviceArrayStream 的生命周期无关。

流生命周期#

C 流的生命周期使用释放回调进行管理,其用法与 C 数据接口 中的类似。

线程安全#

不假定流源是线程安全的。希望从多个线程调用 get_next 的消费者应确保这些调用已序列化。

异步设备流接口#

警告

实验性:异步 C 设备流接口目前处于实验阶段。根据反馈和使用情况,协议定义可能会更改,直到完全标准化为止。

C 流接口 提供了一个同步 API,其核心是消费者调用生产者函数来检索下一个记录批次。为了在生产者和消费者之间进行并发通信,可以使用 ArrowAsyncDeviceStreamHandler。此接口不带倾向性,可以适应不同的并发通信模型。

语义#

异步接口不是由生产者提供回调结构供消费者调用和检索记录,而是一个由消费者分配和填充的结构。消费者分配的结构提供了处理程序回调,供生产者在模式和数据块可用时调用。

除了 ArrowAsyncDeviceStreamHandler 之外,还有两个额外的结构用于完整的数据流:ArrowAsyncTaskArrowAsyncProducer

结构定义#

C 设备异步流接口包含三个 struct 定义

#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
#define ARROW_C_ASYNC_STREAM_INTERFACE

struct ArrowAsyncTask {
  int (*extract_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* out);

  void* private_data;
};

struct ArrowAsyncProducer {
  ArrowDeviceType device_type;

  void (*request)(struct ArrowAsyncProducer* self, int64_t n);
  void (*cancel)(struct ArrowAsyncProducer* self);

  void (*release)(struct ArrowAsyncProducer* self);
  const char* additional_metadata;
  void* private_data;
};

struct ArrowAsyncDeviceStreamHandler {
  // consumer-specific handlers
  int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
                   struct ArrowSchema* stream_schema);
  int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
                      struct ArrowAsyncTask* task, const char* metadata);
  void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self,
                   int code, const char* message, const char* metadata);

  // release callback
  void (*release)(struct ArrowAsyncDeviceStreamHandler* self);

  // must be populated before calling any callbacks
  struct ArrowAsyncProducer* producer;

  // opaque handler-specific data
  void* private_data;
};

#endif  // ARROW_C_ASYNC_STREAM_INTERFACE

注意

规范的保护 ARROW_C_ASYNC_STREAM_INTERFACE 旨在避免重复定义,如果两个项目将 C 异步流接口定义复制到它们自己的头文件中,而第三方项目包含来自这两个项目的定义。因此,复制这些定义时,必须保持此保护的原样。

ArrowAsyncDeviceStreamHandler 结构#

该结构具有以下字段

int (*ArrowAsyncDeviceStreamHandler.on_schema)(struct ArrowAsyncDeviceStreamHandler*, struct ArrowSchema*)#

必需。 用于接收流模式的处理程序。所有传入记录都应与提供的模式匹配。如果成功,该函数应返回 0,否则应返回与 errno 兼容的错误代码。

如果生产者想要提供任何额外的上下文信息,可以将 ArrowAsyncProducer.additional_metadata 设置为非 NULL 值。这使用与 ArrowSchema.metadata 相同的格式进行编码。此元数据(如果不是 NULL)的生命周期应与 ArrowAsyncProducer 对象的生命周期绑定。

除非调用了 on_error 处理程序,否则此函数将始终被调用一次,并且是对此对象调用的第一个方法。因此,生产者*必须*在调用此函数之前填充 ArrowAsyncProducer 成员,以允许消费者应用背压并控制数据流。生产者维护 ArrowAsyncProducer 的所有权,并且必须在对 ArrowAsyncDeviceStreamHandler 调用 release 回调之后清理它。

在此处收到非零结果的生产者不得随后调用此对象上的任何内容,但 release 回调除外。

int (*ArrowAsyncDeviceStreamHandler.on_next_task)(struct ArrowAsyncDeviceStreamHandler*, struct ArrowAsyncTask*, const char*)#

必需。 当有新的记录可供处理时要调用的处理程序。每条记录的模式应与调用 on_schema 时使用的模式相同。如果处理成功,该函数应返回 0,否则应返回与 errno 兼容的错误代码。

它接收的是 ArrowAsyncTask 而不是记录本身,以便在接收数据方面更好地促进以消费者为中心的线程控制。调用此函数只是表明数据可通过提供的任务获得。

生产者通过为 ArrowAsyncTask 指针传递 NULL 而不是有效地址来指示流的结束。此任务对象仅在此函数调用期间有效。如果消费者希望在此方法范围之外使用该任务,则必须将其内容复制或移动到新的 ArrowAsyncTask 对象。

const char* 参数的存在是为了让生产者提供他们想要的任何额外的上下文信息。这以与 ArrowSchema.metadata 相同的格式进行编码。如果不是 NULL,则生命周期仅为此函数调用的范围。希望在此调用生命周期之外维护附加元数据的消费者 *必须* 自己复制该值。

生产者*不得*从多个线程并发调用此函数。

必须调用 ArrowAsyncProducer.request 回调函数才能开始接收对此处理程序的调用。

void (*ArrowAsyncDeviceStreamHandler.on_error)(struct ArrowAsyncDeviceStreamHandler, int, const char*, const char*)#

必需。 当生产者遇到错误时要调用的处理程序。调用此函数后,将调用 release 回调函数作为对此结构的最后一次调用。参数是一个与 errno 兼容的错误代码和一个可选的错误消息和元数据。

如果消息和元数据不是 NULL,则它们的生命周期仅在此调用期间有效。希望在该函数返回后保留这些值的消费者*必须*自己复制这些值。

如果元数据参数不是 NULL,要提供键值错误元数据,则其编码方式应与 ArrowSchema.metadata 中元数据的编码方式相同。

生产者可以在调用或不调用 ArrowAsyncProducer.request 的情况下调用此函数,这是有效的。此回调函数*不得*调用 ArrowAsyncProducer 对象的任何方法。

void (*ArrowAsyncDeviceStreamHandler.release)(struct ArrowAsyncDeviceStreamHandler*)#

必需。 指向消费者提供的处理程序释放回调函数的指针。

生产者可以在调用或不调用 ArrowAsyncProducer.request 的情况下调用此函数,这是有效的。这不得调用 ArrowAsyncProducer 对象的任何方法。

struct ArrowAsyncProducer ArrowAsyncDeviceStreamHandler.producer#

必需。 消费者将用来请求更多数据或取消的生产者对象。

生产者*必须*在调用 ArrowAsyncDeviceStreamHandler.on_schema 回调函数之前填充此对象。生产者维护此对象的所有权,并且必须在调用 ArrowAsyncDeviceStreamHandler 上的 release 回调函数*之后*清理它。

消费者*不能*假设在调用 on_schema 回调函数之前这是有效的。

void *ArrowAsyncDeviceStreamHandler.private_data#

*可选。* 指向消费者提供的私有数据的不透明指针。

生产者*不得*处理此成员。此成员的生命周期由消费者处理,尤其是由 release 回调函数处理。

ArrowAsyncTask 结构#

使用 Task 对象而不是将数组直接传递给 on_next 回调函数的目的是允许更复杂和高效的线程处理。利用 Task 对象允许生产者将“解码”逻辑与 I/O 分离,使消费者避免在 CPU 内核之间传输数据(例如,从一个 L1/L2 缓存到另一个)。

此生产者提供的结构具有以下字段

int (*ArrowArrayTask.extract_data)(struct ArrowArrayTask*, struct ArrowDeviceArray*)#

必需。 使用可用数据填充提供的 ArrowDeviceArray 的回调函数。生产者提供的 ArrowAsyncTasks 的顺序使消费者能够知道要处理的数据的顺序。如果消费者不关心此任务拥有的数据,则仍然必须调用 extract_data,以便生产者可以执行任何所需的清理。应将 NULL 作为设备数组指针传递,以指示消费者不需要实际数据,从而让任务执行必要的清理。

如果从此返回非零值,则仅应在生产者调用 ArrowAsyncDeviceStreamHandleron_error 回调函数之后进行。由于调用此方法可能与当前控制流分开,因此返回非零值以指示发生的错误允许当前线程决定相应地处理这种情况,同时仍然允许所有错误记录和处理集中在 ArrowAsyncDeviceStreamHandler.on_error 回调函数中。

不必使用单独的 release 回调函数,任何所需的清理都应作为调用此回调函数的一部分来执行。数组的所有权赋予作为参数传入的指针,并且必须单独释放此数组。

只允许调用此方法一次

void *ArrowArrayTask.private_data#

*可选。* 指向生产者提供的私有数据的不透明指针。

消费者*不得*处理此成员。此成员的生命周期由创建此对象的生产者处理,并且如果需要,应在调用 ArrowArrayTask.extract_data 期间清理。

ArrowAsyncProducer 结构#

此生产者提供和管理的对象具有以下字段

ArrowDeviceType ArrowAsyncProducer.device_type#

必需。 此生产者将提供数据的设备类型。此生产者生成的所有 ArrowDeviceArray 结构都应具有与此处设置的设备类型相同的设备类型。

void (*ArrowAsyncProducer.request)(struct ArrowAsyncProducer*, uint64_t)#

强制。使用者必须调用此函数才能开始接收对 ArrowAsyncDeviceStreamHandler.on_next_task 的调用。从 ArrowAsyncDeviceStreamHandler.on_next_taskArrowAsyncDeviceStreamHandler.on_schema 内部同步调用此函数*必须*有效。因此,此函数*不得*同步调用 on_next_taskon_error,以避免递归和重入回调。

调用 cancel 后,对此函数的额外调用必须是空操作,但允许。

在未取消的情况下,调用此函数会注册生产者要生成的额外数组/批次的给定数量。生产者应仅调用相应的 on_next_task 回调,最多不超过此方法调用总数,然后再传播反压/等待。

调用 request 遇到的任何错误都必须通过调用 ArrowAsyncDeviceStreamHandleron_error 回调来传播。

使用 <= 0n 值调用此函数无效。如果收到 n 的此类值,生产者应出错(例如,调用 on_error)。

void (*ArrowAsyncProducer.cancel)(struct ArrowAsyncProducer*)#

强制。此函数向生产者发出信号,指示它必须*最终*停止调用 on_next_task。对 cancel 的调用必须是幂等的且线程安全的。调用一次后,后续调用*必须*是空操作。这*不得*调用除 on_error 之外的任何使用者端处理程序。

调用 cancel 不需要*立即*影响生产者,只需要它最终停止调用 on_next_task,然后随后在异步处理程序对象上调用 release。因此,如果仍有请求的数组挂起,则使用者*必须*准备好接收对 on_next_taskon_error 的一次或多次调用,即使在调用 cancel 之后也是如此。

成功取消*不得*导致生产者调用 ArrowAsyncDeviceStreamHandler.on_error,而应完成任何剩余任务(相应地调用 on_next_task),并最终只调用 release

处理对 cancel 的调用期间遇到的任何错误都必须通过异步流处理程序上的 on_error 回调进行报告。

const char *ArrowAsyncProducer.additional_metadata#

*可选。* 提供任何额外上下文给使用者的附加元数据字符串。这*必须*是 NULL 或以与 ArrowSchema.metadata 相同的方式编码的有效字符串。例如,如果知道流中的总行数和/或批次数,生产者可以利用此元数据来提供这些信息。

如果不是 NULL,它*必须*至少在此对象的生命周期内有效。

void *ArrowAsyncProducer.private_data#

可选。 指向生产者提供的特定数据的不透明指针。

使用者*不得*处理此成员,其生命周期由构造此对象的生产者拥有。

错误处理#

与常规 C 流接口不同,异步接口允许错误在两个方向流动。因此,错误处理可能会稍微复杂一些。因此,本规范指定以下规则

  • 如果生产者在处理过程中遇到错误,它应该调用 on_error 回调,然后在它返回后调用 release

  • 如果 on_schemaon_next_task 返回非零整数值,则生产者*不应*调用 on_error 回调,而应最终在错误代码的任何记录或处理之前或之后调用 release

结果生命周期#

传递给 on_schema 回调的 ArrowSchema 必须单独释放,对象本身需要移动到使用者拥有的 ArrowSchema 对象。作为回调参数传递的 ArrowSchema* *不得*存储和保留。

提供给 on_next_taskArrowAsyncTask 对象由生产者拥有,并将在调用其上的 extract_data 期间清理。如果使用者不关心数据,则应传递 NULL 而不是有效的 ArrowDeviceArray*

传递给 on_errorconst char* 错误 messagemetadata 仅在 on_error 函数本身的范围内有效。如果需要它们在返回后存在,则必须复制它们。

流处理程序生命周期#

异步流处理程序的生命周期使用 release 回调进行管理,其用法与 C 数据接口 中的类似。

ArrowAsyncProducer 生命周期#

ArrowAsyncProducer 的生命周期由生产者 selbst 拥有,并应由其管理。在调用除 release 之外的任何方法之前,它*必须*填充,并且*必须*至少在流处理程序对象上调用 release 之前保持有效。

线程安全#

ArrowAsyncDeviceStreamHandler 上的所有处理程序函数都应以序列化方式调用,但不保证每次都从同一线程调用。生产者应等待处理程序回调返回,然后再调用下一个处理程序回调,以及在调用 release 回调之前。

反压由使用者通过调用 ArrowAsyncProducer.request 来管理,以指示它准备接收多少个数组。

ArrowAsyncDeviceStreamHandler 对象应该能够在传递给生产者后立即处理回调,任何初始化都应该在提供之前执行。

可能的序列图#

sequenceDiagram Consumer->>+Producer: ArrowAsyncDeviceStreamHandler* Producer-->>+Consumer: on_schema(ArrowAsyncProducer*, ArrowSchema*) Consumer->>Producer: ArrowAsyncProducer->request(n) par loop up to n times Producer-->>Consumer: on_next_task(ArrowAsyncTask*) end and for each task Consumer-->>Producer: ArrowAsyncTask.extract_data(...) Consumer-->>Producer: ArrowAsyncProducer->request(1) end break Optionally Consumer->>-Producer: ArrowAsyncProducer->cancel() end loop possible remaining Producer-->>Consumer: on_next_task(ArrowAsyncTask*) end Producer->>-Consumer: ArrowAsyncDeviceStreamHandler->release()

与其他交换格式的互操作性#

其他交换 API,例如 CUDA 数组接口,包含用于传递所导出数据缓冲区的形状和数据类型的成员。此信息对于解释正在共享的设备数据缓冲区中的原始字节是必需的。用户应该利用现有的 ArrowSchema 结构来传递任何数据类型和形状信息,而不是将数据的形状/类型与 ArrowDeviceArray 一起存储。

更新此规范#

注意

由于此规范仍被视为实验性的,因此它有可能(可能性仍然很小)略有变化。将其标记为“实验性”的原因是因为我们不知道我们不知道的事情。我们进行了工作和研究,以确保与许多不同框架兼容的通用 ABI,但始终有可能遗漏某些内容。一旦在官方 Arrow 版本中支持此功能并观察到使用情况以确认不需要任何修改,则将删除“实验性”标签并冻结 ABI。

一旦此规范在官方 Arrow 版本中得到支持,C ABI 将被冻结。这意味着 ArrowDeviceArray 结构定义不应以任何方式更改——包括添加新成员。

允许向后兼容的更改,例如 ArrowDeviceType 的新宏值,或将保留的 24 个字节转换为不同的类型/成员,而不更改结构的大小。

任何不兼容的更改都应作为新规范的一部分,例如 ArrowDeviceArrayV2