Arrow C 流接口#
C 流接口构建于C 数据接口中定义的结构之上,并将它们组合成更高级别的规范,以便简化单个进程内流数据的通信。
语义#
Arrow C 流公开了一个数据块的流式源,每个数据块具有相同的模式。数据块是通过调用阻塞式拉取迭代函数获得的。
结构定义#
C 流接口由单个 struct
定义
#ifndef ARROW_C_STREAM_INTERFACE
#define ARROW_C_STREAM_INTERFACE
struct ArrowArrayStream {
// Callbacks providing stream functionality
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
const char* (*get_last_error)(struct ArrowArrayStream*);
// Release callback
void (*release)(struct ArrowArrayStream*);
// Opaque producer-specific data
void* private_data;
};
#endif // ARROW_C_STREAM_INTERFACE
注意
规范保护 ARROW_C_STREAM_INTERFACE
旨在避免重复定义,如果两个项目在其自己的头文件中复制 C 数据接口定义,而第三方项目包含来自这两个项目的定义。因此,复制这些定义时,必须完全按原样保留此保护。
ArrowArrayStream 结构#
ArrowArrayStream
提供了与 Arrow 数组流式源交互所需的回调。它具有以下字段
-
int (*ArrowArrayStream.get_schema)(struct ArrowArrayStream*, struct ArrowSchema *out)#
强制性。 此回调允许使用者查询流中数据块的模式。所有数据块的模式都相同。
不得在已释放的
ArrowArrayStream
上调用此回调。返回值: 成功时返回 0,否则返回非零 错误代码。
-
int (*ArrowArrayStream.get_next)(struct ArrowArrayStream*, struct ArrowArray *out)#
强制性。 此回调允许使用者获取流中的下一个数据块。
不得在已释放的
ArrowArrayStream
上调用此回调。返回值: 成功时返回 0,否则返回非零 错误代码。
成功后,使用者必须检查
ArrowArray
是否标记为已释放。如果ArrowArray
已释放,则已到达流的末尾。否则,ArrowArray
包含有效的数据块。
-
const char *(*ArrowArrayStream.get_last_error)(struct ArrowArrayStream*)#
强制性。 此回调允许使用者获取最后一个错误的文本描述。
仅当
ArrowArrayStream
上的最后一个操作返回错误时,才必须调用此回调。不得在已释放的ArrowArrayStream
上调用它。返回值: 指向以 NULL 结尾的字符串(UTF8 编码)的指针。如果没有可用的详细描述,也可以返回 NULL。
返回的指针仅保证在下次调用流的某个回调之前有效。如果要使其存活更长时间,则应将其指向的字符串复制到使用者管理的存储中。
-
void (*ArrowArrayStream.release)(struct ArrowArrayStream*)#
强制性。 指向生产者提供的释放回调的指针。
-
void *ArrowArrayStream.private_data#
*可选。* 指向生产者提供的私有数据的不透明指针。
使用者不得处理此成员。此成员的生命周期由生产者处理,尤其由释放回调处理。
错误代码#
get_schema
和 get_next
回调可能会以非零整数代码的形式返回错误。此类错误代码应像 errno
数字(由本地平台定义)一样解释。请注意,这些常量的符号形式在不同平台上是稳定的,但它们的数值是特定于平台的。
特别建议识别以下值
EINVAL
:参数或输入验证错误ENOMEM
:内存分配失败(内存不足)EIO
:通用输入/输出错误
结果生命周期#
get_schema
和 get_next
回调返回的数据必须单独释放。它们的生命周期与 ArrowArrayStream
的生命周期无关。
流生命周期#
C 流的生命周期使用释放回调进行管理,其用法与C 数据接口中的类似。
线程安全#
流源不假定是线程安全的。想要从多个线程调用 get_next
的使用者应确保这些调用是串行化的。
C 使用者示例#
假设某个数据库提供以下 C API 来执行 SQL 查询并将结果集作为 Arrow C 流返回
void MyDB_Query(const char* query, struct ArrowArrayStream* result_set);
然后,使用者可以使用以下代码迭代结果
static void handle_error(int errcode, struct ArrowArrayStream* stream) {
// Print stream error
const char* errdesc = stream->get_last_error(stream);
if (errdesc != NULL) {
fputs(errdesc, stderr);
} else {
fputs(strerror(errcode), stderr);
}
// Release stream and abort
stream->release(stream),
exit(1);
}
void run_query() {
struct ArrowArrayStream stream;
struct ArrowSchema schema;
struct ArrowArray chunk;
int errcode;
MyDB_Query("SELECT * FROM my_table", &stream);
// Query result set schema
errcode = stream.get_schema(&stream, &schema);
if (errcode != 0) {
handle_error(errcode, &stream);
}
int64_t num_rows = 0;
// Iterate over results: loop until error or end of stream
while ((errcode = stream.get_next(&stream, &chunk) == 0) &&
chunk.release != NULL) {
// Do something with chunk...
fprintf(stderr, "Result chunk: got %lld rows\n", chunk.length);
num_rows += chunk.length;
// Release chunk
chunk.release(&chunk);
}
// Was it an error?
if (errcode != 0) {
handle_error(errcode, &stream);
}
fprintf(stderr, "Result stream ended: total %lld rows\n", num_rows);
// Release schema and stream
schema.release(&schema);
stream.release(&stream);
}