Arrow C 流接口#
C 流接口建立在C 数据接口中定义的结构之上,并将它们组合成更高级别的规范,以便简化单个进程中流数据的通信。
语义#
Arrow C 流暴露了一个具有相同模式的数据块流源。通过调用阻塞式拉取式迭代函数来获取数据块。
结构定义#
C 流接口由单个 struct 定义
#ifndef ARROW_C_STREAM_INTERFACE
#define ARROW_C_STREAM_INTERFACE
struct ArrowArrayStream {
// Callbacks providing stream functionality
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
const char* (*get_last_error)(struct ArrowArrayStream*);
// Release callback
void (*release)(struct ArrowArrayStream*);
// Opaque producer-specific data
void* private_data;
};
#endif // ARROW_C_STREAM_INTERFACE
注意
规范的保护宏 ARROW_C_STREAM_INTERFACE 旨在避免重复定义,如果两个项目在它们自己的头文件中复制了 C 数据接口定义,而第三方项目又包含了这两个项目。因此,当复制这些定义时,保持此保护宏完全不变非常重要。
ArrowArrayStream 结构#
ArrowArrayStream 提供了与 Arrow 数组流源交互所需的콜백。它具有以下字段
-
int (*ArrowArrayStream.get_schema)(struct ArrowArrayStream*, struct ArrowSchema *out)#
必需。此콜백允许消费者查询流中数据块的模式。所有数据块的模式都是相同的。
此콜백不得在已释放的
ArrowArrayStream上调用。返回值:成功时为 0,否则为非零错误代码。
-
int (*ArrowArrayStream.get_next)(struct ArrowArrayStream*, struct ArrowArray *out)#
必需。此콜백允许消费者获取流中的下一个数据块。
此콜백不得在已释放的
ArrowArrayStream上调用。返回值:成功时为 0,否则为非零错误代码。
成功时,消费者必须检查
ArrowArray是否标记为已释放。如果ArrowArray已释放,则表示已到达流的末尾。否则,ArrowArray包含一个有效的数据块。
-
const char *(*ArrowArrayStream.get_last_error)(struct ArrowArrayStream*)#
必需。此콜백允许消费者获取上次错误的文本描述。
此콜백只能在
ArrowArrayStream上的上次操作返回错误时调用。不得在已释放的ArrowArrayStream上调用。返回值:指向以 NULL 结尾的字符串(UTF8 编码)的指针。如果没有详细描述,也可以返回 NULL。
返回的指针仅保证在下一次调用流的콜백之前有效。如果它打算存活更长时间,则应将其指向的字符串复制到消费者管理的存储中。
-
void (*ArrowArrayStream.release)(struct ArrowArrayStream*)#
必需。指向生产者提供的释放콜백的指针。
-
void *ArrowArrayStream.private_data#
可选。指向生产者提供的私有数据的透明指针。
消费者不得处理此成员。此成员的生命周期由生产者处理,尤其是由释放콜백处理。
错误代码#
get_schema 和 get_next 콜백可能会以非零整数代码的形式返回错误。此类错误代码应像 errno 数字(由本地平台定义)一样解释。请注意,这些常量的符号形式在不同平台之间是稳定的,但其数字值是平台特定的。
特别建议识别以下值
EINVAL:用于参数或输入验证错误ENOMEM:用于内存分配失败(内存不足)EIO:用于通用输入/输出错误
结果生命周期#
get_schema 和 get_next 콜백返回的数据必须独立释放。它们的生命周期不与 ArrowArrayStream 的生命周期相关联。
流生命周期#
C 流的生命周期通过使用与C 数据接口中相似的释放콜백进行管理。
线程安全#
流源不被认为是线程安全的。希望从多个线程调用 get_next 的消费者应确保这些调用是序列化的。
C 消费者示例#
假设某个数据库提供以下 C API 来执行 SQL 查询并将结果集作为 Arrow C 流返回
void MyDB_Query(const char* query, struct ArrowArrayStream* result_set);
然后消费者可以使用以下代码遍历结果
static void handle_error(int errcode, struct ArrowArrayStream* stream) {
// Print stream error
const char* errdesc = stream->get_last_error(stream);
if (errdesc != NULL) {
fputs(errdesc, stderr);
} else {
fputs(strerror(errcode), stderr);
}
// Release stream and abort
stream->release(stream),
exit(1);
}
void run_query() {
struct ArrowArrayStream stream;
struct ArrowSchema schema;
struct ArrowArray chunk;
int errcode;
MyDB_Query("SELECT * FROM my_table", &stream);
// Query result set schema
errcode = stream.get_schema(&stream, &schema);
if (errcode != 0) {
handle_error(errcode, &stream);
}
int64_t num_rows = 0;
// Iterate over results: loop until error or end of stream
while ((errcode = stream.get_next(&stream, &chunk) == 0) &&
chunk.release != NULL) {
// Do something with chunk...
fprintf(stderr, "Result chunk: got %lld rows\n", chunk.length);
num_rows += chunk.length;
// Release chunk
chunk.release(&chunk);
}
// Was it an error?
if (errcode != 0) {
handle_error(errcode, &stream);
}
fprintf(stderr, "Result stream ended: total %lld rows\n", num_rows);
// Release schema and stream
schema.release(&schema);
stream.release(&stream);
}