ADBC
Arrow 数据库连接
正在加载...
正在搜索...
无匹配项
statement.h
1// 根据 Apache 软件基金会 (ASF) 的一份或多份贡献者许可协议获得许可。
2// 请查阅随本工作分发的 NOTICE 文件,了解有关版权所有权的附加信息。
3// ASF 根据 Apache 许可证 2.0 版(以下简称
4// “许可证”)向您许可此文件;除非遵守
5// 许可证,否则您不得使用此文件。您可以从以下网址获取许可证副本:
6// 许可证。您可以从以下网址获取许可证副本:
7// 许可证。您可以从以下网址获取许可证副本:
8//
9// http://apache.org/licenses/LICENSE-2.0
10//
11// 除非适用法律要求或书面同意,否则
12// 根据许可证分发的软件按“原样”分发,
13// 不附带任何明示或暗示的担保或条件。
14// 请参阅许可证,了解有关权限和限制的特定语言。
15// 请参阅许可证,了解有关权限和限制的特定语言。
16// 根据许可证。
17
18#pragma once
19
20#include <cstdint>
21#include <memory>
22#include <optional>
23#include <string>
24#include <utility>
25#include <variant>
26#include <vector>
27
28#include "driver/framework/base_driver.h"
30#include "driver/framework/utility.h"
31
32namespace adbc::driver {
33
35template <typename Derived>
36class Statement : public BaseStatement<Derived> {
37 public
39
41 enum class TableDoesNotExist {
42 kCreate,
43 kFail,
44 };
45
47 enum class TableExists {
48 kAppend,
49 kFail,
50 kReplace,
51 };
52
54 struct EmptyState {};
56 struct IngestState {
57 std::optional<std::string> target_catalog;
58 std::optional<std::string> target_schema;
59 std::optional<std::string> target_table;
60 bool temporary = false;
61 TableDoesNotExist table_does_not_exist_ = TableDoesNotExist::kCreate;
62 TableExists table_exists_ = TableExists::kFail;
63 };
66 std::string query;
67 };
69 struct QueryState {
70 std::string query;
71 };
73 using State = std::variant<EmptyState, IngestState, PreparedState, QueryState>;
74
75 Statement() : BaseStatement<Derived>() {
76 std::memset(&bind_parameters_, 0, sizeof(bind_parameters_));
77 }
78 ~Statement() = default;
79
80 AdbcStatusCode Bind(ArrowArray* values, ArrowSchema* schema, AdbcError* error) {
81 if (!values || !values->release) {
82 return status::InvalidArgument(Derived::kErrorPrefix,
83 " Bind:必须提供非 NULL 数组")
84 .ToAdbc(error);
85 } else if (!schema || !schema->release) {
86 return status::InvalidArgument(Derived::kErrorPrefix,
87 " Bind:必须提供非 NULL 流")
88 .ToAdbc(error);
89 }
90 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
91 MakeArrayStream(schema, values, &bind_parameters_);
92 return ADBC_STATUS_OK;
93 }
94
95 AdbcStatusCode BindStream(ArrowArrayStream* stream, AdbcError* error) {
96 if (!stream || !stream->release) {
97 return status::InvalidArgument(Derived::kErrorPrefix,
98 " BindStream:必须提供非 NULL 流")
99 .ToAdbc(error);
100 }
101 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
102 // 移动流
103 bind_parameters_ = *stream;
104 std::memset(stream, 0, sizeof(*stream));
105 return ADBC_STATUS_OK;
106 }
107
108 AdbcStatusCode Cancel(AdbcError* error) { return ADBC_STATUS_NOT_IMPLEMENTED; }
109
110 AdbcStatusCode ExecutePartitions(struct ArrowSchema* schema,
111 struct AdbcPartitions* partitions,
112 int64_t* rows_affected, AdbcError* error) {
114 }
115
116 AdbcStatusCode ExecuteQuery(ArrowArrayStream* stream, int64_t* rows_affected,
117 AdbcError* error) {
118 return std::visit(
119 [&](auto&& state) -> AdbcStatusCode {
120 using T = std::decay_t<decltype(state)>;
121 if constexpr (std::is_same_v<T, EmptyState>) {
122 return status::InvalidState(Derived::kErrorPrefix,
123 " 未设置查询无法执行 ExecuteQuery")
124 .ToAdbc(error);
125 } else if constexpr (std::is_same_v<T, IngestState>) {
126 if (stream) {
127 return status::InvalidState(Derived::kErrorPrefix,
128 " 无法使用结果集进行数据提取")
129 .ToAdbc(error);
130 }
131 RAISE_RESULT(error, int64_t rows, impl().ExecuteIngestImpl(state));
132 if (rows_affected) {
133 *rows_affected = rows;
134 }
135 return ADBC_STATUS_OK;
136 } else if constexpr (std::is_same_v<T, PreparedState> ||
137 std::is_same_v<T, QueryState>) {
138 int64_t rows = 0;
139 if (stream) {
140 RAISE_RESULT(error, rows, impl().ExecuteQueryImpl(state, stream));
141 } else {
142 RAISE_RESULT(error, rows, impl().ExecuteUpdateImpl(state));
143 }
144 if (rows_affected) {
145 *rows_affected = rows;
146 }
147 return ADBC_STATUS_OK;
148 } else {
149 static_assert(!sizeof(T), "case not implemented");
150 }
151 },
153 }
154
155 AdbcStatusCode ExecuteSchema(ArrowSchema* schema, AdbcError* error) {
157 }
158
159 AdbcStatusCode GetParameterSchema(struct ArrowSchema* schema, struct AdbcError* error) {
160 return std::visit(
161 [&](auto&& state) -> AdbcStatusCode {
162 using T = std::decay_t<decltype(state)>;
163 if constexpr (std::is_same_v<T, EmptyState>) {
164 return status::InvalidState(
165 Derived::kErrorPrefix,
166 " Cannot GetParameterSchema without setting the query")
167 .ToAdbc(error);
168 } else if constexpr (std::is_same_v<T, IngestState>) {
169 return status::InvalidState(Derived::kErrorPrefix,
170 " Cannot GetParameterSchema in bulk ingestion")
171 .ToAdbc(error);
172 } else if constexpr (std::is_same_v<T, PreparedState>) {
173 return impl().GetParameterSchemaImpl(state, schema).ToAdbc(error);
174 } else if constexpr (std::is_same_v<T, QueryState>) {
175 return status::InvalidState(
176 Derived::kErrorPrefix,
177 " Cannot GetParameterSchema without calling Prepare")
178 .ToAdbc(error);
179 } else {
180 static_assert(!sizeof(T), "case not implemented");
181 }
182 },
183 state_);
184 }
185
186 AdbcStatusCode Init(void* parent, AdbcError* error) {
187 this->lifecycle_state_ = LifecycleState::kInitialized;
188 if (auto status = impl().InitImpl(parent); !status.ok()) {
189 return status.ToAdbc(error);
190 }
191 return ObjectBase::Init(parent, error);
192 }
193
194 AdbcStatusCode Prepare(AdbcError* error) {
195 RAISE_STATUS(error, std::visit(
196 [&](auto&& state) -> Status {
197 using T = std::decay_t<decltype(state)>;
198 if constexpr (std::is_same_v<T, EmptyState>) {
199 return status::InvalidState(
200 Derived::kErrorPrefix,
201 " Cannot Prepare without setting the query");
202 } else if constexpr (std::is_same_v<T, IngestState>) {
203 return status::InvalidState(
204 Derived::kErrorPrefix,
205 " Cannot Prepare without setting the query");
206 } else if constexpr (std::is_same_v<T, PreparedState>) {
207 // No-op
208 return status::Ok();
209 } else if constexpr (std::is_same_v<T, QueryState>) {
210 UNWRAP_STATUS(impl().PrepareImpl(state));
211 state_ = PreparedState{std::move(state.query)};
212 return status::Ok();
213 } else {
214 static_assert(!sizeof(T), "case not implemented");
215 }
216 },
217 state_));
218 return ADBC_STATUS_OK;
219 }
220
222 if (bind_parameters_.release) {
223 bind_parameters_.release(&bind_parameters_);
224 bind_parameters_.release = nullptr;
225 }
226 return impl().ReleaseImpl().ToAdbc(error);
227 }
228
229 AdbcStatusCode SetOption(std::string_view key, Option value, AdbcError* error) {
230 auto ensure_ingest = [&]() -> IngestState& {
231 if (!std::holds_alternative<IngestState>(state_)) {
232 state_ = IngestState{};
233 }
234 return std::get<IngestState>(state_);
235 };
236 if (key == ADBC_INGEST_OPTION_MODE) {
237 RAISE_RESULT(error, auto mode, value.AsString());
238 if (mode == ADBC_INGEST_OPTION_MODE_APPEND) {
239 auto& state = ensure_ingest();
240 state.table_does_not_exist_ = TableDoesNotExist::kFail;
241 state.table_exists_ = TableExists::kAppend;
242 } else if (mode == ADBC_INGEST_OPTION_MODE_CREATE) {
243 auto& state = ensure_ingest();
244 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
245 state.table_exists_ = TableExists::kFail;
246 } else if (mode == ADBC_INGEST_OPTION_MODE_CREATE_APPEND) {
247 auto& state = ensure_ingest();
248 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
249 state.table_exists_ = TableExists::kAppend;
250 } else if (mode == ADBC_INGEST_OPTION_MODE_REPLACE) {
251 auto& state = ensure_ingest();
252 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
253 state.table_exists_ = TableExists::kReplace;
254 } else {
255 return status::InvalidArgument(Derived::kErrorPrefix, " 无效的 ingest 模式 '",
256 key, "': ", value.Format())
257 .ToAdbc(error);
258 }
259 return ADBC_STATUS_OK;
260 } else if (key == ADBC_INGEST_OPTION_TARGET_CATALOG) {
261 if (value.has_value()) {
262 RAISE_RESULT(error, auto catalog, value.AsString());
263 ensure_ingest().target_catalog = catalog;
264 } else {
265 ensure_ingest().target_catalog = std::nullopt;
266 }
267 return ADBC_STATUS_OK;
268 } else if (key == ADBC_INGEST_OPTION_TARGET_DB_SCHEMA) {
269 if (value.has_value()) {
270 RAISE_RESULT(error, auto schema, value.AsString());
271 ensure_ingest().target_schema = schema;
272 } else {
273 ensure_ingest().target_schema = std::nullopt;
274 }
275 return ADBC_STATUS_OK;
276 } else if (key == ADBC_INGEST_OPTION_TARGET_TABLE) {
277 RAISE_RESULT(error, auto table, value.AsString());
278 ensure_ingest().target_table = table;
279 return ADBC_STATUS_OK;
280 } else if (key == ADBC_INGEST_OPTION_TEMPORARY) {
281 RAISE_RESULT(error, auto temporary, value.AsBool());
282 ensure_ingest().temporary = temporary;
283 return ADBC_STATUS_OK;
284 }
285 return impl().SetOptionImpl(key, value).ToAdbc(error);
286 }
287
288 AdbcStatusCode SetSqlQuery(const char* query, AdbcError* error) {
289 RAISE_STATUS(error, std::visit(
290 [&](auto&& state) -> Status {
291 using T = std::decay_t<decltype(state)>;
292 if constexpr (std::is_same_v<T, EmptyState>) {
293 state_ = QueryState{
294 std::string(query),
295 };
296 return status::Ok();
297 } else if constexpr (std::is_same_v<T, IngestState>) {
298 state_ = QueryState{
299 std::string(query),
300 };
301 return status::Ok();
302 } else if constexpr (std::is_same_v<T, PreparedState>) {
303 state_ = QueryState{
304 std::string(query),
305 };
306 return status::Ok();
307 } else if constexpr (std::is_same_v<T, QueryState>) {
308 state.query = std::string(query);
309 return status::Ok();
310 } else {
311 static_assert(!sizeof(T),
312 "info value type not implemented");
313 }
314 },
315 state_));
316 return ADBC_STATUS_OK;
317 }
318
319 AdbcStatusCode SetSubstraitPlan(const uint8_t* plan, size_t length, AdbcError* error) {
321 }
322
323 Result<int64_t> ExecuteIngestImpl(IngestState& state) {
324 return status::NotImplemented(Derived::kErrorPrefix,
325 " 批量摄取未实现");
326 }
327
328 Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
329 return status::NotImplemented(Derived::kErrorPrefix,
330 " ExecuteQuery 未实现");
331 }
332
333 Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
334 return status::NotImplemented(Derived::kErrorPrefix,
335 " ExecuteQuery 未实现");
336 }
337
338 Result<int64_t> ExecuteUpdateImpl(PreparedState& state) {
339 return status::NotImplemented(Derived::kErrorPrefix,
340 " ExecuteQuery (update) 未实现");
341 }
342
343 Result<int64_t> ExecuteUpdateImpl(QueryState& state) {
344 return status::NotImplemented(Derived::kErrorPrefix,
345 " ExecuteQuery (update) 未实现");
346 }
347
348 Status GetParameterSchemaImpl(PreparedState& state, ArrowSchema* schema) {
349 return status::NotImplemented(Derived::kErrorPrefix,
350 " GetParameterSchema 未实现");
351 }
352
353 Status InitImpl(void* parent) { return status::Ok(); }
354
355 Status PrepareImpl(QueryState& state) {
356 return status::NotImplemented(Derived::kErrorPrefix, " Prepare 未实现");
357 }
358
359 Status ReleaseImpl() { return status::Ok(); }
360
361 Status SetOptionImpl(std::string_view key, Option value) {
362 return status::NotImplemented(Derived::kErrorPrefix, " 未知的语句选项 ",
363 key, "=", value.Format());
364 }
365
366 protected
367 ArrowArrayStream bind_parameters_;
368
369 private
370 State state_ = State(EmptyState{});
371 Derived& impl() { return static_cast<Derived&>(*this); }
372};
373
374} // namespace adbc::driver
定义 base_driver.h:1022
virtual AdbcStatusCode Init(void *parent, AdbcError *error)
初始化对象。
定义 base_driver.h:274
一个类型化的选项值包装器。它目前不尝试转换(例如,获取双精度选项...)。
定义 base_driver.h:59
Result< std::string_view > AsString() const
如果值是字符串,则获取该值。
定义 base_driver.h:126
std::string Format() const
提供该值的易于理解的摘要。
定义 base_driver.h:139
Result< bool > AsBool() const
尝试将字符串值解析为布尔值。
定义 base_driver.h:83
bool has_value() const
检查是否设置了此选项。
定义 base_driver.h:80
语句的基本实现。
定义 statement.h:36
AdbcStatusCode Release(AdbcError *error)
最终化对象。
定义 statement.h:221
AdbcStatusCode Init(void *parent, AdbcError *error)
初始化对象。
定义 statement.h:186
TableExists
当表已经存在时,在摄取过程中要执行的操作。
定义 statement.h:47
TableDoesNotExist
当表不存在时,在摄取过程中要执行的操作。
定义 statement.h:41
Status SetOptionImpl(std::string_view key, Option value)
设置一个选项。可能在 InitImpl 之前调用。
定义 statement.h:361
AdbcStatusCode SetOption(std::string_view key, Option value, AdbcError *error)
设置选项值。
定义 statement.h:229
std::variant< EmptyState, IngestState, PreparedState, QueryState > State
语句状态:以上之一。
定义 statement.h:73
AdbcStatusCode + AdbcError 的包装器。
定义 status.h:43
bool ok() const
检查这是否是错误。
定义 status.h:64
#define ADBC_STATUS_NOT_IMPLEMENTED
该操作未实现或不支持。
定义 adbc.h:187
uint8_t AdbcStatusCode
可能失败的操作的错误代码。
定义 adbc.h:176
#define ADBC_STATUS_OK
没有错误。
定义 adbc.h:179
操作的详细错误消息。
定义 adbc.h:269
void MakeArrayStream(ArrowSchema *schema, ArrowArray *array, ArrowArrayStream *out)
从给定的 ArrowSchema 和 ArrowArray 创建 ArrowArrayStream。
#define ADBC_INGEST_OPTION_MODE_CREATE
创建表并插入数据;如果表已存在则报错。
定义 adbc.h:761
#define ADBC_INGEST_OPTION_TARGET_CATALOG
用于批量插入的表的目录。
定义 adbc.h:778
#define ADBC_INGEST_OPTION_TEMPORARY
使用临时表进行摄取。
定义 adbc.h:792
#define ADBC_INGEST_OPTION_TARGET_DB_SCHEMA
用于批量插入的表的架构。
定义 adbc.h:782
#define ADBC_INGEST_OPTION_MODE_CREATE_APPEND
插入数据;如果表不存在则创建表,或者如果表存在但架构不匹配则报错。
定义 adbc.h:774
#define ADBC_INGEST_OPTION_MODE
是创建(默认)还是追加。
定义 adbc.h:759
#define ADBC_INGEST_OPTION_MODE_REPLACE
创建表并插入数据;如果原始表已经存在,则删除原始表。
定义 adbc.h:769
#define ADBC_INGEST_OPTION_MODE_APPEND
不创建表,并插入数据;如果表不存在 (ADBC_STATUS_NOT_FOUND) 或者...
定义 adbc.h:765
#define ADBC_INGEST_OPTION_TARGET_TABLE
批量插入的目标表的名称。
定义 adbc.h:755
分布式/分区结果集的分区。
定义 adbc.h:897
#define RAISE_RESULT(ERROR, LHS, RHS)
一个辅助函数,用于在返回 AdbcStatusCode 的函数中解包 Result。
定义 status.h:277
#define UNWRAP_STATUS(rhs)
一个辅助函数,用于在返回 Result/Status 的函数中解包 Status。
定义 status.h:286
#define RAISE_STATUS(ERROR, RHS)
一个辅助函数,用于在返回 AdbcStatusCode 的函数中解包 Status。
定义 status.h:280
语句状态:已初始化,没有设置查询。
定义 statement.h:54
语句状态:批量摄取。
定义 statement.h:56
语句状态:预处理语句。
定义 statement.h:65
语句状态:即席查询。
定义 statement.h:69