ADBC
Arrow Database Connectivity
Loading...
Searching...
No Matches
statement.h
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// https://apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#pragma once
19
20#include <cstdint>
21#include <memory>
22#include <optional>
23#include <string>
24#include <utility>
25#include <variant>
26#include <vector>
27
28#include "driver/framework/base_driver.h"
30#include "driver/framework/utility.h"
31
32namespace adbc::driver {
33
35template <typename Derived>
36class Statement : public BaseStatement<Derived> {
37 public:
39
41 enum class TableDoesNotExist {
42 kCreate,
43 kFail,
44 };
45
47 enum class TableExists {
48 kAppend,
49 kFail,
50 kReplace,
51 };
52
54 struct EmptyState {};
56 struct IngestState {
57 std::optional<std::string> target_catalog;
58 std::optional<std::string> target_schema;
59 std::optional<std::string> target_table;
60 bool temporary = false;
61 TableDoesNotExist table_does_not_exist_ = TableDoesNotExist::kCreate;
62 TableExists table_exists_ = TableExists::kFail;
63 };
66 std::string query;
67 };
69 struct QueryState {
70 std::string query;
71 };
73 using State = std::variant<EmptyState, IngestState, PreparedState, QueryState>;
74
75 Statement() : BaseStatement<Derived>() {
76 std::memset(&bind_parameters_, 0, sizeof(bind_parameters_));
77 }
78 ~Statement() = default;
79
80 AdbcStatusCode Bind(ArrowArray* values, ArrowSchema* schema, AdbcError* error) {
81 if (!values || !values->release) {
82 return status::InvalidArgument(Derived::kErrorPrefix,
83 " Bind: must provide non-NULL array")
84 .ToAdbc(error);
85 } else if (!schema || !schema->release) {
86 return status::InvalidArgument(Derived::kErrorPrefix,
87 " Bind: must provide non-NULL stream")
88 .ToAdbc(error);
89 }
90 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
91 MakeArrayStream(schema, values, &bind_parameters_);
92 return ADBC_STATUS_OK;
93 }
94
95 AdbcStatusCode BindStream(ArrowArrayStream* stream, AdbcError* error) {
96 if (!stream || !stream->release) {
97 return status::InvalidArgument(Derived::kErrorPrefix,
98 " BindStream: must provide non-NULL stream")
99 .ToAdbc(error);
100 }
101 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
102 // Move stream
103 bind_parameters_ = *stream;
104 std::memset(stream, 0, sizeof(*stream));
105 return ADBC_STATUS_OK;
106 }
107
108 AdbcStatusCode Cancel(AdbcError* error) { return ADBC_STATUS_NOT_IMPLEMENTED; }
109
110 AdbcStatusCode ExecutePartitions(struct ArrowSchema* schema,
111 struct AdbcPartitions* partitions,
112 int64_t* rows_affected, AdbcError* error) {
114 }
115
116 AdbcStatusCode ExecuteQuery(ArrowArrayStream* stream, int64_t* rows_affected,
117 AdbcError* error) {
118 return std::visit(
119 [&](auto&& state) -> AdbcStatusCode {
120 using T = std::decay_t<decltype(state)>;
121 if constexpr (std::is_same_v<T, EmptyState>) {
122 return status::InvalidState(Derived::kErrorPrefix,
123 " Cannot ExecuteQuery without setting the query")
124 .ToAdbc(error);
125 } else if constexpr (std::is_same_v<T, IngestState>) {
126 if (stream) {
127 return status::InvalidState(Derived::kErrorPrefix,
128 " Cannot ingest with result set")
129 .ToAdbc(error);
130 }
131 RAISE_RESULT(error, int64_t rows, impl().ExecuteIngestImpl(state));
132 if (rows_affected) {
133 *rows_affected = rows;
134 }
135 return ADBC_STATUS_OK;
136 } else if constexpr (std::is_same_v<T, PreparedState> ||
137 std::is_same_v<T, QueryState>) {
138 int64_t rows = 0;
139 if (stream) {
140 RAISE_RESULT(error, rows, impl().ExecuteQueryImpl(state, stream));
141 } else {
142 RAISE_RESULT(error, rows, impl().ExecuteUpdateImpl(state));
143 }
144 if (rows_affected) {
145 *rows_affected = rows;
146 }
147 return ADBC_STATUS_OK;
148 } else {
149 static_assert(!sizeof(T), "case not implemented");
150 }
151 },
152 state_);
153 }
154
155 AdbcStatusCode ExecuteSchema(ArrowSchema* schema, AdbcError* error) {
157 }
158
159 AdbcStatusCode GetParameterSchema(struct ArrowSchema* schema, struct AdbcError* error) {
160 return std::visit(
161 [&](auto&& state) -> AdbcStatusCode {
162 using T = std::decay_t<decltype(state)>;
163 if constexpr (std::is_same_v<T, EmptyState>) {
164 return status::InvalidState(
165 Derived::kErrorPrefix,
166 " Cannot GetParameterSchema without setting the query")
167 .ToAdbc(error);
168 } else if constexpr (std::is_same_v<T, IngestState>) {
169 return status::InvalidState(Derived::kErrorPrefix,
170 " Cannot GetParameterSchema in bulk ingestion")
171 .ToAdbc(error);
172 } else if constexpr (std::is_same_v<T, PreparedState>) {
173 return impl().GetParameterSchemaImpl(state, schema).ToAdbc(error);
174 } else if constexpr (std::is_same_v<T, QueryState>) {
175 return status::InvalidState(
176 Derived::kErrorPrefix,
177 " Cannot GetParameterSchema without calling Prepare")
178 .ToAdbc(error);
179 } else {
180 static_assert(!sizeof(T), "case not implemented");
181 }
182 },
183 state_);
184 }
185
186 AdbcStatusCode Init(void* parent, AdbcError* error) {
187 this->lifecycle_state_ = LifecycleState::kInitialized;
188 if (auto status = impl().InitImpl(parent); !status.ok()) {
189 return status.ToAdbc(error);
190 }
191 return ObjectBase::Init(parent, error);
192 }
193
194 AdbcStatusCode Prepare(AdbcError* error) {
195 RAISE_STATUS(error, std::visit(
196 [&](auto&& state) -> Status {
197 using T = std::decay_t<decltype(state)>;
198 if constexpr (std::is_same_v<T, EmptyState>) {
199 return status::InvalidState(
200 Derived::kErrorPrefix,
201 " Cannot Prepare without setting the query");
202 } else if constexpr (std::is_same_v<T, IngestState>) {
203 return status::InvalidState(
204 Derived::kErrorPrefix,
205 " Cannot Prepare without setting the query");
206 } else if constexpr (std::is_same_v<T, PreparedState>) {
207 // No-op
208 return status::Ok();
209 } else if constexpr (std::is_same_v<T, QueryState>) {
210 UNWRAP_STATUS(impl().PrepareImpl(state));
211 state_ = PreparedState{std::move(state.query)};
212 return status::Ok();
213 } else {
214 static_assert(!sizeof(T), "case not implemented");
215 }
216 },
217 state_));
218 return ADBC_STATUS_OK;
219 }
220
222 if (bind_parameters_.release) {
223 bind_parameters_.release(&bind_parameters_);
224 bind_parameters_.release = nullptr;
225 }
226 return impl().ReleaseImpl().ToAdbc(error);
227 }
228
229 AdbcStatusCode SetOption(std::string_view key, Option value, AdbcError* error) {
230 auto ensure_ingest = [&]() -> IngestState& {
231 if (!std::holds_alternative<IngestState>(state_)) {
232 state_ = IngestState{};
233 }
234 return std::get<IngestState>(state_);
235 };
236 if (key == ADBC_INGEST_OPTION_MODE) {
237 RAISE_RESULT(error, auto mode, value.AsString());
238 if (mode == ADBC_INGEST_OPTION_MODE_APPEND) {
239 auto& state = ensure_ingest();
240 state.table_does_not_exist_ = TableDoesNotExist::kFail;
241 state.table_exists_ = TableExists::kAppend;
242 } else if (mode == ADBC_INGEST_OPTION_MODE_CREATE) {
243 auto& state = ensure_ingest();
244 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
245 state.table_exists_ = TableExists::kFail;
246 } else if (mode == ADBC_INGEST_OPTION_MODE_CREATE_APPEND) {
247 auto& state = ensure_ingest();
248 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
249 state.table_exists_ = TableExists::kAppend;
250 } else if (mode == ADBC_INGEST_OPTION_MODE_REPLACE) {
251 auto& state = ensure_ingest();
252 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
253 state.table_exists_ = TableExists::kReplace;
254 } else {
255 return status::InvalidArgument(Derived::kErrorPrefix, " Invalid ingest mode '",
256 key, "': ", value.Format())
257 .ToAdbc(error);
258 }
259 return ADBC_STATUS_OK;
260 } else if (key == ADBC_INGEST_OPTION_TARGET_CATALOG) {
261 if (value.has_value()) {
262 RAISE_RESULT(error, auto catalog, value.AsString());
263 ensure_ingest().target_catalog = catalog;
264 } else {
265 ensure_ingest().target_catalog = std::nullopt;
266 }
267 return ADBC_STATUS_OK;
268 } else if (key == ADBC_INGEST_OPTION_TARGET_DB_SCHEMA) {
269 if (value.has_value()) {
270 RAISE_RESULT(error, auto schema, value.AsString());
271 ensure_ingest().target_schema = schema;
272 } else {
273 ensure_ingest().target_schema = std::nullopt;
274 }
275 return ADBC_STATUS_OK;
276 } else if (key == ADBC_INGEST_OPTION_TARGET_TABLE) {
277 RAISE_RESULT(error, auto table, value.AsString());
278 ensure_ingest().target_table = table;
279 return ADBC_STATUS_OK;
280 } else if (key == ADBC_INGEST_OPTION_TEMPORARY) {
281 RAISE_RESULT(error, auto temporary, value.AsBool());
282 ensure_ingest().temporary = temporary;
283 return ADBC_STATUS_OK;
284 }
285 return impl().SetOptionImpl(key, value).ToAdbc(error);
286 }
287
288 AdbcStatusCode SetSqlQuery(const char* query, AdbcError* error) {
289 RAISE_STATUS(error, std::visit(
290 [&](auto&& state) -> Status {
291 using T = std::decay_t<decltype(state)>;
292 if constexpr (std::is_same_v<T, EmptyState>) {
293 state_ = QueryState{
294 std::string(query),
295 };
296 return status::Ok();
297 } else if constexpr (std::is_same_v<T, IngestState>) {
298 state_ = QueryState{
299 std::string(query),
300 };
301 return status::Ok();
302 } else if constexpr (std::is_same_v<T, PreparedState>) {
303 state_ = QueryState{
304 std::string(query),
305 };
306 return status::Ok();
307 } else if constexpr (std::is_same_v<T, QueryState>) {
308 state.query = std::string(query);
309 return status::Ok();
310 } else {
311 static_assert(!sizeof(T),
312 "info value type not implemented");
313 }
314 },
315 state_));
316 return ADBC_STATUS_OK;
317 }
318
319 AdbcStatusCode SetSubstraitPlan(const uint8_t* plan, size_t length, AdbcError* error) {
321 }
322
323 Result<int64_t> ExecuteIngestImpl(IngestState& state) {
324 return status::NotImplemented(Derived::kErrorPrefix,
325 " Bulk ingest is not implemented");
326 }
327
328 Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
329 return status::NotImplemented(Derived::kErrorPrefix,
330 " ExecuteQuery is not implemented");
331 }
332
333 Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
334 return status::NotImplemented(Derived::kErrorPrefix,
335 " ExecuteQuery is not implemented");
336 }
337
338 Result<int64_t> ExecuteUpdateImpl(PreparedState& state) {
339 return status::NotImplemented(Derived::kErrorPrefix,
340 " ExecuteQuery (update) is not implemented");
341 }
342
343 Result<int64_t> ExecuteUpdateImpl(QueryState& state) {
344 return status::NotImplemented(Derived::kErrorPrefix,
345 " ExecuteQuery (update) is not implemented");
346 }
347
348 Status GetParameterSchemaImpl(PreparedState& state, ArrowSchema* schema) {
349 return status::NotImplemented(Derived::kErrorPrefix,
350 " GetParameterSchema is not implemented");
351 }
352
353 Status InitImpl(void* parent) { return status::Ok(); }
354
355 Status PrepareImpl(QueryState& state) {
356 return status::NotImplemented(Derived::kErrorPrefix, " Prepare is not implemented");
357 }
358
359 Status ReleaseImpl() { return status::Ok(); }
360
361 Status SetOptionImpl(std::string_view key, Option value) {
362 return status::NotImplemented(Derived::kErrorPrefix, " Unknown statement option ",
363 key, "=", value.Format());
364 }
365
366 protected:
367 ArrowArrayStream bind_parameters_;
368
369 private:
370 State state_ = State(EmptyState{});
371 Derived& impl() { return static_cast<Derived&>(*this); }
372};
373
374} // namespace adbc::driver
Definition base_driver.h:1022
virtual AdbcStatusCode Init(void *parent, AdbcError *error)
Initialize the object.
Definition base_driver.h:274
A typed option value wrapper. It currently does not attempt conversion (i.e., getting a double option...
Definition base_driver.h:59
Result< std::string_view > AsString() const
Get the value if it is a string.
Definition base_driver.h:126
std::string Format() const
Provide a human-readable summary of the value.
Definition base_driver.h:139
Result< bool > AsBool() const
Try to parse a string value as a boolean.
Definition base_driver.h:83
bool has_value() const
Check whether this option is set.
Definition base_driver.h:80
A base implementation of a statement.
Definition statement.h:36
AdbcStatusCode Release(AdbcError *error)
Finalize the object.
Definition statement.h:221
AdbcStatusCode Init(void *parent, AdbcError *error)
Initialize the object.
Definition statement.h:186
TableExists
What to do in ingestion when the table already exists.
Definition statement.h:47
TableDoesNotExist
What to do in ingestion when the table does not exist.
Definition statement.h:41
Status SetOptionImpl(std::string_view key, Option value)
Set an option. May be called prior to InitImpl.
Definition statement.h:361
AdbcStatusCode SetOption(std::string_view key, Option value, AdbcError *error)
Set an option value.
Definition statement.h:229
std::variant< EmptyState, IngestState, PreparedState, QueryState > State
Statement state: one of the above.
Definition statement.h:73
A wrapper around AdbcStatusCode + AdbcError.
Definition status.h:43
bool ok() const
Check if this is an error or not.
Definition status.h:64
#define ADBC_STATUS_NOT_IMPLEMENTED
The operation is not implemented or supported.
Definition adbc.h:187
uint8_t AdbcStatusCode
Error codes for operations that may fail.
Definition adbc.h:176
#define ADBC_STATUS_OK
No error.
Definition adbc.h:179
A detailed error message for an operation.
Definition adbc.h:269
void MakeArrayStream(ArrowSchema *schema, ArrowArray *array, ArrowArrayStream *out)
Create an ArrowArrayStream from a given ArrowSchema and ArrowArray.
#define ADBC_INGEST_OPTION_MODE_CREATE
Create the table and insert data; error if the table exists.
Definition adbc.h:761
#define ADBC_INGEST_OPTION_TARGET_CATALOG
The catalog of the table for bulk insert.
Definition adbc.h:778
#define ADBC_INGEST_OPTION_TEMPORARY
Use a temporary table for ingestion.
Definition adbc.h:792
#define ADBC_INGEST_OPTION_TARGET_DB_SCHEMA
The schema of the table for bulk insert.
Definition adbc.h:782
#define ADBC_INGEST_OPTION_MODE_CREATE_APPEND
Insert data; create the table if it does not exist, or error if the table exists, but the schema does...
Definition adbc.h:774
#define ADBC_INGEST_OPTION_MODE
Whether to create (the default) or append.
Definition adbc.h:759
#define ADBC_INGEST_OPTION_MODE_REPLACE
Create the table and insert data; drop the original table if it already exists.
Definition adbc.h:769
#define ADBC_INGEST_OPTION_MODE_APPEND
Do not create the table, and insert data; error if the table does not exist (ADBC_STATUS_NOT_FOUND) o...
Definition adbc.h:765
#define ADBC_INGEST_OPTION_TARGET_TABLE
The name of the target table for a bulk insert.
Definition adbc.h:755
The partitions of a distributed/partitioned result set.
Definition adbc.h:897
#define RAISE_RESULT(ERROR, LHS, RHS)
A helper to unwrap a Result in functions returning AdbcStatusCode.
Definition status.h:277
#define UNWRAP_STATUS(rhs)
A helper to unwrap a Status in functions returning Result/Status.
Definition status.h:286
#define RAISE_STATUS(ERROR, RHS)
A helper to unwrap a Status in functions returning AdbcStatusCode.
Definition status.h:280
Statement state: initialized with no set query.
Definition statement.h:54
Statement state: bulk ingestion.
Definition statement.h:56
Statement state: prepared statement.
Definition statement.h:65
Statement state: ad-hoc query.
Definition statement.h:69