驱动示例¶
秘诀来源: driver_example.cc
这里我们将展示使用ADBC驱动框架库在C++中构建ADBC驱动的结构。 ADBC使用相同的库来构建其SQLite和PostgreSQL驱动程序,并抽象出C可调用对象和目录/元数据函数的细节,这些细节可能难以实现,但对于有效地利用ADBC生态系统的其余部分至关重要。
从高层次上讲,我们将构建一个驱动程序,其“数据库”是一个目录,其中数据库中的每个“表”都是一个包含 Arrow IPC 流的文件。表可以使用批量导入功能写入,并且可以使用 SELECT * FROM (the file)
形式的简单查询读取。
安装¶
这个快速入门实际上是一个可运行的 C++ 文件。您可以克隆存储库、构建示例并跟随学习。
我们将假设您正在使用 conda-forge 获取依赖项。需要 CMake、C++17 编译器和 ADBC 库。可以按如下方式安装它们
mamba install cmake compilers libadbc-driver-manager
构建¶
我们将在此处使用 CMake。从 ADBC 存储库的源检出
mkdir build
cd build
cmake ../docs/source/cpp/recipe_driver -DADBC_DRIVER_EXAMPLE_BUILD_TESTS=ON
cmake --build .
ctest
使用 C++ 构建 ADBC 驱动程序¶
让我们从一些包含项开始。值得注意的是,我们将需要驱动程序框架头文件和 nanoarrow,我们将使用它来创建和使用此示例驱动程序中的 Arrow C 数据接口结构。
72#include "driver_example.h"
73
74#include <cstdio>
75#include <string>
76
77#include "driver/framework/connection.h"
78#include "driver/framework/database.h"
79#include "driver/framework/statement.h"
80
81#include "nanoarrow/nanoarrow.hpp"
82#include "nanoarrow/nanoarrow_ipc.hpp"
83
84#include "arrow-adbc/adbc.h"
接下来,我们将一些基本的框架类型引入命名空间,以减少实现的冗长性
adbc::driver::Option
: 可以在 ADBC 数据库、连接和语句上设置选项。它们可以是字符串、不透明二进制文件、双精度数或整数。Option
类抽象了如何获取、设置和解析这些值的细节。adbc::driver::Status
:Status
是 ADBC 驱动框架的错误处理机制:没有返回值的可能失败的函数返回Status
。您可以使用UNWRAP_STATUS(some_call())
作为Status status = some_call(); if (!status.ok()) return status;
的简写来简洁地传播错误。adbc::driver::Result
:Result<T>
用作函数返回值,该函数成功时返回T
类型的值,失败时使用Status
传递错误。您可以使用UNWRAP_RESULT(some_type value, some_call())
作为简写some_type value; Result<some_type> maybe_value = some_call(); if (!maybe_value.status().ok()) { return maybe_value.status(); } else { value = *maybe_value; }
113using adbc::driver::Option;
114using adbc::driver::Result;
115using adbc::driver::Status;
116
117namespace {
接下来,我们将提供数据库实现。驱动程序框架使用奇怪的递归模板模式 (CRTP)。框架会处理此问题的详细信息,但从功能上讲,这仍然只是覆盖基类中的方法,该基类处理详细信息。
在这里,我们的数据库实现将简单地记录用户传递的 uri
。我们对它的解释将是 file://
指向目录的 uri,我们的 IPC 文件应该写入该目录和/或应该读取 IPC 文件。这是数据库在 ADBC 中的作用:数据库的共享句柄,可能会在连接之间缓存一些共享状态,但仍然允许多个连接同时针对数据库执行。
134class DriverExampleDatabase : public adbc::driver::Database<DriverExampleDatabase> {
135 public:
136 [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
137
138 Status SetOptionImpl(std::string_view key, Option value) override {
139 // Handle and validate options implemented by this driver
140 if (key == "uri") {
141 UNWRAP_RESULT(std::string_view uri, value.AsString());
142
143 if (uri.find("file://") != 0) {
144 return adbc::driver::status::InvalidArgument(
145 "[example] uri must start with 'file://'");
146 }
147
148 uri_ = uri;
149 return adbc::driver::status::Ok();
150 }
151
152 // Defer to the base implementation to handle state managed by the base
153 // class (and error for all other options).
154 return Base::SetOptionImpl(key, value);
155 }
156
157 Result<Option> GetOption(std::string_view key) override {
158 // Return the value of options implemented by this driver
159 if (key == "uri") {
160 return Option(uri_);
161 }
162
163 // Defer to the base implementation to handle state managed by the base
164 // class (and error for all other options).
165 return Base::GetOption(key);
166 }
167
168 // This is called after zero or more calls to SetOption() on
169 Status InitImpl() override {
170 if (uri_.empty()) {
171 return adbc::driver::status::InvalidArgument(
172 "[example] Must set uri to a non-empty value");
173 }
174
175 return Base::InitImpl();
176 }
177
178 // Getters for members needed by the connection and/or statement:
179 const std::string& uri() { return uri_; }
180
181 private:
182 std::string uri_;
183};
接下来,我们实现连接。虽然数据库的角色通常是存储或缓存信息,但连接的角色是提供可能难以获得的资源句柄(例如,连接到数据库时协商身份验证)。因为我们的示例“数据库”只是一个目录,所以在连接中我们不需要做太多的资源管理,除了为子语句提供一种访问数据库 uri 的方法。
连接的另一个作用是提供有关表、列、统计信息和调用者可能希望在发出查询之前了解的其他类似目录的信息的元数据。驱动程序框架基类提供了辅助程序来实现这些函数,以便您主要可以使用 C++17 标准库来实现它们(而不是自己构建 C 级数组)。
198class DriverExampleConnection : public adbc::driver::Connection<DriverExampleConnection> {
199 public:
200 [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
201
202 // Get information from the database and/or store a reference if needed.
203 Status InitImpl(void* parent) {
204 auto& database = *reinterpret_cast<DriverExampleDatabase*>(parent);
205 uri_ = database.uri();
206 return Base::InitImpl(parent);
207 }
208
209 // Getters for members needed by the statement:
210 const std::string& uri() { return uri_; }
211
212 private:
213 std::string uri_;
214};
接下来,我们提供语句实现。语句是管理查询执行的地方。因为我们的数据源实际上是 Arrow 数据,所以我们不必提供管理类型或值转换的层。SQLite 和 PostgreSQL 驱动程序都投入了大量代码来有效地实现和测试这些转换。nanoarrow 库可用于实现双向转换,并且是另一篇文章的范围。
223class DriverExampleStatement : public adbc::driver::Statement<DriverExampleStatement> {
224 public:
225 [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
226
227 // Get information from the connection and/or store a reference if needed.
228 Status InitImpl(void* parent) {
229 auto& connection = *reinterpret_cast<DriverExampleConnection*>(parent);
230 uri_ = connection.uri();
231 return Base::InitImpl(parent);
232 }
233
234 // Our implementation of a bulk ingestion is to write an Arrow IPC stream as a file
235 // using the target table as the filename.
236 Result<int64_t> ExecuteIngestImpl(IngestState& state) {
237 std::string directory = uri_.substr(strlen("file://"));
238 std::string filename = directory + "/" + *state.target_table;
239
240 nanoarrow::ipc::UniqueOutputStream output_stream;
241 FILE* c_file = std::fopen(filename.c_str(), "wb");
242 UNWRAP_ERRNO(Internal, ArrowIpcOutputStreamInitFile(output_stream.get(), c_file,
243 /*close_on_release*/ true));
244
245 nanoarrow::ipc::UniqueWriter writer;
246 UNWRAP_ERRNO(Internal, ArrowIpcWriterInit(writer.get(), output_stream.get()));
247
248 ArrowError nanoarrow_error;
249 ArrowErrorInit(&nanoarrow_error);
250 UNWRAP_NANOARROW(nanoarrow_error, Internal,
251 ArrowIpcWriterWriteArrayStream(writer.get(), &bind_parameters_,
252 &nanoarrow_error));
253
254 return -1;
255 }
256
257 // Our implementation of query execution is to accept a simple query in the form
258 // SELECT * FROM (the filename).
259 Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
260 std::string prefix("SELECT * FROM ");
261 if (state.query.find(prefix) != 0) {
262 return adbc::driver::status::InvalidArgument(
263 "[example] Query must be in the form 'SELECT * FROM filename'");
264 }
265
266 std::string directory = uri_.substr(strlen("file://"));
267 std::string filename = directory + "/" + state.query.substr(prefix.size());
268
269 nanoarrow::ipc::UniqueInputStream input_stream;
270 FILE* c_file = std::fopen(filename.c_str(), "rb");
271 UNWRAP_ERRNO(Internal, ArrowIpcInputStreamInitFile(input_stream.get(), c_file,
272 /*close_on_release*/ true));
273
274 UNWRAP_ERRNO(Internal,
275 ArrowIpcArrayStreamReaderInit(stream, input_stream.get(), nullptr));
276 return -1;
277 }
278
279 // This path is taken when the user calls Prepare() first.
280 Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
281 QueryState query_state{state.query};
282 return ExecuteQueryImpl(query_state, stream);
283 }
284
285 private:
286 std::string uri_;
287};
288
289} // namespace
最后,我们创建驱动程序初始化函数,驱动程序管理器需要该函数来为构成 ADBC C API 的 Adbc**()
函数提供实现。此函数的名称很重要:此文件将被构建到名为 libdriver_example.(so|dll|dylib)
的共享库中,因此当被要求加载驱动程序 "driver_example"
时,驱动程序管理器将查找符号 AdbcDriverExampleInit()
作为默认入口点。
298extern "C" AdbcStatusCode AdbcDriverExampleInit(int version, void* raw_driver,
299 AdbcError* error) {
300 using ExampleDriver =
301 adbc::driver::Driver<DriverExampleDatabase, DriverExampleConnection,
302 DriverExampleStatement>;
303 return ExampleDriver::Init(version, raw_driver, error);
304}
底层测试¶
秘诀来源: driver_example_test.cc
在我们编写了驱动程序的草图后,下一步是确保它可以被驱动程序管理器加载,并且可以初始化和释放数据库、连接和语句实例。
首先,我们将包含驱动程序管理器和 googletest。
29#include "driver_example.h"
30
31#include "arrow-adbc/adbc_driver_manager.h"
32#include "gtest/gtest.h"
接下来,我们将声明一个针对基本生命周期的测试用例
36TEST(DriverExample, TestLifecycle) {
37 struct AdbcError error = ADBC_ERROR_INIT;
38
39 struct AdbcDatabase database;
40 ASSERT_EQ(AdbcDatabaseNew(&database, &error), ADBC_STATUS_OK);
41 AdbcDriverManagerDatabaseSetInitFunc(&database, &AdbcDriverExampleInit, &error);
42 ASSERT_EQ(AdbcDatabaseSetOption(&database, "uri", "file://foofy", &error),
43 ADBC_STATUS_OK);
44 ASSERT_EQ(AdbcDatabaseInit(&database, &error), ADBC_STATUS_OK);
45
46 struct AdbcConnection connection;
47 ASSERT_EQ(AdbcConnectionNew(&connection, &error), ADBC_STATUS_OK);
48 ASSERT_EQ(AdbcConnectionInit(&connection, &database, &error), ADBC_STATUS_OK);
49
50 struct AdbcStatement statement;
51 ASSERT_EQ(AdbcStatementNew(&connection, &statement, &error), ADBC_STATUS_OK);
52
53 ASSERT_EQ(AdbcStatementRelease(&statement, &error), ADBC_STATUS_OK);
54 ASSERT_EQ(AdbcConnectionRelease(&connection, &error), ADBC_STATUS_OK);
55 ASSERT_EQ(AdbcDatabaseRelease(&database, &error), ADBC_STATUS_OK);
56
57 if (error.release) {
58 error.release(&error);
59 }
60}
位于 apache/arrow-adbc 存储库中的驱动程序可以使用内置验证库,该库针对功能齐全的 SQL 数据库实现通用测试套件,并提供实用程序来测试一系列输入和输出。
高层测试¶
秘诀来源: driver_example.py
在验证了基本驱动程序功能之后,我们可以使用 adbc_driver_manager
Python 包的内置 dbapi 实现来公开一个现成的 Pythonic 数据库 API。 这对于高层测试也很有用!
首先,我们将导入 pathlib 进行一些路径计算和 adbc_driver_manager
的 dbapi
模块
26from pathlib import Path
27
28from adbc_driver_manager import dbapi
接下来,我们将定义一个 connect()
函数,该函数使用我们在上一节中使用 cmake
构建的共享库的位置来包装 dbapi.connect()
。 就我们的教程而言,这将位于 CMake build/
目录中。
35def connect(uri: str):
36 build_dir = Path(__file__).parent / "build"
37 for lib in [
38 "libdriver_example.dylib",
39 "libdriver_example.so",
40 "driver_example.dll",
41 ]:
42 driver_lib = build_dir / lib
43 if driver_lib.exists():
44 return dbapi.connect(
45 driver=str(driver_lib.resolve()), db_kwargs={"uri": uri}
46 )
47
48 # Try to find it on the dynamic loader path
49 return dbapi.connect(driver="driver_example", db_kwargs={"uri": uri})
接下来,我们可以让我们的驱动程序运行起来!我们在驱动程序中实现的两个部分是“批量导入”功能和“全选”,所以让我们看看它是否有效!
54if __name__ == "__main__":
55 import os
56
57 import pyarrow
58
59 with connect(uri=Path(__file__).parent.as_uri()) as con:
60 data = pyarrow.table({"col": [1, 2, 3]})
61 with con.cursor() as cur:
62 cur.adbc_ingest("example.arrows", data, mode="create")
63
64 with con.cursor() as cur:
65 cur.execute("SELECT * FROM example.arrows")
66 print(cur.fetchall())
67 # Output: [(1,), (2,), (3,)]
68
69 os.unlink(Path(__file__).parent / "example.arrows")
[(1,), (2,), (3,)]
高层测试也可以使用 adbcdrivermanager
包在 R 中编写。
library(adbcdrivermanager)
drv <- adbc_driver("build/libdriver_example.dylib")
db <- adbc_database_init(drv, uri = paste0("file://", getwd()))
con <- adbc_connection_init(db)
data.frame(col = 1:3) |> write_adbc(con, "example.arrows")
con |> read_adbc("SELECT * FROM example.arrows") |> as.data.frame()
unlink("example.arrows")