57 std::optional<std::string> target_catalog;
58 std::optional<std::string> target_schema;
59 std::optional<std::string> target_table;
60 bool temporary =
false;
73 using State = std::variant<EmptyState, IngestState, PreparedState, QueryState>;
76 std::memset(&bind_parameters_, 0,
sizeof(bind_parameters_));
81 if (!values || !values->release) {
82 return status::InvalidArgument(Derived::kErrorPrefix,
83 " Bind:必须提供非 NULL 数组")
85 }
else if (!schema || !schema->release) {
86 return status::InvalidArgument(Derived::kErrorPrefix,
90 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
96 if (!stream || !stream->release) {
97 return status::InvalidArgument(Derived::kErrorPrefix,
98 " BindStream:必须提供非 NULL 流")
101 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
103 bind_parameters_ = *stream;
104 std::memset(stream, 0,
sizeof(*stream));
112 int64_t* rows_affected,
AdbcError* error) {
116 AdbcStatusCode ExecuteQuery(ArrowArrayStream* stream, int64_t* rows_affected,
120 using T = std::decay_t<
decltype(state)>;
121 if constexpr (std::is_same_v<T, EmptyState>) {
122 return status::InvalidState(Derived::kErrorPrefix,
123 " 未设置查询无法执行 ExecuteQuery")
125 }
else if constexpr (std::is_same_v<T, IngestState>) {
127 return status::InvalidState(Derived::kErrorPrefix,
131 RAISE_RESULT(error, int64_t rows, impl().ExecuteIngestImpl(state));
133 *rows_affected = rows;
136 }
else if constexpr (std::is_same_v<T, PreparedState> ||
137 std::is_same_v<T, QueryState>) {
140 RAISE_RESULT(error, rows, impl().ExecuteQueryImpl(state, stream));
142 RAISE_RESULT(error, rows, impl().ExecuteUpdateImpl(state));
145 *rows_affected = rows;
149 static_assert(!
sizeof(T),
"case not implemented");
162 using T = std::decay_t<
decltype(state)>;
163 if constexpr (std::is_same_v<T, EmptyState>) {
164 return status::InvalidState(
165 Derived::kErrorPrefix,
166 " Cannot GetParameterSchema without setting the query")
168 }
else if constexpr (std::is_same_v<T, IngestState>) {
169 return status::InvalidState(Derived::kErrorPrefix,
170 " Cannot GetParameterSchema in bulk ingestion")
172 }
else if constexpr (std::is_same_v<T, PreparedState>) {
173 return impl().GetParameterSchemaImpl(state, schema).ToAdbc(error);
174 }
else if constexpr (std::is_same_v<T, QueryState>) {
175 return status::InvalidState(
176 Derived::kErrorPrefix,
177 " Cannot GetParameterSchema without calling Prepare")
180 static_assert(!
sizeof(T),
"case not implemented");
187 this->lifecycle_state_ = LifecycleState::kInitialized;
188 if (
auto status = impl().InitImpl(parent); !status.
ok()) {
189 return status.ToAdbc(error);
196 [&](
auto&& state) ->
Status {
197 using T = std::decay_t<
decltype(state)>;
198 if constexpr (std::is_same_v<T, EmptyState>) {
199 return status::InvalidState(
200 Derived::kErrorPrefix,
201 " Cannot Prepare without setting the query");
202 }
else if constexpr (std::is_same_v<T, IngestState>) {
203 return status::InvalidState(
204 Derived::kErrorPrefix,
205 " Cannot Prepare without setting the query");
206 }
else if constexpr (std::is_same_v<T, PreparedState>) {
209 }
else if constexpr (std::is_same_v<T, QueryState>) {
211 state_ = PreparedState{std::move(state.query)};
214 static_assert(!
sizeof(T),
"case not implemented");
222 if (bind_parameters_.release) {
223 bind_parameters_.release(&bind_parameters_);
224 bind_parameters_.release =
nullptr;
226 return impl().ReleaseImpl().ToAdbc(error);
231 if (!std::holds_alternative<IngestState>(state_)) {
234 return std::get<IngestState>(state_);
239 auto& state = ensure_ingest();
240 state.table_does_not_exist_ = TableDoesNotExist::kFail;
241 state.table_exists_ = TableExists::kAppend;
243 auto& state = ensure_ingest();
244 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
245 state.table_exists_ = TableExists::kFail;
247 auto& state = ensure_ingest();
248 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
249 state.table_exists_ = TableExists::kAppend;
251 auto& state = ensure_ingest();
252 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
253 state.table_exists_ = TableExists::kReplace;
255 return status::InvalidArgument(Derived::kErrorPrefix,
" 无效的 ingest 模式 '",
256 key,
"': ", value.
Format())
263 ensure_ingest().target_catalog = catalog;
265 ensure_ingest().target_catalog = std::nullopt;
271 ensure_ingest().target_schema = schema;
273 ensure_ingest().target_schema = std::nullopt;
278 ensure_ingest().target_table = table;
282 ensure_ingest().temporary = temporary;
285 return impl().SetOptionImpl(key, value).ToAdbc(error);
290 [&](
auto&& state) ->
Status {
291 using T = std::decay_t<
decltype(state)>;
292 if constexpr (std::is_same_v<T, EmptyState>) {
297 }
else if constexpr (std::is_same_v<T, IngestState>) {
302 }
else if constexpr (std::is_same_v<T, PreparedState>) {
307 }
else if constexpr (std::is_same_v<T, QueryState>) {
308 state.query = std::string(query);
311 static_assert(!
sizeof(T),
312 "info value type not implemented");
323 Result<int64_t> ExecuteIngestImpl(IngestState& state) {
324 return status::NotImplemented(Derived::kErrorPrefix,
328 Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
329 return status::NotImplemented(Derived::kErrorPrefix,
330 " ExecuteQuery 未实现");
333 Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
334 return status::NotImplemented(Derived::kErrorPrefix,
335 " ExecuteQuery 未实现");
338 Result<int64_t> ExecuteUpdateImpl(PreparedState& state) {
339 return status::NotImplemented(Derived::kErrorPrefix,
340 " ExecuteQuery (update) 未实现");
343 Result<int64_t> ExecuteUpdateImpl(QueryState& state) {
344 return status::NotImplemented(Derived::kErrorPrefix,
345 " ExecuteQuery (update) 未实现");
348 Status GetParameterSchemaImpl(PreparedState& state, ArrowSchema* schema) {
349 return status::NotImplemented(Derived::kErrorPrefix,
350 " GetParameterSchema 未实现");
353 Status InitImpl(
void* parent) {
return status::Ok(); }
355 Status PrepareImpl(QueryState& state) {
356 return status::NotImplemented(Derived::kErrorPrefix,
" Prepare 未实现");
359 Status ReleaseImpl() {
return status::Ok(); }
362 return status::NotImplemented(Derived::kErrorPrefix,
" 未知的语句选项 ",
363 key,
"=", value.
Format());
367 ArrowArrayStream bind_parameters_;
371 Derived& impl() {
return static_cast<Derived&
>(*this); }