ADBC
Arrow 資料庫連線能力
載入中...
搜尋中...
沒有符合項目
statement.h
1// 根據 Apache 軟體基金會 (ASF) 授權條款之一授權給您
2// 或多個貢獻者授權協議。請參閱 NOTICE 檔案
3// 隨此作品散佈以取得更多資訊
4// 關於版權所有權。ASF 授權此檔案
5// 根據 Apache License 2.0 版條款 (
6// "授權條款") 授權給您;除非遵守
7// 授權條款,否則您不得使用此檔案。您可以在以下網址取得授權條款副本:
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// 除非適用法律要求或書面同意,
12// 根據「授權條款」散佈的軟體是以
13// 「現狀」基礎散佈,不帶任何明示或暗示的
14// 任何形式的保證或條件。請參閱「授權條款」以了解
15// 管轄權限和限制的特定語言
16// 在「授權條款」下。
17
18#pragma once
19
20#include <cstdint>
21#include <memory>
22#include <optional>
23#include <string>
24#include <utility>
25#include <variant>
26#include <vector>
27
28#include "driver/framework/base_driver.h"
30#include "driver/framework/utility.h"
31
32namespace adbc::driver {
33
35template <typename Derived>
36class Statement : public BaseStatement<Derived> {
37 public
39
41 enum class TableDoesNotExist {
42 kCreate,
43 kFail,
44 };
45
47 enum class TableExists {
48 kAppend,
49 kFail,
50 kReplace,
51 };
52
54 struct EmptyState {};
56 struct IngestState {
57 std::optional<std::string> target_catalog;
58 std::optional<std::string> target_schema;
59 std::optional<std::string> target_table;
60 bool temporary = false;
61 TableDoesNotExist table_does_not_exist_ = TableDoesNotExist::kCreate;
62 TableExists table_exists_ = TableExists::kFail;
63 };
66 std::string query;
67 };
69 struct QueryState {
70 std::string query;
71 };
73 using State = std::variant<EmptyState, IngestState, PreparedState, QueryState>;
74
75 Statement() : BaseStatement<Derived>() {
76 std::memset(&bind_parameters_, 0, sizeof(bind_parameters_));
77 }
78 ~Statement() = default;
79
80 AdbcStatusCode Bind(ArrowArray* values, ArrowSchema* schema, AdbcError* error) {
81 if (!values || !values->release) {
82 return status::InvalidArgument(Derived::kErrorPrefix,
83 " Bind: must provide non-NULL array")
84 .ToAdbc(error);
85 } else if (!schema || !schema->release) {
86 return status::InvalidArgument(Derived::kErrorPrefix,
87 " Bind: must provide non-NULL stream")
88 .ToAdbc(error);
89 }
90 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
91 MakeArrayStream(schema, values, &bind_parameters_);
92 return ADBC_STATUS_OK;
93 }
94
95 AdbcStatusCode BindStream(ArrowArrayStream* stream, AdbcError* error) {
96 if (!stream || !stream->release) {
97 return status::InvalidArgument(Derived::kErrorPrefix,
98 " BindStream: must provide non-NULL stream")
99 .ToAdbc(error);
100 }
101 if (bind_parameters_.release) bind_parameters_.release(&bind_parameters_);
102 // 移動串流
103 bind_parameters_ = *stream;
104 std::memset(stream, 0, sizeof(*stream));
105 return ADBC_STATUS_OK;
106 }
107
108 AdbcStatusCode Cancel(AdbcError* error) { return ADBC_STATUS_NOT_IMPLEMENTED; }
109
110 AdbcStatusCode ExecutePartitions(struct ArrowSchema* schema,
111 struct AdbcPartitions* partitions,
112 int64_t* rows_affected, AdbcError* error) {
114 }
115
116 AdbcStatusCode ExecuteQuery(ArrowArrayStream* stream, int64_t* rows_affected,
117 AdbcError* error) {
118 return std::visit(
119 [&](auto&& state) -> AdbcStatusCode {
120 using T = std::decay_t<decltype(state)>;
121 if constexpr (std::is_same_v<T, EmptyState>) {
122 return status::InvalidState(Derived::kErrorPrefix,
123 " Cannot ExecuteQuery without setting the query")
124 .ToAdbc(error);
125 } else if constexpr (std::is_same_v<T, IngestState>) {
126 if (stream) {
127 return status::InvalidState(Derived::kErrorPrefix,
128 " Cannot ingest with result set")
129 .ToAdbc(error);
130 }
131 RAISE_RESULT(error, int64_t rows, impl().ExecuteIngestImpl(state));
132 if (rows_affected) {
133 *rows_affected = rows;
134 }
135 return ADBC_STATUS_OK;
136 } else if constexpr (std::is_same_v<T, PreparedState> ||
137 std::is_same_v<T, QueryState>) {
138 int64_t rows = 0;
139 if (stream) {
140 RAISE_RESULT(error, rows, impl().ExecuteQueryImpl(state, stream));
141 } else {
142 RAISE_RESULT(error, rows, impl().ExecuteUpdateImpl(state));
143 }
144 if (rows_affected) {
145 *rows_affected = rows;
146 }
147 return ADBC_STATUS_OK;
148 } else {
149 static_assert(!sizeof(T), "case not implemented");
150 }
151 },
152 state_);
153 }
154
155 AdbcStatusCode ExecuteSchema(ArrowSchema* schema, AdbcError* error) {
157 }
158
159 AdbcStatusCode GetParameterSchema(struct ArrowSchema* schema, struct AdbcError* error) {
160 return std::visit(
161 [&](auto&& state) -> AdbcStatusCode {
162 using T = std::decay_t<decltype(state)>;
163 if constexpr (std::is_same_v<T, EmptyState>) {
164 return status::InvalidState(
165 Derived::kErrorPrefix,
166 " Cannot GetParameterSchema without setting the query")
167 .ToAdbc(error);
168 } else if constexpr (std::is_same_v<T, IngestState>) {
169 return status::InvalidState(Derived::kErrorPrefix,
170 " Cannot GetParameterSchema in bulk ingestion")
171 .ToAdbc(error);
172 } else if constexpr (std::is_same_v<T, PreparedState>) {
173 return impl().GetParameterSchemaImpl(state, schema).ToAdbc(error);
174 } else if constexpr (std::is_same_v<T, QueryState>) {
175 return status::InvalidState(
176 Derived::kErrorPrefix,
177 " Cannot GetParameterSchema without calling Prepare")
178 .ToAdbc(error);
179 } else {
180 static_assert(!sizeof(T), "case not implemented");
181 }
182 },
183 state_);
184 }
185
186 AdbcStatusCode Init(void* parent, AdbcError* error) {
187 this->lifecycle_state_ = LifecycleState::kInitialized;
188 if (auto status = impl().InitImpl(parent); !status.ok()) {
189 return status.ToAdbc(error);
190 }
191 return ObjectBase::Init(parent, error);
192 }
193
194 AdbcStatusCode Prepare(AdbcError* error) {
195 RAISE_STATUS(error, std::visit(
196 [&](auto&& state) -> Status {
197 using T = std::decay_t<decltype(state)>;
198 if constexpr (std::is_same_v<T, EmptyState>) {
199 return status::InvalidState(
200 Derived::kErrorPrefix,
201 " Cannot Prepare without setting the query");
202 } else if constexpr (std::is_same_v<T, IngestState>) {
203 return status::InvalidState(
204 Derived::kErrorPrefix,
205 " Cannot Prepare without setting the query");
206 } else if constexpr (std::is_same_v<T, PreparedState>) {
207 // 無操作
208 return status::Ok();
209 } else if constexpr (std::is_same_v<T, QueryState>) {
210 UNWRAP_STATUS(impl().PrepareImpl(state));
211 state_ = PreparedState{std::move(state.query)};
212 return status::Ok();
213 } else {
214 static_assert(!sizeof(T), "case not implemented");
215 }
216 },
217 state_));
218 return ADBC_STATUS_OK;
219 }
220
222 if (bind_parameters_.release) {
223 bind_parameters_.release(&bind_parameters_);
224 bind_parameters_.release = nullptr;
225 }
226 return impl().ReleaseImpl().ToAdbc(error);
227 }
228
229 AdbcStatusCode SetOption(std::string_view key, Option value, AdbcError* error) {
230 auto ensure_ingest = [&]() -> IngestState& {
231 if (!std::holds_alternative<IngestState>(state_)) {
232 state_ = IngestState{};
233 }
234 return std::get<IngestState>(state_);
235 };
236 if (key == ADBC_INGEST_OPTION_MODE) {
237 RAISE_RESULT(error, auto mode, value.AsString());
238 if (mode == ADBC_INGEST_OPTION_MODE_APPEND) {
239 auto& state = ensure_ingest();
240 state.table_does_not_exist_ = TableDoesNotExist::kFail;
241 state.table_exists_ = TableExists::kAppend;
242 } else if (mode == ADBC_INGEST_OPTION_MODE_CREATE) {
243 auto& state = ensure_ingest();
244 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
245 state.table_exists_ = TableExists::kFail;
246 } else if (mode == ADBC_INGEST_OPTION_MODE_CREATE_APPEND) {
247 auto& state = ensure_ingest();
248 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
249 state.table_exists_ = TableExists::kAppend;
250 } else if (mode == ADBC_INGEST_OPTION_MODE_REPLACE) {
251 auto& state = ensure_ingest();
252 state.table_does_not_exist_ = TableDoesNotExist::kCreate;
253 state.table_exists_ = TableExists::kReplace;
254 } else {
255 return status::InvalidArgument(Derived::kErrorPrefix, " Invalid ingest mode '",
256 key, "': ", value.Format())
257 .ToAdbc(error);
258 }
259 return ADBC_STATUS_OK;
260 } else if (key == ADBC_INGEST_OPTION_TARGET_CATALOG) {
261 if (value.has_value()) {
262 RAISE_RESULT(error, auto catalog, value.AsString());
263 ensure_ingest().target_catalog = catalog;
264 } else {
265 ensure_ingest().target_catalog = std::nullopt;
266 }
267 return ADBC_STATUS_OK;
268 } else if (key == ADBC_INGEST_OPTION_TARGET_DB_SCHEMA) {
269 if (value.has_value()) {
270 RAISE_RESULT(error, auto schema, value.AsString());
271 ensure_ingest().target_schema = schema;
272 } else {
273 ensure_ingest().target_schema = std::nullopt;
274 }
275 return ADBC_STATUS_OK;
276 } else if (key == ADBC_INGEST_OPTION_TARGET_TABLE) {
277 RAISE_RESULT(error, auto table, value.AsString());
278 ensure_ingest().target_table = table;
279 return ADBC_STATUS_OK;
280 } else if (key == ADBC_INGEST_OPTION_TEMPORARY) {
281 RAISE_RESULT(error, auto temporary, value.AsBool());
282 ensure_ingest().temporary = temporary;
283 return ADBC_STATUS_OK;
284 }
285 return impl().SetOptionImpl(key, value).ToAdbc(error);
286 }
287
288 AdbcStatusCode SetSqlQuery(const char* query, AdbcError* error) {
289 RAISE_STATUS(error, std::visit(
290 [&](auto&& state) -> Status {
291 using T = std::decay_t<decltype(state)>;
292 if constexpr (std::is_same_v<T, EmptyState>) {
293 state_ = QueryState{
294 std::string(query),
295 };
296 return status::Ok();
297 } else if constexpr (std::is_same_v<T, IngestState>) {
298 state_ = QueryState{
299 std::string(query),
300 };
301 return status::Ok();
302 } else if constexpr (std::is_same_v<T, PreparedState>) {
303 state_ = QueryState{
304 std::string(query),
305 };
306 return status::Ok();
307 } else if constexpr (std::is_same_v<T, QueryState>) {
308 state.query = std::string(query);
309 return status::Ok();
310 } else {
311 static_assert(!sizeof(T),
312 "info value type not implemented");
313 }
314 },
315 state_));
316 return ADBC_STATUS_OK;
317 }
318
319 AdbcStatusCode SetSubstraitPlan(const uint8_t* plan, size_t length, AdbcError* error) {
321 }
322
323 Result<int64_t> ExecuteIngestImpl(IngestState& state) {
324 return status::NotImplemented(Derived::kErrorPrefix,
325 " Bulk ingest is not implemented");
326 }
327
328 Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
329 return status::NotImplemented(Derived::kErrorPrefix,
330 " ExecuteQuery is not implemented");
331 }
332
333 Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
334 return status::NotImplemented(Derived::kErrorPrefix,
335 " ExecuteQuery is not implemented");
336 }
337
338 Result<int64_t> ExecuteUpdateImpl(PreparedState& state) {
339 return status::NotImplemented(Derived::kErrorPrefix,
340 " ExecuteQuery (update) is not implemented");
341 }
342
343 Result<int64_t> ExecuteUpdateImpl(QueryState& state) {
344 return status::NotImplemented(Derived::kErrorPrefix,
345 " ExecuteQuery (update) is not implemented");
346 }
347
348 Status GetParameterSchemaImpl(PreparedState& state, ArrowSchema* schema) {
349 return status::NotImplemented(Derived::kErrorPrefix,
350 " GetParameterSchema is not implemented");
351 }
352
353 Status InitImpl(void* parent) { return status::Ok(); }
354
355 Status PrepareImpl(QueryState& state) {
356 return status::NotImplemented(Derived::kErrorPrefix, " Prepare is not implemented");
357 }
358
359 Status ReleaseImpl() { return status::Ok(); }
360
361 Status SetOptionImpl(std::string_view key, Option value) {
362 return status::NotImplemented(Derived::kErrorPrefix, " Unknown statement option ",
363 key, "=", value.Format());
364 }
365
366 protected
367 ArrowArrayStream bind_parameters_;
368
369 private
370 State state_ = State(EmptyState{});
371 Derived& impl() { return static_cast<Derived&>(*this); }
372};
373
374} // namespace adbc::driver
Definition base_driver.h:1022
virtual AdbcStatusCode Init(void *parent, AdbcError *error)
初始化物件。
Definition base_driver.h:274
一個型別化的選項值包裝器。目前不嘗試轉換(即,取得雙精度浮點數選項...
Definition base_driver.h:59
Result< std::string_view > AsString() const
如果值是字串,則取得該值。
Definition base_driver.h:126
std::string Format() const
提供值的易讀摘要。
Definition base_driver.h:139
Result< bool > AsBool() const
嘗試將字串值解析為布林值。
Definition base_driver.h:83
bool has_value() const
檢查是否已設定此選項。
Definition base_driver.h:80
statement 的基本實作。
Definition statement.h:36
AdbcStatusCode Release(AdbcError *error)
終結物件。
Definition statement.h:221
AdbcStatusCode Init(void *parent, AdbcError *error)
初始化物件。
Definition statement.h:186
TableExists
當表格已存在時,在資料載入中要執行的動作。
Definition statement.h:47
TableDoesNotExist
當表格不存在時,在資料載入中要執行的動作。
Definition statement.h:41
Status SetOptionImpl(std::string_view key, Option value)
設定選項。可能在 InitImpl 之前呼叫。
Definition statement.h:361
AdbcStatusCode SetOption(std::string_view key, Option value, AdbcError *error)
設定選項值。
Definition statement.h:229
std::variant< EmptyState, IngestState, PreparedState, QueryState > State
Statement 狀態:以上其中之一。
Definition statement.h:73
AdbcStatusCode + AdbcError 的包裝器。
Definition status.h:43
bool ok() const
檢查這是否為錯誤。
Definition status.h:64
#define ADBC_STATUS_NOT_IMPLEMENTED
操作未實作或不支援。
Definition adbc.h:187
uint8_t AdbcStatusCode
可能會失敗之操作的錯誤代碼。
Definition adbc.h:176
#define ADBC_STATUS_OK
沒有錯誤。
Definition adbc.h:179
操作的詳細錯誤訊息。
Definition adbc.h:269
void MakeArrayStream(ArrowSchema *schema, ArrowArray *array, ArrowArrayStream *out)
從給定的 ArrowSchema 和 ArrowArray 建立 ArrowArrayStream。
#define ADBC_INGEST_OPTION_MODE_CREATE
建立表格並插入資料;如果表格已存在則會發生錯誤。
Definition adbc.h:761
#define ADBC_INGEST_OPTION_TARGET_CATALOG
用於大量插入之表格的目錄。
Definition adbc.h:778
#define ADBC_INGEST_OPTION_TEMPORARY
使用臨時表格進行資料載入。
Definition adbc.h:792
#define ADBC_INGEST_OPTION_TARGET_DB_SCHEMA
用於大量插入之表格的結構描述。
Definition adbc.h:782
#define ADBC_INGEST_OPTION_MODE_CREATE_APPEND
插入資料;如果表格不存在則建立表格,如果表格存在但結構描述不相符則會發生錯誤...
Definition adbc.h:774
#define ADBC_INGEST_OPTION_MODE
是否建立(預設)或附加。
Definition adbc.h:759
#define ADBC_INGEST_OPTION_MODE_REPLACE
建立表格並插入資料;如果原始表格已存在則捨棄它。
Definition adbc.h:769
#define ADBC_INGEST_OPTION_MODE_APPEND
不要建立表格,並插入資料;如果表格不存在(ADBC_STATUS_NOT_FOUND)或...則會發生錯誤
Definition adbc.h:765
#define ADBC_INGEST_OPTION_TARGET_TABLE
大量插入之目標表格的名稱。
Definition adbc.h:755
分散式/分割結果集的分割區。
Definition adbc.h:897
#define RAISE_RESULT(ERROR, LHS, RHS)
在傳回 AdbcStatusCode 的函式中,用於解包 Result 的輔助程式。
Definition status.h:277
#define UNWRAP_STATUS(rhs)
在傳回 Result/Status 的函式中,用於解包 Status 的輔助程式。
Definition status.h:286
#define RAISE_STATUS(ERROR, RHS)
在傳回 AdbcStatusCode 的函式中,用於解包 Status 的輔助程式。
Definition status.h:280
Statement 狀態:已初始化,但未設定查詢。
Definition statement.h:54
Statement 狀態:大量資料載入。
Definition statement.h:56
Statement 狀態:預備陳述式。
Definition statement.h:65
Statement 狀態:隨選查詢。
Definition statement.h:69