ADBC
Arrow 数据库连接
正在加载...
正在搜索...
未找到匹配项
connection.h
1// 根据 Apache 软件基金会 (ASF) 授权的条款
2// 或多项贡献者许可协议。请参阅 NOTICE 文件
3// 此工作附带的关于版权归属的附加信息
4// 。ASF 根据 Apache 许可证 2.0 版(
5// “许可证”)向您许可此文件,除非您遵守
6// 许可证,否则您不得使用此文件。您可以在以下位置获取许可证的副本
7//
8//
9// http://apache.org/licenses/LICENSE-2.0
10//
11// 除非适用法律要求或书面同意,否则
12// 根据许可证分发的软件按“原样”分发
13// 不作任何明示或暗示的保证或条件
14//。请参阅许可证,以了解有关权限和限制的
15// 特定语言
16// 根据许可证。
17
18#pragma once
19
20#include <cstdint>
21#include <memory>
22#include <string>
23#include <string_view>
24#include <utility>
25#include <vector>
26
27#include <arrow-adbc/adbc.h>
28
29#include "driver/framework/base_driver.h"
30#include "driver/framework/objects.h"
31#include "driver/framework/utility.h"
32
33namespace adbc::driver {
41template <typename Derived>
42class Connection : public ObjectBase {
43 public
45
47 enum class AutocommitState {
48 kAutocommit,
49 kTransaction,
50 };
51
52 Connection() : ObjectBase() {}
53 ~Connection() = default;
54
56 AdbcStatusCode Init(void* parent, AdbcError* error) override {
57 if (auto status = impl().InitImpl(parent); !status.ok()) {
58 return status.ToAdbc(error);
59 }
60 return ObjectBase::Init(parent, error);
61 }
62
65
67 AdbcStatusCode Commit(AdbcError* error) {
68 switch (autocommit_) {
69 case AutocommitState::kAutocommit
70 return status::InvalidState(Derived::kErrorPrefix,
71 " 没有活动的事务,无法提交")
72 .ToAdbc(error);
73 case AutocommitState::kTransaction
74 return impl().CommitImpl().ToAdbc(error);
75 }
76 assert(false);
78 }
79
81 AdbcStatusCode GetInfo(const uint32_t* info_codes, size_t info_codes_length,
82 ArrowArrayStream* out, AdbcError* error) {
83 if (!out) {
84 RAISE_STATUS(error, status::InvalidArgument("out 不能为空"));
85 }
86
87 std::vector<uint32_t> codes(info_codes, info_codes + info_codes_length);
88 RAISE_RESULT(error, auto infos, impl().InfoImpl(codes));
89 RAISE_STATUS(error, MakeGetInfoStream(infos, out));
90 return ADBC_STATUS_OK;
91 }
92
94 AdbcStatusCode GetObjects(int c_depth, const char* catalog, const char* db_schema,
95 const char* table_name, const char** table_type,
96 const char* column_name, ArrowArrayStream* out,
97 AdbcError* error) {
98 const auto catalog_filter =
99 catalog ? std::make_optional(std::string_view(catalog)) : std::nullopt;
100 const auto schema_filter =
101 db_schema ? std::make_optional(std::string_view(db_schema)) : std::nullopt;
102 const auto table_filter =
103 table_name ? std::make_optional(std::string_view(table_name)) : std::nullopt;
104 const auto column_filter =
105 column_name ? std::make_optional(std::string_view(column_name)) : std::nullopt;
106 std::vector<std::string_view> table_type_filter;
107 while (table_type && *table_type) {
108 if (*table_type) {
109 table_type_filter.push_back(std::string_view(*table_type));
110 }
111 table_type++;
112 }
113
114 GetObjectsDepth depth = GetObjectsDepth::kColumns;
115 switch (c_depth) {
117 depth = GetObjectsDepth::kCatalogs;
118 break;
120 depth = GetObjectsDepth::kColumns;
121 break;
123 depth = GetObjectsDepth::kSchemas;
124 break;
126 depth = GetObjectsDepth::kTables;
127 break;
128 default
129 return status::InvalidArgument(Derived::kErrorPrefix,
130 " GetObjects: invalid depth ", c_depth)
131 .ToAdbc(error);
132 }
133
134 RAISE_RESULT(error, auto helper, impl().GetObjectsImpl());
135 auto status = BuildGetObjects(helper.get(), depth, catalog_filter, schema_filter,
136 table_filter, column_filter, table_type_filter, out);
138 RAISE_STATUS(error, status);
139 return ADBC_STATUS_OK;
140 }
141
143 Result<Option> GetOption(std::string_view key) override {
145 switch (autocommit_) {
146 case AutocommitState::kAutocommit:
148 case AutocommitState::kTransaction:
150 }
151 } else if (key == ADBC_CONNECTION_OPTION_CURRENT_CATALOG) {
152 UNWRAP_RESULT(auto catalog, impl().GetCurrentCatalogImpl());
153 if (catalog) {
154 return driver::Option(std::move(*catalog));
155 }
156 } return driver::Option();
158 UNWRAP_RESULT(auto schema, impl().GetCurrentSchemaImpl());
159 if (schema) {
160 return driver::Option(std::move(*schema));
161 }
162 } return driver::Option();
163 }
164 return Base::GetOption(key);
165 }
166
168 AdbcStatusCode GetStatistics(const char* catalog, const char* db_schema,
169 const char* table_name, char approximate,
170 ArrowArrayStream* out, AdbcError* error) {
172 }
173
175 AdbcStatusCode GetStatisticNames(ArrowArrayStream* out, AdbcError* error) {
177 }
178
180 AdbcStatusCode GetTableSchema(const char* catalog, const char* db_schema,
181 const char* table_name, ArrowSchema* schema,
182 AdbcError* error) {
183 if (!table_name) {
184 return status::InvalidArgument(Derived::kErrorPrefix,
185 " GetTableSchema: must provide table_name")
186 .ToAdbc(error);
187 }
188 std::memset(schema, 0, sizeof(*schema));
189 std::optional<std::string_view> catalog_param =
190 catalog ? std::make_optional(std::string_view(catalog)) : std::nullopt;
191 std::optional<std::string_view> db_schema_param =
192 db_schema ? std::make_optional(std::string_view(db_schema)) : std::nullopt;
193 std::string_view table_name_param = table_name;
194
195 return impl()
196 .GetTableSchemaImpl(catalog_param, db_schema_param, table_name_param, schema)
197 .ToAdbc(error);
198 }
199
201 AdbcStatusCode GetTableTypes(ArrowArrayStream* out, AdbcError* error) {
202 if (!out) {
203 RAISE_STATUS(error, status::InvalidArgument("out must be non-null"));
204 }
205
206 RAISE_RESULT(error, std::vector<std::string> table_types, impl().GetTableTypesImpl());
207 RAISE_STATUS(error, MakeTableTypesStream(table_types, out));
208 return ADBC_STATUS_OK;
209 }
210
212 AdbcStatusCode ReadPartition(const uint8_t* serialized_partition,
213 size_t serialized_length, ArrowArrayStream* out,
214 AdbcError* error) {
216 }
217
220 return impl().ReleaseImpl().ToAdbc(error);
221 }
222
224 AdbcStatusCode Rollback(AdbcError* error) {
225 switch (autocommit_) {
226 case AutocommitState::kAutocommit:
227 return status::InvalidState(Derived::kErrorPrefix,
228 " 没有活动的事务,无法回滚")
229 .ToAdbc(error);
230 case AutocommitState::kTransaction:
231 return impl().RollbackImpl().ToAdbc(error);
232 }
233 assert(false);
235 }
236
238 AdbcStatusCode SetOption(std::string_view key, Option value,
239 AdbcError* error) override {
240 return impl().SetOptionImpl(key, value).ToAdbc(error);
241 }
242
246 Status CommitImpl() { return status::NotImplemented("提交"); }
247
248 Result<std::optional<std::string>> GetCurrentCatalogImpl() { return std::nullopt; }
249
250 Result<std::optional<std::string>> GetCurrentSchemaImpl() { return std::nullopt; }
251
259 return std::make_unique<GetObjectsHelper>();
260 }
261
262 Status GetTableSchemaImpl(std::optional<std::string_view> catalog,
263 std::optional<std::string_view> db_schema,
264 std::string_view table_name, ArrowSchema* schema) {
265 return status::NotImplemented("获取表模式");
266 }
267
268 Result<std::vector<std::string>> GetTableTypesImpl() {
269 return std::vector<std::string>();
270 }
271
272 Result<std::vector<InfoValue>> InfoImpl(const std::vector<uint32_t>& codes) {
273 return std::vector<InfoValue>{};
274 }
275
276 Status InitImpl(void* parent) { return status::Ok(); }
277
278 Status ReleaseImpl() { return status::Ok(); }
279
280 Status RollbackImpl() { return status::NotImplemented("回滚"); }
281
282 Status SetOptionImpl(std::string_view key, Option value) {
284 UNWRAP_RESULT(auto enabled, value.AsBool());
285 switch (autocommit_) {
286 case AutocommitState::kAutocommit: {
287 if (!enabled) {
288 UNWRAP_STATUS(impl().ToggleAutocommitImpl(false));
289 autocommit_ = AutocommitState::kTransaction;
290 }
291 break;
292 }
293 case AutocommitState::kTransaction: {
294 if (enabled) {
295 UNWRAP_STATUS(impl().ToggleAutocommitImpl(true));
296 autocommit_ = AutocommitState::kAutocommit;
297 }
298 break;
299 }
300 }
301 return status::Ok();
302 }
303 return status::NotImplemented(Derived::kErrorPrefix, " 未知的连接选项 ",
304 key, "=", value.Format());
305 }
306
307 Status ToggleAutocommitImpl(bool enable_autocommit) {
308 return status::NotImplemented(Derived::kErrorPrefix, " 无法更改自动提交");
309 }
310
311 protected
312 AutocommitState autocommit_ = AutocommitState::kAutocommit;
313
314 private
315 Derived& impl() { return static_cast<Derived&>(*this); }
316};
317
318} // namespace adbc::driver
AdbcConnection 的 CRTP 基类实现。
定义 connection.h:42
Status CommitImpl()
提交当前事务并开始一个新事务。
定义 connection.h:246
AdbcStatusCode Release(AdbcError *error) override
完成对象的最终化操作。
定义 connection.h:219
Result< std::unique_ptr< GetObjectsHelper > > GetObjectsImpl()
查询数据库目录。
定义 connection.h:258
AdbcStatusCode SetOption(std::string_view key, Option value, AdbcError *error) override
设置一个选项值。
定义 connection.h:238
AdbcStatusCode Init(void *parent, AdbcError *error) override
初始化对象。
定义 connection.h:56
Result< Option > GetOption(std::string_view key) override
获取一个选项值。
定义 connection.h:143
AutocommitState
是否启用自动提交(默认:启用)。
定义 connection.h:47
AdbcDatabase、AdbcConnection 和 AdbcStatement 的 private_data 的基类。
定义 base_driver.h:254
virtual AdbcStatusCode Init(void *parent, AdbcError *error)
初始化对象。
定义 base_driver.h:274
一个类型化的选项值包装器。 它当前不尝试转换(即,获取双精度选项...
定义 base_driver.h:59
一个围绕值或错误的包装器。
定义 status.h:203
AdbcStatusCode + AdbcError 的包装器。
定义 status.h:43
bool ok() const
检查这是否是一个错误。
定义 status.h:64
#define ADBC_OBJECT_DEPTH_TABLES
返回关于目录、模式和表的元数据。
定义 adbc.h:518
#define ADBC_CONNECTION_OPTION_CURRENT_DB_SCHEMA
当前模式的规范选项的名称。
定义 adbc.h:604
#define ADBC_OBJECT_DEPTH_CATALOGS
仅返回关于目录的元数据。
定义 adbc.h:510
#define ADBC_CONNECTION_OPTION_CURRENT_CATALOG
当前目录的规范选项的名称。
定义 adbc.h:595
#define ADBC_CONNECTION_OPTION_AUTOCOMMIT
是否启用自动提交的规范选项的名称。
定义 adbc.h:578
#define ADBC_OBJECT_DEPTH_DB_SCHEMAS
返回关于目录和模式的元数据。
定义 adbc.h:514
#define ADBC_OPTION_VALUE_DISABLED
用于禁用选项的规范选项值。
定义 adbc.h:418
#define ADBC_OPTION_VALUE_ENABLED
用于启用选项的规范选项值。
定义 adbc.h:414
#define ADBC_OBJECT_DEPTH_COLUMNS
返回关于目录、模式、表和列的元数据。
定义 adbc.h:522
#define ADBC_STATUS_NOT_IMPLEMENTED
操作未实现或不支持。
定义 adbc.h:187
uint8_t AdbcStatusCode
可能失败的操作的错误代码。
定义 adbc.h:176
#define ADBC_STATUS_OK
无错误。
定义 adbc.h:179
#define ADBC_STATUS_INTERNAL
驱动程序或数据库内部发生错误。
定义 adbc.h:227
操作的详细错误消息。
定义 adbc.h:269
Status MakeTableTypesStream(const std::vector< std::string > &table_types, ArrowArrayStream *out)
创建一个表类型向量的 ArrowArrayStream 表示形式。
GetObjectsDepth
GetObjects 级别。
定义 objects.h:39
Status BuildGetObjects(GetObjectsHelper *helper, GetObjectsDepth depth, std::optional< std::string_view > catalog_filter, std::optional< std::string_view > schema_filter, std::optional< std::string_view > table_filter, std::optional< std::string_view > column_filter, const std::vector< std::string_view > &table_types, ArrowArrayStream *out)
一个实现 GetObjects 的辅助函数。 out/helper 的生命周期由调用者管理。
Status MakeGetInfoStream(const std::vector< InfoValue > &infos, ArrowArrayStream *out)
创建一个 ArrowArrayStream,用于从 AdbcConnectionGetInfo() 返回。
#define RAISE_RESULT(ERROR, LHS, RHS)
一个辅助宏,用于在返回 AdbcStatusCode 的函数中解包 Result。
定义 status.h:277
#define UNWRAP_STATUS(rhs)
一个辅助宏,用于在返回 Result/Status 的函数中解包 Status。
定义 status.h:286
#define RAISE_STATUS(ERROR, RHS)
一个辅助宏,用于在返回 AdbcStatusCode 的函数中解包 Status。
定义 status.h:280
#define UNWRAP_RESULT(lhs, rhs)
一个辅助宏,用于在返回 Result/Status 的函数中解包 Result。
定义 status.h:283