Arrow C stream 介面#
C stream 介面建立於 C 資料介面 中定義的結構之上,並將它們結合成更高階的規格,以便於在單一程序內傳輸串流資料。
語意#
Arrow C stream 公開一個資料區塊的串流來源,每個區塊都具有相同的綱要。區塊透過呼叫阻塞式的提取式迭代函式取得。
結構定義#
C stream 介面由單一 struct
定義所定義
#ifndef ARROW_C_STREAM_INTERFACE
#define ARROW_C_STREAM_INTERFACE
struct ArrowArrayStream {
// Callbacks providing stream functionality
int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
const char* (*get_last_error)(struct ArrowArrayStream*);
// Release callback
void (*release)(struct ArrowArrayStream*);
// Opaque producer-specific data
void* private_data;
};
#endif // ARROW_C_STREAM_INTERFACE
注意
標準保護 ARROW_C_STREAM_INTERFACE
旨在避免重複定義,如果兩個專案在其各自的標頭中複製 C 資料介面定義,並且第三方專案從這兩個專案中包含。因此,當複製這些定義時,保持此保護完全不變非常重要。
ArrowArrayStream 結構#
ArrowArrayStream
提供與 Arrow 陣列的串流來源互動所需的回呼。它具有以下欄位
-
int (*ArrowArrayStream.get_schema)(struct ArrowArrayStream*, struct ArrowSchema *out)#
必要項。 此回呼允許消費者查詢串流中資料區塊的綱要。綱要對於所有資料區塊都是相同的。
不得在已釋放的
ArrowArrayStream
上呼叫此回呼。傳回值: 成功時為 0,否則為非零的 錯誤代碼。
-
int (*ArrowArrayStream.get_next)(struct ArrowArrayStream*, struct ArrowArray *out)#
必要項。 此回呼允許消費者取得串流中的下一個資料區塊。
不得在已釋放的
ArrowArrayStream
上呼叫此回呼。傳回值: 成功時為 0,否則為非零的 錯誤代碼。
成功時,消費者必須檢查
ArrowArray
是否標記為 已釋放。如果ArrowArray
已釋放,則表示已到達串流結尾。否則,ArrowArray
包含有效的資料區塊。
-
const char *(*ArrowArrayStream.get_last_error)(struct ArrowArrayStream*)#
必要項。 此回呼允許消費者取得最後一個錯誤的文字描述。
只有在
ArrowArrayStream
上的最後一個操作傳回錯誤時,才必須呼叫此回呼。不得在已釋放的ArrowArrayStream
上呼叫它。傳回值: 指向以 NULL 終止的字元字串(UTF8 編碼)的指標。如果沒有詳細描述可用,也可以傳回 NULL。
傳回的指標僅保證在下次呼叫串流的回呼之一之前有效。如果希望字元字串指向的內容存活更長時間,則應將其複製到消費者管理的儲存空間。
-
void (*ArrowArrayStream.release)(struct ArrowArrayStream*)#
必要項。 指向生產者提供的釋放回呼的指標。
-
void *ArrowArrayStream.private_data#
選用項。 指向生產者提供的私有資料的不透明指標。
消費者不得處理此成員。此成員的生命週期由生產者處理,尤其是由釋放回呼處理。
錯誤代碼#
get_schema
和 get_next
回呼可能會以非零整數代碼的形式傳回錯誤。此類錯誤代碼應像 errno
數字(由本機平台定義)一樣解釋。請注意,這些常數的符號形式在不同平台之間是穩定的,但它們的數值是平台特定的。
特別是,建議辨識以下值
EINVAL
:用於參數或輸入驗證錯誤ENOMEM
:用於記憶體配置失敗(記憶體不足)EIO
:用於一般輸入/輸出錯誤
結果生命週期#
由 get_schema
和 get_next
回呼傳回的資料必須獨立釋放。它們的生命週期與 ArrowArrayStream
的生命週期無關。
串流生命週期#
C 串流的生命週期使用釋放回呼來管理,其使用方式與 C 資料介面 中類似。
執行緒安全#
串流來源不假設為執行緒安全。希望從多個執行緒呼叫 get_next
的消費者應確保這些呼叫是序列化的。
C 消費者範例#
假設特定資料庫提供以下 C API 來執行 SQL 查詢,並將結果集以 Arrow C stream 的形式傳回
void MyDB_Query(const char* query, struct ArrowArrayStream* result_set);
然後,消費者可以使用以下程式碼來迭代結果
static void handle_error(int errcode, struct ArrowArrayStream* stream) {
// Print stream error
const char* errdesc = stream->get_last_error(stream);
if (errdesc != NULL) {
fputs(errdesc, stderr);
} else {
fputs(strerror(errcode), stderr);
}
// Release stream and abort
stream->release(stream),
exit(1);
}
void run_query() {
struct ArrowArrayStream stream;
struct ArrowSchema schema;
struct ArrowArray chunk;
int errcode;
MyDB_Query("SELECT * FROM my_table", &stream);
// Query result set schema
errcode = stream.get_schema(&stream, &schema);
if (errcode != 0) {
handle_error(errcode, &stream);
}
int64_t num_rows = 0;
// Iterate over results: loop until error or end of stream
while ((errcode = stream.get_next(&stream, &chunk) == 0) &&
chunk.release != NULL) {
// Do something with chunk...
fprintf(stderr, "Result chunk: got %lld rows\n", chunk.length);
num_rows += chunk.length;
// Release chunk
chunk.release(&chunk);
}
// Was it an error?
if (errcode != 0) {
handle_error(errcode, &stream);
}
fprintf(stderr, "Result stream ended: total %lld rows\n", num_rows);
// Release schema and stream
schema.release(&schema);
stream.release(&stream);
}