Arrow Flight¶
這個章節包含許多運用 Arrow Flight 的食譜,Arrow Flight 是特別針對表格型資料集提供的 RPC 函式庫。關於 Flight 的更多資訊,請參閱 format/Flight。
使用 Arrow Flight 進行單純的 Parquet 儲存服務¶
我們將實作一項服務,它會提供表格資料的鍵值儲存,使用 Flight 處理上傳/要求,並使用 Parquet 儲存實際資料。
首先,我們將實作服務本身。為了簡單起見,我們不會使用 Datasets API,而是直接使用 Parquet API。
1class ParquetStorageService : public arrow::flight::FlightServerBase {
2 public:
3 const arrow::flight::ActionType kActionDropDataset{"drop_dataset", "Delete a dataset."};
4
5 explicit ParquetStorageService(std::shared_ptr<arrow::fs::FileSystem> root)
6 : root_(std::move(root)) {}
7
8 arrow::Status ListFlights(
9 const arrow::flight::ServerCallContext&, const arrow::flight::Criteria*,
10 std::unique_ptr<arrow::flight::FlightListing>* listings) override {
11 arrow::fs::FileSelector selector;
12 selector.base_dir = "/";
13 ARROW_ASSIGN_OR_RAISE(auto listing, root_->GetFileInfo(selector));
14
15 std::vector<arrow::flight::FlightInfo> flights;
16 for (const auto& file_info : listing) {
17 if (!file_info.IsFile() || file_info.extension() != "parquet") continue;
18 ARROW_ASSIGN_OR_RAISE(auto info, MakeFlightInfo(file_info));
19 flights.push_back(std::move(info));
20 }
21
22 *listings = std::unique_ptr<arrow::flight::FlightListing>(
23 new arrow::flight::SimpleFlightListing(std::move(flights)));
24 return arrow::Status::OK();
25 }
26
27 arrow::Status GetFlightInfo(const arrow::flight::ServerCallContext&,
28 const arrow::flight::FlightDescriptor& descriptor,
29 std::unique_ptr<arrow::flight::FlightInfo>* info) override {
30 ARROW_ASSIGN_OR_RAISE(auto file_info, FileInfoFromDescriptor(descriptor));
31 ARROW_ASSIGN_OR_RAISE(auto flight_info, MakeFlightInfo(file_info));
32 *info = std::unique_ptr<arrow::flight::FlightInfo>(
33 new arrow::flight::FlightInfo(std::move(flight_info)));
34 return arrow::Status::OK();
35 }
36
37 arrow::Status DoPut(const arrow::flight::ServerCallContext&,
38 std::unique_ptr<arrow::flight::FlightMessageReader> reader,
39 std::unique_ptr<arrow::flight::FlightMetadataWriter>) override {
40 ARROW_ASSIGN_OR_RAISE(auto file_info, FileInfoFromDescriptor(reader->descriptor()));
41 ARROW_ASSIGN_OR_RAISE(auto sink, root_->OpenOutputStream(file_info.path()));
42 ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, reader->ToTable());
43
44 ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(),
45 sink, /*chunk_size=*/65536));
46 return arrow::Status::OK();
47 }
48
49 arrow::Status DoGet(const arrow::flight::ServerCallContext&,
50 const arrow::flight::Ticket& request,
51 std::unique_ptr<arrow::flight::FlightDataStream>* stream) override {
52 ARROW_ASSIGN_OR_RAISE(auto input, root_->OpenInputFile(request.ticket));
53 std::unique_ptr<parquet::arrow::FileReader> reader;
54 ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(std::move(input),
55 arrow::default_memory_pool(), &reader));
56
57 std::shared_ptr<arrow::Table> table;
58 ARROW_RETURN_NOT_OK(reader->ReadTable(&table));
59 // Note that we can't directly pass TableBatchReader to
60 // RecordBatchStream because TableBatchReader keeps a non-owning
61 // reference to the underlying Table, which would then get freed
62 // when we exit this function
63 std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
64 arrow::TableBatchReader batch_reader(*table);
65 ARROW_ASSIGN_OR_RAISE(batches, batch_reader.ToRecordBatches());
66
67 ARROW_ASSIGN_OR_RAISE(auto owning_reader, arrow::RecordBatchReader::Make(
68 std::move(batches), table->schema()));
69 *stream = std::unique_ptr<arrow::flight::FlightDataStream>(
70 new arrow::flight::RecordBatchStream(owning_reader));
71
72 return arrow::Status::OK();
73 }
74
75 arrow::Status ListActions(const arrow::flight::ServerCallContext&,
76 std::vector<arrow::flight::ActionType>* actions) override {
77 *actions = {kActionDropDataset};
78 return arrow::Status::OK();
79 }
80
81 arrow::Status DoAction(const arrow::flight::ServerCallContext&,
82 const arrow::flight::Action& action,
83 std::unique_ptr<arrow::flight::ResultStream>* result) override {
84 if (action.type == kActionDropDataset.type) {
85 *result = std::unique_ptr<arrow::flight::ResultStream>(
86 new arrow::flight::SimpleResultStream({}));
87 return DoActionDropDataset(action.body->ToString());
88 }
89 return arrow::Status::NotImplemented("Unknown action type: ", action.type);
90 }
91
92 private:
93 arrow::Result<arrow::flight::FlightInfo> MakeFlightInfo(
94 const arrow::fs::FileInfo& file_info) {
95 ARROW_ASSIGN_OR_RAISE(auto input, root_->OpenInputFile(file_info));
96 std::unique_ptr<parquet::arrow::FileReader> reader;
97 ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(std::move(input),
98 arrow::default_memory_pool(), &reader));
99
100 std::shared_ptr<arrow::Schema> schema;
101 ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));
102
103 auto descriptor = arrow::flight::FlightDescriptor::Path({file_info.base_name()});
104
105 arrow::flight::FlightEndpoint endpoint;
106 endpoint.ticket.ticket = file_info.base_name();
107 arrow::flight::Location location;
108 ARROW_ASSIGN_OR_RAISE(location,
109 arrow::flight::Location::ForGrpcTcp("localhost", port()));
110 endpoint.locations.push_back(location);
111
112 int64_t total_records = reader->parquet_reader()->metadata()->num_rows();
113 int64_t total_bytes = file_info.size();
114
115 return arrow::flight::FlightInfo::Make(*schema, descriptor, {endpoint}, total_records,
116 total_bytes);
117 }
118
119 arrow::Result<arrow::fs::FileInfo> FileInfoFromDescriptor(
120 const arrow::flight::FlightDescriptor& descriptor) {
121 if (descriptor.type != arrow::flight::FlightDescriptor::PATH) {
122 return arrow::Status::Invalid("Must provide PATH-type FlightDescriptor");
123 } else if (descriptor.path.size() != 1) {
124 return arrow::Status::Invalid(
125 "Must provide PATH-type FlightDescriptor with one path component");
126 }
127 return root_->GetFileInfo(descriptor.path[0]);
128 }
129
130 arrow::Status DoActionDropDataset(const std::string& key) {
131 return root_->DeleteFile(key);
132 }
133
134 std::shared_ptr<arrow::fs::FileSystem> root_;
135}; // end ParquetStorageService
首先,我們將啟動我們的伺服器
auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
ARROW_RETURN_NOT_OK(fs->CreateDir("./flight_datasets/"));
ARROW_RETURN_NOT_OK(fs->DeleteDirContents("./flight_datasets/"));
auto root = std::make_shared<arrow::fs::SubTreeFileSystem>("./flight_datasets/", fs);
arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0));
arrow::flight::FlightServerOptions options(server_location);
auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
new ParquetStorageService(std::move(root)));
ARROW_RETURN_NOT_OK(server->Init(options));
rout << "Listening on port " << server->port() << std::endl;
Listening on port 34553
然後,我們可以建立一個用戶端並連線到伺服器
arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
arrow::flight::Location::ForGrpcTcp("localhost", server->port()));
std::unique_ptr<arrow::flight::FlightClient> client;
ARROW_ASSIGN_OR_RAISE(client, arrow::flight::FlightClient::Connect(location));
rout << "Connected to " << location.ToString() << std::endl;
Connected to grpc+tcp://localhost:34553
首先,我們將建立並上傳一個表格,然後伺服器會將其儲存在 Parquet 檔案中。
// Open example data file to upload
ARROW_ASSIGN_OR_RAISE(std::string airquality_path,
FindTestDataFile("airquality.parquet"));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> input,
fs->OpenInputFile(airquality_path));
std::unique_ptr<parquet::arrow::FileReader> reader;
ARROW_RETURN_NOT_OK(
parquet::arrow::OpenFile(std::move(input), arrow::default_memory_pool(), &reader));
auto descriptor = arrow::flight::FlightDescriptor::Path({"airquality.parquet"});
std::shared_ptr<arrow::Schema> schema;
ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));
// Start the RPC call
std::unique_ptr<arrow::flight::FlightStreamWriter> writer;
std::unique_ptr<arrow::flight::FlightMetadataReader> metadata_reader;
ARROW_ASSIGN_OR_RAISE(auto put_stream, client->DoPut(descriptor, schema));
writer = std::move(put_stream.writer);
metadata_reader = std::move(put_stream.reader);
// Upload data
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
std::vector<int> row_groups(reader->num_row_groups());
std::iota(row_groups.begin(), row_groups.end(), 0);
ARROW_RETURN_NOT_OK(reader->GetRecordBatchReader(row_groups, &batch_reader));
int64_t batches = 0;
while (true) {
ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->Next());
if (!batch) break;
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
batches++;
}
ARROW_RETURN_NOT_OK(writer->Close());
rout << "Wrote " << batches << " batches" << std::endl;
Wrote 1 batches
這樣做之後,我們可以擷取該資料集的元資料
std::unique_ptr<arrow::flight::FlightInfo> flight_info;
ARROW_ASSIGN_OR_RAISE(flight_info, client->GetFlightInfo(descriptor));
rout << flight_info->descriptor().ToString() << std::endl;
rout << "=== Schema ===" << std::endl;
std::shared_ptr<arrow::Schema> info_schema;
arrow::ipc::DictionaryMemo dictionary_memo;
ARROW_ASSIGN_OR_RAISE(info_schema, flight_info->GetSchema(&dictionary_memo));
rout << info_schema->ToString() << std::endl;
rout << "==============" << std::endl;
<FlightDescriptor path='airquality.parquet'>
=== Schema ===
Ozone: int32
Solar.R: int32
Wind: double
Temp: int32
Month: int32
Day: int32
==============
然後再取得資料
std::unique_ptr<arrow::flight::FlightStreamReader> stream;
ARROW_ASSIGN_OR_RAISE(stream, client->DoGet(flight_info->endpoints()[0].ticket));
std::shared_ptr<arrow::Table> table;
ARROW_ASSIGN_OR_RAISE(table, stream->ToTable());
arrow::PrettyPrintOptions print_options(/*indent=*/0, /*window=*/2);
ARROW_RETURN_NOT_OK(arrow::PrettyPrint(*table, print_options, &rout));
Ozone: int32
Solar.R: int32
Wind: double
Temp: int32
Month: int32
Day: int32
----
Ozone:
[
[
41,
36,
...
18,
20
]
]
Solar.R:
[
[
190,
118,
...
131,
223
]
]
Wind:
[
[
7.4,
8,
...
8,
11.5
]
]
Temp:
[
[
67,
72,
...
76,
68
]
]
Month:
[
[
5,
5,
...
9,
9
]
]
Day:
[
[
1,
2,
...
29,
30
]
]
再來,我們將刪除資料集
arrow::flight::Action action{"drop_dataset",
arrow::Buffer::FromString("airquality.parquet")};
std::unique_ptr<arrow::flight::ResultStream> results;
ARROW_ASSIGN_OR_RAISE(results, client->DoAction(action));
rout << "Deleted dataset" << std::endl;
Deleted dataset
並確認它已刪除
std::unique_ptr<arrow::flight::FlightListing> listing;
ARROW_ASSIGN_OR_RAISE(listing, client->ListFlights());
while (true) {
std::unique_ptr<arrow::flight::FlightInfo> flight_info;
ARROW_ASSIGN_OR_RAISE(flight_info, listing->Next());
if (!flight_info) break;
rout << flight_info->descriptor().ToString() << std::endl;
rout << "=== Schema ===" << std::endl;
std::shared_ptr<arrow::Schema> info_schema;
arrow::ipc::DictionaryMemo dictionary_memo;
ARROW_ASSIGN_OR_RAISE(info_schema, flight_info->GetSchema(&dictionary_memo));
rout << info_schema->ToString() << std::endl;
rout << "==============" << std::endl;
}
rout << "End of listing" << std::endl;
End of listing
最後,我們將停止我們的伺服器
ARROW_RETURN_NOT_OK(server->Shutdown());
rout << "Server shut down successfully" << std::endl;
Server shut down successfully
設定 gRPC 用戶端選項¶
可傳遞 gRPC 資訊給 generic_options
這個 arrow::flight::FlightClientOptions
欄位。還有一個 gRPC API 文件 提到清單可用的元件資料。
例如,你用傳送最大訊息長度做變更
auto client_options = arrow::flight::FlightClientOptions::Defaults();
// Set a very low limit at the gRPC layer to fail all calls
client_options.generic_options.emplace_back(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 2);
arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
arrow::flight::Location::ForGrpcTcp("localhost", server->port()));
std::unique_ptr<arrow::flight::FlightClient> client;
// pass client_options into Connect()
ARROW_ASSIGN_OR_RAISE(client,
arrow::flight::FlightClient::Connect(location, client_options));
rout << "Connected to " << location.ToString() << std::endl;
Connected to grpc+tcp://localhost:41329
Flight Service 和其他 gRPC 終端點¶
如果您使用 gRPC 後端,您可以為 Flight 伺服器新增其他 gRPC 終端點。雖然 Flight 資訊不會辨識這些終端點,但一般 gRPC 資訊可以。
注意
如果連結 Arrow Flight,Protobuf 和 gRPC 也必須靜態連結,而動態連結也一樣。更多資訊請參閱 https://arrow.dev.org.tw/docs/cpp/build_system.html#a-note-on-linking
建立伺服器¶
若要建立 gRPC 服務,請先使用 protobuf 定義服務。
1syntax = "proto3";
2
3service HelloWorldService {
4 rpc SayHello(HelloRequest) returns (HelloResponse);
5}
6
7message HelloRequest {
8 string name = 1;
9}
10
11message HelloResponse {
12 string reply = 1;
13}
接下來,您需要編譯以提供 protobuf 和 gRPC 產生的檔案。請參閱 gRPC 的 產生資訊與伺服器程式碼 文件以取得詳細資訊。
然後為 gRPC 服務撰寫實作
1class HelloWorldServiceImpl : public HelloWorldService::Service {
2 grpc::Status SayHello(grpc::ServerContext*, const HelloRequest* request,
3 HelloResponse* reply) override {
4 const std::string& name = request->name();
5 if (name.empty()) {
6 return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Must provide a name!");
7 }
8 reply->set_reply("Hello, " + name);
9 return grpc::Status::OK;
10 }
11}; // end HelloWorldServiceImpl
最後,使用 arrow::flight::FlightServerOptions
上的 builder_hook
掛勾來註冊額外的 gRPC 服務。
arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
arrow::flight::Location::ForGrpcTcp("0.0.0.0", 5000));
arrow::flight::FlightServerOptions options(server_location);
auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
new ParquetStorageService(std::move(root)));
// Create hello world service
HelloWorldServiceImpl grpc_service;
// Use builder_hook to register grpc service
options.builder_hook = [&](void* raw_builder) {
auto* builder = reinterpret_cast<grpc::ServerBuilder*>(raw_builder);
builder->RegisterService(&grpc_service);
};
ARROW_RETURN_NOT_OK(server->Init(options));
rout << "Listening on port " << server->port() << std::endl;
Listening on port 5000
建立客戶端¶
Flight 客戶實作不知道任何自訂 gRPC 服務,所以若要呼叫他們,您需要建立一般的 gRPC 客戶端。對於 Hello World 服務,我們使用 HelloWorldService stub,而這由已編譯的 gRPC 定義提供。
auto client_channel =
grpc::CreateChannel("0.0.0.0:5000", grpc::InsecureChannelCredentials());
auto stub = HelloWorldService::NewStub(client_channel);
grpc::ClientContext context;
HelloRequest request;
request.set_name("Arrow User");
HelloResponse response;
grpc::Status status = stub->SayHello(&context, request, &response);
if (!status.ok()) {
return arrow::Status::IOError(status.error_message());
}
rout << response.reply();
Hello, Arrow User