分離式 IPC 協定#

警告

實驗性:分離式 IPC 協定在其目前形式中為實驗性。根據回饋和使用情況,協定定義可能會變更,直到完全標準化為止。

原理#

Arrow IPC 格式 描述了一種將 Arrow 資料作為記錄批次串流傳輸的協定。此協定預期一個連續的位元組串流,該串流分為離散訊息(使用長度前綴和接續指示符)。每個離散訊息包含兩個部分

  • Flatbuffers 標頭訊息

  • 一系列位元組,由扁平化和封裝的本體緩衝區組成(某些訊息類型,例如結構描述訊息,沒有此部分)- 在 IPC 格式規範中,這被稱為訊息本體

對於大多數情況,目前現有的 IPC 格式已足夠有效率

  • 以 IPC 格式接收資料允許零複製利用本體緩衝區位元組,無需反序列化即可形成 Arrow 陣列

  • IPC 檔案格式可以進行記憶體對應,因為它是位置無關的,並且檔案的位元組與記憶體中預期的完全相同。

但是,有些使用案例是此格式無法處理的

  • 建構 IPC 記錄批次訊息需要分配一個連續的位元組區塊,並將所有資料緩衝區複製到其中,背對背地封裝在一起。這降低了將現有的、直接可消耗的資料包裝到 IPC 訊息中的常見情況的效率。

  • 即使 Arrow 資料位於跨程序邊界或傳輸(例如 UCX)可存取的記憶體中,也沒有標準方法可以向可以利用它的消費者指定該共享位置。

  • 位於非 CPU 裝置(例如 GPU)上的 Arrow 資料無法使用 Arrow IPC 發送,而無需將資料複製回主機裝置或將 Flatbuffers 元數據位元組複製到裝置記憶體中。

    • 同樣地,將 IPC 訊息接收到裝置記憶體中將需要將 Flatbuffers 元數據複製回主機 CPU 裝置。這是因為 IPC 串流在單一串流中交錯了資料和元數據。

此協定嘗試以有效率的方式解決這些使用案例。

目標#

  • 定義一個通用的協定,用於傳遞 Arrow IPC 資料,不綁定任何特定傳輸,同時也允許利用非 CPU 裝置記憶體、共享記憶體和更新的「高效能」傳輸,例如 UCXlibfabric

    • 這允許本體中的資料保留在非 CPU 裝置(如 GPU)上,而無需昂貴的裝置到主機複製。

  • 允許僅使用 Flight RPC 進行控制流程,方法是將 IPC 元數據串流與 IPC 本體位元組分離

定義#

IPC 元數據

包含 Arrow IPC 訊息標頭的 Flatbuffers 訊息位元組

標籤

一個小端序 uint64 值,用於流量控制,並用於決定如何解譯訊息的本體。可以遮罩特定的位元,以允許僅透過標籤的一部分來識別訊息,並將其餘位元用於控制流程或其他訊息元數據。某些傳輸(例如 UCX)內建支援此類標籤值,並且無論訊息本體是否可能駐留在非 CPU 裝置上,都會在 CPU 記憶體中提供這些標籤值。

序列號碼

一個小端序、4 位元組的無號整數,對於串流從 0 開始,指示訊息的序列順序。它也用於識別特定訊息,以將 IPC 元數據標頭與其對應的本體連結,因為元數據和本體可以透過分離的管道/串流/傳輸發送。

如果序列號碼達到 UINT32_MAX,則應允許其循環,因為不太可能會有足夠多的未處理訊息等待處理,而導致序列號碼重疊。

序列號碼有兩個用途:識別對應的元數據和標籤本體資料訊息,並確保我們不依賴訊息必須按順序到達。客戶端應使用序列號碼來正確排序到達的訊息以進行處理。

協定#

使用 libcudfUCX 的參考範例實作可以在 arrow-experiments repo 中找到。

需求#

實作此協定的傳輸必須提供兩個功能

  • 訊息傳送

    • 定界訊息(如 gRPC),而不是非定界串流(如沒有進一步框架的純 TCP)。

    • 或者,可以使用用於 IPC 協定的 封裝訊息格式 之類的框架機制,同時省略本體位元組。

  • 標籤訊息傳送

    • 傳送具有附加的小端序、無號 64 位元整數標籤的訊息,用於控制流程。這樣的標籤允許控制流程對本體位於非 CPU 裝置上的訊息進行操作,而無需將訊息本身複製出裝置。

URI 規範#

當提供 URI 給消費者以與此協定一起使用時(例如透過 Flight 的 Location URI),URI 應指定一個容易識別的方案,例如 ucx:fabric:。此外,URI 應編碼以下 URI 查詢參數

注意

隨著此協定的成熟,本文檔將更新為常用於此協定的傳輸方案。

  • want_data - 必要 - uint64 整數值

    • 此值應用於標記發送到伺服器的初始訊息,以啟動資料傳輸。啟動訊息的本體應為所請求的資料串流的不透明二進位識別符(如 Flight RPC 協定中的 Ticket

  • free_data - 選用 - uint64 整數值

    • 如果伺服器可能會發送使用偏移量/位址進行遠端記憶體存取或共享記憶體位置的訊息,則 URI 應包含此參數。此值用於標記從客戶端發送到資料伺服器的訊息,其中包含客戶端不再需要的特定偏移量/位址(即,任何直接引用這些記憶體位置的操作,例如將遠端資料複製到本地記憶體,都已完成)。

  • remote_handle - 選用 - base64 編碼字串

    • 當使用共享記憶體或遠端記憶體時,此值指示存取記憶體所需的任何必要句柄或識別符。

      • 使用 UCX,這將是一個 rkey

      • 使用 CUDA IPC,這將是基礎 GPU 指標或記憶體句柄的值,後續位址將是從此基礎指標的偏移量。

背壓處理#

目前,此提案未指定任何管理訊息背壓的方法,以因記憶體和頻寬原因進行節流。目前,這將是由傳輸定義的,而不是鎖定到次優方案。

隨著不同傳輸和程式庫之間的使用量增長,將會出現常見模式,這將允許以通用但有效率的方式處理不同使用案例中的背壓。

注意

雖然協定本身與傳輸無關,但到目前為止,目前的使用情況和範例僅在使用 UCX 和 libfabric 傳輸的情況下進行了測試,但僅此而已。

協定描述#

可能發生兩種情況

  1. 元數據和本體資料的串流在分離的連線上發送

%% Licensed to the Apache Software Foundation (ASF) under one %% or more contributor license agreements. See the NOTICE file %% distributed with this work for additional information %% regarding copyright ownership. The ASF licenses this file %% to you under the Apache License, Version 2.0 (the %% "License"); you may not use this file except in compliance %% with the License. You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, %% software distributed under the License is distributed on an %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY %% KIND, either express or implied. See the License for the %% specific language governing permissions and limitations %% under the License. sequenceDiagram participant D as Data Stream participant C as Client participant M as Metadata Stream activate C C-->>+M: TaggedMessage(server.want_data, bytes=ID_of_desired_data) C-->>+D: TaggedMessage(server.want_data, bytes=ID_of_desired_data) M-->>C: Message(bytes([1]) + le_bytes(sequence_number) + schema_metadata) loop each batch par M-->>C: Message(bytes([1]) + le_bytes(sequence_number) + batch_metadata) and alt D-->>C: TaggedMessage((bytes[0] << 55) | le_bytes(sequence_number),<br/>bytes=batch_data) else D-->>C: TaggedMessage((bytes[1] << 55) | le_bytes(sequence_number),<br/>bytes=uint64_pairs) end end end M-->>C: Message(bytes([0]) + le_bytes(sequence_number)) deactivate M loop C-->>D: TaggedMessage(server.free_data, bytes=uint64_list) end deactivate D deactivate C

  1. 元數據和本體資料的串流同時在同一個連線上發送

%% Licensed to the Apache Software Foundation (ASF) under one %% or more contributor license agreements. See the NOTICE file %% distributed with this work for additional information %% regarding copyright ownership. The ASF licenses this file %% to you under the Apache License, Version 2.0 (the %% "License"); you may not use this file except in compliance %% with the License. You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, %% software distributed under the License is distributed on an %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY %% KIND, either express or implied. See the License for the %% specific language governing permissions and limitations %% under the License. sequenceDiagram participant C as Client participant S as Server activate C C-->>+S: TaggedMessage(server.want_data, bytes=ID_of_desired_data) S-->>C: Message(bytes([1]) + le_bytes(sequence_number) + schema_metadata) par loop each chunk S-->>C: Message(bytes([1]) + le_bytes(sequence_number) + batch_metadata) end S-->>C: Message(bytes([0]) + le_bytes(sequence_number)) and loop each chunk alt S-->>C: TaggedMessage((bytes[0] << 55) | le_bytes(sequence_number),<br/>bytes=batch_data) else S-->>C: TaggedMessage((bytes[1] << 55) | le_bytes(sequence_number),<br/>bytes=uint64_pairs) end end end loop C-->>S: TaggedMessage(server.free_data, bytes=uint64_list) end deactivate S deactivate C

伺服器序列#

可以有一個伺服器處理 IPC 元數據串流和本體資料串流,也可以有分離的伺服器處理 IPC 元數據和本體資料。如果需要,這允許跨單一傳輸管道或兩個管道串流資料。

元數據串流序列#

伺服器的常駐狀態是等待具有特定 <want_data> 標籤值的帶標籤訊息以啟動傳輸。此 <want_data> 值由伺服器定義,並透過提供給任何客戶端的 URI 傳播給他們。此協定未規定任何特定值,以便它不會干擾任何其他依賴標籤值的現有協定。該訊息的本體將包含一個不透明的二進位識別符,以指示要發送的特定資料集/資料串流。

注意

例如,與 FlightInfo 訊息一起傳遞的 ticket 將是此訊息的本體。因為它是不透明的,所以它可以是伺服器想要使用的任何東西。URI 和識別符不需要透過 Flight RPC 提供給客戶端,但可以來自任何所需的傳輸或協定。

在接收到 <want_data> 請求後,伺服器透過發送由以下內容組成的訊息串流來回應

block-beta columns 8 block:P["\n\n\n\nPrefix"]:5 T["Message type\nByte 0"] S["Sequence number\nBytes 1-4"] end H["Flatbuffer bytes\nRest of the message"]:3

  • 一個 5 位元組的前綴

    • 訊息的第一個位元組指示訊息的類型,目前僅允許兩種訊息類型(未來可能會新增更多類型)

      1. 串流結束

      2. Flatbuffers IPC 元數據訊息

    • 接下來的 4 位元組是一個小端序、無號 32 位元整數,指示訊息的序列號碼。串流中的第一個訊息(必須始終是結構描述訊息)必須具有序列號碼 0。每個後續訊息必須將號碼遞增 1

  • Arrow IPC 標頭的完整 Flatbuffers 位元組

如 Arrow IPC 格式中所定義,每個元數據訊息可以表示資料區塊或字典,供資料串流使用。

在發送最後一個元數據訊息後,伺服器必須透過發送由正好 5 位元組組成的訊息來指示串流結束

  • 第一個位元組是 0,指示串流結束訊息

  • 最後 4 位元組是序列號碼(4 位元組,小端序位元組順序的無號整數)

資料串流序列#

如果單一伺服器正在處理資料和元數據串流,則資料訊息與元數據訊息並行開始發送到客戶端。否則,與元數據序列一樣,伺服器的常駐狀態是等待具有 <want_data> 標籤值的帶標籤訊息,其本體指示要發送到客戶端的資料集/資料串流。

對於資料串流中的每個 IPC 訊息,如果該訊息有本體(即記錄批次或字典訊息),則必須在資料串流上發送帶標籤的訊息。每個訊息的 標籤 應結構化如下

block-beta columns 8 S["Sequence number\nBytes 0-3"]:4 U["Unused (Reserved)\nBytes 4-6"]:3 T["Message type\nByte 7"]:1

  • 標籤的最低有效 4 位元組(位元 0 - 31)應為訊息的小端序無號 32 位元序列號碼。

  • 標籤的最高有效 位元組(位元 56 - 63)指示訊息本體類型為 8 位元無號整數。目前僅指定了兩種訊息類型,但可以根據需要新增更多類型以擴展協定

    1. 本體包含原始本體緩衝區位元組作為封裝緩衝區(即標準 IPC 格式本體位元組)

    2. 本體包含一系列無號、小端序 64 位元整數對,以表示共享或遠端記憶體,示意性地結構化為

      • 前兩個整數(例如,前 16 位元組)表示此訊息中所有緩衝區的大小(以位元組為單位)和緩衝區數量(因此是後續 uint64 對的數量)

      • 每個後續的 uint64 值對都是位址/偏移量,後跟該特定緩衝區的長度。

  • 標籤的所有未指定位元(位元 32 - 55)都保留供未來更新此協定時使用。目前,它們必須為 0。

注意

跨網路發送的任何共享/遠端記憶體位址必須由伺服器保持活動狀態,直到收到對應的帶標籤 <free_data> 訊息。如果客戶端在發送任何 <free_data> 訊息之前斷線,則可以認為伺服器可以安全地清理記憶體(如果需要)。

在發送最後一個帶標籤的 IPC 本體訊息後,伺服器應維持連線並等待帶標籤的 <free_data> 訊息。這些 <free_data> 訊息的結構很簡單:一個或多個無號、小端序 64 位元整數,指示可以釋放的位址/偏移量。

一旦沒有更多待釋放的未完成位址,此串流的工作就完成了。

客戶端序列#

此協定的客戶端需要並行處理訊息的資料和元數據串流,這些串流可能來自同一個伺服器或不同的伺服器。以下流程圖顯示了客戶端可能如何處理元數據和資料串流

%% Licensed to the Apache Software Foundation (ASF) under one %% or more contributor license agreements. See the NOTICE file %% distributed with this work for additional information %% regarding copyright ownership. The ASF licenses this file %% to you under the Apache License, Version 2.0 (the %% "License"); you may not use this file except in compliance %% with the License. You may obtain a copy of the License at %% http://www.apache.org/licenses/LICENSE-2.0 %% Unless required by applicable law or agreed to in writing, %% software distributed under the License is distributed on an %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY %% KIND, either express or implied. See the License for the %% specific language governing permissions and limitations %% under the License. graph LR client((Client))-->c1{{Send #60;want_data#gt; Msg}} subgraph meta [Meta Message] direction LR m1[/Msg Type #40;byte 0#41;<br/>Seq Num #40;bytes 1-5#41;/]-- type 1 -->m2[[Process IPC Header]] m2-- IPC has body -->m3[Get Corresponding<br/>Tagged Msg] m2-- Schema Msg -->m4[/Store Schema/] m1-- type 0 -->e[Indicate End of Stream] end subgraph data [Data Stream] direction LR d1[Request Msg<br/>for Seq Num]-->d2{Most Significant<br/>Byte} d2-- 0 -->d3[Construct from<br/>Metadata and Body] d2-- 1 -->d4[Get shared/remote<br/>buffers] d4 -->d5[Construct from<br/>Metadata and buffers] d3 & d5 -->e2[Output Batch] end client -- recv untagged msg --> meta client -- get tagged msg --> data

  1. 首先,客戶端使用它在 URI 中提供的 <want_data> 值作為標籤,並使用不透明 ID 作為本體,發送一個帶標籤的訊息。

    • 如果元數據和資料伺服器是分離的,則需要將 <want_data> 訊息分別發送到每個伺服器。

    • 在任一種情況下,元數據和資料串流都可以根據傳輸的性質並行和/或非同步處理。

  2. 對於客戶端在元數據串流中接收的每個未標籤訊息

    • 訊息的第一個位元組指示它是串流結束訊息(值 0)還是元數據訊息(值 1)。

    • 接下來的 4 位元組是訊息的序列號碼,一個小端序位元組順序的無號 32 位元整數。

    • 如果它不是串流結束訊息,則剩餘的位元組是可以正常解譯的 IPC Flatbuffer 位元組。

      • 如果訊息有本體(即記錄批次或字典訊息),則客戶端應使用相同的序列號碼從資料串流中檢索帶標籤的訊息。

    • 如果它是串流結束訊息,則如果接收到的序列號碼中沒有間隙,則可以安全地關閉元數據連線。

  3. 當接收到需要本體的元數據訊息時,0x00000000FFFFFFFF 的標籤遮罩與序列號碼一起使用,以匹配訊息,而與較高的位元組無關(例如,我們僅關心將較低的 4 位元組與序列號碼匹配)

    • 接收後,最高有效位元組的值決定了客戶端如何處理本體資料

      • 如果最高有效位元組為 0:則訊息的本體是原始 IPC 封裝的本體緩衝區,允許輕鬆地與對應的元數據標頭位元組一起處理。

      • 如果最高有效位元組為 1:訊息的本體將由一系列無號、小端序 64 位元整數對組成。

        • 前兩個整數表示 1) 所有本體緩衝區的總大小,以便於分配(如果需要中間緩衝區)和 2) 正在發送的緩衝區數量 (nbuf)。

        • 訊息的其餘部分將是 nbuf 對整數,每個緩衝區一對。每對都是 1) 緩衝區的位址/偏移量和 2) 該緩衝區的長度。然後可以根據底層傳輸透過共享或遠端記憶體常式檢索記憶體。這些位址/偏移量必須保留,以便稍後可以在 <free_data> 訊息中發回,向伺服器指示客戶端不再需要共享記憶體。

  4. 一旦收到串流結束訊息,客戶端應處理任何剩餘的未處理 IPC 元數據訊息。

  5. 在遠端伺服器能夠釋放個別記憶體位址/偏移量(在它已發送這些位址/偏移量而不是完整的本體位元組的情況下)後,客戶端應將對應的 <free_data> 訊息發送到伺服器。

    • 單一 <free_data> 訊息由任意數量的無號 64 位元整數值組成,表示可以釋放的位址/偏移量。它之所以是任意數字的原因是為了允許客戶端選擇是發送多個訊息來釋放多個位址,還是將多個位址合併為更少的訊息來釋放(從而使協定在需要時不那麼“冗長”)

持續開發#

如果您決定在您自己的環境和系統中嘗試此協定,我們很樂意收到回饋並了解您的使用案例。由於目前這是一個實驗性協定,因此我們需要真實世界的使用情況,以促進改進它並找到正確的概括以在各種傳輸中標準化。

請使用 Arrow 開發人員郵件列表發表意見:https://arrow.dev.org.tw/community/#mailing-lists