Arrow C stream 介面#

C stream 介面建立於 C 資料介面 中定義的結構之上,並將它們結合成更高階的規格,以便於在單一程序內傳輸串流資料。

語意#

Arrow C stream 公開一個資料區塊的串流來源,每個區塊都具有相同的綱要。區塊透過呼叫阻塞式的提取式迭代函式取得。

結構定義#

C stream 介面由單一 struct 定義所定義

#ifndef ARROW_C_STREAM_INTERFACE
#define ARROW_C_STREAM_INTERFACE

struct ArrowArrayStream {
  // Callbacks providing stream functionality
  int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
  int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
  const char* (*get_last_error)(struct ArrowArrayStream*);

  // Release callback
  void (*release)(struct ArrowArrayStream*);

  // Opaque producer-specific data
  void* private_data;
};

#endif  // ARROW_C_STREAM_INTERFACE

注意

標準保護 ARROW_C_STREAM_INTERFACE 旨在避免重複定義,如果兩個專案在其各自的標頭中複製 C 資料介面定義,並且第三方專案從這兩個專案中包含。因此,當複製這些定義時,保持此保護完全不變非常重要。

ArrowArrayStream 結構#

ArrowArrayStream 提供與 Arrow 陣列的串流來源互動所需的回呼。它具有以下欄位

int (*ArrowArrayStream.get_schema)(struct ArrowArrayStream*, struct ArrowSchema *out)#

必要項。 此回呼允許消費者查詢串流中資料區塊的綱要。綱要對於所有資料區塊都是相同的。

不得在已釋放的 ArrowArrayStream 上呼叫此回呼。

傳回值: 成功時為 0,否則為非零的 錯誤代碼

int (*ArrowArrayStream.get_next)(struct ArrowArrayStream*, struct ArrowArray *out)#

必要項。 此回呼允許消費者取得串流中的下一個資料區塊。

不得在已釋放的 ArrowArrayStream 上呼叫此回呼。

傳回值: 成功時為 0,否則為非零的 錯誤代碼

成功時,消費者必須檢查 ArrowArray 是否標記為 已釋放。如果 ArrowArray 已釋放,則表示已到達串流結尾。否則,ArrowArray 包含有效的資料區塊。

const char *(*ArrowArrayStream.get_last_error)(struct ArrowArrayStream*)#

必要項。 此回呼允許消費者取得最後一個錯誤的文字描述。

只有在 ArrowArrayStream 上的最後一個操作傳回錯誤時,才必須呼叫此回呼。不得在已釋放的 ArrowArrayStream 上呼叫它。

傳回值: 指向以 NULL 終止的字元字串(UTF8 編碼)的指標。如果沒有詳細描述可用,也可以傳回 NULL。

傳回的指標僅保證在下次呼叫串流的回呼之一之前有效。如果希望字元字串指向的內容存活更長時間,則應將其複製到消費者管理的儲存空間。

void (*ArrowArrayStream.release)(struct ArrowArrayStream*)#

必要項。 指向生產者提供的釋放回呼的指標。

void *ArrowArrayStream.private_data#

選用項。 指向生產者提供的私有資料的不透明指標。

消費者不得處理此成員。此成員的生命週期由生產者處理,尤其是由釋放回呼處理。

錯誤代碼#

get_schemaget_next 回呼可能會以非零整數代碼的形式傳回錯誤。此類錯誤代碼應像 errno 數字(由本機平台定義)一樣解釋。請注意,這些常數的符號形式在不同平台之間是穩定的,但它們的數值是平台特定的。

特別是,建議辨識以下值

  • EINVAL:用於參數或輸入驗證錯誤

  • ENOMEM:用於記憶體配置失敗(記憶體不足)

  • EIO:用於一般輸入/輸出錯誤

結果生命週期#

get_schemaget_next 回呼傳回的資料必須獨立釋放。它們的生命週期與 ArrowArrayStream 的生命週期無關。

串流生命週期#

C 串流的生命週期使用釋放回呼來管理,其使用方式與 C 資料介面 中類似。

執行緒安全#

串流來源不假設為執行緒安全。希望從多個執行緒呼叫 get_next 的消費者應確保這些呼叫是序列化的。

C 消費者範例#

假設特定資料庫提供以下 C API 來執行 SQL 查詢,並將結果集以 Arrow C stream 的形式傳回

void MyDB_Query(const char* query, struct ArrowArrayStream* result_set);

然後,消費者可以使用以下程式碼來迭代結果

static void handle_error(int errcode, struct ArrowArrayStream* stream) {
   // Print stream error
   const char* errdesc = stream->get_last_error(stream);
   if (errdesc != NULL) {
      fputs(errdesc, stderr);
   } else {
      fputs(strerror(errcode), stderr);
   }
   // Release stream and abort
   stream->release(stream),
   exit(1);
}

void run_query() {
   struct ArrowArrayStream stream;
   struct ArrowSchema schema;
   struct ArrowArray chunk;
   int errcode;

   MyDB_Query("SELECT * FROM my_table", &stream);

   // Query result set schema
   errcode = stream.get_schema(&stream, &schema);
   if (errcode != 0) {
      handle_error(errcode, &stream);
   }

   int64_t num_rows = 0;

   // Iterate over results: loop until error or end of stream
   while ((errcode = stream.get_next(&stream, &chunk) == 0) &&
          chunk.release != NULL) {
      // Do something with chunk...
      fprintf(stderr, "Result chunk: got %lld rows\n", chunk.length);
      num_rows += chunk.length;

      // Release chunk
      chunk.release(&chunk);
   }

   // Was it an error?
   if (errcode != 0) {
      handle_error(errcode, &stream);
   }

   fprintf(stderr, "Result stream ended: total %lld rows\n", num_rows);

   // Release schema and stream
   schema.release(&schema);
   stream.release(&stream);
}