Arrow Flight RPC#
Arrow Flight 是一個 RPC 框架,用於透過網路有效率地傳輸 Flight 資料。
另請參閱
- Flight 協定文件
Flight 協定的文件,包括如何在概念上使用 Flight。
- Flight API 文件
C++ API 文件,列出所有各種用戶端和伺服器類型。
- C++ 食譜
在 C++ 中使用 Arrow Flight 的食譜。
撰寫 Flight 服務#
伺服器是 arrow::flight::FlightServerBase
的子類別。若要實作個別的 RPC,請覆寫此類別上的 RPC 方法。
class MyFlightServer : public FlightServerBase {
Status ListFlights(const ServerCallContext& context, const Criteria* criteria,
std::unique_ptr<FlightListing>* listings) override {
std::vector<FlightInfo> flights = ...;
*listings = std::unique_ptr<FlightListing>(new SimpleFlightListing(flights));
return Status::OK();
}
};
每個 RPC 方法都會採用 arrow::flight::ServerCallContext
作為通用參數,並傳回 arrow::Status
以指示成功或失敗。Flight 特定錯誤碼可以透過 arrow::flight::MakeFlightError()
傳回。
除了狀態之外還傳回值的 RPC 方法將使用 out 參數,如上所示。通常,會有輔助程式類別提供這些 out 參數的基本實作。例如,在上方,arrow::flight::SimpleFlightListing
使用 arrow::flight::FlightInfo
物件的向量作為 ListFlights
RPC 的結果。
若要啟動伺服器,請建立 arrow::flight::Location
以指定要接聽的位置,並呼叫 arrow::flight::FlightServerBase::Init()
。這將啟動伺服器,但不會封鎖程式的其餘部分。使用 arrow::flight::FlightServerBase::SetShutdownOnSignals()
以在收到中斷訊號時啟用停止伺服器,然後呼叫 arrow::flight::FlightServerBase::Serve()
以封鎖直到伺服器停止。
std::unique_ptr<arrow::flight::FlightServerBase> server;
// Initialize server
arrow::flight::Location location;
// Listen to all interfaces on a free port
ARROW_CHECK_OK(arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0, &location));
arrow::flight::FlightServerOptions options(location);
// Start the server
ARROW_CHECK_OK(server->Init(options));
// Exit with a clean error code (0) on SIGTERM
ARROW_CHECK_OK(server->SetShutdownOnSignals({SIGTERM}));
std::cout << "Server listening on localhost:" << server->port() << std::endl;
ARROW_CHECK_OK(server->Serve());
使用 Flight 用戶端#
若要連線至 Flight 服務,請呼叫 Connect
來建立 arrow::flight::FlightClient
的執行個體。
每個 RPC 方法都會傳回 arrow::Result
以指示請求的成功/失敗,以及請求成功時的結果物件。有些呼叫是串流呼叫,因此它們會傳回讀取器和/或寫入器物件;最終呼叫狀態在串流完成之前未知。
取消和逾時#
在進行呼叫時,用戶端可以選擇性地提供 FlightCallOptions
。這允許用戶端設定呼叫逾時或提供自訂 HTTP 標頭,以及其他功能。此外,用戶端 RPC 呼叫傳回的一些物件會公開 Cancel
方法,該方法允許提早終止呼叫。
在伺服器端,不需要額外的程式碼來實作逾時。對於取消,伺服器需要手動輪詢 ServerCallContext::is_cancelled
以檢查用戶端是否已取消呼叫,如果是,則跳出伺服器目前正在執行的任何處理。
啟用 TLS#
在設定伺服器時,可以透過將憑證和金鑰組提供給 FlightServerBase::Init
來啟用 TLS。
在用戶端,使用 Location::ForGrpcTls
來建構要接聽的 arrow::flight::Location
。
啟用驗證#
警告
在未啟用 TLS 的情況下,驗證是不安全的。
可以透過實作 ServerAuthHandler
並在建構期間將其提供給伺服器來啟用基於交握的驗證。
驗證包含兩個部分:在初始用戶端連線時,伺服器和用戶端驗證實作可以執行任何所需的協商。然後,用戶端驗證處理常式會提供一個權杖,該權杖將附加到未來的呼叫。這是透過使用所需的用戶端驗證實作呼叫 Authenticate
來完成的。
在之後的每個 RPC 上,用戶端處理常式的權杖都會自動新增至請求中的呼叫標頭。伺服器驗證處理常式會驗證權杖並提供用戶端的身分。在伺服器上,可以從 arrow::flight::ServerCallContext
取得此身分。
自訂中介軟體#
伺服器和用戶端支援自訂中介軟體(或攔截器),這些中介軟體會在每個請求上呼叫,並且可以有限度地修改請求。這些可以透過子類別化 ServerMiddleware
和 ClientMiddleware
來實作,然後在建立用戶端或伺服器時提供它們。
中介軟體相當有限,但它們可以將標頭新增至請求/回應。在伺服器上,它們可以檢查傳入的標頭並使請求失敗;因此,它們可以用於實作自訂驗證方法。
最佳實務#
gRPC#
當使用預設 gRPC 傳輸時,可以透過 arrow::flight::FlightClientOptions::generic_options
將選項傳遞給它。例如
auto options = FlightClientOptions::Defaults();
// Set the period after which a keepalive ping is sent on transport.
options.generic_options.emplace_back(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);
# Set the period after which a keepalive ping is sent on transport.
generic_options = [("GRPC_ARG_KEEPALIVE_TIME_MS", 60000)]
client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)
另請參閱 最佳 gRPC 實務 和可用的 gRPC 金鑰。
盡可能重複使用用戶端#
建立和關閉用戶端需要在用戶端和伺服器端進行設定和關閉,這可能會佔用實際處理 RPC 的時間。盡可能重複使用用戶端以避免這種情況。請注意,用戶端是執行緒安全的,因此單一用戶端可以在多個執行緒之間共用。
請勿循環配置負載平衡#
循環配置負載平衡 表示每個用戶端都可以與每個伺服器建立開放連線,從而導致意外數量的開放連線並耗盡伺服器資源。
偵錯連線問題#
當長時間執行的連線遇到意外斷線時,請使用 netstat 監控開放連線的數量。如果連線數量遠大於用戶端數量,則可能會導致問題。
為了進行偵錯,某些環境變數會在 gRPC 中啟用記錄。例如,env GRPC_VERBOSITY=info GRPC_TRACE=http
將列印初始標頭(在兩側),以便您可以查看 gRPC 是否已建立連線。它也會在傳送訊息時列印,以便您可以判斷連線是否已開啟。
gRPC 可能不會在實際進行呼叫之前報告連線錯誤。因此,為了在建立用戶端時偵測連線錯誤,應進行某種虛擬 RPC。
記憶體管理#
Flight 嘗試重複使用 gRPC 進行的配置,以避免多餘的資料複製。但是,這表示這些配置可能不會由 Arrow 記憶體池追蹤,並且記憶體使用行為(例如,是否將可用記憶體傳回系統)取決於 gRPC 使用的配置器(通常是系統配置器)。
一種快速測試方法:使用偵錯工具附加到程序並呼叫 malloc_trim
,或在系統池上呼叫 ReleaseUnused
。如果記憶體使用量下降,則很可能存在 gRPC 或應用程式配置的記憶體,而系統配置器一直保留著這些記憶體。這可以在平台特定的方式中進行調整;請參閱 ARROW-16697 中的調查,以取得有關此如何在 Linux/glibc 上運作的範例。可以明確告知 glibc malloc 轉儲快取。
過多的流量#
gRPC 將產生最多執行緒配額的執行緒,以用於並行用戶端。這些執行緒不一定會清除(Java 用語中的「快取執行緒池」)。glibc malloc 清除了一些每個執行緒的狀態,並且預設調整永遠不會清除某些工作負載中的快取。
gRPC 的預設行為允許一個伺服器接受來自許多不同用戶端的許多連線,但如果請求執行大量工作(就像它們在 Flight 下可能做的那樣),則伺服器可能無法跟上。將用戶端設定為使用退避重試(並可能連線到不同的節點)將提供更一致的服務品質。
auto options = FlightClientOptions::Defaults();
// Set the minimum time between subsequent connection attempts.
options.generic_options.emplace_back(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000);
# Set the minimum time between subsequent connection attempts.
generic_options = [("GRPC_ARG_MIN_RECONNECT_BACKOFF_MS", 2000)]
client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)
限制 DoPut 批次大小#
您可能希望限制用戶端可以透過 DoPut 提交給伺服器的最大批次大小,以防止請求佔用伺服器上過多的記憶體。在用戶端,設定 arrow::flight::FlightClientOptions::write_size_limit_bytes
。在伺服器端,設定 gRPC 選項 GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH
。用戶端選項將傳回一個錯誤,可以使用較小的批次重試,而伺服器端限制將關閉連線。同時設定兩者可能是明智之舉,因為前者提供更好的使用者體驗,但後者可能是防禦不禮貌用戶端所必需的。
關閉無回應的連線#
可以使用
arrow::flight::FlightCallOptions::stop_token
關閉過時的呼叫。這需要在呼叫建立時間記錄停止權杖。StopSource stop_source; FlightCallOptions options; options.stop_token = stop_source.token(); stop_source.RequestStop(Status::Cancelled("StopSource")); flight_client->DoAction(options, {});
使用呼叫逾時。(這是一般 gRPC 最佳實務。)
FlightCallOptions options; options.timeout = TimeoutDuration{0.2}; Status status = client->GetFlightInfo(options, FlightDescriptor{}).status();
Iterator<Result> results = client.doAction(new Action("hang"), CallOptions.timeout(0.2, TimeUnit.SECONDS));
options = pyarrow.flight.FlightCallOptions(timeout=0.2) result = client.do_action(action, options=options)
用戶端逾時對於長時間執行的串流呼叫來說不太好,在這種情況下,可能很難為整個操作選擇逾時。相反,通常需要的是每個讀取或每個寫入逾時,以便在操作沒有進展時失敗。這可以使用背景執行緒來實作,該執行緒在計時器上呼叫 Cancel(),而主執行緒在每次操作成功完成時重設計時器。如需完整運作的範例,請參閱食譜。
注意
有一個長期存在的票證,用於每個寫入/每個讀取逾時,而不是每個呼叫逾時 (ARROW-6062),但這並非(容易)使用封鎖 gRPC API 實作。