Arrow C 流接口#

C 流接口构建于C 数据接口定义的结构之上,并将它们组合成一个更高级别的规范,从而简化了单一进程内流式数据的通信。

语义#

Arrow C 流公开了一个数据块的流式源,每个块都具有相同的模式。数据块可以通过调用阻塞拉取式迭代函数获取。

结构定义#

C 流接口由单个struct定义定义

#ifndef ARROW_C_STREAM_INTERFACE
#define ARROW_C_STREAM_INTERFACE

struct ArrowArrayStream {
  // Callbacks providing stream functionality
  int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
  int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
  const char* (*get_last_error)(struct ArrowArrayStream*);

  // Release callback
  void (*release)(struct ArrowArrayStream*);

  // Opaque producer-specific data
  void* private_data;
};

#endif  // ARROW_C_STREAM_INTERFACE

注意

规范保护ARROW_C_STREAM_INTERFACE旨在避免如果两个项目在它们自己的头文件中复制 C 数据接口定义,而第三方项目从这两个项目中包含,则出现重复定义。因此,在复制这些定义时,保持此保护完全不变非常重要。

ArrowArrayStream 结构#

ArrowArrayStream提供了与 Arrow 数组的流式源进行交互所需的回调。它具有以下字段

int (*ArrowArrayStream.get_schema)(struct ArrowArrayStream*, struct ArrowSchema *out)#

必需。此回调允许使用者查询流中数据块的模式。所有数据块的模式都相同。

此回调不能在已释放的ArrowArrayStream上调用。

返回值:成功时为 0,否则为非零错误代码

int (*ArrowArrayStream.get_next)(struct ArrowArrayStream*, struct ArrowArray *out)#

必需。此回调允许使用者获取流中的下一个数据块。

此回调不能在已释放的ArrowArrayStream上调用。

返回值:成功时为 0,否则为非零错误代码

成功后,使用者必须检查ArrowArray是否被标记为已释放。如果ArrowArray已释放,则表示已到达流的末尾。否则,ArrowArray包含一个有效的数据块。

const char *(*ArrowArrayStream.get_last_error)(struct ArrowArrayStream*)#

必需。此回调允许使用者获取上次错误的文本描述。

此回调仅应在上次对ArrowArrayStream的操作返回错误时调用。它不能在已释放的ArrowArrayStream上调用。

返回值:指向 NULL 终止的字符字符串(UTF8 编码)的指针。如果不可用详细描述,也可以返回 NULL。

返回的指针仅在对流的回调进行下一次调用之前有效。如果要让它存活更长时间,则应将它指向的字符字符串复制到使用者管理的存储中。

void (*ArrowArrayStream.release)(struct ArrowArrayStream*)#

必需。指向生产者提供的释放回调的指针。

void *ArrowArrayStream.private_data#

可选。指向生产者提供的私有数据的非透明指针。

使用者不得处理此成员。此成员的生命周期由生产者处理,尤其是由释放回调处理。

错误代码#

get_schemaget_next回调可能以非零整型代码的形式返回错误。此类错误代码应像errno数字(由本地平台定义)一样解释。请注意,这些常量的符号形式在平台之间是稳定的,但它们的数值是特定于平台的。

特别是,建议识别以下值

  • EINVAL:用于参数或输入验证错误

  • ENOMEM:用于内存分配失败(内存不足)

  • EIO:用于一般输入/输出错误

结果生命周期#

get_schemaget_next回调返回的数据必须独立释放。它们的生命周期与ArrowArrayStream的生命周期无关。

流生命周期#

C 流的生命周期使用释放回调进行管理,其使用方式类似于C 数据接口中的释放回调。

线程安全性#

流源假定不是线程安全的。想要从多个线程调用get_next的使用者应确保这些调用被序列化。

C 使用者示例#

假设特定数据库提供以下 C API 来执行 SQL 查询并将结果集作为 Arrow C 流返回

void MyDB_Query(const char* query, struct ArrowArrayStream* result_set);

然后,使用者可以使用以下代码迭代结果

static void handle_error(int errcode, struct ArrowArrayStream* stream) {
   // Print stream error
   const char* errdesc = stream->get_last_error(stream);
   if (errdesc != NULL) {
      fputs(errdesc, stderr);
   } else {
      fputs(strerror(errcode), stderr);
   }
   // Release stream and abort
   stream->release(stream),
   exit(1);
}

void run_query() {
   struct ArrowArrayStream stream;
   struct ArrowSchema schema;
   struct ArrowArray chunk;
   int errcode;

   MyDB_Query("SELECT * FROM my_table", &stream);

   // Query result set schema
   errcode = stream.get_schema(&stream, &schema);
   if (errcode != 0) {
      handle_error(errcode, &stream);
   }

   int64_t num_rows = 0;

   // Iterate over results: loop until error or end of stream
   while ((errcode = stream.get_next(&stream, &chunk) == 0) &&
          chunk.release != NULL) {
      // Do something with chunk...
      fprintf(stderr, "Result chunk: got %lld rows\n", chunk.length);
      num_rows += chunk.length;

      // Release chunk
      chunk.release(&chunk);
   }

   // Was it an error?
   if (errcode != 0) {
      handle_error(errcode, &stream);
   }

   fprintf(stderr, "Result stream ended: total %lld rows\n", num_rows);

   // Release schema and stream
   schema.release(&schema);
   stream.release(&stream);
}