Arrow Flight RPC#

Arrow Flight 是一個 RPC 框架,用於透過網路有效率地傳輸 Arrow 資料。

另請參閱

Flight 協定文件

Flight 協定的文件,包含如何在概念上使用 Flight。

Java 食譜

在 Java 中使用 Arrow Flight 的方法。

撰寫 Flight 服務#

Flight 伺服器實作 FlightProducer 介面。為了方便起見,它們可以繼承 NoOpFlightProducer,它提供了所有 RPC 方法的預設實作。

public class TutorialFlightProducer implements FlightProducer {
    @Override
    // Override methods or use NoOpFlightProducer for only methods needed
}

每個 RPC 方法總是接受 CallContext 作為通用參數。若要指示失敗,請將例外狀況傳遞給「listener」(如果有的話),否則引發例外狀況。

// Server
@Override
public void listFlights(CallContext context, Criteria criteria, StreamListener<FlightInfo> listener) {
    // ...
    listener.onError(
        CallStatus.UNAUTHENTICATED.withDescription(
            "Custom UNAUTHENTICATED description message.").toRuntimeException());
    // ...
}

// Client
try{
    Iterable<FlightInfo> flightInfosBefore = flightClient.listFlights(Criteria.ALL);
    // ...
} catch (FlightRuntimeException e){
    // Catch UNAUTHENTICATED exception
}

若要啟動伺服器,請建立一個 Location 以指定要監聽的位置,然後使用生產者的實例建立一個 FlightServer。這將啟動伺服器,但不會封鎖程式的其餘部分。呼叫 FlightServer.awaitTermination 以封鎖直到伺服器停止。

class TutorialFlightProducer implements FlightProducer {
    @Override
    // Override methods or use NoOpFlightProducer for only methods needed
}

Location location = Location.forGrpcInsecure("0.0.0.0", 0);
try(
    BufferAllocator allocator = new RootAllocator();
    FlightServer server = FlightServer.builder(
            allocator,
            location,
            new TutorialFlightProducer()
    ).build();
){
    server.start();
    System.out.println("Server listening on port " + server.getPort());
    server.awaitTermination();
} catch (Exception e) {
    e.printStackTrace();
}
Server listening on port 58104

使用 Flight 用戶端#

若要連線到 Flight 服務,請使用位置建立 FlightClient

Location location = Location.forGrpcInsecure("0.0.0.0", 58104);

try(BufferAllocator allocator = new RootAllocator();
    FlightClient client = FlightClient.builder(allocator, location).build()){
    // ... Consume operations exposed by Flight server
} catch (Exception e) {
    e.printStackTrace();
}

取消與逾時#

在進行呼叫時,用戶端可以選擇性地提供 CallOptions。這允許用戶端設定呼叫的逾時。此外,用戶端 RPC 呼叫傳回的一些物件公開了一個取消方法,允許提早終止呼叫。

Location location = Location.forGrpcInsecure("0.0.0.0", 58609);

try(BufferAllocator allocator = new RootAllocator();
    FlightClient tutorialFlightClient = FlightClient.builder(allocator, location).build()){

    Iterator<Result> resultIterator = tutorialFlightClient.doAction(
            new Action("test-timeout"),
            CallOptions.timeout(2, TimeUnit.SECONDS)
    );
} catch (Exception e) {
    e.printStackTrace();
}

在伺服器端,逾時是透明的。對於取消,伺服器需要手動輪詢 setOnCancelHandlerisCancelled 以檢查用戶端是否已取消呼叫,如果是,則跳出伺服器目前正在執行的任何處理。

// Client
Location location = Location.forGrpcInsecure("0.0.0.0", 58609);
try(BufferAllocator allocator = new RootAllocator();
    FlightClient tutorialFlightClient = FlightClient.builder(allocator, location).build()){
    try(FlightStream flightStream = flightClient.getStream(new Ticket(new byte[]{}))) {
        // ...
        flightStream.cancel("tutorial-cancel", new Exception("Testing cancellation option!"));
    }
} catch (Exception e) {
    e.printStackTrace();
}
// Server
@Override
public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
    // ...
    listener.setOnCancelHandler(()->{
                // Implement logic to handle cancellation option
            });
}

啟用 TLS#

在設定伺服器時,可以透過提供憑證和金鑰組給 FlightServer.Builder.useTls 來啟用 TLS。

在用戶端,使用 Location.forGrpcTls 建立用戶端的 Location。

啟用身分驗證#

警告

在未啟用 TLS 的情況下,身分驗證是不安全的。

基於交握的身分驗證可以透過實作 ServerAuthHandler 來啟用。身分驗證包含兩個部分:在初始用戶端連線時,伺服器和用戶端身分驗證實作可以執行任何需要的協商。然後,用戶端身分驗證處理常式會提供一個權杖,該權杖將附加到未來的呼叫。

用戶端傳送要透過 ClientAuthHandler.authenticate 驗證的資料。伺服器驗證透過 ServerAuthHandler.authenticate 收到的資料。

自訂中介軟體#

伺服器和用戶端支援自訂中介軟體(或攔截器),這些中介軟體會在每個請求上呼叫,並且可以有限度地修改請求。這些可以透過實作 FlightServerMiddlewareFlightClientMiddleware 介面來實作。

中介軟體的功能相當有限,但它們可以將標頭新增至請求/回應。在伺服器上,它們可以檢查傳入的標頭並使請求失敗;因此,它們可以用於實作自訂身分驗證方法。

新增服務#

伺服器可以新增其他 gRPC 服務。例如,新增 Health Check service

final HealthStatusManager statusManager = new HealthStatusManager();
final Consumer<NettyServerBuilder> consumer = (builder) -> {
  builder.addService(statusManager.getHealthService());
};
final Location location = forGrpcInsecure(LOCALHOST, 5555);
try (
    BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
    Producer producer = new Producer(a);
    FlightServer s = FlightServer.builder(a, location, producer)
        .transportHint("grpc.builderConsumer", consumer).build().start();
) {
  Channel channel = NettyChannelBuilder.forAddress(location.toSocketAddress()).usePlaintext().build();
  HealthCheckResponse response = HealthGrpc
          .newBlockingStub(channel)
          .check(HealthCheckRequest.getDefaultInstance());

  System.out.println(response.getStatus());
}

Flight 最佳實務#