Arrow C 流接口#
C 流接口构建于 C 数据接口 中定义的结构之上,并将它们组合成一个更高级别的规范,以便简化在单个进程内流式数据的通信。
语义#
Arrow C 流暴露了一个数据块的流式来源,每个数据块具有相同的 Schema。数据块通过调用一个阻塞的拉取式迭代函数来获取。
结构定义#
C 流接口由单个 struct
定义
#ifndef ARROW_C_STREAM_INTERFACE
#define ARROW_C_STREAM_INTERFACE
struct ArrowArrayStream {
// Callbacks providing stream functionality
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
const char* (*get_last_error)(struct ArrowArrayStream*);
// Release callback
void (*release)(struct ArrowArrayStream*);
// Opaque producer-specific data
void* private_data;
};
#endif // ARROW_C_STREAM_INTERFACE
注意
规范的宏定义保护 ARROW_C_STREAM_INTERFACE
旨在避免重复定义,例如当两个项目在自己的头文件中复制了 C 数据接口的定义,而第三方项目包含了这两个项目时。因此,在复制这些定义时,保持此宏定义保护完全不变非常重要。
ArrowArrayStream 结构#
ArrowArrayStream
提供了与 Arrow 数组流式源交互所需的 Callback 函数。它具有以下字段
-
int (*ArrowArrayStream.get_schema)(struct ArrowArrayStream*, struct ArrowSchema *out)#
强制。此 Callback 函数允许消费者查询流中数据块的 Schema。所有数据块的 Schema 相同。
此 Callback 函数不得在已释放的
ArrowArrayStream
上调用。返回值:成功时为 0,否则为非零的 错误码。
-
int (*ArrowArrayStream.get_next)(struct ArrowArrayStream*, struct ArrowArray *out)#
强制。此 Callback 函数允许消费者获取流中的下一个数据块。
此 Callback 函数不得在已释放的
ArrowArrayStream
上调用。返回值:成功时为 0,否则为非零的 错误码。
成功时,消费者必须检查
ArrowArray
是否被标记为 已释放。如果ArrowArray
已释放,则表示已到达流的末尾。否则,ArrowArray
包含一个有效的数据块。
-
const char *(*ArrowArrayStream.get_last_error)(struct ArrowArrayStream*)#
强制。此 Callback 函数允许消费者获取最后一次错误的文本描述。
此 Callback 函数只能在
ArrowArrayStream
上的最后一次操作返回错误时调用。不得在已释放的ArrowArrayStream
上调用。返回值:指向一个 NULL 结尾的字符串(UTF8 编码)的指针。如果没有详细描述,也可以返回 NULL。
返回的指针仅保证在下一次调用流的 Callback 函数之前有效。如果希望字符串存活更长时间,应将其复制到消费者管理的存储空间。
-
void (*ArrowArrayStream.release)(struct ArrowArrayStream*)#
强制。指向生产者提供的 release callback 函数的指针。
-
void *ArrowArrayStream.private_data#
可选。指向生产者提供的私有数据的不透明指针。
消费者绝不能处理此成员。此成员的生命周期由生产者管理,特别是通过 release callback 函数管理。
错误码#
get_schema
和 get_next
Callback 函数可能以非零整数码的形式返回错误。这些错误码应像 errno
数字(由本地平台定义)一样解释。请注意,这些常量的符号形式在不同平台之间是稳定的,但其数值是平台相关的。
特别建议识别以下值
EINVAL
: 参数或输入验证错误ENOMEM
: 内存分配失败(内存不足)EIO
: 通用输入/输出错误
结果生命周期#
get_schema
和 get_next
Callback 函数返回的数据必须独立释放。它们的生命周期与 ArrowArrayStream
的生命周期无关。
流生命周期#
C 流的生命周期使用 release callback 函数管理,其用法与 C 数据接口 中的类似。
线程安全#
不假设流源是线程安全的。希望从多个线程调用 get_next
的消费者应确保这些调用是串行化的。
C 消费者示例#
假设某个数据库提供了以下 C API 来执行 SQL 查询并将结果集作为 Arrow C 流返回
void MyDB_Query(const char* query, struct ArrowArrayStream* result_set);
然后消费者可以使用以下代码迭代结果
static void handle_error(int errcode, struct ArrowArrayStream* stream) {
// Print stream error
const char* errdesc = stream->get_last_error(stream);
if (errdesc != NULL) {
fputs(errdesc, stderr);
} else {
fputs(strerror(errcode), stderr);
}
// Release stream and abort
stream->release(stream),
exit(1);
}
void run_query() {
struct ArrowArrayStream stream;
struct ArrowSchema schema;
struct ArrowArray chunk;
int errcode;
MyDB_Query("SELECT * FROM my_table", &stream);
// Query result set schema
errcode = stream.get_schema(&stream, &schema);
if (errcode != 0) {
handle_error(errcode, &stream);
}
int64_t num_rows = 0;
// Iterate over results: loop until error or end of stream
while ((errcode = stream.get_next(&stream, &chunk) == 0) &&
chunk.release != NULL) {
// Do something with chunk...
fprintf(stderr, "Result chunk: got %lld rows\n", chunk.length);
num_rows += chunk.length;
// Release chunk
chunk.release(&chunk);
}
// Was it an error?
if (errcode != 0) {
handle_error(errcode, &stream);
}
fprintf(stderr, "Result stream ended: total %lld rows\n", num_rows);
// Release schema and stream
schema.release(&schema);
stream.release(&stream);
}