Arrow C 流接口#
C 流接口构建于C 数据接口定义的结构之上,并将它们组合成一个更高级别的规范,从而简化了单一进程内流式数据的通信。
语义#
Arrow C 流公开了一个数据块的流式源,每个块都具有相同的模式。数据块可以通过调用阻塞拉取式迭代函数获取。
结构定义#
C 流接口由单个struct
定义定义
#ifndef ARROW_C_STREAM_INTERFACE
#define ARROW_C_STREAM_INTERFACE
struct ArrowArrayStream {
// Callbacks providing stream functionality
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
const char* (*get_last_error)(struct ArrowArrayStream*);
// Release callback
void (*release)(struct ArrowArrayStream*);
// Opaque producer-specific data
void* private_data;
};
#endif // ARROW_C_STREAM_INTERFACE
注意
规范保护ARROW_C_STREAM_INTERFACE
旨在避免如果两个项目在它们自己的头文件中复制 C 数据接口定义,而第三方项目从这两个项目中包含,则出现重复定义。因此,在复制这些定义时,保持此保护完全不变非常重要。
ArrowArrayStream 结构#
ArrowArrayStream
提供了与 Arrow 数组的流式源进行交互所需的回调。它具有以下字段
-
int (*ArrowArrayStream.get_schema)(struct ArrowArrayStream*, struct ArrowSchema *out)#
必需。此回调允许使用者查询流中数据块的模式。所有数据块的模式都相同。
此回调不能在已释放的
ArrowArrayStream
上调用。返回值:成功时为 0,否则为非零错误代码。
-
int (*ArrowArrayStream.get_next)(struct ArrowArrayStream*, struct ArrowArray *out)#
必需。此回调允许使用者获取流中的下一个数据块。
此回调不能在已释放的
ArrowArrayStream
上调用。返回值:成功时为 0,否则为非零错误代码。
成功后,使用者必须检查
ArrowArray
是否被标记为已释放。如果ArrowArray
已释放,则表示已到达流的末尾。否则,ArrowArray
包含一个有效的数据块。
-
const char *(*ArrowArrayStream.get_last_error)(struct ArrowArrayStream*)#
必需。此回调允许使用者获取上次错误的文本描述。
此回调仅应在上次对
ArrowArrayStream
的操作返回错误时调用。它不能在已释放的ArrowArrayStream
上调用。返回值:指向 NULL 终止的字符字符串(UTF8 编码)的指针。如果不可用详细描述,也可以返回 NULL。
返回的指针仅在对流的回调进行下一次调用之前有效。如果要让它存活更长时间,则应将它指向的字符字符串复制到使用者管理的存储中。
-
void (*ArrowArrayStream.release)(struct ArrowArrayStream*)#
必需。指向生产者提供的释放回调的指针。
-
void *ArrowArrayStream.private_data#
可选。指向生产者提供的私有数据的非透明指针。
使用者不得处理此成员。此成员的生命周期由生产者处理,尤其是由释放回调处理。
错误代码#
get_schema
和get_next
回调可能以非零整型代码的形式返回错误。此类错误代码应像errno
数字(由本地平台定义)一样解释。请注意,这些常量的符号形式在平台之间是稳定的,但它们的数值是特定于平台的。
特别是,建议识别以下值
EINVAL
:用于参数或输入验证错误ENOMEM
:用于内存分配失败(内存不足)EIO
:用于一般输入/输出错误
结果生命周期#
get_schema
和get_next
回调返回的数据必须独立释放。它们的生命周期与ArrowArrayStream
的生命周期无关。
流生命周期#
C 流的生命周期使用释放回调进行管理,其使用方式类似于C 数据接口中的释放回调。
线程安全性#
流源假定不是线程安全的。想要从多个线程调用get_next
的使用者应确保这些调用被序列化。
C 使用者示例#
假设特定数据库提供以下 C API 来执行 SQL 查询并将结果集作为 Arrow C 流返回
void MyDB_Query(const char* query, struct ArrowArrayStream* result_set);
然后,使用者可以使用以下代码迭代结果
static void handle_error(int errcode, struct ArrowArrayStream* stream) {
// Print stream error
const char* errdesc = stream->get_last_error(stream);
if (errdesc != NULL) {
fputs(errdesc, stderr);
} else {
fputs(strerror(errcode), stderr);
}
// Release stream and abort
stream->release(stream),
exit(1);
}
void run_query() {
struct ArrowArrayStream stream;
struct ArrowSchema schema;
struct ArrowArray chunk;
int errcode;
MyDB_Query("SELECT * FROM my_table", &stream);
// Query result set schema
errcode = stream.get_schema(&stream, &schema);
if (errcode != 0) {
handle_error(errcode, &stream);
}
int64_t num_rows = 0;
// Iterate over results: loop until error or end of stream
while ((errcode = stream.get_next(&stream, &chunk) == 0) &&
chunk.release != NULL) {
// Do something with chunk...
fprintf(stderr, "Result chunk: got %lld rows\n", chunk.length);
num_rows += chunk.length;
// Release chunk
chunk.release(&chunk);
}
// Was it an error?
if (errcode != 0) {
handle_error(errcode, &stream);
}
fprintf(stderr, "Result stream ended: total %lld rows\n", num_rows);
// Release schema and stream
schema.release(&schema);
stream.release(&stream);
}