驅動程式範例

食譜來源:driver_example.cc

在此,我們將展示使用 ADBC 驅動程式框架函式庫在 C++ 中建構 ADBC 驅動程式的結構。這與 ADBC 用於建構其 SQLite 和 PostgreSQL 驅動程式的函式庫相同,並抽象化了 C 可調用物件和目錄/元數據函式的細節,這些細節可能難以實作,但對於有效利用 ADBC 生態系統的其餘部分至關重要。

在高階層次上,我們將建構一個驅動程式,其「資料庫」是一個目錄,其中資料庫中的每個「表格」都是一個包含 Arrow IPC 串流的檔案。表格可以使用批量擷取功能寫入,表格可以使用 SELECT * FROM (檔案) 形式的簡單查詢讀取。

安裝

此快速入門實際上是一個可讀的 C++ 檔案。您可以複製儲存庫、建置範例並跟隨操作。

我們假設您使用 conda-forge 來處理依賴項。需要 CMake、C++17 編譯器和 ADBC 函式庫。它們可以按如下方式安裝

mamba install cmake compilers libadbc-driver-manager

建置

我們在此使用 CMake。從 ADBC 儲存庫的來源簽出

mkdir build
cd build
cmake ../docs/source/cpp/recipe_driver -DADBC_DRIVER_EXAMPLE_BUILD_TESTS=ON
cmake --build .
ctest

使用 C++ 建置 ADBC 驅動程式

讓我們先從一些包含項目開始。值得注意的是,我們需要驅動程式框架標頭檔和 nanoarrow,我們將使用它來建立和使用此範例驅動程式中的 Arrow C 資料介面結構。

72#include "driver_example.h"
73
74#include <cstdio>
75#include <string>
76
77#include "driver/framework/connection.h"
78#include "driver/framework/database.h"
79#include "driver/framework/statement.h"
80
81#include "nanoarrow/nanoarrow.hpp"
82#include "nanoarrow/nanoarrow_ipc.hpp"
83
84#include "arrow-adbc/adbc.h"

接下來,我們將一些必要的框架類型帶入命名空間,以減少實作的冗長性

  • adbc::driver::Option:選項可以在 ADBC 資料庫、連線和陳述式上設定。它們可以是字串、不透明二進位檔、雙精度浮點數或整數。Option 類別抽象化了如何取得、設定和剖析這些值的細節。

  • adbc::driver::StatusStatus 是 ADBC 驅動程式框架的錯誤處理機制:沒有傳回值但可能會失敗的函式會傳回 Status。您可以使用 UNWRAP_STATUS(some_call()) 作為 Status status = some_call(); if (!status.ok()) return status; 的簡寫,以簡潔地傳播錯誤。

  • adbc::driver::ResultResult<T> 用作函式的傳回值,這些函式在成功時傳回類型為 T 的值,而在失敗時使用 Status 傳達其錯誤。您可以使用 UNWRAP_RESULT(some_type value, some_call()) 作為簡寫,表示

    some_type value;
    Result<some_type> maybe_value = some_call();
    if (!maybe_value.status().ok()) {
      return maybe_value.status();
    } else {
      value = *maybe_value;
    }
    
113using adbc::driver::Option;
114using adbc::driver::Result;
115using adbc::driver::Status;
116
117namespace {

接下來,我們將提供資料庫實作。驅動程式框架使用奇異遞迴模板模式 (CRTP)。框架會處理此模式的細節,但實際上這仍然只是覆寫處理細節的基底類別中的方法。

在此,我們的資料庫實作將只記錄使用者傳遞的 uri。我們對此的解釋將是 file:// uri 到一個目錄,我們的 IPC 檔案應寫入和/或應從中讀取 IPC 檔案。這是 ADBC 中資料庫的角色:資料庫的共用句柄,可能會在連線之間快取一些共用狀態,但仍然允許多個連線同時對資料庫執行操作。

134class DriverExampleDatabase : public adbc::driver::Database<DriverExampleDatabase> {
135 public:
136  [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
137
138  Status SetOptionImpl(std::string_view key, Option value) override {
139    // Handle and validate options implemented by this driver
140    if (key == "uri") {
141      UNWRAP_RESULT(std::string_view uri, value.AsString());
142
143      if (uri.find("file://") != 0) {
144        return adbc::driver::status::InvalidArgument(
145            "[example] uri must start with 'file://'");
146      }
147
148      uri_ = uri;
149      return adbc::driver::status::Ok();
150    }
151
152    // Defer to the base implementation to handle state managed by the base
153    // class (and error for all other options).
154    return Base::SetOptionImpl(key, value);
155  }
156
157  Result<Option> GetOption(std::string_view key) override {
158    // Return the value of options implemented by this driver
159    if (key == "uri") {
160      return Option(uri_);
161    }
162
163    // Defer to the base implementation to handle state managed by the base
164    // class (and error for all other options).
165    return Base::GetOption(key);
166  }
167
168  // This is called after zero or more calls to SetOption() on
169  Status InitImpl() override {
170    if (uri_.empty()) {
171      return adbc::driver::status::InvalidArgument(
172          "[example] Must set uri to a non-empty value");
173    }
174
175    return Base::InitImpl();
176  }
177
178  // Getters for members needed by the connection and/or statement:
179  const std::string& uri() { return uri_; }
180
181 private:
182  std::string uri_;
183};

接下來,我們實作連線。雖然資料庫的角色通常是儲存或快取資訊,但連線的角色是提供可能難以取得的資源句柄(例如,連線到資料庫時協商身份驗證)。因為我們的範例「資料庫」只是一個目錄,所以我們不需要在連線中執行太多資源管理,除了提供一種讓子陳述式存取資料庫 uri 的方法。

連線的另一個角色是提供關於表格、欄位、統計資訊和其他目錄類資訊的元數據,呼叫者可能想在發出查詢之前知道這些資訊。驅動程式框架基底類別提供協助程式來實作這些函式,以便您主要可以使用 C++17 標準函式庫來實作它們(而不是自己建構 C 級陣列)。

198class DriverExampleConnection : public adbc::driver::Connection<DriverExampleConnection> {
199 public:
200  [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
201
202  // Get information from the database and/or store a reference if needed.
203  Status InitImpl(void* parent) {
204    auto& database = *reinterpret_cast<DriverExampleDatabase*>(parent);
205    uri_ = database.uri();
206    return Base::InitImpl(parent);
207  }
208
209  // Getters for members needed by the statement:
210  const std::string& uri() { return uri_; }
211
212 private:
213  std::string uri_;
214};

接下來,我們提供陳述式實作。陳述式是管理查詢執行的地方。因為我們的資料來源實際上是 Arrow 資料,所以我們不必提供管理類型或值轉換的層。SQLite 和 PostgreSQL 驅動程式都投入了許多程式碼行來有效率地實作和測試這些轉換。nanoarrow 函式庫可用於在兩個方向上實作轉換,並且是另一篇文章的範圍。

223class DriverExampleStatement : public adbc::driver::Statement<DriverExampleStatement> {
224 public:
225  [[maybe_unused]] constexpr static std::string_view kErrorPrefix = "[example]";
226
227  // Get information from the connection and/or store a reference if needed.
228  Status InitImpl(void* parent) {
229    auto& connection = *reinterpret_cast<DriverExampleConnection*>(parent);
230    uri_ = connection.uri();
231    return Base::InitImpl(parent);
232  }
233
234  // Our implementation of a bulk ingestion is to write an Arrow IPC stream as a file
235  // using the target table as the filename.
236  Result<int64_t> ExecuteIngestImpl(IngestState& state) {
237    std::string directory = uri_.substr(strlen("file://"));
238    std::string filename = directory + "/" + *state.target_table;
239
240    nanoarrow::ipc::UniqueOutputStream output_stream;
241    FILE* c_file = std::fopen(filename.c_str(), "wb");
242    UNWRAP_ERRNO(Internal, ArrowIpcOutputStreamInitFile(output_stream.get(), c_file,
243                                                        /*close_on_release*/ true));
244
245    nanoarrow::ipc::UniqueWriter writer;
246    UNWRAP_ERRNO(Internal, ArrowIpcWriterInit(writer.get(), output_stream.get()));
247
248    ArrowError nanoarrow_error;
249    ArrowErrorInit(&nanoarrow_error);
250    UNWRAP_NANOARROW(nanoarrow_error, Internal,
251                     ArrowIpcWriterWriteArrayStream(writer.get(), &bind_parameters_,
252                                                    &nanoarrow_error));
253
254    return -1;
255  }
256
257  // Our implementation of query execution is to accept a simple query in the form
258  // SELECT * FROM (the filename).
259  Result<int64_t> ExecuteQueryImpl(QueryState& state, ArrowArrayStream* stream) {
260    std::string prefix("SELECT * FROM ");
261    if (state.query.find(prefix) != 0) {
262      return adbc::driver::status::InvalidArgument(
263          "[example] Query must be in the form 'SELECT * FROM filename'");
264    }
265
266    std::string directory = uri_.substr(strlen("file://"));
267    std::string filename = directory + "/" + state.query.substr(prefix.size());
268
269    nanoarrow::ipc::UniqueInputStream input_stream;
270    FILE* c_file = std::fopen(filename.c_str(), "rb");
271    UNWRAP_ERRNO(Internal, ArrowIpcInputStreamInitFile(input_stream.get(), c_file,
272                                                       /*close_on_release*/ true));
273
274    UNWRAP_ERRNO(Internal,
275                 ArrowIpcArrayStreamReaderInit(stream, input_stream.get(), nullptr));
276    return -1;
277  }
278
279  // This path is taken when the user calls Prepare() first.
280  Result<int64_t> ExecuteQueryImpl(PreparedState& state, ArrowArrayStream* stream) {
281    QueryState query_state{state.query};
282    return ExecuteQueryImpl(query_state, stream);
283  }
284
285 private:
286  std::string uri_;
287};
288
289}  // namespace

最後,我們建立驅動程式初始化函式,驅動程式管理器需要此函式來為構成 ADBC C API 的 Adbc**() 函式提供實作。此函式的名稱很重要:此檔案將建置到名為 libdriver_example.(so|dll|dylib) 的共用函式庫中,因此驅動程式管理器將尋找符號 AdbcDriverExampleInit() 作為預設進入點,當被要求載入驅動程式 "driver_example" 時。

298extern "C" AdbcStatusCode AdbcDriverExampleInit(int version, void* raw_driver,
299                                                AdbcError* error) {
300  using ExampleDriver =
301      adbc::driver::Driver<DriverExampleDatabase, DriverExampleConnection,
302                           DriverExampleStatement>;
303  return ExampleDriver::Init(version, raw_driver, error);
304}

低階測試

食譜來源:driver_example_test.cc

在我們撰寫驅動程式的草圖後,下一步是確保驅動程式管理器可以載入它,並且可以初始化和釋放資料庫、連線和陳述式實例。

首先,我們將包含驅動程式管理器和 googletest

29#include "driver_example.h"
30
31#include "arrow-adbc/adbc_driver_manager.h"
32#include "gtest/gtest.h"

接下來,我們將為基本生命週期宣告一個測試案例

36TEST(DriverExample, TestLifecycle) {
37  struct AdbcError error = ADBC_ERROR_INIT;
38
39  struct AdbcDatabase database;
40  ASSERT_EQ(AdbcDatabaseNew(&database, &error), ADBC_STATUS_OK);
41  AdbcDriverManagerDatabaseSetInitFunc(&database, &AdbcDriverExampleInit, &error);
42  ASSERT_EQ(AdbcDatabaseSetOption(&database, "uri", "file://foofy", &error),
43            ADBC_STATUS_OK);
44  ASSERT_EQ(AdbcDatabaseInit(&database, &error), ADBC_STATUS_OK);
45
46  struct AdbcConnection connection;
47  ASSERT_EQ(AdbcConnectionNew(&connection, &error), ADBC_STATUS_OK);
48  ASSERT_EQ(AdbcConnectionInit(&connection, &database, &error), ADBC_STATUS_OK);
49
50  struct AdbcStatement statement;
51  ASSERT_EQ(AdbcStatementNew(&connection, &statement, &error), ADBC_STATUS_OK);
52
53  ASSERT_EQ(AdbcStatementRelease(&statement, &error), ADBC_STATUS_OK);
54  ASSERT_EQ(AdbcConnectionRelease(&connection, &error), ADBC_STATUS_OK);
55  ASSERT_EQ(AdbcDatabaseRelease(&database, &error), ADBC_STATUS_OK);
56
57  if (error.release) {
58    error.release(&error);
59  }
60}

位於 apache/arrow-adbc 儲存庫中的驅動程式可以使用內建驗證函式庫,該函式庫針對功能齊全的 SQL 資料庫實作通用測試套件,並提供實用程式來測試一系列輸入和輸出。

高階測試

食譜來源:driver_example.py

在驗證基本驅動程式功能後,我們可以使用 adbc_driver_manager Python 套件的內建 dbapi 實作來公開隨時可用的 Pythonic 資料庫 API。這也適用於高階測試!

首先,我們將匯入 pathlib 以進行一些路徑計算,以及 adbc_driver_managerdbapi 模組

26from pathlib import Path
27
28from adbc_driver_manager import dbapi

接下來,我們將定義一個 connect() 函式,該函式使用我們在上一節中使用 cmake 建置的共用函式庫的位置來包裝 dbapi.connect()。為了我們的教學目的,這將位於 CMake build/ 目錄中。

35def connect(uri: str):
36    build_dir = Path(__file__).parent / "build"
37    for lib in [
38        "libdriver_example.dylib",
39        "libdriver_example.so",
40        "driver_example.dll",
41    ]:
42        driver_lib = build_dir / lib
43        if driver_lib.exists():
44            return dbapi.connect(
45                driver=str(driver_lib.resolve()), db_kwargs={"uri": uri}
46            )
47
48    raise RuntimeError("Can't find driver shared object")

接下來,我們可以讓我們的驅動程式試試看!我們在驅動程式中實作的兩個部分是「批量擷取」功能和「從中選取全部」,所以讓我們看看它是否有效!

53if __name__ == "__main__":
54    import os
55
56    import pyarrow
57
58    with connect(uri=Path(__file__).parent.as_uri()) as con:
59        data = pyarrow.table({"col": [1, 2, 3]})
60        with con.cursor() as cur:
61            cur.adbc_ingest("example.arrows", data, mode="create")
62
63        with con.cursor() as cur:
64            cur.execute("SELECT * FROM example.arrows")
65            print(cur.fetchall())
66
67        os.unlink(Path(__file__).parent / "example.arrows")

高階測試也可以使用 adbcdrivermanager 套件在 R 中撰寫。

library(adbcdrivermanager)

drv <- adbc_driver("build/libdriver_example.dylib")
db <- adbc_database_init(drv, uri = paste0("file://", getwd()))
con <- adbc_connection_init(db)

data.frame(col = 1:3) |> write_adbc(con, "example.arrows")
con |> read_adbc("SELECT * FROM example.arrows") |> as.data.frame()
unlink("example.arrows")