驅動程式範例¶
食譜來源:driver_example.cc
在此,我們將展示使用 ADBC 驅動程式框架函式庫在 C++ 中建構 ADBC 驅動程式的結構。這與 ADBC 用於建構其 SQLite 和 PostgreSQL 驅動程式的函式庫相同,並抽象化了 C 可調用物件和目錄/元數據函式的細節,這些細節可能難以實作,但對於有效利用 ADBC 生態系統的其餘部分至關重要。
在高階層次上,我們將建構一個驅動程式,其「資料庫」是一個目錄,其中資料庫中的每個「表格」都是一個包含 Arrow IPC 串流的檔案。表格可以使用批量擷取功能寫入,表格可以使用 SELECT * FROM (檔案)
形式的簡單查詢讀取。
安裝¶
此快速入門實際上是一個可讀的 C++ 檔案。您可以複製儲存庫、建置範例並跟隨操作。
我們假設您使用 conda-forge 來處理依賴項。需要 CMake、C++17 編譯器和 ADBC 函式庫。它們可以按如下方式安裝
mamba install cmake compilers libadbc-driver-manager
建置¶
我們在此使用 CMake。從 ADBC 儲存庫的來源簽出
mkdir build
cd build
cmake ../docs/source/cpp/recipe_driver -DADBC_DRIVER_EXAMPLE_BUILD_TESTS=ON
cmake --build .
ctest
使用 C++ 建置 ADBC 驅動程式¶
讓我們先從一些包含項目開始。值得注意的是,我們需要驅動程式框架標頭檔和 nanoarrow,我們將使用它來建立和使用此範例驅動程式中的 Arrow C 資料介面結構。
72#include "driver_example.h"
73
74#include <cstdio>
75#include <string>
76
77#include "driver/framework/connection.h"
78#include "driver/framework/database.h"
79#include "driver/framework/statement.h"
80
81#include "nanoarrow/nanoarrow.hpp"
82#include "nanoarrow/nanoarrow_ipc.hpp"
83
84#include "arrow-adbc/adbc.h"
接下來,我們將一些必要的框架類型帶入命名空間,以減少實作的冗長性
adbc::driver::Option
:選項可以在 ADBC 資料庫、連線和陳述式上設定。它們可以是字串、不透明二進位檔、雙精度浮點數或整數。Option
類別抽象化了如何取得、設定和剖析這些值的細節。adbc::driver::Status
:Status
是 ADBC 驅動程式框架的錯誤處理機制:沒有傳回值但可能會失敗的函式會傳回Status
。您可以使用UNWRAP_STATUS(some_call())
作為Status status = some_call(); if (!status.ok()) return status;
的簡寫,以簡潔地傳播錯誤。adbc::driver::Result
:Result<T>
用作函式的傳回值,這些函式在成功時傳回類型為T
的值,而在失敗時使用Status
傳達其錯誤。您可以使用UNWRAP_RESULT(some_type value, some_call())
作為簡寫,表示some_type value; Result<some_type> maybe_value = some_call(); if (!maybe_value.status().ok()) { return maybe_value.status(); } else { value = *maybe_value; }
113using adbc::driver::Option;
114using adbc::driver::Result;
115using adbc::driver::Status;
116
117namespace {
接下來,我們將提供資料庫實作。驅動程式框架使用奇異遞迴模板模式 (CRTP)。框架會處理此模式的細節,但實際上這仍然只是覆寫處理細節的基底類別中的方法。
在此,我們的資料庫實作將只記錄使用者傳遞的 uri
。我們對此的解釋將是 file://
uri 到一個目錄,我們的 IPC 檔案應寫入和/或應從中讀取 IPC 檔案。這是 ADBC 中資料庫的角色:資料庫的共用句柄,可能會在連線之間快取一些共用狀態,但仍然允許多個連線同時對資料庫執行操作。
134class DriverExampleDatabase : public adbc::driver::Database<DriverExampleDatabase> {
135 public:
136 [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
137
138 Status SetOptionImpl(std::string_view key, Option value) override {
139 // Handle and validate options implemented by this driver
140 if (key == "uri") {
141 UNWRAP_RESULT(std::string_view uri, value.AsString());
142
143 if (uri.find("file://") != 0) {
144 return adbc::driver::status::InvalidArgument(
145 "[example] uri must start with 'file://'");
146 }
147
148 uri_ = uri;
149 return adbc::driver::status::Ok();
150 }
151
152 // Defer to the base implementation to handle state managed by the base
153 // class (and error for all other options).
154 return Base::SetOptionImpl(key, value);
155 }
156
157 Result<Option> GetOption(std::string_view key) override {
158 // Return the value of options implemented by this driver
159 if (key == "uri") {
160 return Option(uri_);
161 }
162
163 // Defer to the base implementation to handle state managed by the base
164 // class (and error for all other options).
165 return Base::GetOption(key);
166 }
167
168 // This is called after zero or more calls to SetOption() on
169 Status InitImpl() override {
170 if (uri_.empty()) {
171 return adbc::driver::status::InvalidArgument(
172 "[example] Must set uri to a non-empty value");
173 }
174
175 return Base::InitImpl();
176 }
177
178 // Getters for members needed by the connection and/or statement:
179 const std::string& uri() { return uri_; }
180
181 private:
182 std::string uri_;
183};
接下來,我們實作連線。雖然資料庫的角色通常是儲存或快取資訊,但連線的角色是提供可能難以取得的資源句柄(例如,連線到資料庫時協商身份驗證)。因為我們的範例「資料庫」只是一個目錄,所以我們不需要在連線中執行太多資源管理,除了提供一種讓子陳述式存取資料庫 uri 的方法。
連線的另一個角色是提供關於表格、欄位、統計資訊和其他目錄類資訊的元數據,呼叫者可能想在發出查詢之前知道這些資訊。驅動程式框架基底類別提供協助程式來實作這些函式,以便您主要可以使用 C++17 標準函式庫來實作它們(而不是自己建構 C 級陣列)。
198class DriverExampleConnection : public adbc::driver::Connection<DriverExampleConnection> {
199 public:
200 [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
201
202 // Get information from the database and/or store a reference if needed.
203 Status InitImpl(void* parent) {
204 auto& database = *reinterpret_cast<DriverExampleDatabase*>(parent);
205 uri_ = database.uri();
206 return Base::InitImpl(parent);
207 }
208
209 // Getters for members needed by the statement:
210 const std::string& uri() { return uri_; }
211
212 private:
213 std::string uri_;
214};
接下來,我們提供陳述式實作。陳述式是管理查詢執行的地方。因為我們的資料來源實際上是 Arrow 資料,所以我們不必提供管理類型或值轉換的層。SQLite 和 PostgreSQL 驅動程式都投入了許多程式碼行來有效率地實作和測試這些轉換。nanoarrow 函式庫可用於在兩個方向上實作轉換,並且是另一篇文章的範圍。
223class DriverExampleStatement : public adbc::driver::Statement<DriverExampleStatement> {
224 public:
225 [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
226
227 // Get information from the connection and/or store a reference if needed.
228 Status InitImpl(void* parent) {
229 auto& connection = *reinterpret_cast<DriverExampleConnection*>(parent);
230 uri_ = connection.uri();
231 return Base::InitImpl(parent);
232 }
233
234 // Our implementation of a bulk ingestion is to write an Arrow IPC stream as a file
235 // using the target table as the filename.
236 Result<int64_t> ExecuteIngestImpl(IngestState& state) {
237 std::string directory = uri_.substr(strlen("file://"));
238 std::string filename = directory + "/" + *state.target_table;
239
240 nanoarrow::ipc::UniqueOutputStream output_stream;
241 FILE* c_file = std::fopen(filename.c_str(), "wb");
242 UNWRAP_ERRNO(Internal, ArrowIpcOutputStreamInitFile(output_stream.get(), c_file,
243 /*close_on_release*/ true));
244
245 nanoarrow::ipc::UniqueWriter writer;
246 UNWRAP_ERRNO(Internal, ArrowIpcWriterInit(writer.get(), output_stream.get()));
247
248 ArrowError nanoarrow_error;
249 ArrowErrorInit(&nanoarrow_error);
250 UNWRAP_NANOARROW(nanoarrow_error, Internal,
251 ArrowIpcWriterWriteArrayStream(writer.get(), &bind_parameters_,
252 &nanoarrow_error));
253
254 return -1;
255 }
256
257 // Our implementation of query execution is to accept a simple query in the form
258 // SELECT * FROM (the filename).
259 Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
260 std::string prefix("SELECT * FROM ");
261 if (state.query.find(prefix) != 0) {
262 return adbc::driver::status::InvalidArgument(
263 "[example] Query must be in the form 'SELECT * FROM filename'");
264 }
265
266 std::string directory = uri_.substr(strlen("file://"));
267 std::string filename = directory + "/" + state.query.substr(prefix.size());
268
269 nanoarrow::ipc::UniqueInputStream input_stream;
270 FILE* c_file = std::fopen(filename.c_str(), "rb");
271 UNWRAP_ERRNO(Internal, ArrowIpcInputStreamInitFile(input_stream.get(), c_file,
272 /*close_on_release*/ true));
273
274 UNWRAP_ERRNO(Internal,
275 ArrowIpcArrayStreamReaderInit(stream, input_stream.get(), nullptr));
276 return -1;
277 }
278
279 // This path is taken when the user calls Prepare() first.
280 Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
281 QueryState query_state{state.query};
282 return ExecuteQueryImpl(query_state, stream);
283 }
284
285 private:
286 std::string uri_;
287};
288
289} // namespace
最後,我們建立驅動程式初始化函式,驅動程式管理器需要此函式來為構成 ADBC C API 的 Adbc**()
函式提供實作。此函式的名稱很重要:此檔案將建置到名為 libdriver_example.(so|dll|dylib)
的共用函式庫中,因此驅動程式管理器將尋找符號 AdbcDriverExampleInit()
作為預設進入點,當被要求載入驅動程式 "driver_example"
時。
298extern "C" AdbcStatusCode AdbcDriverExampleInit(int version, void* raw_driver,
299 AdbcError* error) {
300 using ExampleDriver =
301 adbc::driver::Driver<DriverExampleDatabase, DriverExampleConnection,
302 DriverExampleStatement>;
303 return ExampleDriver::Init(version, raw_driver, error);
304}
低階測試¶
在我們撰寫驅動程式的草圖後,下一步是確保驅動程式管理器可以載入它,並且可以初始化和釋放資料庫、連線和陳述式實例。
首先,我們將包含驅動程式管理器和 googletest。
29#include "driver_example.h"
30
31#include "arrow-adbc/adbc_driver_manager.h"
32#include "gtest/gtest.h"
接下來,我們將為基本生命週期宣告一個測試案例
36TEST(DriverExample, TestLifecycle) {
37 struct AdbcError error = ADBC_ERROR_INIT;
38
39 struct AdbcDatabase database;
40 ASSERT_EQ(AdbcDatabaseNew(&database, &error), ADBC_STATUS_OK);
41 AdbcDriverManagerDatabaseSetInitFunc(&database, &AdbcDriverExampleInit, &error);
42 ASSERT_EQ(AdbcDatabaseSetOption(&database, "uri", "file://foofy", &error),
43 ADBC_STATUS_OK);
44 ASSERT_EQ(AdbcDatabaseInit(&database, &error), ADBC_STATUS_OK);
45
46 struct AdbcConnection connection;
47 ASSERT_EQ(AdbcConnectionNew(&connection, &error), ADBC_STATUS_OK);
48 ASSERT_EQ(AdbcConnectionInit(&connection, &database, &error), ADBC_STATUS_OK);
49
50 struct AdbcStatement statement;
51 ASSERT_EQ(AdbcStatementNew(&connection, &statement, &error), ADBC_STATUS_OK);
52
53 ASSERT_EQ(AdbcStatementRelease(&statement, &error), ADBC_STATUS_OK);
54 ASSERT_EQ(AdbcConnectionRelease(&connection, &error), ADBC_STATUS_OK);
55 ASSERT_EQ(AdbcDatabaseRelease(&database, &error), ADBC_STATUS_OK);
56
57 if (error.release) {
58 error.release(&error);
59 }
60}
位於 apache/arrow-adbc 儲存庫中的驅動程式可以使用內建驗證函式庫,該函式庫針對功能齊全的 SQL 資料庫實作通用測試套件,並提供實用程式來測試一系列輸入和輸出。
高階測試¶
食譜來源:driver_example.py
在驗證基本驅動程式功能後,我們可以使用 adbc_driver_manager
Python 套件的內建 dbapi 實作來公開隨時可用的 Pythonic 資料庫 API。這也適用於高階測試!
首先,我們將匯入 pathlib 以進行一些路徑計算,以及 adbc_driver_manager
的 dbapi
模組
26from pathlib import Path
27
28from adbc_driver_manager import dbapi
接下來,我們將定義一個 connect()
函式,該函式使用我們在上一節中使用 cmake
建置的共用函式庫的位置來包裝 dbapi.connect()
。為了我們的教學目的,這將位於 CMake build/
目錄中。
35def connect(uri: str):
36 build_dir = Path(__file__).parent / "build"
37 for lib in [
38 "libdriver_example.dylib",
39 "libdriver_example.so",
40 "driver_example.dll",
41 ]:
42 driver_lib = build_dir / lib
43 if driver_lib.exists():
44 return dbapi.connect(
45 driver=str(driver_lib.resolve()), db_kwargs={"uri": uri}
46 )
47
48 raise RuntimeError("Can't find driver shared object")
接下來,我們可以讓我們的驅動程式試試看!我們在驅動程式中實作的兩個部分是「批量擷取」功能和「從中選取全部」,所以讓我們看看它是否有效!
53if __name__ == "__main__":
54 import os
55
56 import pyarrow
57
58 with connect(uri=Path(__file__).parent.as_uri()) as con:
59 data = pyarrow.table({"col": [1, 2, 3]})
60 with con.cursor() as cur:
61 cur.adbc_ingest("example.arrows", data, mode="create")
62
63 with con.cursor() as cur:
64 cur.execute("SELECT * FROM example.arrows")
65 print(cur.fetchall())
66
67 os.unlink(Path(__file__).parent / "example.arrows")
高階測試也可以使用 adbcdrivermanager
套件在 R 中撰寫。
library(adbcdrivermanager)
drv <- adbc_driver("build/libdriver_example.dylib")
db <- adbc_database_init(drv, uri = paste0("file://", getwd()))
con <- adbc_connection_init(db)
data.frame(col = 1:3) |> write_adbc(con, "example.arrows")
con |> read_adbc("SELECT * FROM example.arrows") |> as.data.frame()
unlink("example.arrows")