驱动程序示例

配方来源: driver_example.cc

在这里我们将展示使用 ADBC 驱动程序框架库在 C++ 中构建 ADBC 驱动程序的结构。这与 ADBC 用于构建其 SQLite 和 PostgreSQL 驱动程序的库相同,并抽象了 C 可调用和目录/元数据函数的细节,这些函数可能难以实现,但对于有效利用 ADBC 生态系统的其余部分至关重要。

从高层次来看,我们将构建一个驱动程序,其“数据库”是一个目录,其中数据库中的每个“表”都是一个包含 Arrow IPC 流的文件。可以使用批量导入功能写入表,并且可以使用SELECT * FROM (the file)形式的简单查询读取表。

安装

这个快速入门实际上是一个文学 C++ 文件。您可以克隆存储库,构建示例,然后继续操作。

我们假设您使用的是 conda-forge 来管理依赖项。CMake、C++17 编译器和 ADBC 库是必需的。可以按如下方式安装它们

mamba install cmake compilers libadbc-driver-manager

构建

我们将在此处使用 CMake。从 ADBC 存储库的源代码签出中

mkdir build
cd build
cmake ../docs/source/cpp/recipe_driver -DADBC_DRIVER_EXAMPLE_BUILD_TESTS=ON
cmake --build .
ctest

使用 C++ 构建 ADBC 驱动程序

让我们从一些包含开始。值得注意的是,我们将需要驱动程序框架头文件和 nanoarrow,我们将使用它在本示例驱动程序中创建和使用 Arrow C 数据接口结构。

72#include "driver_example.h"
73
74#include <cstdio>
75#include <string>
76
77#include "driver/framework/connection.h"
78#include "driver/framework/database.h"
79#include "driver/framework/statement.h"
80
81#include "nanoarrow/nanoarrow.hpp"
82#include "nanoarrow/nanoarrow_ipc.hpp"
83
84#include "arrow-adbc/adbc.h"

接下来,我们将把一些基本框架类型引入命名空间,以减少实现的冗长性

  • adbc::driver::Option: 选项可以在 ADBC 数据库、连接和语句上设置。它们可以是字符串、不透明二进制、双精度浮点数或整数。Option 类抽象了获取、设置和解析这些值的细节。

  • adbc::driver::Status: Status 是 ADBC 驱动程序框架的错误处理机制:没有返回值且可能失败的函数将返回一个 Status。您可以使用 UNWRAP_STATUS(some_call()) 作为 Status status = some_call(); if (!status.ok()) return status; 的简写,以简洁地传播错误。

  • adbc::driver::Result: Result<T> 用于作为函数的返回值,这些函数在成功时返回类型为 T 的值,并在失败时使用 Status 传达其错误。您可以使用 UNWRAP_RESULT(some_type value, some_call()) 作为简写

    some_type value;
    Result<some_type> maybe_value = some_call();
    if (!maybe_value.status().ok()) {
      return maybe_value.status();
    } else {
      value = *maybe_value;
    }
    
113using adbc::driver::Option;
114using adbc::driver::Result;
115using adbc::driver::Status;
116
117namespace {

接下来,我们将提供数据库实现。驱动程序框架使用好奇的递归模板模式 (CRTP)。该模式的细节由框架处理,但在功能上这仍然只是覆盖来自基类的那些方法,而基类处理细节。

在这里,我们的数据库实现将只记录用户传入的 uri。我们将对此的解释是一个 file:// uri,指向应写入我们的 IPC 文件或应读取 IPC 文件的目录。这是数据库在 ADBC 中的角色:对数据库的共享句柄,该句柄可能在连接之间缓存一些共享状态,但仍然允许多个连接并发地对数据库执行操作。

134class DriverExampleDatabase : public adbc::driver::Database<DriverExampleDatabase> {
135 public:
136  [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
137
138  Status SetOptionImpl(std::string_view key, Option value) override {
139    // Handle and validate options implemented by this driver
140    if (key == "uri") {
141      UNWRAP_RESULT(std::string_view uri, value.AsString());
142
143      if (uri.find("file://") != 0) {
144        return adbc::driver::status::InvalidArgument(
145            "[example] uri must start with 'file://'");
146      }
147
148      uri_ = uri;
149      return adbc::driver::status::Ok();
150    }
151
152    // Defer to the base implementation to handle state managed by the base
153    // class (and error for all other options).
154    return Base::SetOptionImpl(key, value);
155  }
156
157  Result<Option> GetOption(std::string_view key) override {
158    // Return the value of options implemented by this driver
159    if (key == "uri") {
160      return Option(uri_);
161    }
162
163    // Defer to the base implementation to handle state managed by the base
164    // class (and error for all other options).
165    return Base::GetOption(key);
166  }
167
168  // This is called after zero or more calls to SetOption() on
169  Status InitImpl() override {
170    if (uri_.empty()) {
171      return adbc::driver::status::InvalidArgument(
172          "[example] Must set uri to a non-empty value");
173    }
174
175    return Base::InitImpl();
176  }
177
178  // Getters for members needed by the connection and/or statement:
179  const std::string& uri() { return uri_; }
180
181 private:
182  std::string uri_;
183};

接下来,我们实现了连接。虽然数据库的作用通常是存储或缓存信息,但连接的作用是提供可能代价高昂的资源句柄(例如,在连接到数据库时协商身份验证)。因为我们的示例“数据库”只是一个目录,所以我们在连接中不需要做太多关于资源管理的事情,除了提供一种方法让子语句访问数据库的 uri。

连接的另一个作用是提供有关表、列、统计信息和其他目录类信息的元数据,调用者可能希望在发出查询之前了解这些信息。驱动程序框架基类提供了一些帮助程序来实现这些函数,这样您就可以主要在 C++17 标准库(而不是自己构建 C 级数组)方面实现它们。

198class DriverExampleConnection : public adbc::driver::Connection<DriverExampleConnection> {
199 public:
200  [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
201
202  // Get information from the database and/or store a reference if needed.
203  Status InitImpl(void* parent) {
204    auto& database = *reinterpret_cast<DriverExampleDatabase*>(parent);
205    uri_ = database.uri();
206    return Base::InitImpl(parent);
207  }
208
209  // Getters for members needed by the statement:
210  const std::string& uri() { return uri_; }
211
212 private:
213  std::string uri_;
214};

接下来,我们提供语句实现。语句是管理查询执行的地方。因为我们的数据源实际上就是 Arrow 数据,所以我们不需要提供一个管理类型或值转换的层。SQLite 和 PostgreSQL 驱动程序都为有效地实现和测试这些转换分配了许多代码行。nanoarrow 库可用于实现两个方向上的转换,并且是另一篇文章的范围。

223class DriverExampleStatement : public adbc::driver::Statement<DriverExampleStatement> {
224 public:
225  [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
226
227  // Get information from the connection and/or store a reference if needed.
228  Status InitImpl(void* parent) {
229    auto& connection = *reinterpret_cast<DriverExampleConnection*>(parent);
230    uri_ = connection.uri();
231    return Base::InitImpl(parent);
232  }
233
234  // Our implementation of a bulk ingestion is to write an Arrow IPC stream as a file
235  // using the target table as the filename.
236  Result<int64_t> ExecuteIngestImpl(IngestState& state) {
237    std::string directory = uri_.substr(strlen("file://"));
238    std::string filename = directory + "/" + *state.target_table;
239
240    nanoarrow::ipc::UniqueOutputStream output_stream;
241    FILE* c_file = std::fopen(filename.c_str(), "wb");
242    UNWRAP_ERRNO(Internal, ArrowIpcOutputStreamInitFile(output_stream.get(), c_file,
243                                                        /*close_on_release*/ true));
244
245    nanoarrow::ipc::UniqueWriter writer;
246    UNWRAP_ERRNO(Internal, ArrowIpcWriterInit(writer.get(), output_stream.get()));
247
248    ArrowError nanoarrow_error;
249    ArrowErrorInit(&nanoarrow_error);
250    UNWRAP_NANOARROW(nanoarrow_error, Internal,
251                     ArrowIpcWriterWriteArrayStream(writer.get(), &bind_parameters_,
252                                                    &nanoarrow_error));
253
254    return -1;
255  }
256
257  // Our implementation of query execution is to accept a simple query in the form
258  // SELECT * FROM (the filename).
259  Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
260    std::string prefix("SELECT * FROM ");
261    if (state.query.find(prefix) != 0) {
262      return adbc::driver::status::InvalidArgument(
263          "[example] Query must be in the form 'SELECT * FROM filename'");
264    }
265
266    std::string directory = uri_.substr(strlen("file://"));
267    std::string filename = directory + "/" + state.query.substr(prefix.size());
268
269    nanoarrow::ipc::UniqueInputStream input_stream;
270    FILE* c_file = std::fopen(filename.c_str(), "rb");
271    UNWRAP_ERRNO(Internal, ArrowIpcInputStreamInitFile(input_stream.get(), c_file,
272                                                       /*close_on_release*/ true));
273
274    UNWRAP_ERRNO(Internal,
275                 ArrowIpcArrayStreamReaderInit(stream, input_stream.get(), nullptr));
276    return -1;
277  }
278
279  // This path is taken when the user calls Prepare() first.
280  Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
281    QueryState query_state{state.query};
282    return ExecuteQueryImpl(query_state, stream);
283  }
284
285 private:
286  std::string uri_;
287};
288
289}  // namespace

最后,我们创建驱动程序初始化函数,这是驱动程序管理器需要提供的实现,用于组成 ADBC C API 的 Adbc**() 函数。此函数的名称很重要:此文件将被构建到一个名为 libdriver_example.(so|dll|dylib) 的共享库中,因此驱动程序管理器将查找符号 AdbcDriverExampleInit() 作为默认入口点,当被要求加载驱动程序 "driver_example" 时。

298extern "C" AdbcStatusCode AdbcDriverExampleInit(int version, void* raw_driver,
299                                                AdbcError* error) {
300  using ExampleDriver =
301      adbc::driver::Driver<DriverExampleDatabase, DriverExampleConnection,
302                           DriverExampleStatement>;
303  return ExampleDriver::Init(version, raw_driver, error);
304}

低级别测试

配方来源: driver_example_test.cc

在编写驱动程序草稿之后,下一步是确保驱动程序管理器可以加载它,并且可以初始化和释放数据库、连接和语句实例。

首先,我们将包含驱动程序管理器和 googletest

29#include "driver_example.h"
30
31#include "arrow-adbc/adbc_driver_manager.h"
32#include "gtest/gtest.h"

接下来,我们将声明一个用于基本生命周期的测试用例

36TEST(DriverExample, TestLifecycle) {
37  struct AdbcError error = ADBC_ERROR_INIT;
38
39  struct AdbcDatabase database;
40  ASSERT_EQ(AdbcDatabaseNew(&database, &error), ADBC_STATUS_OK);
41  AdbcDriverManagerDatabaseSetInitFunc(&database, &AdbcDriverExampleInit, &error);
42  ASSERT_EQ(AdbcDatabaseSetOption(&database, "uri", "file://foofy", &error),
43            ADBC_STATUS_OK);
44  ASSERT_EQ(AdbcDatabaseInit(&database, &error), ADBC_STATUS_OK);
45
46  struct AdbcConnection connection;
47  ASSERT_EQ(AdbcConnectionNew(&connection, &error), ADBC_STATUS_OK);
48  ASSERT_EQ(AdbcConnectionInit(&connection, &database, &error), ADBC_STATUS_OK);
49
50  struct AdbcStatement statement;
51  ASSERT_EQ(AdbcStatementNew(&connection, &statement, &error), ADBC_STATUS_OK);
52
53  ASSERT_EQ(AdbcStatementRelease(&statement, &error), ADBC_STATUS_OK);
54  ASSERT_EQ(AdbcConnectionRelease(&connection, &error), ADBC_STATUS_OK);
55  ASSERT_EQ(AdbcDatabaseRelease(&database, &error), ADBC_STATUS_OK);
56
57  if (error.release) {
58    error.release(&error);
59  }
60}

位于 apache/arrow-adbc 存储库中的驱动程序可以使用内置的验证库,该库实现了针对功能齐全的 SQL 数据库的通用测试套件,并提供了实用程序来测试一系列输入和输出。

高级测试

配方来源: driver_example.py

在验证了基本驱动程序功能之后,我们可以使用 adbc_driver_manager Python 包的内置 dbapi 实现来公开一个现成的 Pythonic 数据库 API。这对高级测试也很有用!

首先,我们将导入 pathlib 用于一些路径计算,以及 adbc_driver_managerdbapi 模块

26from pathlib import Path
27
28from adbc_driver_manager import dbapi

接下来,我们将定义一个 connect() 函数,它使用我们在上一节中使用 cmake 构建的共享库的位置来包装 dbapi.connect()。为了我们教程的目的,这将在 CMake build/ 目录中。

35def connect(uri: str):
36    build_dir = Path(__file__).parent / "build"
37    for lib in [
38        "libdriver_example.dylib",
39        "libdriver_example.so",
40        "driver_example.dll",
41    ]:
42        driver_lib = build_dir / lib
43        if driver_lib.exists():
44            return dbapi.connect(
45                driver=str(driver_lib.resolve()), db_kwargs={"uri": uri}
46            )
47
48    raise RuntimeError("Can't find driver shared object")

接下来,我们可以试用我们的驱动程序!我们在驱动程序中实现的两个部分是“批量导入”功能和“从所有选择”,所以让我们看看它是否有效!

53if __name__ == "__main__":
54    import os
55
56    import pyarrow
57
58    with connect(uri=Path(__file__).parent.as_uri()) as con:
59        data = pyarrow.table({"col": [1, 2, 3]})
60        with con.cursor() as cur:
61            cur.adbc_ingest("example.arrows", data, mode="create")
62
63        with con.cursor() as cur:
64            cur.execute("SELECT * FROM example.arrows")
65            print(cur.fetchall())
66
67        os.unlink(Path(__file__).parent / "example.arrows")

高级测试也可以使用 adbcdrivermanager 包在 R 中编写。

library(adbcdrivermanager)

drv <- adbc_driver("build/libdriver_example.dylib")
db <- adbc_database_init(drv, uri = paste0("file://", getwd()))
con <- adbc_connection_init(db)

data.frame(col = 1:3) |> write_adbc(con, "example.arrows")
con |> read_adbc("SELECT * FROM example.arrows") |> as.data.frame()
unlink("example.arrows")