Arrow C 裝置資料介面#

警告

Arrow C 裝置資料介面應視為實驗性

理由#

目前的 C 資料介面,及其大多數實作,皆假設提供的所有資料緩衝區皆為 CPU 緩衝區。由於 Apache Arrow 旨在成為表示表格(「欄狀」)資料的通用記憶體格式,因此期望在非 CPU 硬體(如 GPU)上利用此資料。其中一個例子是 RAPIDS cuDF 函式庫,其搭配 NVIDIA GPU 的 CUDA 使用 Arrow 記憶體格式。由於從主機複製資料到裝置再複製回主機的成本很高,理想情況是能夠盡可能將資料保留在裝置上,即使在執行階段和函式庫之間傳遞資料時也是如此。

Arrow C 裝置資料介面建立在現有的 C 資料介面之上,方法是向其新增一組非常小且穩定的 C 定義。這些定義等同於 C 資料介面中的 ArrowArrayArrowArrayStream 結構,並新增成員以允許指定裝置類型並傳遞必要的資訊以與生產者同步。對於非 C/C++ 語言和執行階段,將 C 定義翻譯成對應的 C FFI 宣告應與目前的 C 資料介面一樣簡單。

如此一來,應用程式和函式庫便可以使用非 CPU 裝置上的 Arrow 綱要和 Arrow 格式化記憶體來交換資料,就像現在使用 CPU 資料一樣容易。這將能夠讓資料在這些裝置上保留更長時間,並避免在主機和裝置之間進行成本高昂的複製,僅僅為了利用新的函式庫和執行階段。

目標#

  • 公開建立在現有 C 資料介面之上的 ABI 穩定介面。

  • 讓協力廠商專案能夠輕鬆實作支援,且初始投入成本低。

  • 允許在同一程序中執行的獨立執行階段和元件之間零複製共用 Arrow 格式化的裝置記憶體。

  • 避免需要一對一的調整層,例如 Python 程序用於傳遞 CUDA 資料的 CUDA 陣列介面

  • 在沒有對 Arrow 軟體專案本身的明確相依性(編譯時期或執行時期)的情況下啟用整合。

Arrow C 裝置資料介面的目的是擴展目前 C 資料介面的範圍,使其也能成為 GPU 或 FPGA 等裝置上欄狀處理的標準低階建構區塊。

結構定義#

由於這是建立在 C 資料介面之上,因此 C 裝置資料介面使用 C 資料介面規格 中定義的 ArrowSchemaArrowArray 結構。然後,它新增以下獨立定義。與 Arrow 專案的其餘部分一樣,它們在 Apache License 2.0 下提供。

#ifndef ARROW_C_DEVICE_DATA_INTERFACE
#define ARROW_C_DEVICE_DATA_INTERFACE

// Device type for the allocated memory
typedef int32_t ArrowDeviceType;

// CPU device, same as using ArrowArray directly
#define ARROW_DEVICE_CPU 1
// CUDA GPU Device
#define ARROW_DEVICE_CUDA 2
// Pinned CUDA CPU memory by cudaMallocHost
#define ARROW_DEVICE_CUDA_HOST 3
// OpenCL Device
#define ARROW_DEVICE_OPENCL 4
// Vulkan buffer for next-gen graphics
#define ARROW_DEVICE_VULKAN 7
// Metal for Apple GPU
#define ARROW_DEVICE_METAL 8
// Verilog simulator buffer
#define ARROW_DEVICE_VPI 9
// ROCm GPUs for AMD GPUs
#define ARROW_DEVICE_ROCM 10
// Pinned ROCm CPU memory allocated by hipMallocHost
#define ARROW_DEVICE_ROCM_HOST 11
// Reserved for extension
//
// used to quickly test extension devices, semantics
// can differ based on implementation
#define ARROW_DEVICE_EXT_DEV 12
// CUDA managed/unified memory allocated by cudaMallocManaged
#define ARROW_DEVICE_CUDA_MANAGED 13
// Unified shared memory allocated on a oneAPI
// non-partitioned device.
//
// A call to the oneAPI runtime is required to determine the
// device type, the USM allocation type and the sycl context
// that it is bound to.
#define ARROW_DEVICE_ONEAPI 14
// GPU support for next-gen WebGPU standard
#define ARROW_DEVICE_WEBGPU 15
// Qualcomm Hexagon DSP
#define ARROW_DEVICE_HEXAGON 16

struct ArrowDeviceArray {
  struct ArrowArray array;
  int64_t device_id;
  ArrowDeviceType device_type;
  void* sync_event;

  // reserved bytes for future expansion
  int64_t reserved[3];
};

#endif  // ARROW_C_DEVICE_DATA_INTERFACE

注意

標準保護 ARROW_C_DEVICE_DATA_INTERFACE 旨在避免在兩個專案將定義複製到自己的標頭中,而協力廠商專案從這兩個專案包含標頭時,發生重複定義的情況。因此,當複製這些定義時,務必保持此保護完全不變。

ArrowDeviceType#

ArrowDeviceType typedef 用於指示所提供的記憶體緩衝區分配在何種類型的裝置上。這與 device_id 結合使用,應足以參考正確的資料緩衝區。

然後,我們使用巨集來定義不同裝置類型的值。提供的巨集值與廣泛使用的 dlpack DLDeviceType 定義值相容,針對每個等效的 kDL<type> 列舉使用與 dlpack.h 相同的值。此清單將隨著時間推移與這些等效的列舉值保持同步,以確保相容性,而不是可能產生分歧。為了避免 Arrow 專案必須審查新硬體裝置,新的新增項目應先新增到 dlpack,然後我們再在此處新增對應的巨集。

為了確保 ABI 的可預測性,我們使用巨集而不是 enum,因此儲存類型不依賴編譯器。

ARROW_DEVICE_CPU#

CPU 裝置,相當於直接使用 ArrowArray 而不是使用 ArrowDeviceArray

ARROW_DEVICE_CUDA#

CUDA GPU 裝置。這可以表示使用執行階段函式庫 (cudaMalloc) 或裝置驅動程式 (cuMemAlloc) 分配的資料。

ARROW_DEVICE_CUDA_HOST#

由 CUDA 使用 cudaMallocHostcuMemAllocHost 固定和鎖頁的 CPU 記憶體。

ARROW_DEVICE_OPENCL#

使用 OpenCL(開放運算語言)框架在裝置上分配的資料。

ARROW_DEVICE_VULKAN#

Vulkan 框架和函式庫分配的資料。

ARROW_DEVICE_METAL#

使用 Metal 框架和函式庫的 Apple GPU 裝置上的資料。

ARROW_DEVICE_VPI#

表示 Verilog 模擬器緩衝區的使用。

ARROW_DEVICE_ROCM#

使用 ROCm 堆疊的 AMD 裝置。

ARROW_DEVICE_ROCM_HOST#

由 ROCm 使用 hipMallocHost 固定和鎖頁的 CPU 記憶體。

ARROW_DEVICE_EXT_DEV#

此值是裝置的應急方案,用於擴充目前未以其他方式表示的裝置。如果生產者使用此裝置類型,則需要提供特定於裝置的其他資訊/內容。這用於快速測試擴充裝置,且語意可能會因實作而異。

ARROW_DEVICE_CUDA_MANAGED#

cudaMallocManaged 分配的 CUDA 管理/統一記憶體。

ARROW_DEVICE_ONEAPI#

在 Intel oneAPI 非分割裝置上分配的統一共享記憶體。需要呼叫 oneAPI 執行階段才能判斷特定裝置類型、USM 分配類型及其繫結的 sycl 內容。

ARROW_DEVICE_WEBGPU#

支援下一代 WebGPU 標準的 GPU

ARROW_DEVICE_HEXAGON#

在 Qualcomm Hexagon DSP 裝置上分配的資料。

ArrowDeviceArray 結構#

ArrowDeviceArray 結構嵌入 C 資料 ArrowArray 結構,並新增消費者使用資料所需的其他資訊。它具有以下欄位

struct ArrowArray ArrowDeviceArray.array#

必要。 已分配的陣列資料。void** 緩衝區(以及任何子項的緩衝區)中的值是在裝置上分配的內容。緩衝區值應為裝置指標。結構的其餘部分應可供 CPU 存取。

此結構的 private_datarelease 回呼應包含與根據資料分配所在的裝置釋放陣列相關的任何必要資訊和結構,而不是在此處具有單獨的釋放回呼和 private_data 指標。

int64_t ArrowDeviceArray.device_id#

必要。 如果系統上有此類型的多個裝置,則用於識別特定裝置的裝置 ID。ID 的語意將取決於硬體,但我們使用 int64_t 來確保 ID 的未來適用性,因為裝置會隨著時間推移而變化。

對於沒有裝置識別碼的內在概念的裝置類型(例如,ARROW_DEVICE_CPU),建議慣例上使用 device_id -1。

ArrowDeviceType ArrowDeviceArray.device_type#

必要。 可以存取陣列中緩衝區的裝置類型。

void *ArrowDeviceArray.sync_event#

選用。 如果需要,用於同步的類事件物件。

許多裝置(如 GPU)主要相對於 CPU 處理是非同步的。因此,為了安全地存取裝置記憶體,通常需要有一個物件來同步處理。由於不同的裝置將使用不同的類型來指定此物件,因此我們使用 void*,它可以強制轉換為指向任何裝置適當類型的指標。

如果不需要同步,則可以為 null。如果這不是 null,則必須使用它來呼叫裝置的適當同步方法(例如 cudaStreamWaitEventhipStreamWaitEvent),然後再嘗試存取緩衝區中的記憶體。

如果提供事件,則生產者必須確保在觸發事件之前,匯出的資料在裝置上可用。消費者應在嘗試存取匯出的資料之前等待事件。

另請參閱

下方的 同步事件類型 章節。

int64_t ArrowDeviceArray.reserved[3]#

隨著非 CPU 開發的擴展,可能需要擴展此結構。為了在不破壞 ABI 變更的情況下執行此操作,我們在物件末尾保留了 24 個位元組。生產者在初始化後必須將這些位元組歸零,以確保未來 ABI 的安全演進。

同步事件類型#

下表列出了每種裝置類型的預期事件類型。如果沒有支援的事件類型(「不適用」),則 sync_event 成員應始終為 null。

請記住,如果不需要同步即可存取資料,則事件可以為 null。

裝置類型

實際事件類型

註釋

ARROW_DEVICE_CPU

不適用

ARROW_DEVICE_CUDA

cudaEvent_t*

ARROW_DEVICE_CUDA_HOST

cudaEvent_t*

ARROW_DEVICE_OPENCL

cl_event*

ARROW_DEVICE_VULKAN

VkEvent*

ARROW_DEVICE_METAL

MTLEvent*

ARROW_DEVICE_VPI

不適用

ARROW_DEVICE_ROCM

hipEvent_t*

ARROW_DEVICE_ROCM_HOST

hipEvent_t*

ARROW_DEVICE_EXT_DEV

ARROW_DEVICE_CUDA_MANAGED

cudaEvent_t*

ARROW_DEVICE_ONEAPI

sycl::event*

ARROW_DEVICE_WEBGPU

不適用

ARROW_DEVICE_HEXAGON

不適用

註釋

  • (1) 目前尚不清楚框架是否具有要支援的事件類型。

  • (2) 擴充裝置具有生產者定義的語意,因此如果擴充裝置需要同步,生產者應記錄類型。

語意#

記憶體管理#

首先也是最重要的:在此介面中的所有內容中,只有資料緩衝區本身駐留在裝置記憶體中(即 ArrowArray 結構的 buffers 成員)。其他所有內容都應在 CPU 記憶體中。

ArrowDeviceArray 結構包含一個 ArrowArray 物件,該物件本身具有用於釋放記憶體的 特定語意。下方的「基本結構」是指在生產者和消費者之間直接傳遞的 ArrowDeviceArray 物件 – 而不是其任何子結構。

基本結構旨在由消費者進行堆疊或堆積分配。在這種情況下,生產者 API 應採用指向消費者分配的結構的指標。

但是,結構指向的任何資料都必須由生產者分配和維護。這包括 sync_event 成員(如果它不是 null)以及 ArrowArray 物件中的任何指標(與往常一樣)。資料生命週期透過 ArrowArray 成員的 release 回呼進行管理。

對於 ArrowDeviceArray,已釋放結構的語意和回呼語意與 ArrowArray 本身 的語意相同。除了任何已分配的事件之外,釋放裝置資料緩衝區所需的任何生產者特定內容資訊都應儲存在 ArrowArrayprivate_data 成員中,並由 release 回呼管理。

移動陣列#

消費者可以透過位元複製或淺層成員方式複製來移動 ArrowDeviceArray 結構。然後,它必須透過將內嵌 ArrowArray 結構的 release 成員設定為 NULL,但不呼叫該釋放回呼,來將來源結構標記為已釋放。這可確保在任何給定時間只有一個即時結構副本處於活動狀態,並且生命週期已正確傳達給生產者。

與往常一樣,當不再需要目的地結構時,將會對其呼叫釋放回呼。

記錄批次#

與 C 資料介面本身一樣,記錄批次可以簡單地視為等效的結構陣列。在這種情況下,最上層 ArrowSchema 的中繼資料可用於記錄批次的綱要層級中繼資料。

可變性#

生產者和消費者都應將匯出的資料(即,可透過內嵌 ArrowArraybuffers 成員在裝置上存取的資料)視為不可變的,因為任何一方都可能在另一方正在變更資料時看到不一致的資料。

同步#

如果 sync_event 成員不是 NULL,則消費者在同步該事件之前,不應嘗試存取或讀取資料。如果 sync_event 成員為 NULL,則必須可以安全地存取資料,而無需消費者進行任何同步。

C 生產者範例#

匯出簡單的 int32 裝置陣列#

匯出具有空中繼資料的不可為 null 的 int32 類型。此範例可以在 C 資料介面文件中直接看到。

為了匯出資料本身,我們透過釋放回呼將所有權轉移給消費者。此範例將使用 CUDA,但等效的呼叫可用於任何裝置

static void release_int32_device_array(struct ArrowArray* array) {
    assert(array->n_buffers == 2);
    // destroy the event
    cudaEvent_t* ev_ptr = (cudaEvent_t*)(array->private_data);
    cudaError_t status = cudaEventDestroy(*ev_ptr);
    assert(status == cudaSuccess);
    free(ev_ptr);

    // free the buffers and the buffers array
    status = cudaFree(array->buffers[1]);
    assert(status == cudaSuccess);
    free(array->buffers);

    // mark released
    array->release = NULL;
}

void export_int32_device_array(void* cudaAllocedPtr,
                               cudaStream_t stream,
                               int64_t length,
                               struct ArrowDeviceArray* array) {
    // get device id
    int device;
    cudaError_t status;
    status = cudaGetDevice(&device);
    assert(status == cudaSuccess);

    cudaEvent_t* ev_ptr = (cudaEvent_t*)malloc(sizeof(cudaEvent_t));
    assert(ev_ptr != NULL);
    status = cudaEventCreate(ev_ptr);
    assert(status == cudaSuccess);

    // record event on the stream, assuming that the passed in
    // stream is where the work to produce the data will be processing.
    status = cudaEventRecord(*ev_ptr, stream);
    assert(status == cudaSuccess);

    memset(array, 0, sizeof(struct ArrowDeviceArray));
    // initialize fields
    *array = (struct ArrowDeviceArray) {
        .array = (struct ArrowArray) {
            .length = length,
            .null_count = 0,
            .offset = 0,
            .n_buffers = 2,
            .n_children = 0,
            .children = NULL,
            .dictionary = NULL,
            // bookkeeping
            .release = &release_int32_device_array,
            // store the event pointer as private data in the array
            // so that we can access it in the release callback.
            .private_data = (void*)(ev_ptr),
        },
        .device_id = (int64_t)(device),
        .device_type = ARROW_DEVICE_CUDA,
        // pass the event pointer to the consumer
        .sync_event = (void*)(ev_ptr),
    };

    // allocate list of buffers
    array->array.buffers = (const void**)malloc(sizeof(void*) * array->array.n_buffers);
    assert(array->array.buffers != NULL);
    array->array.buffers[0] = NULL;
    array->array.buffers[1] = cudaAllocedPtr;
}

// calling the release callback should be done using the array member
// of the device array.
static void release_device_array_helper(struct ArrowDeviceArray* arr) {
    arr->array.release(&arr->array);
}

裝置串流介面#

C 串流介面 類似,C 裝置資料介面也指定了更高等級的結構,以簡化單一程序內串流資料的通訊。

語意#

Arrow C 裝置串流公開資料區塊的串流來源,每個區塊都具有相同的綱要。區塊透過呼叫封鎖式提取樣式迭代函式取得。預期所有區塊都應在相同的裝置類型(但不一定是相同的裝置 ID)上提供資料。如果需要提供多個裝置類型上的資料串流,生產者應為每種裝置類型提供單獨的串流物件。

結構定義#

C 裝置串流介面由單一 struct 定義定義

#ifndef ARROW_C_DEVICE_STREAM_INTERFACE
#define ARROW_C_DEVICE_STREAM_INTERFACE

struct ArrowDeviceArrayStream {
    // device type that all arrays will be accessible from
    ArrowDeviceType device_type;
    // callbacks
    int (*get_schema)(struct ArrowDeviceArrayStream*, struct ArrowSchema*);
    int (*get_next)(struct ArrowDeviceArrayStream*, struct ArrowDeviceArray*);
    const char* (*get_last_error)(struct ArrowDeviceArrayStream*);

    // release callback
    void (*release)(struct ArrowDeviceArrayStream*);

    // opaque producer-specific data
    void* private_data;
};

#endif  // ARROW_C_DEVICE_STREAM_INTERFACE

注意

標準保護 ARROW_C_DEVICE_STREAM_INTERFACE 旨在避免在兩個專案將 C 裝置串流介面定義複製到自己的標頭中,而協力廠商專案從這兩個專案包含標頭時,發生重複定義的情況。因此,當複製這些定義時,務必保持此保護完全不變。

ArrowDeviceArrayStream 結構#

ArrowDeviceArrayStream 提供可存取結果資料的裝置類型,以及與 Arrow 陣列串流來源互動所需的必要回呼。它具有以下欄位

ArrowDeviceType device_type#

必要。 此串流在其上產生資料的裝置類型。由此串流產生的所有 ArrowDeviceArray 都應具有與此處設定的裝置類型相同的裝置類型。這對於消費者來說是一種方便,不必檢查檢索到的每個陣列,而是允許用於串流的更高等級編碼結構。

int (*ArrowDeviceArrayStream.get_schema)(struct ArrowDeviceArrayStream*, struct ArrowSchema *out)#

必要。 此回呼允許消費者查詢串流中資料區塊的綱要。綱要對於所有資料區塊都是相同的。

不得在已釋放的 ArrowDeviceArrayStream 上呼叫此回呼。

傳回值: 成功時為 0,否則為非零 錯誤代碼

int (*ArrowDeviceArrayStream.get_next)(struct ArrowDeviceArrayStream*, struct ArrowDeviceArray *out)#

必要。 此回呼允許消費者取得串流中的下一個資料區塊。

不得在已釋放的 ArrowDeviceArrayStream 上呼叫此回呼。

下一個資料區塊必須可從與 ArrowDeviceArrayStream.device_type 相符的裝置類型存取。

傳回值: 成功時為 0,否則為非零 錯誤代碼

成功後,消費者必須檢查 ArrowDeviceArray 的內嵌 ArrowArray 是否標記為 已釋放。如果內嵌 ArrowDeviceArray.array 已釋放,則表示已到達串流結尾。否則,ArrowDeviceArray 包含有效的資料區塊。

const char *(*ArrowDeviceArrayStream.get_last_error)(struct ArrowDeviceArrayStream*)#

必要。 此回呼允許消費者取得最後一個錯誤的文字描述。

只有在 ArrowDeviceArrayStream 上的最後一個操作傳回錯誤時,才必須呼叫此回呼。不得在已釋放的 ArrowDeviceArrayStream 上呼叫此回呼。

傳回值: 指向以 NULL 終止的字元字串(UTF8 編碼)的指標。如果沒有詳細描述可用,也可以傳回 NULL。

傳回的指標僅保證在下次呼叫其中一個串流的回呼之前有效。如果打算讓它存在更長時間,則應將它指向的字元字串複製到消費者管理的儲存空間。

void (*ArrowDeviceArrayStream.release)(struct ArrowDeviceArrayStream*)#

必要。 指向生產者提供的釋放回呼的指標。

void *ArrowDeviceArrayStream.private_data#

選用。 指向生產者提供的私有資料的不透明指標。

消費者不得處理此成員。此成員的生命週期由生產者處理,尤其是由釋放回呼處理。

結果生命週期#

get_schemaget_next 回呼傳回的資料必須獨立釋放。它們的生命週期與 ArrowDeviceArrayStream 的生命週期無關。

串流生命週期#

C 串流的生命週期使用釋放回呼進行管理,其用法與 C 資料介面 中的用法類似。

執行緒安全#

串流來源不假設為執行緒安全。想要從多個執行緒呼叫 get_next 的消費者應確保這些呼叫已序列化。

非同步裝置串流介面#

警告

實驗性:非同步 C 裝置串流介面在其目前形式中是實驗性的。根據回饋和使用情況,協定定義可能會在完全標準化之前變更。

C 串流介面 提供以消費者呼叫生產者函式來檢索下一個記錄批次為中心的同步 API。對於生產者和消費者之間的並行通訊,可以使用 ArrowAsyncDeviceStreamHandler。此介面不帶有主觀意見,可能適用於不同的並行通訊模型。

語意#

非同步介面不是由生產者提供一組回呼結構供消費者呼叫和檢索記錄,而是一個由消費者分配和填入的結構。消費者分配的結構提供處理常式回呼,供生產者在綱要和資料區塊可用時呼叫。

除了 ArrowAsyncDeviceStreamHandler 之外,還有兩個額外的結構用於完整資料流程:ArrowAsyncTaskArrowAsyncProducer

結構定義#

C 裝置非同步串流介面包含三個 struct 定義

#ifndef ARROW_C_ASYNC_STREAM_INTERFACE
#define ARROW_C_ASYNC_STREAM_INTERFACE

struct ArrowAsyncTask {
  int (*extract_data)(struct ArrowArrayTask* self, struct ArrowDeviceArray* out);

  void* private_data;
};

struct ArrowAsyncProducer {
  ArrowDeviceType device_type;

  void (*request)(struct ArrowAsyncProducer* self, int64_t n);
  void (*cancel)(struct ArrowAsyncProducer* self);

  void (*release)(struct ArrowAsyncProducer* self);
  const char* additional_metadata;
  void* private_data;
};

struct ArrowAsyncDeviceStreamHandler {
  // consumer-specific handlers
  int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self,
                   struct ArrowSchema* stream_schema);
  int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self,
                      struct ArrowAsyncTask* task, const char* metadata);
  void (*on_error)(struct ArrowAsyncDeviceStreamHandler* self,
                   int code, const char* message, const char* metadata);

  // release callback
  void (*release)(struct ArrowAsyncDeviceStreamHandler* self);

  // must be populated before calling any callbacks
  struct ArrowAsyncProducer* producer;

  // opaque handler-specific data
  void* private_data;
};

#endif  // ARROW_C_ASYNC_STREAM_INTERFACE

注意

標準保護機制 ARROW_C_ASYNC_STREAM_INTERFACE 的目的是為了避免重複定義。當兩個專案將 C 非同步串流介面定義複製到它們自己的標頭檔中,而第三方程式庫專案又同時引入這兩個專案時,就會發生重複定義的問題。因此,當複製這些定義時,務必保持此保護機制完全不變。

ArrowAsyncDeviceStreamHandler 結構#

此結構包含以下欄位

int (*ArrowAsyncDeviceStreamHandler.on_schema)(struct ArrowAsyncDeviceStreamHandler*, struct ArrowSchema*)#

必要。用於接收串流綱要的處理常式。所有傳入的記錄應符合提供的綱要。如果成功,此函式應傳回 0;否則,應傳回與 errno 相容的錯誤代碼。

如果生產者想要提供任何額外的上下文資訊,它可以將 ArrowAsyncProducer.additional_metadata 設定為非 NULL 值。這會以與 ArrowSchema.metadata 相同的格式進行編碼。如果 metadata 不是 NULL,則其生命週期應與 ArrowAsyncProducer 物件的生命週期相關聯。

除非呼叫 on_error 處理常式,否則此方法將始終只會被呼叫一次,並且是此物件上第一個被呼叫的方法。因此,生產者必須在呼叫此函式之前填入 ArrowAsyncProducer 成員,以允許消費者施加回壓並控制資料流。生產者保有 ArrowAsyncProducer 的所有權,並且必須在呼叫 ArrowAsyncDeviceStreamHandler 上的 release 回呼之後清理它。

在此處收到非零結果的生產者,後續不得呼叫此物件上的 release 回呼以外的任何其他方法。

int (*ArrowAsyncDeviceStreamHandler.on_next_task)(struct ArrowAsyncDeviceStreamHandler*, struct ArrowAsyncTask*, const char*)#

必要。當有新的記錄可供處理時要呼叫的處理常式。每個記錄的綱要應與呼叫 on_schema 時的綱要相同。如果成功處理,此函式應傳回 0;否則,應傳回與 errno 相容的錯誤代碼。

接收的不是記錄本身,而是 ArrowAsyncTask,這樣做的目的是為了促進以消費者為中心的執行緒控制,以便更好地接收資料。呼叫此函式僅表示資料可透過提供的任務取得。

生產者透過傳遞 NULL 作為 ArrowAsyncTask 指標(而不是有效的位址)來表示串流的結束。此任務物件僅在此函式呼叫的生命週期內有效。如果消費者想要在此方法範圍之外使用此任務,則必須將其內容複製或移動到新的 ArrowAsyncTask 物件。

const char* 參數的存在是為了讓生產者提供他們想要的任何額外上下文資訊。這會以與 ArrowSchema.metadata 相同的格式進行編碼。如果不是 NULL,則生命週期僅限於呼叫此函式的範圍。想要在這次呼叫的生命週期之外維護額外 metadata 的消費者必須自行複製該值。

生產者絕不能從多個執行緒同時呼叫此方法。

必須呼叫 ArrowAsyncProducer.request 回呼,才能開始接收對此處理常式的呼叫。

void (*ArrowAsyncDeviceStreamHandler.on_error)(struct ArrowAsyncDeviceStreamHandler, int, const char*, const char*)#

必要。當生產者遇到錯誤時要呼叫的處理常式。在呼叫此方法之後,將呼叫 release 回呼作為此結構上的最後一次呼叫。參數是與 errno 相容的錯誤代碼,以及可選的錯誤訊息和 metadata。

如果訊息和 metadata 不是 NULL,則其生命週期僅在這次呼叫的範圍內有效。想要在函式傳回後仍維護這些值的消費者必須自行複製這些值。

如果 metadata 參數不是 NULL,為了提供鍵值錯誤 metadata,則應以與 ArrowSchema.metadata 中 metadata 編碼方式相同的形式進行編碼。

生產者可以在呼叫 ArrowAsyncProducer.request 之前或之後呼叫此方法。此回呼絕不能呼叫 ArrowAsyncProducer 物件的任何方法。

void (*ArrowAsyncDeviceStreamHandler.release)(struct ArrowAsyncDeviceStreamHandler*)#

必要。指向消費者提供的處理常式 release 回呼的指標。

生產者可以在呼叫 ArrowAsyncProducer.request 之前或之後呼叫此方法。此方法絕不能呼叫 ArrowAsyncProducer 物件的任何方法。

struct ArrowAsyncProducer ArrowAsyncDeviceStreamHandler.producer#

必要。消費者將用來請求額外資料或取消的生產者物件。

生產者必須在呼叫 ArrowAsyncDeviceStreamHandler.on_schema 回呼之前填入此物件。生產者保有此物件的所有權,並且必須在呼叫 ArrowAsyncDeviceStreamHandler 上的 release 回呼之後清理它。

在呼叫 on_schema 回呼之前,消費者不能假定此物件有效。

void *ArrowAsyncDeviceStreamHandler.private_data#

可選。指向消費者提供的私有資料的不透明指標。

生產者絕不能處理此成員。此成員的生命週期由消費者處理,尤其由 release 回呼處理。

ArrowAsyncTask 結構#

使用 Task 物件而不是將陣列直接傳遞給 on_next 回呼的目的,是為了實現更複雜和高效的執行緒處理。利用 Task 物件可讓生產者將「解碼」邏輯與 I/O 分離,使消費者能夠避免在 CPU 核心之間傳輸資料(例如,從一個 L1/L2 快取到另一個快取)。

此生產者提供的結構包含以下欄位

int (*ArrowArrayTask.extract_data)(struct ArrowArrayTask*, struct ArrowDeviceArray*)#

必要。用於使用可用資料填入提供的 ArrowDeviceArray 的回呼。生產者提供的 ArrowAsyncTasks 的順序,使消費者能夠知道要處理的資料順序。如果消費者不在意此任務擁有的資料,則仍必須呼叫 extract_data,以便生產者可以執行任何必要的清理。NULL 應作為裝置陣列指標傳遞,以表示消費者不想要實際資料,讓任務執行必要的清理。

如果從此方法傳回非零值,則後續應僅有生產者呼叫 ArrowAsyncDeviceStreamHandleron_error 回呼。由於呼叫此方法很可能與目前的控制流程分離,因此傳回非零值以表示發生錯誤,可讓目前的執行緒決定如何相應地處理該情況,同時仍允許所有錯誤記錄和處理集中在 ArrowAsyncDeviceStreamHandler.on_error 回呼中。

任何必要的清理都應作為此回呼調用的一部分來執行,而不是使用單獨的 release 回呼。陣列的所有權會給予作為參數傳入的指標,並且此陣列必須單獨 release。

只能呼叫此方法一次。

void *ArrowArrayTask.private_data#

選用。 指向生產者提供的私有資料的不透明指標。

消費者絕不能處理此成員。此成員的生命週期由建立此物件的生產者處理,並且應在呼叫 ArrowArrayTask.extract_data 期間進行必要的清理。

ArrowAsyncProducer 結構#

此生產者提供和管理的物件包含以下欄位

ArrowDeviceType ArrowAsyncProducer.device_type#

必要。此生產者將提供資料的裝置類型。此生產者產生的所有 ArrowDeviceArray 結構都應具有與此處設定相同的裝置類型。

void (*ArrowAsyncProducer.request)(struct ArrowAsyncProducer*, uint64_t)#

必要。消費者必須呼叫此函式才能開始接收對 ArrowAsyncDeviceStreamHandler.on_next_task 的呼叫。從 ArrowAsyncDeviceStreamHandler.on_next_taskArrowAsyncDeviceStreamHandler.on_schema 內部同步呼叫此函式必須有效。因此,此函式絕不能同步呼叫 on_next_taskon_error,以避免遞迴和可重入回呼。

在呼叫 cancel 之後,對此函式的額外呼叫必須是空操作 (NOP),但允許。

在未取消的情況下,呼叫此函式會註冊給定數量的額外陣列/批次,以供生產者產生。生產者在傳播回壓/等待之前,應僅呼叫適當的 on_next_task 回呼,最大次數為對此方法的總呼叫次數。

呼叫 request 遇到的任何錯誤都必須透過呼叫 ArrowAsyncDeviceStreamHandleron_error 回呼來傳播。

使用 <= 0 的 n 值呼叫此函式是無效的。如果收到此類 n 值,生產者應產生錯誤(例如,呼叫 on_error)。

void (*ArrowAsyncProducer.cancel)(struct ArrowAsyncProducer*)#

必要。此函式向生產者發出訊號,告知其最終必須停止呼叫 on_next_taskcancel 的呼叫必須是等冪且執行緒安全的。在呼叫一次之後,後續呼叫必須是空操作 (NOP)。除了 on_error 之外,此方法絕不能呼叫任何消費者端處理常式。

不需要呼叫 cancel 會立即影響生產者,只需要它最終必須停止呼叫 on_next_task,然後後續呼叫非同步處理常式物件上的 release。因此,即使在呼叫 cancel 之後,如果仍有待處理的請求陣列,消費者必須準備好接收對 on_next_taskon_error 的一次或多次呼叫。

成功取消絕不能導致生產者呼叫 ArrowAsyncDeviceStreamHandler.on_error,而是應該完成任何剩餘任務(相應地呼叫 on_next_task),並最終只呼叫 release

在處理對 cancel 的呼叫期間遇到的任何錯誤,都必須透過非同步串流處理常式上的 on_error 回呼回報。

const char *ArrowAsyncProducer.additional_metadata#

可選。一個額外的 metadata 字串,用於向消費者提供任何額外的上下文。此值必須NULL 或以與 ArrowSchema.metadata 相同方式編碼的有效字串。例如,如果已知,生產者可以利用此 metadata 來提供串流中的總行數和/或批次數。

如果不是 NULL,則必須至少在此物件的生命週期內有效。

void *ArrowAsyncProducer.private_data#

可選。指向生產者提供的特定資料的不透明指標。

消費者絕不能處理此成員,生命週期由建構此物件的生產者擁有。

錯誤處理#

與常規 C 串流介面不同,非同步介面允許錯誤在兩個方向上流動。因此,錯誤處理可能會稍微複雜一些。因此,此規範指定以下規則

  • 如果生產者在處理過程中遇到錯誤,則應呼叫 on_error 回呼,然後在它傳回後呼叫 release

  • 如果 on_schemaon_next_task 傳回非零整數值,生產者不應呼叫 on_error 回呼,而是應在記錄或處理錯誤代碼之前或之後的某個時間點最終呼叫 release

結果生命週期#

傳遞給 on_schema 回呼的 ArrowSchema 必須獨立 release,並且物件本身需要移動到消費者擁有的 ArrowSchema 物件。作為參數傳遞給回呼的 ArrowSchema* 絕不能被儲存和保留。

提供給 on_next_taskArrowAsyncTask 物件由生產者擁有,並將在調用其 extract_data 時清理。如果消費者不在意資料,則應傳遞 NULL 而不是有效的 ArrowDeviceArray*

傳遞給 on_errorconst char* 錯誤訊息和 metadata 僅在 on_error 函式本身的範圍內有效。如果需要在其傳回後仍然存在,則必須複製它們。

串流處理常式生命週期#

非同步串流處理常式的生命週期是使用 release 回呼來管理的,其用法與 C 資料介面 中的用法類似。

ArrowAsyncProducer 生命週期#

ArrowAsyncProducer 的生命週期由生產者本身擁有,並且應由其管理。在呼叫 release 以外的任何方法之前,必須填入它,並且必須至少保持有效,直到剛要呼叫串流處理常式物件上的 release 之前。

執行緒安全#

ArrowAsyncDeviceStreamHandler 上的所有處理常式函式都應僅以序列化的方式呼叫,但不保證每次都從同一個執行緒呼叫。生產者應等待處理常式回呼傳回,然後再呼叫下一個處理常式回呼,以及在呼叫 release 回呼之前。

回壓是由消費者呼叫 ArrowAsyncProducer.request 來管理的,以指示它準備好接收多少個陣列。

ArrowAsyncDeviceStreamHandler 物件應該能夠在傳遞給生產者後立即處理回呼,任何初始化都應在提供之前執行。

可能的序列圖#

sequenceDiagram Consumer->>+Producer: ArrowAsyncDeviceStreamHandler* Producer-->>+Consumer: on_schema(ArrowAsyncProducer*, ArrowSchema*) Consumer->>Producer: ArrowAsyncProducer->request(n) par loop up to n times Producer-->>Consumer: on_next_task(ArrowAsyncTask*) end and for each task Consumer-->>Producer: ArrowAsyncTask.extract_data(...) Consumer-->>Producer: ArrowAsyncProducer->request(1) end break Optionally Consumer->>-Producer: ArrowAsyncProducer->cancel() end loop possible remaining Producer-->>Consumer: on_next_task(ArrowAsyncTask*) end Producer->>-Consumer: ArrowAsyncDeviceStreamHandler->release()

與其他交換格式的互操作性#

其他交換 API,例如 CUDA 陣列介面,包含用於傳遞正在匯出的資料緩衝區的形狀和資料類型的成員。此資訊對於解釋正在共享的裝置資料緩衝區中的原始位元組是必要的。使用者應利用現有的 ArrowSchema 結構來傳遞任何資料類型和形狀資訊,而不是將資料的形狀/類型與 ArrowDeviceArray 一起儲存。

更新此規範#

注意

由於此規範仍被視為實驗性的,因此存在(仍然非常低)可能會稍微更改的可能性。將其標記為「實驗性」的原因是因為我們不知道我們不知道什麼。已完成工作和研究以確保通用 ABI 與許多不同的框架相容,但始終有可能遺漏某些內容。一旦官方 Arrow 版本支援此規範,並且觀察到使用情況以確認不需要任何修改,則將移除「實驗性」標籤並凍結 ABI。

一旦官方 Arrow 版本支援此規範,C ABI 就會凍結。這表示 ArrowDeviceArray 結構定義不應以任何方式更改,包括新增成員。

允許向後相容的變更,例如 ArrowDeviceType 的新巨集值,或將保留的 24 個位元組轉換為不同的類型/成員,而不會更改結構的大小。

任何不相容的變更都應成為新規範的一部分,例如 ArrowDeviceArrayV2