Gandiva Expression、Projector 與 Filter#

建構 Expression#

Gandiva 提供通用的 expression 表示法,其中 expression 由節點樹表示。TreeExprBuilder 用於建構 expression 樹。expression 樹的葉節點通常是欄位參照(由 TreeExprBuilder::MakeField() 建立)和常值(由 TreeExprBuilder::MakeLiteral() 建立)。節點可以使用以下方式組合成更複雜的 expression 樹

  • TreeExprBuilder::MakeFunction() 建立函數節點。(您可以呼叫 GetRegisteredFunctionSignatures() 以取得有效函數簽章的清單。)

  • TreeExprBuilder::MakeIf() 建立 if-else 邏輯。

  • TreeExprBuilder::MakeAnd()TreeExprBuilder::MakeOr() 建立布林 expression。(對於 “not”,請在 MakeFunction 中使用 not(bool) 函數。)

  • TreeExprBuilder::MakeInExpressionInt32() 和其他 “in expression” 函數建立集合成員資格測試。

這些函數中的每一個都會建立新的複合節點,其中包含葉節點(常值和欄位參照)或其他複合節點作為子節點。透過組合這些節點,您可以建立任意複雜的 expression 樹。

一旦建構了 expression 樹,它們就會封裝在 ExpressionCondition 中,具體取決於它們的使用方式。Expression 用於投影,而 Condition 用於篩選。

例如,以下是如何建立表示 x + 3 的 Expression 和表示 x < 3 的 Condition

std::shared_ptr<arrow::Field> field_x_raw = arrow::field("x", arrow::int32());
std::shared_ptr<Node> field_x = TreeExprBuilder::MakeField(field_x_raw);
std::shared_ptr<Node> literal_3 = TreeExprBuilder::MakeLiteral(3);
std::shared_ptr<arrow::Field> field_result = arrow::field("result", arrow::int32());

std::shared_ptr<Node> add_node =
    TreeExprBuilder::MakeFunction("add", {field_x, literal_3}, arrow::int32());
std::shared_ptr<Expression> expression =
    TreeExprBuilder::MakeExpression(add_node, field_result);

std::shared_ptr<Node> less_than_node =
    TreeExprBuilder::MakeFunction("less_than", {field_x, literal_3}, arrow::boolean());
std::shared_ptr<Condition> condition = TreeExprBuilder::MakeCondition(less_than_node);

Projector 與 Filter#

Gandiva 的兩個執行核心是 ProjectorFilterProjector 消耗 record batch 並投影到新的 record batch 中。Filter 消耗 record batch 並產生 SelectionVector,其中包含符合條件的索引。

對於 ProjectorFilter,expression IR 的最佳化發生在建立實例時。它們是針對靜態 schema 編譯的,因此 record batch 的 schema 必須在此時已知。

繼續使用先前章節中建立的 expressioncondition,以下是建立 Projector 和 Filter 的範例

std::shared_ptr<arrow::Schema> input_schema = arrow::schema({field_x_raw});
std::shared_ptr<arrow::Schema> output_schema = arrow::schema({field_result});
std::shared_ptr<Projector> projector;
Status status;
std::vector<std::shared_ptr<Expression>> expressions = {expression};
status = Projector::Make(input_schema, expressions, &projector);
ARROW_RETURN_NOT_OK(status);

std::shared_ptr<Filter> filter;
status = Filter::Make(input_schema, condition, &filter);
ARROW_RETURN_NOT_OK(status);

一旦建立 Projector 或 Filter,就可以在 Arrow record batch 上評估它。這些執行核心本身是單執行緒的,但設計為可重複使用以平行處理不同的 record batch。

評估投影#

執行是透過 Projector::Evaluate() 執行的。這會輸出陣列向量,可以與輸出 schema 一起傳遞至 arrow::RecordBatch::Make()

auto pool = arrow::default_memory_pool();
int num_records = 4;
arrow::Int32Builder builder;
int32_t values[4] = {1, 2, 3, 4};
ARROW_RETURN_NOT_OK(builder.AppendValues(values, 4));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> array, builder.Finish());
auto in_batch = arrow::RecordBatch::Make(input_schema, num_records, {array});

arrow::ArrayVector outputs;
status = projector->Evaluate(*in_batch, pool, &outputs);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<arrow::RecordBatch> result =
    arrow::RecordBatch::Make(output_schema, outputs[0]->length(), outputs);

評估篩選#

Filter::Evaluate() 產生 SelectionVector,這是一個符合篩選條件的列索引向量。selection vector 是 arrow 整數陣列周圍的包裝器,由位元寬度參數化。在建立 selection vector 時(您必須在傳遞至 Evaluate() 之前 初始化它),您必須選擇位元寬度,這決定了它可以容納的最大索引值,以及最大槽數,這決定了它可以包含多少索引。一般來說,最大槽數應設定為您的批次大小,而位元寬度應設定為可以表示小於批次大小的所有整數的最小整數大小。例如,如果您的批次大小為 10 萬,請將最大槽數設定為 10 萬,位元寬度設定為 32(因為 2^16 = 64k 太小)。

一旦 Evaluate() 執行完畢且 SelectionVector 已填入,請使用 SelectionVector::ToArray() 方法取得底層陣列,然後使用 ::arrow::compute::Take() 具體化輸出 record batch。

std::shared_ptr<gandiva::SelectionVector> result_indices;
// Use 16-bit integers for indices. Result can be no longer than input size,
// so use batch num_rows as max_slots.
status = gandiva::SelectionVector::MakeInt16(/*max_slots=*/in_batch->num_rows(), pool,
                                             &result_indices);
ARROW_RETURN_NOT_OK(status);
status = filter->Evaluate(*in_batch, result_indices);
ARROW_RETURN_NOT_OK(status);
std::shared_ptr<arrow::Array> take_indices = result_indices->ToArray();
Datum maybe_batch;
ARROW_ASSIGN_OR_RAISE(maybe_batch,
                      arrow::compute::Take(Datum(in_batch), Datum(take_indices),
                                           TakeOptions::NoBoundsCheck()));
result = maybe_batch.record_batch();

評估投影和篩選#

最後,您也可以在使用 selection vector 的同時進行投影,透過 Projector::Evaluate()。若要執行此操作,請先確保使用 SelectionVector::GetMode() 初始化 Projector,以便 projector 以正確的位元寬度編譯。然後,您可以將 SelectionVector 傳遞至 Projector::Evaluate() 方法。

// Make sure the projector is compiled for the appropriate selection vector mode
status = Projector::Make(input_schema, expressions, result_indices->GetMode(),
                         ConfigurationBuilder::DefaultConfiguration(), &projector);
ARROW_RETURN_NOT_OK(status);

arrow::ArrayVector outputs_filtered;
status = projector->Evaluate(*in_batch, result_indices.get(), pool, &outputs_filtered);
ARROW_RETURN_NOT_OK(status);

result =
    arrow::RecordBatch::Make(output_schema, outputs[0]->length(), outputs_filtered);