開發人員指南#

本頁更詳細地介紹 Acero 的設計。它討論如何建立自訂的 exec 節點,並描述 Acero 設計和實作背後的一些理念。最後,它概述如何使用新的行為擴展 Acero,以及如何將這種新的行為向上游整合到核心 Arrow 儲存庫中。

了解 ExecNode#

ExecNode 是一個抽象類別,具有幾個純虛擬方法,用於控制節點的運作方式

ExecNode::StartProducing()#

此方法在計畫開始時呼叫一次。大多數節點會忽略此方法(任何必要的初始化都應在建構函式或 Init 中進行)。但是,來源節點通常會提供自訂實作。來源節點應排程啟動讀取和提供資料所需的任何任務。來源節點通常是計畫中任務的主要建立者。

注意

ExecPlan 以推送模型運作。來源通常是拉取模型。例如,您的來源可能是迭代器。然後,來源節點通常會排程任務,以從來源拉取一個項目,並透過 InputReceived 將該項目推送到來源的輸出節點。

範例#

  • table_source 節點中,輸入表格會分成批次。會為每個批次建立一個任務,該任務會在節點的輸出上呼叫 InputReceived

  • scan 節點中,會建立一個任務以開始列出資料集中的片段。然後,每個列出任務都會建立任務,以非同步方式從片段讀取批次。當批次完整讀取時,接續會排程一個包含 exec 計畫的新任務。此任務會在掃描節點的輸出上呼叫 InputReceived

ExecNode::InputReceived()#

此方法在計畫執行期間會呼叫多次。它是節點之間傳遞資料的方式。輸入節點會在其輸出上呼叫 InputReceived。Acero 的執行模型是基於推送的。每個節點都會透過呼叫 InputReceived 並傳入一批資料,將資料推送到其輸出中。

InputReceived 方法通常是節點實際執行工作的地方。例如,投影節點會執行其表達式,並建立新的擴充輸出批次。然後,它會在輸出上呼叫 InputReceived。永遠不會在來源節點上呼叫 InputReceived。永遠不會呼叫 InputReceived 的是接收節點。所有其他節點都會經歷這兩者。

某些節點(通常稱為「管線中斷器」)必須累積輸入,才能產生任何輸出。例如,排序節點必須累積所有輸入,才能排序資料並產生輸出。在這些節點中,InputReceived 方法通常會將資料放入某種累積佇列中。如果節點沒有足夠的資料來運作,則它不會呼叫 InputReceived。那麼這將是目前任務的結束。

範例#

  • project 節點執行其表達式,並使用接收到的批次作為表達式的輸入。從輸入批次和表達式的結果建立新的批次。新的批次會被賦予與輸入批次相同的順序索引,然後節點會在輸出上呼叫 InputReceived

  • order_by 節點將批次插入到累積佇列中。如果這是最後一個批次,則節點將對累積佇列中的所有內容進行排序。然後,節點將在排序結果中針對每個批次在其輸出上呼叫 InputReceived。將為每個批次分配新的批次索引。請注意,此最終輸出步驟也可能因呼叫 InputFinished(如下所述)而發生。

ExecNode::InputFinished()#

每個輸入都會呼叫此方法一次。節點一旦知道它將發送到該輸出的批次數量,就會在其輸出上呼叫 InputFinished。通常,這會在節點完成工作時發生。例如,掃描節點在完成讀取檔案後會呼叫 InputFinished。但是,如果它知道(可能從檔案中繼資料得知)將建立多少批次,則它可以更早呼叫它。

某些節點將使用此訊號來觸發某些處理。例如,排序節點需要等到它接收到所有輸入後才能排序資料。它依賴 InputFinished 呼叫來知道這已經發生。

即使節點在完成時沒有執行任何特殊處理(例如,投影節點或篩選節點不需要執行任何串流結束處理),該節點仍會在輸出上呼叫 InputFinished。

警告

InputFinished 呼叫可能會在最後一次呼叫 InputReceived 之前到達。事實上,它甚至可能在任何 InputReceived 呼叫開始之前發送出去。例如,表格來源節點始終確切知道它將產生多少批次。它可以選擇在呼叫 InputReceived 之前呼叫 InputFinished。如果節點需要執行「串流結束」處理,則它通常會使用 AtomicCounter,這是一個輔助類別,用於找出所有資料何時到達。

範例#

  • order_by 檢查它是否已接收到所有批次。如果是,則執行 InputReceived 範例中描述的排序步驟。在開始發送輸出資料之前,它會檢查它有多少輸出批次(批次大小可能會因為累積或排序而改變),並在節點的輸出上呼叫 InputFinished

  • fetch 節點在呼叫 InputReceived 期間意識到它已接收到要求的所有列。它立即在其輸出上呼叫 InputFinished(即使它自己的 InputFinished 方法尚未被呼叫)

ExecNode::PauseProducing() / ExecNode::ResumeProducing()#

這些方法控制背壓。某些節點可能需要暫停其輸入,以避免累積過多資料。例如,當使用者使用 RecordBatchReader 取用計畫時,我們使用 SinkNode。SinkNode 將資料放入 RecordBatchReader 從中拉取的佇列中(這是從推送模型到拉取模型的轉換)。如果使用者讀取 RecordBatchReader 的速度很慢,則此佇列可能會開始填滿。對於另一個範例,我們可以考慮寫入節點。此節點將資料寫入檔案系統。如果寫入速度很慢,則資料可能會累積在寫入節點。因此,寫入節點將需要施加背壓。

當節點意識到它需要施加一些背壓時,它會在輸入上呼叫 PauseProducing。一旦節點有足夠的空間繼續,它就會在其輸入上呼叫 ResumeProducing。例如,當 SinkNode 的佇列過滿時,它會暫停。當使用者繼續從 RecordBatchReader 讀取時,我們可以預期佇列會慢慢排空。一旦佇列排空到足夠的程度,SinkNode 就可以呼叫 ResumeProducing。

來源節點通常需要為 PauseProducing 和 ResumeProducing 提供特殊行為。例如,從檔案讀取的掃描節點可以暫停讀取檔案。但是,某些來源節點可能無法以任何有意義的方式暫停。表格來源節點暫停沒有太大意義,因為其資料已在記憶體中。

既不是來源也不是接收節點的節點仍應轉發背壓訊號。例如,當在投影節點上呼叫 PauseProducing 時,它應在其輸入上呼叫 PauseProducing。如果節點有多個輸入,則應將訊號轉發到每個輸入。

範例#

  • write 節點在其 InputReceived 方法中,將批次新增至資料集寫入器的佇列。如果資料集寫入器已滿,它將傳回一個未完成的 future,該 future 將在有更多空間時完成。write 節點然後在其輸入上呼叫 PauseProducing。然後,它將接續新增到 future,這將在其輸入上呼叫 ResumeProducing

  • scan 節點使用 AsyncTaskScheduler 來追蹤它排程的所有任務。此排程器受到節流,以限制 scan 節點允許執行的並行 I/O 量。當呼叫 PauseProducing 時,節點將暫停排程器。這表示節流閥後面的任何排隊任務都不會提交。但是,任何正在進行的 I/O 都將繼續(背壓無法立即生效)。當呼叫 ResumeProducing 時,scan 節點將取消暫停排程器。

ExecNode::StopProducing()#

當計畫需要提早結束時,會呼叫 StopProducing。這可能是因為使用者取消了計畫,也可能是因為發生了錯誤。大多數節點不需要在此處執行任何操作。沒有期望或要求節點發送它擁有的任何剩餘資料。任何排程任務的節點(例如,來源節點)都應停止產生新資料。

除了計畫範圍的取消之外,如果節點已決定它已接收到它需要的所有資料,則它可以在其輸入上呼叫此方法。但是,由於並行性,節點在停止其輸入後,可能仍會收到一些對 InputReceived 的呼叫。

如果使用了任何外部資源,則應在此呼叫中完成清理。

範例#

  • asofjoin 節點有一個專用的處理執行緒,該執行緒使用佇列與主要的 Acero 執行緒進行通訊。當呼叫 StopProducing 時,節點會將毒丸插入佇列。這會告知處理執行緒立即停止。一旦處理執行緒停止,它會將其外部任務(如下所述)標記為已完成,這允許計畫完成。

  • fetch 節點在 InputReceived 中可能會決定它已擁有所有需要的資料。然後,它可以在其輸入上呼叫 StopProducing

初始化/建構/解構#

簡單的初始化邏輯(不會產生錯誤)可以在建構函式中完成。如果初始化邏輯可能會傳回無效的狀態,則可以在 exec 節點的 factory 方法或 Init 方法中完成。對於簡單的驗證,factory 方法是首選。Init 方法是首選,如果初始化可能會執行昂貴的分配或其他資源消耗。Init 將始終在呼叫 StartProducing 之前呼叫。初始化也可以在 StartProducing 中完成,但請記住,其他節點可能已在那時啟動。

此外,還有一個 Validate 方法可以被覆寫以提供自訂驗證。此方法通常在 Init 之前但在新增所有輸入和輸出之後呼叫。

最終化今天在解構函式中發生。今天有一些例子可能很慢。例如,在寫入節點中,如果計畫期間發生錯誤,那麼我們可能會在此處關閉一些打開的檔案。如果應該有顯著的最終化是異步的或可能觸發錯誤,那麼我們可以為 ExecNode 生命周期引入 Finalize 方法。尚未完成只是因為還沒有需要。

摘要#

ExecNode 生命周期#

方法名稱

在以下情況下呼叫…

節點在以下情況下呼叫…

StartProducing

計畫正在開始

不適用

InputReceived

從輸入接收到資料

將資料發送到輸出

InputFinished

輸入知道有多少批次

節點可以告訴其輸出有多少批次

StopProducing

計畫中止或輸出有足夠的資料

節點擁有它需要的所有資料

擴展 Acero#

Acero 實例化一個單例 ExecFactoryRegistry,它在名稱和 exec 節點 factory(從選項建立 ExecNode 的方法)之間建立映射。要建立新的 ExecNode,您可以向此登錄表註冊節點,您的節點現在即可供 Acero 使用。如果您希望能夠將此節點與 Substrait 計畫一起使用,您還需要配置 Substrait 登錄表,以便它知道如何將 Substrait 映射到您的自訂節點。

這表示您可以建立新的節點並將其新增到 Acero,而無需從原始碼重新編譯 Acero。

排程和並行性#

資料引擎可以透過多種方式利用多個運算資源(例如,多個核心)。在我們深入探討 Acero 排程的細節之前,我們將涵蓋一些高階主題。

計畫的並行執行#

使用者可能希望並行執行多個計畫,他們可以隨時這樣做。但是,Acero 沒有跨計畫排程的概念。每個計畫都會嘗試最大化其運算資源的使用率,並且可能會存在 CPU、記憶體和磁碟資源的爭用。如果計畫使用預設的 CPU 和 I/O 執行緒池,則這將在一定程度上得到緩解,因為它們將共用相同的執行緒池。

本機分散式計畫#

解決多執行緒的常見方法是將輸入分割成多個分割區,然後為每個分割區建立一個計畫,然後以某種方式合併這些計畫的結果。例如,假設您有 20 個檔案和 10 個核心,並且您想要讀取和排序所有資料。您可以為每 2 個檔案建立一個計畫,以讀取和排序這些檔案。然後,您可以建立一個額外的計畫,該計畫從這 10 個子計畫接收輸入,並以排序方式合併這 10 個輸入串流。

這種方法很受歡迎,因為它是查詢在多個伺服器之間分散的方式,因此它被廣泛支援且廣為人知。Acero 今天沒有這樣做,但沒有理由阻止它。將 shuffle 和 partition 節點新增到 Acero 應該是高度優先事項,並且將使 Acero 能夠被分散式系統使用。一旦完成,如果需要,應該可以進行本機 shuffle(本機是指在單一系統上的多個 exec 計畫實例之間交換)。

../../_images/dist_plan.svg

即使計畫本身是串列執行的,分散式計畫也可以提供並行性#

管線並行性#

Acero 嘗試使用管線並行性來最大化並行性。當每個資料批次從來源到達時,我們會立即建立一個任務並開始處理它。這表示我們可能會在批次 X-1 的處理完成之前開始處理批次 X。這非常靈活且功能強大。但是,這也表示正確實作 ExecNode 很困難。

例如,ExecNode 的 InputReceived 方法應該是可重入的。換句話說,應該預期會在先前的 InputReceived 呼叫完成之前呼叫 InputReceived。這表示具有任何種類可變狀態的節點將需要互斥鎖或類似機制來保護該狀態免於競爭條件。這也表示任務很容易變得無序,並且節點不應期望其輸入有任何特定的排序(稍後會詳細介紹)。

../../_images/pipeline.svg

在具有 3 個 CPU 執行緒和 2 個 I/O 執行緒的系統上進行管線並行性的範例#

非同步性#

某些操作需要很長時間,並且可能不需要 CPU。從檔案系統讀取資料就是一個範例。如果每個核心只有一個執行緒,那麼在等待這些操作完成時,時間將被浪費。解決此問題有兩種常見的解決方案。同步解決方案通常是建立比核心更多的執行緒,並期望它們中的一些會被封鎖,這沒關係。這種方法往往更簡單,但可能會導致過多的執行緒爭用,並且需要微調。

另一種解決方案是使緩慢的操作變成非同步的。當緩慢的操作開始時,呼叫者會放棄執行緒,並允許其他任務在此期間執行。一旦緩慢的操作完成,就會建立一個新任務來取得結果並繼續處理。這有助於最大程度地減少執行緒爭用,但往往更難實作。

由於缺乏標準的 C++ 非同步 API,Acero 使用了這兩種方法的組合。Acero 有兩個執行緒池。第一個是 CPU 執行緒池。此執行緒池每個核心有一個執行緒。此執行緒池中的任務永遠不應封鎖(超出同步的次要延遲),並且通常應盡可能積極地使用 CPU。I/O 執行緒池中的執行緒預計大部分時間都處於閒置狀態。它們應避免執行任何 CPU 密集型工作。它們的工作基本上是等待資料可用,並在 CPU 執行緒池上排程後續任務。

../../_images/async.svg

Arrow 透過結合 CPU 和 I/O 執行緒池來實現非同步執行#

注意

Acero 中的大多數節點都不需要擔心非同步性。它們是完全同步的,並且不產生任務。

每個管線的任務(有時甚至更多)#

引擎可以選擇為節點的每次執行建立執行緒任務。但是,如果沒有仔細的排程,這會導致快取局部性問題。例如,假設我們有一個由三個 exec 節點組成的基本計畫:scan、project,然後是 filter(這是一個非常常見的用例)。現在假設有 100 個批次。在每個運算元的任務模型中,我們將擁有諸如「掃描批次 5」、「投影批次 5」和「篩選批次 5」之類的任務。這些任務中的每一個都可能存取相同的資料。例如,也許 projectfilter 節點需要讀取相同的欄。一個最初在 scan 節點的解碼階段中建立的欄。為了最大化快取利用率,我們需要仔細排程我們的任務,以確保所有這三個任務都連續執行並分配到相同的 CPU 核心。

為了避免這個問題,我們設計了在任務結束之前盡可能多地執行節點的任務。節點的此序列通常稱為「管線」,而結束管線(從而結束任務)的節點通常稱為「管線中斷器」。有些節點甚至可能介於兩者之間。例如,在雜湊聯結節點中,當我們在探測端接收到批次,並且雜湊表已建立時,我們不需要結束任務,而是繼續執行。這表示任務有時可能會在聯結節點結束,有時可能會繼續通過聯結節點。

../../_images/pipeline_task.svg

計畫中管線的邏輯視圖和兩個任務,顯示管線邊界在計畫期間可能會有所不同#

執行緒池和排程器#

CPU 和 I/O 執行緒池是核心 Arrow-C++ 程式庫的一部分。它們包含任務的 FIFO 佇列,並將在執行緒可用時執行它們。對於 Acero,我們需要額外的功能。為此,我們使用 AsyncTaskScheduler。在最簡單的運作模式下,排程器只是將任務提交到基礎執行緒池。但是,它也能够建立子排程器,這些子排程器可以應用節流、優先順序和任務追蹤

  • 節流排程器將成本與每個任務關聯。僅當有空間時,任務才會提交到基礎排程器。如果沒有,則任務會放入佇列中。寫入節點使用大小為 1 的節流閥,以避免重新進入呼叫資料集寫入器(資料集寫入器執行其自己的內部排程)。節流排程器可以手動暫停和取消暫停。暫停時,所有任務都會排隊,即使有空間,排隊的任務也不會提交。這在來源節點中可用於實作 PauseProducing 和 ResumeProducing。

  • 優先順序可以應用於節流排程器,以控制排隊任務的提交順序。如果有空間,則立即提交任務(無論優先順序如何)。但是,如果節流閥已滿,則任務會排隊並受優先順序影響。掃描節點節流控制它產生的讀取請求數量,並在可能的情況下優先依序讀取資料集。

  • 任務群組可用於追蹤任務集合,並在所有任務完成時執行最終化任務。這對於 fork-join 風格的問題很有用。寫入節點使用任務群組在檔案的所有未完成寫入任務完成後關閉檔案。

有很多關於在執行引擎中優先處理任務的不同方法的研究和範例。Acero 尚未解決此問題。讓我們來看一些常見情況

  • 引擎通常會優先從聯結節點的建置端讀取,然後再從探測端讀取。透過應用背壓,這在 Acero 中可以更輕鬆地處理。

  • 另一個常見的用例是控制記憶體累積。引擎將優先處理更接近接收節點的任務,以減輕記憶體壓力。但是,Acero 目前假設溢出將新增到管線中斷器中,並且計畫中的記憶體使用量將或多或少是靜態的(每個核心),並且遠低於硬體限制。如果需要在具有許多運算資源和有限記憶體(例如 GPU)的環境中使用 Acero,則這種情況可能會改變

  • 引擎通常會使用工作竊取演算法,優先處理在同一個核心上執行的任務,以提升快取局部性。然而,由於 Acero 使用的是 task-per-pipeline 模型,排程器可以回收的快取平行處理機會損失並不多。任務只會在沒有更多資料可以處理時才會結束。

雖然 Acero 目前沒有太多的優先順序機制,但我們確實擁有在需要時應用它的工具。

注意

除了 AsyncTaskScheduler 之外,還有另一個名為 TaskScheduler 的類別。這個類別比 AsyncTaskScheduler 更早出現,旨在為高效能的同步 fork-join 工作負載提供任務追蹤。如果這個特殊用途符合您的需求,您可以考慮使用它。將它與 AsyncTaskScheduler 進行效能分析並比較兩者有多接近會很有趣。

節點內平行處理#

某些節點可能會利用任務內的平行處理。例如,在掃描節點中,我們可以平行解碼多個資料行。在雜湊聯結節點中,有時會利用平行處理來處理複雜的任務,例如建立雜湊表。這種平行處理方式比較不常見,但並非不鼓勵。不過,應先進行效能分析,以確保這種額外的平行處理對您的工作負載有所幫助。

所有工作都在任務中完成#

Acero 中的所有工作都作為任務的一部分執行。當一個計畫開始時,AsyncTaskScheduler 會被建立並賦予一個初始任務。這個初始任務會呼叫節點上的 StartProducing。任務可能會排程額外的任務。例如,來源節點通常會在呼叫 StartProducing 期間排程任務。Pipeline breakers 通常會在它們累積了所有需要的資料時排程任務。一旦計畫中的所有任務都完成,則該計畫被視為完成。

有些節點使用外部執行緒。這些執行緒必須使用 BeginExternalTask 方法註冊為外部任務。例如,asof join 節點使用專用的處理執行緒來實現序列執行。這個專用執行緒被註冊為外部任務。應盡可能避免外部任務,因為它們需要仔細處理,以避免在錯誤情況下發生死鎖。

有序執行#

有些節點會建立其輸出批次的順序,或者它們需要能夠按順序處理批次。Acero 使用 ExecBatch 上的 batch_index 屬性來處理排序。如果節點具有確定性的輸出順序,則它應該在其發出的批次上應用批次索引。例如,OrderByNode 會對批次應用新的排序(無論輸入排序如何)。掃描節點能夠將隱含的排序附加到批次,這反映了掃描檔案中列的順序。

如果節點需要按順序處理資料,則會比較複雜。由於執行的平行性質,我們無法保證批次會按順序到達節點。然而,它們通常可以預期是「大致有序的」。因此,我們可以將批次插入到排序佇列中。排序佇列被賦予一個回呼,該回呼保證以串行方式按順序在批次上執行。例如,fetch 節點使用排序佇列。回呼檢查我們是否需要包含批次的部分或全部,然後在需要時切分批次。

即使節點不關心順序,它也應該盡可能嘗試維護批次索引。project 和 filter 節點不關心順序,但它們確保輸出批次保持與輸入批次相同的索引。filter 節點甚至會在需要時發出空批次,以便它可以維護批次順序而沒有間隙。

../../_images/ordered.svg

有序執行的範例#

分割執行#

如果資料列以某種方式分組在一起,則串流會被分割(或有時稱為分段)。目前,沒有分割的正式概念。然而,一個概念正在開始發展(例如,分段聚合),我們最終也可能會在 Acero 中引入更正式的分割概念。

溢出#

溢出尚未在 Acero 中實作。

分散式執行#

當引擎在分散式環境中使用時,某些 exec 節點非常有用。術語可能會有所不同,因此我們將使用 Substrait 術語。exchange 節點將資料傳送到不同的 worker。通常,這是一個分割的 exchange,因此 Acero 預期會分割每個批次,並將分割區分散到 N 個不同的 worker。在另一端,我們有 capture 節點。這個節點接收來自不同 worker 的資料。

這些節點今天在 Acero 中不存在。然而,它們將在範圍內,我們希望有一天能擁有這樣的節點。

效能分析與追蹤#

Acero 的追蹤目前只實作了一半,並且在效能分析工具中存在重大缺陷。然而,已經有一些使用 open telemetry 進行追蹤的努力,並且大多數必要的組件都已到位。目前主要缺乏的是對追蹤結果的一些有效視覺化呈現。

為了使用今天存在的追蹤功能,您需要使用 ARROW_WITH_OPENTELEMETRY=ON 建置 Arrow。然後您需要設定環境變數 ARROW_TRACING_BACKEND=otlp_http。這將配置 open telemetry 將追蹤結果 (以 OTLP 形式) 匯出到 HTTP 端點 https://127.0.0.1:4318/v1/traces。您需要配置一個 open telemetry 收集器來收集該端點上的結果,並且您需要配置某種類型的追蹤檢視器,例如 Jaeger:https://jaeger.dev.org.tw/docs/1.21/opentelemetry/

基準測試#

Acero 最完整的巨觀基準測試由 voltrondata-labs/arrowbench 提供。這些基準測試包括一組 TPC-H 基準測試,從 R-dplyr 整合執行,這些基準測試在每次 Arrow 提交時執行,並報告給 Conbench,網址為 https://conbench.ursa.dev/

除了這些 TPC-H 基準測試之外,還有許多針對各種節點(hash-join、asof-join 等)的微基準測試。最後,計算函數本身應大多具有微基準測試。有關微基準測試的更多資訊,您可以參考 https://arrow.dev.org.tw/docs/developers/benchmarks.html

任何新功能都應包含微基準測試,以避免效能衰退。

繫結#

公開 API#

Acero 的公開 API 由 Declaration 和各種 DeclarationToXyz 方法組成。此外,每個節點的選項類別也是公開 API 的一部分。然而,節點是可擴展的,因此這個 API 也是可擴展的。

R (dplyr)#

Dplyr 是一個 R 函式庫,用於以程式設計方式建置查詢。arrow-r 套件具有 dplyr 繫結,可調整 dplyr API 以建立 Acero 執行計畫。此外,還有一個正在開發中的 dplyr-substrait 後端,最終可能會取代 Acero 感知的繫結。

Python#

pyarrow 函式庫以兩種不同的方式繫結到 Acero。首先,pyarrow.acero 中有一個直接繫結,它直接繫結到公開 API。其次,有許多計算實用程式,例如 pyarrow.Table.group_by,它使用 Acero,儘管這對使用者來說是不可見的。

Java#

Java 實作公開了來自 Arrow datasets 的一些功能。這些功能隱含地使用 Acero。目前 Java 實作中沒有 Acero 或 Substrait 的直接繫結。

設計哲學#

引擎獨立計算#

如果節點需要複雜的計算,那麼它應該將該工作封裝在不依賴任何特定引擎設計的抽象中。例如,hash join 節點使用諸如 row encoder、hash table 和 exec batch builder 之類的實用程式。其他地方共享排序佇列和 row segmenter 的實作。節點本身應保持最小化,並且僅從 Acero 映射到抽象。

這有助於將設計與 Acero 的設計細節解耦,並使它們更能適應引擎的變更。它還有助於將這些抽象提升為它們自己的功能。無論是用於其他引擎,還是用於 pyarrow 的潛在新增功能作為計算實用程式。

建立任務而非執行緒#

如果您需要在平行環境中執行某些操作,那麼您應該使用執行緒任務,而不是專用執行緒。

  • 這可以減少執行緒計數(減少執行緒爭用和上下文切換)

  • 這可以防止死鎖(任務在發生故障時會自動取消)

  • 這簡化了效能分析(任務可以輕鬆測量,更容易知道所有工作在哪裡)

  • 這使得在沒有執行緒的情況下執行成為可能(有時使用者正在進行自己的執行緒處理,有時我們需要在執行緒受限的環境中執行,例如 emscripten)

注意:我們目前並非總是遵循這個建議。asof join 節點中有一個專用的處理執行緒。專用執行緒對於實驗性使用來說是「可以接受的」,但我們希望遷移到不再使用它們。

不要阻塞 CPU 執行緒#

如果您需要執行可能長時間運行的活動,而該活動沒有積極使用 CPU 資源(例如,從磁碟讀取、網路 I/O、等待使用自己的執行緒的外部函式庫),那麼您應該使用非同步實用程式,以確保您不會阻塞 CPU 執行緒。

不要重新發明輪子#

每個節點都不應該是實用程式的獨立島嶼。在可能的情況下,計算應推送到計算函數或常見的共享實用程式中。這是像這樣龐大的專案能夠維持下去的唯一方法。

避免查詢最佳化#

編寫高效的 Acero 計畫可能具有挑戰性。例如,篩選表達式和資料行選擇應向下推送到掃描節點中,以便不會從磁碟讀取資料。應簡化表達式並分解出常見的子表達式。hash join 節點的建置端應該是兩個輸入中較小的一個。

然而,找出這些問題是查詢規劃器或查詢最佳化器的挑戰。建立查詢最佳化器是一項具有挑戰性的任務,超出了 Acero 的範圍。隨著 Substrait 的採用,我們希望最終會出現解決這些問題的實用程式。因此,我們通常避免在 Acero 內進行任何類型的查詢最佳化。Acero 應盡可能字面地解釋宣告。這有助於減少維護並避免意外。

我們也意識到這並非總是可能的。例如,hash join 節點目前會偵測是否存在 hash join 運算子的鏈,如果存在,它會在運算子之間配置 bloom filter。從技術上講,這是一個可以留給查詢最佳化器的任務。然而,這種行為相當特定於 Acero 且相當小眾,因此不太可能很快將其引入到最佳化器中。

效能指南#

批次大小#

也許最常討論的效能標準是批次大小。Acero 最初是根據研究設計的,以遵循 morsel-batch 模型。任務是根據大量的資料列批次(一個 morsel)建立的。目標是讓 morsel 足够大,以證明任務的開銷是合理的。在任務中,資料會進一步細分為批次。每個批次都應該足够小,以舒適地放入 CPU 快取(通常是 L2 快取)。

這設定了兩個迴圈。外迴圈是平行的,而內迴圈不是

for morsel in dataset: # parallel
  for batch in morsel:
    run_pipeline(batch)

這種執行風格的優點是,存取同一資料行的後續節點(或 exec 節點內的後續運算)可能會受益於快取。這對於需要隨機存取資料的函數來說也是至關重要的。它最大限度地提高了平行處理能力,同時最大限度地減少了從主記憶體到 CPU 快取的資料傳輸。

../../_images/microbatch.svg

如果需要多次傳遞資料(或隨機存取),並且批次遠大於快取,則效能會受到影響。將任務分解為較小的批次有助於提高任務局部性。#

morsel/batch 模型反映在 Acero 的幾個地方

  • 在大多數來源節點中,我們將嘗試獲取 1Mi 資料列的批次。這通常是可配置的。

  • 在來源節點中,我們然後迭代並切分出 32Ki 資料列的批次。目前這不可配置。

  • hash join 節點目前要求批次包含 32Ki 資料列或更少,因為它在某些地方使用 16 位元有號整數作為資料列索引。

然而,這個指南是有爭議的。效能分析表明,我們從移動到更小的批次大小中沒有獲得任何實際的好處。我們獲得的任何優勢似乎都因每批次的開銷而損失。大多數這種開銷似乎是由於各種每批次分配造成的。此外,根據您的硬體,CPU 快取 <-> RAM 是否始終是瓶頸尚不清楚。線性存取、預取和高 CPU <-> RAM 頻寬的組合可以減輕快取未命中的懲罰。

因此,本節包含在本指南中以提供歷史背景,但不應被視為具有約束力。

進行中與已棄用的工作#

以下工作正在進行中。在此處描述它們是為了解釋程式碼庫中的某些重複,以及解釋即將消失的類型。

Scanner v2#

掃描器目前是 datasets 模組中的一個節點,在工廠註冊表中註冊為「scan」。這個節點是在 Acero 之前編寫的,並且廣泛使用 AsyncGenerator 來平行掃描多個檔案。不幸的是,AsyncGenerator 的使用使得掃描難以進行效能分析、難以偵錯且無法取消。一個新的掃描節點正在進行中。它目前以名稱「scan2」註冊。新的掃描節點使用 AsyncTaskScheduler 而不是 AsyncGenerator,並且應提供額外功能,例如跳過資料列和處理巢狀資料行投影(對於支援它的格式)。

OrderBySink 和 SelectKSink#

這兩個 exec 節點提供了自訂的 sink 實作。它們是在將有序執行添加到 Acero 之前編寫的,並且是產生有序輸出的唯一方法。然而,它們必須放置在計畫的末尾,並且它們是自訂 sink 節點的事實使得它們難以用 Declaration 描述。OrderByNode 和 FetchNode 取代了它們。這些節點目前被保留,直到現有的繫結不再使用它們。

向上游變更#

Acero 的設計使其可以在不重新編譯的情況下進行擴展。您可以輕鬆地新增計算函數和 exec 節點,而無需建立分支或編譯 Acero。然而,當您開發出通常有用的新功能時,我們希望您能撥出時間將您的變更向上游推送。

即使我們歡迎這些變更,我們也必須承認這個過程是有成本的。向上游推送程式碼需要新模組的行為正確,但這通常是比較容易審查的部分。更重要的是,向上游推送程式碼是一個將維護負擔從您自己轉移到更廣泛的 Arrow C++ 專案維護者的過程。這需要維護者深入了解程式碼,需要程式碼與專案的風格一致,並且需要程式碼通過單元測試進行充分測試,以幫助進行迴歸。

因此,我們強烈建議採取以下步驟

  • 當您剛開始時,您應該向郵件列表發送訊息,宣布您的意圖和設計。這將幫助您確定對該功能是否有更廣泛的興趣,並且其他人可能會在流程的早期階段提出想法或建議。

    • 如果對該功能沒有太多興趣,那麼請記住,最終可能難以將變更向上游推送。團隊的維護能力是有限的,我們嘗試優先處理需求高的功能。

  • 我們建議在您自己的分支上開發和測試變更,直到您對事情正常運作相當有信心為止。如果變更很大,那麼您也可以考慮如何將變更分解為更小的部分。當您執行此操作時,您可以同時共享較大的 PR(作為草稿 PR 或本地分支上的分支)和較小的 PR。這樣我們就可以看到較小 PR 的上下文。但是,如果您確實分解了事情,那麼較小的 PR 仍然應該理想地獨立存在。

  • 任何 PR 都需要具有以下內容

    • 轉換新功能的單元測試

    • 如果正在進行任何重要的計算工作,則需要微基準測試

    • 示範如何使用新功能的範例

    • API 參考和本指南的更新

    • 通過 CI(您可以在您的分支上啟用 GitHub Actions,這將允許大多數 CI 作業在您建立 PR 之前運行)