驱动程序示例¶
配方来源: driver_example.cc
在这里我们将展示使用 ADBC 驱动程序框架库在 C++ 中构建 ADBC 驱动程序的结构。这与 ADBC 用于构建其 SQLite 和 PostgreSQL 驱动程序的库相同,并抽象了 C 可调用和目录/元数据函数的细节,这些函数可能难以实现,但对于有效利用 ADBC 生态系统的其余部分至关重要。
从高层次来看,我们将构建一个驱动程序,其“数据库”是一个目录,其中数据库中的每个“表”都是一个包含 Arrow IPC 流的文件。可以使用批量导入功能写入表,并且可以使用SELECT * FROM (the file)
形式的简单查询读取表。
安装¶
这个快速入门实际上是一个文学 C++ 文件。您可以克隆存储库,构建示例,然后继续操作。
我们假设您使用的是 conda-forge 来管理依赖项。CMake、C++17 编译器和 ADBC 库是必需的。可以按如下方式安装它们
mamba install cmake compilers libadbc-driver-manager
构建¶
我们将在此处使用 CMake。从 ADBC 存储库的源代码签出中
mkdir build
cd build
cmake ../docs/source/cpp/recipe_driver -DADBC_DRIVER_EXAMPLE_BUILD_TESTS=ON
cmake --build .
ctest
使用 C++ 构建 ADBC 驱动程序¶
让我们从一些包含开始。值得注意的是,我们将需要驱动程序框架头文件和 nanoarrow,我们将使用它在本示例驱动程序中创建和使用 Arrow C 数据接口结构。
72#include "driver_example.h"
73
74#include <cstdio>
75#include <string>
76
77#include "driver/framework/connection.h"
78#include "driver/framework/database.h"
79#include "driver/framework/statement.h"
80
81#include "nanoarrow/nanoarrow.hpp"
82#include "nanoarrow/nanoarrow_ipc.hpp"
83
84#include "arrow-adbc/adbc.h"
接下来,我们将把一些基本框架类型引入命名空间,以减少实现的冗长性
adbc::driver::Option
: 选项可以在 ADBC 数据库、连接和语句上设置。它们可以是字符串、不透明二进制、双精度浮点数或整数。Option
类抽象了获取、设置和解析这些值的细节。adbc::driver::Status
:Status
是 ADBC 驱动程序框架的错误处理机制:没有返回值且可能失败的函数将返回一个Status
。您可以使用UNWRAP_STATUS(some_call())
作为Status status = some_call(); if (!status.ok()) return status;
的简写,以简洁地传播错误。adbc::driver::Result
:Result<T>
用于作为函数的返回值,这些函数在成功时返回类型为T
的值,并在失败时使用Status
传达其错误。您可以使用UNWRAP_RESULT(some_type value, some_call())
作为简写some_type value; Result<some_type> maybe_value = some_call(); if (!maybe_value.status().ok()) { return maybe_value.status(); } else { value = *maybe_value; }
113using adbc::driver::Option;
114using adbc::driver::Result;
115using adbc::driver::Status;
116
117namespace {
接下来,我们将提供数据库实现。驱动程序框架使用好奇的递归模板模式 (CRTP)。该模式的细节由框架处理,但在功能上这仍然只是覆盖来自基类的那些方法,而基类处理细节。
在这里,我们的数据库实现将只记录用户传入的 uri
。我们将对此的解释是一个 file://
uri,指向应写入我们的 IPC 文件或应读取 IPC 文件的目录。这是数据库在 ADBC 中的角色:对数据库的共享句柄,该句柄可能在连接之间缓存一些共享状态,但仍然允许多个连接并发地对数据库执行操作。
134class DriverExampleDatabase : public adbc::driver::Database<DriverExampleDatabase> {
135 public:
136 [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
137
138 Status SetOptionImpl(std::string_view key, Option value) override {
139 // Handle and validate options implemented by this driver
140 if (key == "uri") {
141 UNWRAP_RESULT(std::string_view uri, value.AsString());
142
143 if (uri.find("file://") != 0) {
144 return adbc::driver::status::InvalidArgument(
145 "[example] uri must start with 'file://'");
146 }
147
148 uri_ = uri;
149 return adbc::driver::status::Ok();
150 }
151
152 // Defer to the base implementation to handle state managed by the base
153 // class (and error for all other options).
154 return Base::SetOptionImpl(key, value);
155 }
156
157 Result<Option> GetOption(std::string_view key) override {
158 // Return the value of options implemented by this driver
159 if (key == "uri") {
160 return Option(uri_);
161 }
162
163 // Defer to the base implementation to handle state managed by the base
164 // class (and error for all other options).
165 return Base::GetOption(key);
166 }
167
168 // This is called after zero or more calls to SetOption() on
169 Status InitImpl() override {
170 if (uri_.empty()) {
171 return adbc::driver::status::InvalidArgument(
172 "[example] Must set uri to a non-empty value");
173 }
174
175 return Base::InitImpl();
176 }
177
178 // Getters for members needed by the connection and/or statement:
179 const std::string& uri() { return uri_; }
180
181 private:
182 std::string uri_;
183};
接下来,我们实现了连接。虽然数据库的作用通常是存储或缓存信息,但连接的作用是提供可能代价高昂的资源句柄(例如,在连接到数据库时协商身份验证)。因为我们的示例“数据库”只是一个目录,所以我们在连接中不需要做太多关于资源管理的事情,除了提供一种方法让子语句访问数据库的 uri。
连接的另一个作用是提供有关表、列、统计信息和其他目录类信息的元数据,调用者可能希望在发出查询之前了解这些信息。驱动程序框架基类提供了一些帮助程序来实现这些函数,这样您就可以主要在 C++17 标准库(而不是自己构建 C 级数组)方面实现它们。
198class DriverExampleConnection : public adbc::driver::Connection<DriverExampleConnection> {
199 public:
200 [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
201
202 // Get information from the database and/or store a reference if needed.
203 Status InitImpl(void* parent) {
204 auto& database = *reinterpret_cast<DriverExampleDatabase*>(parent);
205 uri_ = database.uri();
206 return Base::InitImpl(parent);
207 }
208
209 // Getters for members needed by the statement:
210 const std::string& uri() { return uri_; }
211
212 private:
213 std::string uri_;
214};
接下来,我们提供语句实现。语句是管理查询执行的地方。因为我们的数据源实际上就是 Arrow 数据,所以我们不需要提供一个管理类型或值转换的层。SQLite 和 PostgreSQL 驱动程序都为有效地实现和测试这些转换分配了许多代码行。nanoarrow 库可用于实现两个方向上的转换,并且是另一篇文章的范围。
223class DriverExampleStatement : public adbc::driver::Statement<DriverExampleStatement> {
224 public:
225 [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
226
227 // Get information from the connection and/or store a reference if needed.
228 Status InitImpl(void* parent) {
229 auto& connection = *reinterpret_cast<DriverExampleConnection*>(parent);
230 uri_ = connection.uri();
231 return Base::InitImpl(parent);
232 }
233
234 // Our implementation of a bulk ingestion is to write an Arrow IPC stream as a file
235 // using the target table as the filename.
236 Result<int64_t> ExecuteIngestImpl(IngestState& state) {
237 std::string directory = uri_.substr(strlen("file://"));
238 std::string filename = directory + "/" + *state.target_table;
239
240 nanoarrow::ipc::UniqueOutputStream output_stream;
241 FILE* c_file = std::fopen(filename.c_str(), "wb");
242 UNWRAP_ERRNO(Internal, ArrowIpcOutputStreamInitFile(output_stream.get(), c_file,
243 /*close_on_release*/ true));
244
245 nanoarrow::ipc::UniqueWriter writer;
246 UNWRAP_ERRNO(Internal, ArrowIpcWriterInit(writer.get(), output_stream.get()));
247
248 ArrowError nanoarrow_error;
249 ArrowErrorInit(&nanoarrow_error);
250 UNWRAP_NANOARROW(nanoarrow_error, Internal,
251 ArrowIpcWriterWriteArrayStream(writer.get(), &bind_parameters_,
252 &nanoarrow_error));
253
254 return -1;
255 }
256
257 // Our implementation of query execution is to accept a simple query in the form
258 // SELECT * FROM (the filename).
259 Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
260 std::string prefix("SELECT * FROM ");
261 if (state.query.find(prefix) != 0) {
262 return adbc::driver::status::InvalidArgument(
263 "[example] Query must be in the form 'SELECT * FROM filename'");
264 }
265
266 std::string directory = uri_.substr(strlen("file://"));
267 std::string filename = directory + "/" + state.query.substr(prefix.size());
268
269 nanoarrow::ipc::UniqueInputStream input_stream;
270 FILE* c_file = std::fopen(filename.c_str(), "rb");
271 UNWRAP_ERRNO(Internal, ArrowIpcInputStreamInitFile(input_stream.get(), c_file,
272 /*close_on_release*/ true));
273
274 UNWRAP_ERRNO(Internal,
275 ArrowIpcArrayStreamReaderInit(stream, input_stream.get(), nullptr));
276 return -1;
277 }
278
279 // This path is taken when the user calls Prepare() first.
280 Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
281 QueryState query_state{state.query};
282 return ExecuteQueryImpl(query_state, stream);
283 }
284
285 private:
286 std::string uri_;
287};
288
289} // namespace
最后,我们创建驱动程序初始化函数,这是驱动程序管理器需要提供的实现,用于组成 ADBC C API 的 Adbc**()
函数。此函数的名称很重要:此文件将被构建到一个名为 libdriver_example.(so|dll|dylib)
的共享库中,因此驱动程序管理器将查找符号 AdbcDriverExampleInit()
作为默认入口点,当被要求加载驱动程序 "driver_example"
时。
298extern "C" AdbcStatusCode AdbcDriverExampleInit(int version, void* raw_driver,
299 AdbcError* error) {
300 using ExampleDriver =
301 adbc::driver::Driver<DriverExampleDatabase, DriverExampleConnection,
302 DriverExampleStatement>;
303 return ExampleDriver::Init(version, raw_driver, error);
304}
低级别测试¶
配方来源: driver_example_test.cc
在编写驱动程序草稿之后,下一步是确保驱动程序管理器可以加载它,并且可以初始化和释放数据库、连接和语句实例。
首先,我们将包含驱动程序管理器和 googletest。
29#include "driver_example.h"
30
31#include "arrow-adbc/adbc_driver_manager.h"
32#include "gtest/gtest.h"
接下来,我们将声明一个用于基本生命周期的测试用例
36TEST(DriverExample, TestLifecycle) {
37 struct AdbcError error = ADBC_ERROR_INIT;
38
39 struct AdbcDatabase database;
40 ASSERT_EQ(AdbcDatabaseNew(&database, &error), ADBC_STATUS_OK);
41 AdbcDriverManagerDatabaseSetInitFunc(&database, &AdbcDriverExampleInit, &error);
42 ASSERT_EQ(AdbcDatabaseSetOption(&database, "uri", "file://foofy", &error),
43 ADBC_STATUS_OK);
44 ASSERT_EQ(AdbcDatabaseInit(&database, &error), ADBC_STATUS_OK);
45
46 struct AdbcConnection connection;
47 ASSERT_EQ(AdbcConnectionNew(&connection, &error), ADBC_STATUS_OK);
48 ASSERT_EQ(AdbcConnectionInit(&connection, &database, &error), ADBC_STATUS_OK);
49
50 struct AdbcStatement statement;
51 ASSERT_EQ(AdbcStatementNew(&connection, &statement, &error), ADBC_STATUS_OK);
52
53 ASSERT_EQ(AdbcStatementRelease(&statement, &error), ADBC_STATUS_OK);
54 ASSERT_EQ(AdbcConnectionRelease(&connection, &error), ADBC_STATUS_OK);
55 ASSERT_EQ(AdbcDatabaseRelease(&database, &error), ADBC_STATUS_OK);
56
57 if (error.release) {
58 error.release(&error);
59 }
60}
位于 apache/arrow-adbc 存储库中的驱动程序可以使用内置的验证库,该库实现了针对功能齐全的 SQL 数据库的通用测试套件,并提供了实用程序来测试一系列输入和输出。
高级测试¶
配方来源: driver_example.py
在验证了基本驱动程序功能之后,我们可以使用 adbc_driver_manager
Python 包的内置 dbapi 实现来公开一个现成的 Pythonic 数据库 API。这对高级测试也很有用!
首先,我们将导入 pathlib 用于一些路径计算,以及 adbc_driver_manager
的 dbapi
模块
26from pathlib import Path
27
28from adbc_driver_manager import dbapi
接下来,我们将定义一个 connect()
函数,它使用我们在上一节中使用 cmake
构建的共享库的位置来包装 dbapi.connect()
。为了我们教程的目的,这将在 CMake build/
目录中。
35def connect(uri: str):
36 build_dir = Path(__file__).parent / "build"
37 for lib in [
38 "libdriver_example.dylib",
39 "libdriver_example.so",
40 "driver_example.dll",
41 ]:
42 driver_lib = build_dir / lib
43 if driver_lib.exists():
44 return dbapi.connect(
45 driver=str(driver_lib.resolve()), db_kwargs={"uri": uri}
46 )
47
48 raise RuntimeError("Can't find driver shared object")
接下来,我们可以试用我们的驱动程序!我们在驱动程序中实现的两个部分是“批量导入”功能和“从所有选择”,所以让我们看看它是否有效!
53if __name__ == "__main__":
54 import os
55
56 import pyarrow
57
58 with connect(uri=Path(__file__).parent.as_uri()) as con:
59 data = pyarrow.table({"col": [1, 2, 3]})
60 with con.cursor() as cur:
61 cur.adbc_ingest("example.arrows", data, mode="create")
62
63 with con.cursor() as cur:
64 cur.execute("SELECT * FROM example.arrows")
65 print(cur.fetchall())
66
67 os.unlink(Path(__file__).parent / "example.arrows")
高级测试也可以使用 adbcdrivermanager
包在 R 中编写。
library(adbcdrivermanager)
drv <- adbc_driver("build/libdriver_example.dylib")
db <- adbc_database_init(drv, uri = paste0("file://", getwd()))
con <- adbc_connection_init(db)
data.frame(col = 1:3) |> write_adbc(con, "example.arrows")
con |> read_adbc("SELECT * FROM example.arrows") |> as.data.frame()
unlink("example.arrows")