驱动程序示例¶
方案源码: driver_example.cc
在此,我们将展示如何使用 ADBC 驱动程序框架库在 C++ 中构建 ADBC 驱动程序。该库与 ADBC 用于构建其 SQLite 和 PostgreSQL 驱动程序的库相同。它抽象了 C 可调用对象及目录/元数据函数的实现细节,这些功能实现起来可能比较困难,但对于有效利用 ADBC 生态系统的其余部分至关重要。
概括而言,我们将构建一个驱动程序,其“数据库”是一个目录,数据库中的每个“表”都是一个包含 Arrow IPC 流的文件。可以使用批量导入功能写入表,并使用 SELECT * FROM (文件) 形式的简单查询来读取表。
安装¶
本快速入门实际上是一个可读的 C++ 文件。您可以克隆仓库,构建示例并跟随操作。
我们假设您使用 conda-forge 来管理依赖项。需要 CMake、C++17 编译器和 ADBC 库。安装方式如下:
mamba install cmake compilers libadbc-driver-manager
构建¶
我们将在此处使用 CMake。从 ADBC 仓库的源码检出目录开始
mkdir build
cd build
cmake ../docs/source/cpp/recipe_driver -DADBC_DRIVER_EXAMPLE_BUILD_TESTS=ON
cmake --build .
ctest
使用 C++ 构建 ADBC 驱动程序¶
让我们从一些头文件包含开始。特别需要注意的是,我们将需要驱动程序框架头文件和 nanoarrow,在该示例驱动程序中,我们将使用它来创建和使用 Arrow C 数据接口结构。
72#include "driver_example.h"
73
74#include <cstdio>
75#include <string>
76
77#include "driver/framework/connection.h"
78#include "driver/framework/database.h"
79#include "driver/framework/statement.h"
80
81#include "nanoarrow/nanoarrow.hpp"
82#include "nanoarrow/nanoarrow_ipc.hpp"
83
84#include "arrow-adbc/adbc.h"
接下来,我们将一些基本的框架类型引入命名空间,以减少实现的冗长性
adbc::driver::Option:选项可以在 ADBC 数据库、连接和语句上设置。它们可以是字符串、不透明二进制数据、双精度浮点数或整数。Option类抽象了获取、设置和解析这些值的细节。adbc::driver::Status:Status是 ADBC 驱动程序框架的错误处理机制:没有返回值且可能失败的函数会返回Status。您可以使用UNWRAP_STATUS(some_call())作为Status status = some_call(); if (!status.ok()) return status;的简写,以简洁地传播错误。adbc::driver::Result:Result<T>用作函数的返回值,该函数在成功时返回类型为T的值,在失败时通过Status传达其错误。您可以使用UNWRAP_RESULT(value, some_call())作为简写。some_type value; Result<some_type> maybe_value = some_call(); if (!maybe_value.status().ok()) { return maybe_value.status(); } else { value = *maybe_value; }
113using adbc::driver::Option;
114using adbc::driver::Result;
115using adbc::driver::Status;
116
117namespace {
接下来,我们将提供数据库实现。驱动程序框架使用了奇异递归模板模式(CRTP)。其细节由框架处理,但从功能上讲,这仍然只是重写处理细节的基类中的方法。
在此,我们的数据库实现将简单地记录用户传入的 uri。我们将其解读为一个 file:// URI,指向一个目录,我们的 IPC 文件应在此处写入和/或从中读取。这就是 ADBC 中数据库的角色:一个共享的数据库句柄,它可能在连接之间缓存一些共享状态,但仍然允许多个连接并发地针对数据库执行操作。
134class DriverExampleDatabase : public adbc::driver::Database<DriverExampleDatabase> {
135 public:
136 [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
137
138 Status SetOptionImpl(std::string_view key, Option value) override {
139 // Handle and validate options implemented by this driver
140 if (key == "uri") {
141 std::string_view uri;
142 UNWRAP_RESULT(uri, value.AsString());
143
144 if (uri.find("file://") != 0) {
145 return adbc::driver::status::InvalidArgument(
146 "[example] uri must start with 'file://'");
147 }
148
149 uri_ = uri;
150 return adbc::driver::status::Ok();
151 }
152
153 // Defer to the base implementation to handle state managed by the base
154 // class (and error for all other options).
155 return Base::SetOptionImpl(key, value);
156 }
157
158 Result<Option> GetOption(std::string_view key) override {
159 // Return the value of options implemented by this driver
160 if (key == "uri") {
161 return Option(uri_);
162 }
163
164 // Defer to the base implementation to handle state managed by the base
165 // class (and error for all other options).
166 return Base::GetOption(key);
167 }
168
169 // This is called after zero or more calls to SetOption() on
170 Status InitImpl() override {
171 if (uri_.empty()) {
172 return adbc::driver::status::InvalidArgument(
173 "[example] Must set uri to a non-empty value");
174 }
175
176 return Base::InitImpl();
177 }
178
179 // Getters for members needed by the connection and/or statement:
180 const std::string& uri() { return uri_; }
181
182 private:
183 std::string uri_;
184};
接下来,我们实现连接。虽然数据库的角色通常是存储或缓存信息,但连接的角色是提供获取成本可能较高的资源句柄(例如,连接数据库时协商身份验证)。因为我们的示例“数据库”只是一个目录,所以除了为子语句提供访问数据库 URI 的方法外,我们无需在连接方面进行太多的资源管理。
连接的另一个角色是提供有关表、列、统计信息以及调用者在发出查询前可能想了解的其他目录类信息的元数据。驱动程序框架基类提供了帮助程序来实现这些功能,这样您基本上可以使用 C++17 标准库来实现它们(而不是自己构建 C 级别的数组)。
199class DriverExampleConnection : public adbc::driver::Connection<DriverExampleConnection> {
200 public:
201 [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
202
203 // Get information from the database and/or store a reference if needed.
204 Status InitImpl(void* parent) {
205 auto& database = *reinterpret_cast<DriverExampleDatabase*>(parent);
206 uri_ = database.uri();
207 return Base::InitImpl(parent);
208 }
209
210 // Getters for members needed by the statement:
211 const std::string& uri() { return uri_; }
212
213 private:
214 std::string uri_;
215};
接下来,我们提供语句实现。语句是管理查询执行的地方。由于我们的数据源实际上就是 Arrow 数据,因此我们无需提供管理类型或值转换的层。SQLite 和 PostgreSQL 驱动程序都花费了大量代码行来高效地实现和测试这些转换。nanoarrow 库可用于实现双向转换,这是另一篇文章的主题。
224class DriverExampleStatement : public adbc::driver::Statement<DriverExampleStatement> {
225 public:
226 [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
227
228 // Get information from the connection and/or store a reference if needed.
229 Status InitImpl(void* parent) {
230 auto& connection = *reinterpret_cast<DriverExampleConnection*>(parent);
231 uri_ = connection.uri();
232 return Base::InitImpl(parent);
233 }
234
235 // Our implementation of a bulk ingestion is to write an Arrow IPC stream as a file
236 // using the target table as the filename.
237 Result<int64_t> ExecuteIngestImpl(IngestState& state) {
238 std::string directory = uri_.substr(strlen("file://"));
239 std::string filename = directory + "/" + *state.target_table;
240
241 nanoarrow::ipc::UniqueOutputStream output_stream;
242 FILE* c_file = std::fopen(filename.c_str(), "wb");
243 UNWRAP_ERRNO(Internal, ArrowIpcOutputStreamInitFile(output_stream.get(), c_file,
244 /*close_on_release*/ true));
245
246 nanoarrow::ipc::UniqueWriter writer;
247 UNWRAP_ERRNO(Internal, ArrowIpcWriterInit(writer.get(), output_stream.get()));
248
249 ArrowError nanoarrow_error;
250 ArrowErrorInit(&nanoarrow_error);
251 UNWRAP_NANOARROW(nanoarrow_error, Internal,
252 ArrowIpcWriterWriteArrayStream(writer.get(), &bind_parameters_,
253 &nanoarrow_error));
254
255 return -1;
256 }
257
258 // Our implementation of query execution is to accept a simple query in the form
259 // SELECT * FROM (the filename).
260 Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
261 std::string prefix("SELECT * FROM ");
262 if (state.query.find(prefix) != 0) {
263 return adbc::driver::status::InvalidArgument(
264 "[example] Query must be in the form 'SELECT * FROM filename'");
265 }
266
267 std::string directory = uri_.substr(strlen("file://"));
268 std::string filename = directory + "/" + state.query.substr(prefix.size());
269
270 nanoarrow::ipc::UniqueInputStream input_stream;
271 FILE* c_file = std::fopen(filename.c_str(), "rb");
272 UNWRAP_ERRNO(Internal, ArrowIpcInputStreamInitFile(input_stream.get(), c_file,
273 /*close_on_release*/ true));
274
275 UNWRAP_ERRNO(Internal,
276 ArrowIpcArrayStreamReaderInit(stream, input_stream.get(), nullptr));
277 return -1;
278 }
279
280 // This path is taken when the user calls Prepare() first.
281 Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
282 QueryState query_state{state.query};
283 return ExecuteQueryImpl(query_state, stream);
284 }
285
286 private:
287 std::string uri_;
288};
289
290} // namespace
最后,我们创建驱动程序初始化函数,这是驱动程序管理器提供 Adbc**() 函数实现所必需的,这些函数构成了 ADBC C API。此函数的名称非常重要:该文件将被编译为名为 libdriver_example.(so|dll|dylib) 的共享库,因此当被要求加载驱动程序 "driver_example" 时,驱动程序管理器将查找符号 AdbcDriverExampleInit() 作为默认入口点。
299extern "C" AdbcStatusCode AdbcDriverExampleInit(int version, void* raw_driver,
300 AdbcError* error) {
301 using ExampleDriver =
302 adbc::driver::Driver<DriverExampleDatabase, DriverExampleConnection,
303 DriverExampleStatement>;
304 return ExampleDriver::Init(version, raw_driver, error);
305}
底层测试¶
方案源码: driver_example_test.cc
在我们编写了驱动程序草图之后,下一步是确保它能被驱动程序管理器加载,并且数据库、连接和语句实例可以被初始化和释放。
首先,我们将包含驱动程序管理器和 googletest。
29#include "driver_example.h"
30
31#include "arrow-adbc/adbc_driver_manager.h"
32#include "gtest/gtest.h"
接下来,我们将为基本生命周期声明一个测试用例。
36TEST(DriverExample, TestLifecycle) {
37 struct AdbcError error = ADBC_ERROR_INIT;
38
39 struct AdbcDatabase database;
40 ASSERT_EQ(AdbcDatabaseNew(&database, &error), ADBC_STATUS_OK);
41 AdbcDriverManagerDatabaseSetInitFunc(&database, &AdbcDriverExampleInit, &error);
42 ASSERT_EQ(AdbcDatabaseSetOption(&database, "uri", "file://foofy", &error),
43 ADBC_STATUS_OK);
44 ASSERT_EQ(AdbcDatabaseInit(&database, &error), ADBC_STATUS_OK);
45
46 struct AdbcConnection connection;
47 ASSERT_EQ(AdbcConnectionNew(&connection, &error), ADBC_STATUS_OK);
48 ASSERT_EQ(AdbcConnectionInit(&connection, &database, &error), ADBC_STATUS_OK);
49
50 struct AdbcStatement statement;
51 ASSERT_EQ(AdbcStatementNew(&connection, &statement, &error), ADBC_STATUS_OK);
52
53 ASSERT_EQ(AdbcStatementRelease(&statement, &error), ADBC_STATUS_OK);
54 ASSERT_EQ(AdbcConnectionRelease(&connection, &error), ADBC_STATUS_OK);
55 ASSERT_EQ(AdbcDatabaseRelease(&database, &error), ADBC_STATUS_OK);
56
57 if (error.release) {
58 error.release(&error);
59 }
60}
存在于 apache/arrow-adbc 仓库中的驱动程序可以使用内置的验证库,该库针对全功能 SQL 数据库实现了一个通用测试套件,并提供了测试一系列输入和输出的实用工具。
高层测试¶
方案源码: driver_example.py
在验证了基本驱动程序功能后,我们可以使用 adbc_driver_manager Python 包的内置 dbapi 实现来公开一个现成的 Python 风格数据库 API。这对于高层测试也非常有用!
首先,我们将导入 pathlib 进行路径计算,并导入 adbc_driver_manager 的 dbapi 模块。
26from pathlib import Path
27
28from adbc_driver_manager import dbapi
接下来,我们将定义一个 connect() 函数,该函数使用我们在前一节中使用 cmake 构建的共享库位置来包装 dbapi.connect()。对于本教程,它将位于 CMake build/ 目录中。
35def connect(uri: str):
36 build_dir = Path(__file__).parent / "build"
37 for lib in [
38 "libdriver_example.dylib",
39 "libdriver_example.so",
40 "driver_example.dll",
41 ]:
42 driver_lib = build_dir / lib
43 if driver_lib.exists():
44 return dbapi.connect(
45 driver=str(driver_lib.resolve()), db_kwargs={"uri": uri}
46 )
47
48 # Try to find it on the dynamic loader path
49 return dbapi.connect(driver="driver_example", db_kwargs={"uri": uri})
接下来,我们可以尝试运行我们的驱动程序了!我们在驱动程序中实现的两个部分是“批量导入”功能和“全选”功能,让我们看看它是否有效!
54if __name__ == "__main__":
55 import os
56
57 import pyarrow
58
59 with connect(uri=Path(__file__).parent.as_uri()) as con:
60 data = pyarrow.table({"col": [1, 2, 3]})
61 with con.cursor() as cur:
62 cur.adbc_ingest("example.arrows", data, mode="create")
63
64 with con.cursor() as cur:
65 cur.execute("SELECT * FROM example.arrows")
66 print(cur.fetchall())
67 # Output: [(1,), (2,), (3,)]
68
69 os.unlink(Path(__file__).parent / "example.arrows")
[(1,), (2,), (3,)]
高层测试也可以使用 adbcdrivermanager 包在 R 中编写。
library(adbcdrivermanager)
drv <- adbc_driver("build/libdriver_example.dylib")
db <- adbc_database_init(drv, uri = paste0("file://", getwd()))
con <- adbc_connection_init(db)
data.frame(col = 1:3) |> write_adbc(con, "example.arrows")
con |> read_adbc("SELECT * FROM example.arrows") |> as.data.frame()
unlink("example.arrows")
驱动程序清单¶
方案源码: driver_example_manifest.py
在验证了基本驱动程序功能后,我们可以使用 adbc_driver_manager Python 包的内置 dbapi 实现来公开一个现成的 Python 风格数据库 API。这对于高层测试也非常有用!
首先,我们将导入 pathlib 进行路径计算,并导入 adbc_driver_manager 的 dbapi 模块。
26from pathlib import Path
27
28from adbc_driver_manager import dbapi
接下来,我们将定义一个 connect() 函数,该函数使用指向我们在前一节中使用 cmake 构建的共享库的 .toml 清单文件位置来包装 dbapi.connect()。在本教程中,它将位于当前目录中。
35def connect(uri: str):
36 # we can point to the manifest file directly
37 manifest_file = Path(".") / "driver_example.toml"
38 if manifest_file.exists():
39 return dbapi.connect(
40 driver=str(manifest_file.resolve()), db_kwargs={"uri": uri}
41 )
42
43 # alternatively, it can look for the manifest file in the user's config
44 # directory ($HOME/.config/adbc/drivers/driver_example.toml) or the system's
45 # config directory (/etc/adbc/drivers/driver_example.toml)
46 return dbapi.connect(driver="driver_example", db_kwargs={"uri": uri})
接下来,我们可以尝试运行我们的驱动程序了!我们在驱动程序中实现的两个部分是“批量导入”功能和“全选”功能,让我们看看它是否有效!
51if __name__ == "__main__":
52 import pyarrow
53
54 with connect(uri=Path(__file__).parent.as_uri()) as con:
55 data = pyarrow.table({"col": [1, 2, 3]})
56 with con.cursor() as cur:
57 cur.adbc_ingest("example.arrows", data, mode="create")
58
59 with con.cursor() as cur:
60 cur.execute("SELECT * FROM example.arrows")
61 print(cur.fetchall())
62 # Output: [(1,), (2,), (3,)]
63
64 (Path(__file__).parent / "example.arrows").unlink()
[(1,), (2,), (3,)]
驱动程序清单可以通过 TOML 文件提供一种安装和管理 ADBC 驱动程序的更简便方法,这些文件描述了一些元数据以及驱动程序共享库的路径。驱动程序管理器可以读取这些清单以定位并动态加载驱动程序。