我們在 F5 使用 Apache Arrow 的旅程(第 2 部分):自適應 Schema 和排序以最佳化 Arrow 的使用
已發布 2023 年 6 月 26 日
作者 Laurent Quérel
在前一篇文章中,我們討論了在 OpenTelemetry 專案中使用 Apache Arrow 的情況。我們研究了各種技術,以最大化 Apache Arrow 的效率,目標是找到資料壓縮率和可查詢性之間的最佳平衡。壓縮結果不言而喻,改進幅度從比原始 OTLP 協定好 1.5 倍到 5 倍不等。在本文中,我們將深入探討三種技術,這些技術使我們能夠在目前版本的 OTel Arrow 協定中,同時提高 Apache Arrow 緩衝區的壓縮率和記憶體使用量。
我們將討論的第一種技術旨在最佳化 Schema 的記憶體使用量。您將會看到,收益可能相當可觀,在某些情況下可能會將記憶體使用量減半。第二節將更深入地探討可用於處理遞迴 Schema 定義的各種方法。最後,我們將強調 Schema 的設計,以及您可以在記錄層級應用的排序,在最大化 Apache Arrow 及其欄狀表示法的優勢方面,發揮著關鍵作用。
處理動態和未知的資料分佈
在某些情況下,為了涵蓋您打算以欄狀形式表示的所有可能情況,Arrow Schema 的完整定義最終可能會過於廣泛和複雜。然而,正如複雜 Schema 的情況經常發生的那樣,只有 Schema 的子集會實際用於特定的部署。同樣地,事先判斷一個或多個欄位的最佳字典編碼並非總是可能的。採用涵蓋所有情況的廣泛且非常通用的 Schema 通常會更耗費記憶體。這是因為,對於大多數實作而言,沒有值的欄位仍然會繼續消耗記憶體空間。同樣地,索引 uint64 的字典編碼欄位將佔用比相同欄位(基於 uint8 的字典編碼)多四倍的記憶體。
為了更具體地說明這一點,讓我們考慮一個位於生產環境輸出的 OTel 收集器,它接收由大量且動態的伺服器集產生的遙測資料流。不可避免地,此遙測資料流的內容會隨著時間在數量和性質上發生變化。在這種情況下預測最佳 Schema 具有挑戰性,並且事先知道通過此點的特定遙測資料屬性的分佈同樣困難。
為了最佳化此類情況,我們採用了一種我們稱為動態 Arrow Schema的中間方法,旨在根據觀察到的資料逐步調整 Schema。一般原則相對簡單。我們從一個通用 Schema 開始,該 Schema 定義了應表示內容的最大範圍。此 Schema 的某些欄位將被宣告為可選,而其他欄位將根據觀察到的分佈以多種可能的選項進行編碼。理論上,此原則可以應用於其他類型的轉換(例如,遞迴欄位建立),但我們將讓您的想像力探索其他選項。因此,如果您遇到某些欄位未被使用、某些聯合變體保持未使用狀態,和/或欄位的值分佈無法先驗確定的資料流,則可能值得投入時間來實作此模型。這可以提高壓縮率、記憶體使用量和處理速度方面的效率。
以下 Go Arrow Schema 定義提供了一個此類 Schema 的範例,其中包含一組註解。這些註解將由增強型 Record Builder 處理,該 Record Builder 配備了動態調整 Schema 的能力。此系統的結構如圖 1 所示。
var (
// Arrow schema for the OTLP Arrow Traces record (without attributes, links, and events).
TracesSchema = arrow.NewSchema([]arrow.Field{
// Nullabe:true means the field is optional, in this case of 16 bit unsigned integers
{Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
{Name: constants.Resource, Type: arrow.StructOf([]arrow.Field{
{Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
// --- Use dictionary with 8 bit integers initially ----
{Name: constants.SchemaUrl,Type: arrow.BinaryTypes.String,Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
{Name: constants.DroppedAttributesCount,Type: arrow.PrimitiveTypes.Uint32,Nullable: true},
}...), Nullable: true},
{Name: constants.Scope, Type: arrow.StructOf([]arrow.Field{
{Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Metadata: acommon.Metadata(acommon.DeltaEncoding), Nullable: true},
// --- Use dictionary with 8 bit integers initially ----
{Name: constants.Name, Type: arrow.BinaryTypes.String, Metadata: acommon.Metadata(acommon.Dictionary8), Nullable: true},
{Name: constants.Version, Type: arrow.BinaryTypes.String, Metadata: acommon.Metadata(acommon.Dictionary8), Nullable: true},
{Name: constants.DroppedAttributesCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
}...), Nullable: true},
{Name: constants.SchemaUrl, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
{Name: constants.StartTimeUnixNano, Type: arrow.FixedWidthTypes.Timestamp_ns},
{Name: constants.DurationTimeUnixNano, Type: arrow.FixedWidthTypes.Duration_ms, Metadata: schema.Metadata(schema.Dictionary8)},
{Name: constants.TraceId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 16}},
{Name: constants.SpanId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 8}},
{Name: constants.TraceState, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
{Name: constants.ParentSpanId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 8}, Nullable: true},
{Name: constants.Name, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8)},
{Name: constants.KIND, Type: arrow.PrimitiveTypes.Int32, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
{Name: constants.DroppedAttributesCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
{Name: constants.DroppedEventsCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
{Name: constants.DroppedLinksCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
{Name: constants.Status, Type: arrow.StructOf([]arrow.Field{
{Name: constants.StatusCode, Type: arrow.PrimitiveTypes.Int32, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
{Name: constants.StatusMessage, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
}...), Nullable: true},
}, nil)
)
在此範例中,Arrow 欄位層級中繼資料用於指定欄位何時為可選(Nullable:true)或指定適用於特定欄位的最小字典編碼(中繼資料 Dictionary8/16/…)。現在,讓我們想像一個在簡單場景中使用此 Schema 的情況,其中實際上只使用了少數幾個欄位,並且大多數字典編碼欄位的基數都很低(即低於 2^8)。理想情況下,我們希望系統能夠動態建構以下簡化的 Schema,從本質上講,它是原始 Schema 的嚴格子集。
var (
// Simplified schema definition generated by the Arrow Record encoder based on
// the data observed.
TracesSchema = arrow.NewSchema([]arrow.Field{
{Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
{Name: constants.StartTimeUnixNano, Type: arrow.FixedWidthTypes.Timestamp_ns},
{Name: constants.TraceId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 16}},
{Name: constants.SpanId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 8}},
{Name: constants.Name, Type: &arrow.DictionaryType {
IndexType: arrow.PrimitiveTypes.Uint8,
ValueType: arrow.BinaryTypes.String}},
{Name: constants.KIND, Type: &arrow.DictionaryType {
IndexType: arrow.PrimitiveTypes.Uint8,
ValueType: arrow.PrimitiveTypes.Int32,
}, Nullable: true},
}, nil)
)
此外,我們希望系統能夠在未來的批次中遇到新欄位或現有欄位的基數超過目前字典定義大小的情況下,自動調整上述 Schema。在極端情況下,如果特定欄位的基數超過某個閾值,我們希望系統自動還原為非字典表示法(字典溢位機制)。這正是我們將在本節的其餘部分中詳細闡述的內容。
圖 1 描繪了用於實作此方法的不同組件和事件的概觀。
整體自適應 Arrow Schema 組件會取得分段成批次的資料流,並產生一個或多個 Arrow 記錄流(每個流一個 Schema)。這些記錄中的每一筆都使用 Arrow Schema 定義,該 Schema 基於帶註解的 Arrow Schema 和傳入資料中觀察到的欄位形狀。
更具體地說,自適應 Arrow Schema 組件的流程包含四個主要階段
初始化階段
在初始化階段,Arrow Record Encoder 會讀取帶註解的 Arrow Schema(即參考 Schema),並產生一組轉換。當這些轉換應用於參考 Schema 時,它們會產生第一個最小的 Arrow Schema,該 Schema 符合這些註解所描述的約束。在此初始迭代中,所有可選欄位都會被消除,並且所有字典編碼欄位都會被配置為使用註解定義的最小編碼(在先前的範例中僅為 Dictionary8
)。這些轉換形成一個樹狀結構,反映了參考 Schema 的結構。
饋送階段
初始化之後是饋送階段。在這裡,Arrow Record Encoder 會掃描批次,並嘗試將所有欄位儲存到 Arrow Record Builder 中,該 Record Builder 由先前步驟中建立的 Schema 定義。如果資料中存在欄位,但未包含在 Schema 中,則編碼器將觸發 遺失欄位
事件。此流程會持續進行,直到目前批次完全處理完畢。在 Arrow Record Builder 中的所有字典編碼欄位上,都會執行額外的內部檢查,以確保沒有字典溢位(即,唯一條目多於索引允許的基數)。如果偵測到這種情況,則會產生 字典溢位
事件。因此,到最後,所有未知的欄位和字典溢位都將被偵測到,或者,如果資料與 Schema 完全對齊,則不會出現任何差異。
修正階段
如果至少產生一個事件,則將啟動修正階段以修正 Schema。此可選階段會考量先前階段中產生的所有事件,並相應地調整轉換樹狀結構,以與觀察到的資料對齊。遺失欄位
事件將移除對應欄位的 NoField 轉換。字典溢位
事件將修改字典轉換以反映事件(例如,將索引類型從 uint8 變更為 uint16,或者如果已達到最大索引大小,則轉換將移除字典編碼並還原為原始的非字典編碼類型)。隨後,更新後的轉換樹狀結構會用於建立新的 Schema 和新的 Arrow Record Builder。然後,此 Record Builder 會用於重播先前的饋送階段,並處理未正確處理的批次。
路由階段
一旦 Record Builder 已正確饋送,就會建立 Arrow 記錄,並且系統會轉換到路由階段。路由器組件會計算記錄的 Schema 簽章,並使用此簽章將記錄路由到與簽章相容的現有 Arrow 流,或者如果沒有相符項,則啟動新的流。
這種四階段流程應逐步調整和穩定 Schema,使其達到針對特定資料流最佳化的結構和定義。未使用的欄位將永遠不會不必要地消耗記憶體。字典編碼欄位將根據觀察到的資料基數,使用最佳索引大小進行定義,並且基數超過特定閾值(由組態定義)的欄位將自動還原為其非字典編碼版本。
為了有效地執行此方法,您必須確保接收端具有足夠的彈性。至關重要的是,即使 Schema 中缺少某些欄位,或者使用了各種字典索引組態,您的下游管線仍然可以正常運作。雖然如果不在接收時實作額外的轉換,這可能並非總是可行的,但在某些情況下,這證明是值得的。
以下結果突顯了透過應用各種最佳化技術,在記憶體使用量方面實現的顯著減少。這些結果是使用類似於先前呈現的 Schema 收集的。顯著的記憶體效率突顯了此方法的有效性。

轉換樹狀結構的概念使通用方法能夠根據從資料中獲得的知識,執行各種 Schema 最佳化。此架構非常靈活;目前的實作允許移除未使用的欄位、應用最特定的字典編碼,以及最佳化聯合類型變體。未來,有可能引入可以表示為初始 Schema 轉換的額外最佳化。此方法的實作可在此處取得。
處理遞迴 Schema 定義
Apache Arrow 不支援遞迴 Schema 定義,這表示無法直接表示具有可變深度的資料結構。圖 3 例示了此類遞迴定義,其中屬性的值可以是簡單資料類型、值清單或值對應。此定義的深度無法預先決定。
可以採用幾種策略來規避此限制。從技術上講,我們提出的動態 Schema 概念可以擴展為動態更新 Schema,以包含任何遺失的遞迴層級。但是,對於此使用案例,此方法很複雜,並且具有無法保證 Schema 最大大小的顯著缺點。這種約束的缺乏可能會造成安全性問題;因此,此方法未詳細說明。
第二種方法包括透過使用支援遞迴 Schema 定義的序列化格式來中斷遞迴。然後,可以將此序列化的結果作為二進位類型欄位整合到 Arrow 記錄中,從而在特定層級有效地停止遞迴。為了充分利用欄狀表示法的優勢,至關重要的是在資料結構中盡可能深入地應用此特別序列化。在 OpenTelemetry 的上下文中,這是在屬性層級執行的 – 更具體地說,是在屬性的第二層級。
各種序列化格式(例如 protobuf 或 CBOR)可用於編碼遞迴資料。如果沒有特別處理,現有的 Arrow 查詢引擎可能不容易查詢這些二進位欄位。因此,仔細確定何時以及在何處應用此技術至關重要。雖然我不知道 Arrow 系統內有任何嘗試解決此限制的方法,但這似乎並非無法克服,並且將構成有價值的擴展。這將有助於降低將 Arrow 與其他依賴此類遞迴定義的系統整合的複雜性。
排序的重要性
在我們之前的文章中,我們探索了多種策略來表示階層式資料模型,包括基於 struct/list/map/union 的巢狀結構、反正規化和平坦化表示法,以及多記錄方法。每種方法都有其獨特的優點和缺點。但是,在最後一節中,我們將更深入地探討多記錄方法,特別關注其提供多功能排序選項的能力,以及這些選項如何有助於提高壓縮率。
在 OTel Arrow 協定中,我們利用多記錄方法來表示指標、記錄和追蹤。以下實體關係圖提供了各種記錄 Schema 的簡化版本,並說明了它們之間的關係,特別是用於表示量表和總和的關係。OpenTelemetry 中使用的 Arrow 資料模型的完整描述可在此處存取此處。
這些 Arrow 記錄,也稱為表格,形成一個以 METRICS
作為主要進入點的階層結構。每個表格都可以根據一個或多個欄位獨立排序。這種排序策略有助於對重複資料進行分組,從而提高壓縮率。

主要 METRICS
表格與次要 RESOURCE_ATTRS
、SCOPE_ATTRS
和 NUMBER_DATA_POINTS
表格之間的關係是透過主要表格中的唯一 id
和每個次要表格中的 parent_id
欄位建立的。此 {id,parent_id} 配對表示壓縮後應盡可能減少的額外負荷。
為了實現這一點,不同表格的排序流程遵循階層結構,從主要表格到葉節點。主要表格已排序(按一個或多個欄位),然後為每一列分配一個遞增的 id。此數字 id 使用 delta 編碼儲存,delta 編碼是在 Arrow 之上實作的。
直接連接到主要表格的次要表格使用相同的原則進行排序,但 parent_id
欄位始終用作排序陳述式中的最後一個欄位。在排序陳述式中包含 parent_id
欄位可以使用 delta 編碼的變體。下圖總結了此方法的效率。

第二欄顯示了不同大小批次的 OTLP 批次的平均大小(壓縮前和壓縮後)。此欄作為後續兩欄的參考點。第三欄顯示了未應用任何排序的 OTel Arrow 協定的結果,而最後一欄顯示了啟用排序的 OTel Arrow 協定的結果。
在壓縮之前,兩種 OTel Arrow 組態的平均批次大小可預測地相似。但是,在壓縮之後,對每個個別表格進行排序對壓縮率的優勢立即顯現。在不排序的情況下,OTel Arrow 協定的壓縮率比參考值好 1.40 到 1.67 倍。啟用排序後,OTel Arrow 協定的效能比參考值高出 4.94 到 7.21 倍!
壓縮方面的收益顯然取決於您的資料以及資料批次中存在的資訊冗餘。根據我們的觀察,選擇良好的排序通常會將壓縮率提高 1.5 到 8 倍。
將複雜的 Schema 分解為多個更簡單的 Schema 以增強排序能力,再加上有效編碼表示關係的識別碼的目標方法,成為提高整體資料壓縮的有效策略。此方法還消除了複雜的 Arrow 資料類型,例如清單、對應和聯合。因此,它不僅提高了資料可查詢性,而且還簡化了資料可查詢性。這種簡化證明對現有的查詢引擎有利,這些引擎可能難以對複雜的 Schema 進行操作。
結論與後續步驟
本文總結了我們關於 Apache Arrow 的兩部分系列文章,在其中我們探索了各種策略,以在特定情況下最大化 Apache Arrow 的實用性。本系列文章第二部分中介紹的自適應 Schema 架構為未來的最佳化可能性鋪平了道路。我們期待看到社群可以根據此貢獻新增什麼內容。
Apache Arrow 是一個傑出的專案,不斷受到蓬勃發展的生態系統的增強。但是,在我們的探索過程中,我們注意到某些差距或摩擦點,如果解決這些問題,可能會顯著豐富整體體驗。
- 在某些情況下,設計有效的 Arrow Schema 可能被證明是一項具有挑戰性的任務。收集記錄層級的統計資料的能力可以促進此設計階段(每個欄位的資料分佈、字典統計資料、壓縮前後的 Arrow 陣列大小等等)。這些統計資料也有助於識別最有效的欄位,以作為記錄排序的基礎。
- 對遞迴 Schema 的原生支援也將透過簡化在複雜情況下 Arrow 的使用來增加採用率。雖然我不知道 Arrow 系統內有任何嘗試解決此限制的方法,但這似乎並非無法克服,並且將構成有價值的擴展。這將有助於降低將 Arrow 與其他依賴此類遞迴定義的系統整合的複雜性。
- 協調對資料類型以及 IPC 流功能的支援也將是一項主要優勢。主要的用戶端程式庫支援巢狀和階層式 Schema,但由於生態系統其餘部分缺乏完全支援,因此它們的使用受到限制。例如,查詢引擎或 Parquet 橋接器無法很好地支援清單和/或聯合類型。此外,IPC 流中的進階字典支援在不同的實作中不一致(即,並非所有實作都支援 delta 字典和替換字典)。
- 透過原生整合本文中介紹的動態 Schema 概念,可以改善對複雜 Schema 的支援進行最佳化,包括記憶體消耗和壓縮率。
- 偵測字典溢位(索引層級)並非易於即時測試的事情。可以改進 API,以便在發生插入時立即指示此溢位。
我們在結合 OpenTelemetry 使用 Apache Arrow 的努力已產生令人鼓舞的結果。雖然這需要在開發、探索和基準測試方面投入大量資源,但我們希望這些文章將有助於加速您使用 Apache Arrow 的旅程。展望未來,我們設想與 Apache Arrow 進行端對端整合,並計畫大幅擴展我們對 Arrow 生態系統的使用。此擴展涉及提供與 Parquet 的橋接器,以及與 DataFusion 等查詢引擎整合,目標是在收集器內處理遙測資料流。