Acero 使用者指南#

本頁說明如何使用 Acero。建議您先閱讀概述,並熟悉基本概念。

使用 Acero#

Acero 的基本工作流程如下:

  1. 首先,建立描述計畫的 Declaration 物件圖

  2. 呼叫 DeclarationToXyz 方法之一來執行 Declaration。

    1. 從 Declaration 的圖形建立新的 ExecPlan。每個 Declaration 將對應於計畫中的一個 ExecNode。此外,將根據使用的 DeclarationToXyz 方法新增一個 sink 節點。

    2. ExecPlan 會被執行。通常這會作為 DeclarationToXyz 呼叫的一部分發生,但在 DeclarationToReader 中,讀取器會在計畫完成執行之前返回。

    3. 計畫完成後,它會被銷毀

建立計畫#

使用 Substrait#

Substrait 是建立計畫(Declaration 圖形)的首選機制。原因如下:

  • Substrait 生產者花費大量時間和精力來建立使用者友善的 API,以便以簡單的方式產生複雜的執行計畫。例如,可以使用一系列複雜的 aggregate 節點來實現 pivot_wider 操作。生產者將為您提供更簡單的 API,而不是手動建立所有這些 aggregate 節點。

  • 如果您使用 Substrait,那麼您可以輕鬆切換到任何其他 Substrait 消費引擎,如果您在某個時候發現它比 Acero 更能滿足您的需求。

  • 我們希望最終會出現基於 Substrait 的最佳化器和規劃器的工具。透過使用 Substrait,您將更容易在未來使用這些工具。

您可以自行建立 Substrait 計畫,但您可能會更容易找到現有的 Substrait 生產者。例如,您可以使用 ibis-substrait 從 python 表達式輕鬆建立 Substrait 計畫。有一些不同的工具能夠從 SQL 建立 Substrait 計畫。最終,我們希望基於 C++ 的 Substrait 生產者將會出現。但是,我們目前尚未得知任何相關資訊。

有關從 Substrait 建立執行計畫的詳細說明,請參閱 Substrait 頁面

程式化計畫建立#

以程式化方式建立執行計畫比從 Substrait 建立計畫更簡單,儘管會失去一些彈性和未來驗證保證。建立 Declaration 的最簡單方法是直接實例化一個。您將需要 declaration 的名稱、輸入向量和選項物件。例如:

381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390  auto options = std::make_shared<arrow::dataset::ScanOptions>();
391  // projection
392  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393  options->projection = cp::project({}, {});
394
395  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397  ac::Declaration scan{"scan", std::move(scan_node_options)};
398  ac::Declaration project{
399      "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401  return ExecutePlanAndCollectAsTable(std::move(project));
402}

上述程式碼建立了一個掃描 declaration(沒有輸入)和一個投影 declaration(使用掃描作為輸入)。這已經夠簡單了,但我們可以讓它稍微容易一些。如果您要建立 declaration 的線性序列(如上述範例),那麼您也可以使用 Declaration::Sequence() 函數。

420  // Inputs do not have to be passed to the project node when using Sequence
421  ac::Declaration plan =
422      ac::Declaration::Sequence({{"scan", std::move(scan_node_options)},
423                                 {"project", ac::ProjectNodeOptions({a_times_2})}});

本文檔稍後將提供更多關於程式化計畫建立的範例。

執行計畫#

有許多不同的方法可用於執行 declaration。每種方法都以稍微不同的形式提供資料。由於所有這些方法都以 DeclarationTo... 開頭,因此本指南通常將這些方法稱為 DeclarationToXyz 方法。

DeclarationToTable#

DeclarationToTable() 方法會將所有結果累積到單個 arrow::Table 中。這可能是從 Acero 收集結果的最簡單方法。這種方法的主要缺點是它需要將所有結果累積到記憶體中。

注意

Acero 以小塊處理大型資料集。這在開發者指南中有更詳細的描述。因此,您可能會驚訝地發現,使用 DeclarationToTable 收集的表格與您的輸入相比,是以不同的區塊劃分的。例如,您的輸入可能是一個包含單個區塊(200 萬列)的大型表格。然後,您的輸出表格可能會有 64 個區塊,每個區塊包含 32Ki 列。目前有一個請求,要求在 GH-15155 中指定輸出的區塊大小。

DeclarationToReader#

DeclarationToReader() 方法允許您迭代地使用結果。它將建立一個 arrow::RecordBatchReader,您可以隨時從中讀取。如果您沒有及時從讀取器中讀取,則會套用背壓,並且執行計畫將會暫停。關閉讀取器將取消正在執行的執行計畫,並且讀取器的解構函數將等待執行計畫完成其正在執行的任何操作,因此可能會被封鎖。

DeclarationToStatus#

DeclarationToStatus() 方法在您想要執行計畫但不實際想要使用結果時很有用。例如,這在基準測試或計畫具有副作用(例如資料集寫入節點)時很有用。如果計畫產生任何結果,它們將立即被丟棄。

直接執行計畫#

如果 DeclarationToXyz 方法之一因某種原因不足以滿足需求,則可以直接執行計畫。只有在您正在執行某些獨特的操作時才需要這樣做。例如,如果您建立了一個自訂 sink 節點,或者您需要一個具有多個輸出的計畫。

注意

在學術文獻和許多現有系統中,普遍假設執行計畫最多只有一個輸出。Acero 中的某些東西(例如 DeclarationToXyz 方法)會預期如此。但是,設計中沒有任何東西嚴格禁止擁有多個 sink 節點。

關於如何執行此操作的詳細說明超出了本指南的範圍,但大致步驟如下:

  1. 建立新的 ExecPlan 物件。

  2. 將 sink 節點新增到您的 Declaration 物件圖形中(這是您需要為 sink 節點建立 declaration 的唯一類型)

  3. 使用 Declaration::AddToPlan() 將您的 declaration 新增到您的計畫中(如果您有多個輸出,那麼您將無法使用此方法,並且需要一次新增一個節點)

  4. 使用 ExecPlan::Validate() 驗證計畫

  5. 使用 ExecPlan::StartProducing() 啟動計畫

  6. 等待 ExecPlan::finished() 返回的 future 完成。

提供輸入#

執行計畫的輸入資料可以來自各種來源。它通常從儲存在某種檔案系統上的檔案中讀取。輸入也通常來自記憶體中的資料。記憶體中的資料是典型的,例如,在類似 pandas 的前端中。輸入也可能來自網路串流,例如 Flight 請求。Acero 可以支援所有這些情況,甚至可以支援此處未提及的獨特和自訂情況。

有預先定義的來源節點涵蓋最常見的輸入情境。這些節點列在下方。但是,如果您的來源資料是獨特的,那麼您將需要使用通用的 source 節點。此節點期望您提供批次的非同步串流,更詳細的資訊請參閱此處

可用的 ExecNode 實作#

下表快速總結了可用的運算子。

來源#

這些節點可以用作資料來源

來源節點#

Factory 名稱

選項

簡短描述

source

SourceNodeOptions

一個通用的來源節點,它包裝了資料的非同步串流 (範例)

table_source

TableSourceNodeOptions

arrow::Table 產生資料 (範例)

record_batch_source

RecordBatchSourceNodeOptions

arrow::RecordBatch 的迭代器產生資料

record_batch_reader_source

RecordBatchReaderSourceNodeOptions

arrow::RecordBatchReader 產生資料

exec_batch_source

ExecBatchSourceNodeOptions

arrow::compute::ExecBatch 的迭代器產生資料

array_vector_source

ArrayVectorSourceNodeOptions

arrow::Array 向量的迭代器產生資料

scan

arrow::dataset::ScanNodeOptions

arrow::dataset::Dataset 產生資料(需要 datasets 模組)(範例)

計算節點#

這些節點對資料執行計算,並且可能會轉換或重塑資料

計算節點#

Factory 名稱

選項

簡短描述

filter

FilterNodeOptions

移除與給定篩選表達式不符的列 (範例)

project

ProjectNodeOptions

透過評估計算表達式來建立新欄位。也可以捨棄和重新排序欄位 (範例)

aggregate

AggregateNodeOptions

計算整個輸入串流或資料群組的摘要統計資訊 (範例)

pivot_longer

PivotLongerNodeOptions

透過將某些欄位轉換為額外的列來重塑資料

排列節點#

這些節點重新排序、組合或分割資料串流

排列節點#

Factory 名稱

選項

簡短描述

hash_join

HashJoinNodeOptions

根據通用欄位聯結兩個輸入 (範例)

asofjoin

AsofJoinNodeOptions

根據通用的排序欄位(通常是時間)將多個輸入聯結到第一個輸入

union

不適用

合併具有相同結構描述的兩個輸入 (範例)

order_by

OrderByNodeOptions

重新排序串流

fetch

FetchNodeOptions

從串流中分割一系列列

Sink 節點#

這些節點終止計畫。使用者通常不會建立 sink 節點,因為它們是根據用於消耗計畫的 DeclarationToXyz 方法選擇的。但是,此列表對於正在開發新 sink 節點或以進階方式使用 Acero 的人來說可能很有用。

Sink 節點#

Factory 名稱

選項

簡短描述

sink

SinkNodeOptions

將批次收集到具有可選背壓的 FIFO 佇列中

write

arrow::dataset::WriteNodeOptions

將批次寫入檔案系統 (範例)

consuming_sink

ConsumingSinkNodeOptions

使用使用者提供的回呼函數消耗批次

table_sink

TableSinkNodeOptions

將批次收集到 arrow::Table

order_by_sink

OrderBySinkNodeOptions

已棄用

select_k_sink

SelectKSinkNodeOptions

已棄用

範例#

本文檔的其餘部分包含執行計畫範例。每個範例都突顯了特定執行節點的行為。

source#

source 操作可以被視為建立串流執行計畫的入口點。SourceNodeOptions 用於建立 source 操作。source 操作是目前可用的最通用和最靈活的來源類型,但配置起來可能相當棘手。首先,您應該查看其他來源節點類型,以確保沒有更簡單的選擇。

source 節點需要某種函數,可以呼叫該函數來輪詢更多資料。此函數不應接受任何引數,並且應返回 arrow::Future<std::optional<arrow::ExecBatch>>。此函數可能正在讀取檔案、迭代記憶體結構或從網路連線接收資料。arrow 程式庫將這些函數稱為 arrow::AsyncGenerator,並且有許多用於處理這些函數的工具程式。在此範例中,我們使用已儲存在記憶體中的 record batch 向量。此外,資料的結構描述必須事先已知。Acero 必須在執行圖的每個階段都知道資料的結構描述,然後才能開始任何處理。這表示我們必須將來源節點的結構描述與資料本身分開提供。

在這裡,我們定義了一個結構來保存資料產生器定義。這包括記憶體中的批次、結構描述和充當資料產生器的函數

156struct BatchesWithSchema {
157  std::vector<cp::ExecBatch> batches;
158  std::shared_ptr<arrow::Schema> schema;
159  // This method uses internal arrow utilities to
160  // convert a vector of record batches to an AsyncGenerator of optional batches
161  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
162    auto opt_batches = ::arrow::internal::MapVector(
163        [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
164        batches);
165    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
166    gen = arrow::MakeVectorGenerator(std::move(opt_batches));
167    return gen;
168  }
169};

產生用於計算的範例批次

173arrow::Result<BatchesWithSchema> MakeBasicBatches() {
174  BatchesWithSchema out;
175  auto field_vector = {arrow::field("a", arrow::int32()),
176                       arrow::field("b", arrow::boolean())};
177  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
178  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
179  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
180
181  ARROW_ASSIGN_OR_RAISE(auto b1_bool,
182                        GetArrayDataSample<arrow::BooleanType>({false, true}));
183  ARROW_ASSIGN_OR_RAISE(auto b2_bool,
184                        GetArrayDataSample<arrow::BooleanType>({true, false, true}));
185  ARROW_ASSIGN_OR_RAISE(auto b3_bool,
186                        GetArrayDataSample<arrow::BooleanType>({false, true, false}));
187
188  ARROW_ASSIGN_OR_RAISE(auto b1,
189                        GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
190  ARROW_ASSIGN_OR_RAISE(auto b2,
191                        GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
192  ARROW_ASSIGN_OR_RAISE(auto b3,
193                        GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
194
195  out.batches = {b1, b2, b3};
196  out.schema = arrow::schema(field_vector);
197  return out;
198}

使用 source 的範例(sink 的用法在 sink 中詳細說明)

294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom source is often not needed.  In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309  ac::Declaration source{"source", std::move(source_node_options)};
310
311  return ExecutePlanAndCollectAsTable(std::move(source));
312}

table_source#

在先前的範例 source 節點 中,使用 source 節點輸入資料。但在開發應用程式時,如果資料已經以表格形式存在於記憶體中,則使用 TableSourceNodeOptions 會更容易且效能更高。在這裡,輸入資料可以作為 std::shared_ptr<arrow::Table> 連同 max_batch_size 傳遞。max_batch_size 用於分解大型 record batch,以便可以平行處理它們。重要的是要注意,當來源表格的批次大小較小時,表格批次不會合併以形成更大的批次。

使用 table_source 的範例

317/// \brief An example showing a table source node
318///
319/// TableSource-Table Example
320/// This example shows how a table_source can be used
321/// in an execution plan. This includes a table source node
322/// receiving data from a table.  This plan simply collects the
323/// data back into a table but nodes could be added that modify
324/// or transform the data as well (as is shown in later examples)
325arrow::Status TableSourceSinkExample() {
326  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
327
328  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
329  int max_batch_size = 2;
330  auto table_source_options = ac::TableSourceNodeOptions{table, max_batch_size};
331
332  ac::Declaration source{"table_source", std::move(table_source_options)};
333
334  return ExecutePlanAndCollectAsTable(std::move(source));
335}

filter#

filter 操作,顧名思義,提供了定義資料篩選條件的選項。它選擇給定表達式評估為 true 的列。可以使用 arrow::compute::Expression 撰寫篩選器,並且表達式應具有布林值的傳回類型。例如,如果我們希望保留欄 b 的值大於 3 的列,那麼我們可以使用以下表達式。

篩選範例

340/// \brief An example showing a filter node
341///
342/// Source-Filter-Table
343/// This example shows how a filter can be used in an execution plan,
344/// to filter data from a source. The output from the execution plan
345/// is collected into a table.
346arrow::Status ScanFilterSinkExample() {
347  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
348
349  auto options = std::make_shared<arrow::dataset::ScanOptions>();
350  // specify the filter.  This filter removes all rows where the
351  // value of the "a" column is greater than 3.
352  cp::Expression filter_expr = cp::greater(cp::field_ref("a"), cp::literal(3));
353  // set filter for scanner : on-disk / push-down filtering.
354  // This step can be skipped if you are not reading from disk.
355  options->filter = filter_expr;
356  // empty projection
357  options->projection = cp::project({}, {});
358
359  // construct the scan node
360  std::cout << "Initialized Scanning Options" << std::endl;
361
362  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
363  std::cout << "Scan node options created" << std::endl;
364
365  ac::Declaration scan{"scan", std::move(scan_node_options)};
366
367  // pipe the scan node into the filter node
368  // Need to set the filter in scan node options and filter node options.
369  // At scan node it is used for on-disk / push-down filtering.
370  // At filter node it is used for in-memory filtering.
371  ac::Declaration filter{
372      "filter", {std::move(scan)}, ac::FilterNodeOptions(std::move(filter_expr))};
373
374  return ExecutePlanAndCollectAsTable(std::move(filter));
375}

project#

project 操作會重新排列、刪除、轉換和建立欄位。每個輸出欄位都是透過針對來源 record batch 評估表達式來計算的。這些必須是純量表達式(由純量文字、欄位參考和純量函數組成的表達式,即元素函數,它們為每個輸入列傳回一個值,而與所有其他列的值無關)。這透過 ProjectNodeOptions 公開,它需要 arrow::compute::Expression 和每個輸出欄位的名稱(如果未提供名稱,則將使用 expr 的字串表示形式)。

投影範例

381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390  auto options = std::make_shared<arrow::dataset::ScanOptions>();
391  // projection
392  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393  options->projection = cp::project({}, {});
394
395  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397  ac::Declaration scan{"scan", std::move(scan_node_options)};
398  ac::Declaration project{
399      "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401  return ExecutePlanAndCollectAsTable(std::move(project));
402}

aggregate#

aggregate 節點計算資料的各種聚合。

Arrow 支援兩種聚合類型:「純量」聚合和「雜湊」聚合。純量聚合將陣列或純量輸入縮減為單個純量輸出(例如,計算欄位的平均值)。雜湊聚合的作用類似於 SQL 中的 GROUP BY,並首先根據一個或多個索引鍵欄位分割資料,然後縮減每個分割區中的資料。aggregate 節點同時支援這兩種計算類型,並且可以一次計算任意數量的聚合。

AggregateNodeOptions 用於定義聚合條件。它採用聚合函數及其選項的列表;要聚合的目標欄位列表(每個函數一個);以及輸出欄位的名稱列表(每個函數一個)。或者,它可以採用用於分割資料的欄位列表(在雜湊聚合的情況下)。可以從此聚合函數列表中選擇聚合函數。

注意

此節點是「管線中斷器」,將完全實現記憶體中的資料集。未來將新增溢出機制,這應該可以減輕此限制。

聚合可以將結果作為群組或純量提供。例如,像 hash_count 這樣的操作會提供每個唯一記錄的計數作為分組結果,而像 sum 這樣的操作會提供單個記錄。

純量聚合範例

430/// \brief An example showing an aggregation node to aggregate an entire table
431///
432/// Source-Aggregation-Table
433/// This example shows how an aggregation operation can be applied on a
434/// execution plan resulting in a scalar output. The source node loads the
435/// data and the aggregation (counting unique types in column 'a')
436/// is applied on this data. The output is collected into a table (that will
437/// have exactly one row)
438arrow::Status SourceScalarAggregateSinkExample() {
439  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
440
441  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
442
443  ac::Declaration source{"source", std::move(source_node_options)};
444  auto aggregate_options =
445      ac::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
446  ac::Declaration aggregate{
447      "aggregate", {std::move(source)}, std::move(aggregate_options)};
448
449  return ExecutePlanAndCollectAsTable(std::move(aggregate));
450}

群組聚合範例

455/// \brief An example showing an aggregation node to perform a group-by operation
456///
457/// Source-Aggregation-Table
458/// This example shows how an aggregation operation can be applied on a
459/// execution plan resulting in grouped output. The source node loads the
460/// data and the aggregation (counting unique types in column 'a') is
461/// applied on this data. The output is collected into a table that will contain
462/// one row for each unique combination of group keys.
463arrow::Status SourceGroupAggregateSinkExample() {
464  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
465
466  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
467
468  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
469
470  ac::Declaration source{"source", std::move(source_node_options)};
471  auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
472  auto aggregate_options =
473      ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
474                               /*keys=*/{"b"}};
475  ac::Declaration aggregate{
476      "aggregate", {std::move(source)}, std::move(aggregate_options)};
477
478  return ExecutePlanAndCollectAsTable(std::move(aggregate));
479}

sink#

sink 操作提供輸出,並且是串流執行定義的最終節點。SinkNodeOptions 介面用於傳遞所需的選項。與 source 運算子類似,sink 運算子透過每次呼叫時傳回 record batch future 的函數來公開輸出。預期呼叫者會重複呼叫此函數,直到產生器函數耗盡(傳回 std::optional::nullopt)。如果沒有經常呼叫此函數,則 record batch 將累積在記憶體中。執行計畫應僅有一個「終端」節點(一個 sink 節點)。ExecPlan 可能會由於取消或錯誤而提前終止,然後再完全消耗輸出。但是,可以安全地獨立於 sink 銷毀計畫,sink 將透過 exec_plan->finished() 保留未消耗的批次。

作為 Source 範例的一部分,也包含 Sink 操作;

294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom source is often not needed.  In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309  ac::Declaration source{"source", std::move(source_node_options)};
310
311  return ExecutePlanAndCollectAsTable(std::move(source));
312}

consuming_sink#

consuming_sink 運算子是一個 sink 操作,其中包含執行計畫內的消耗操作(即,執行計畫在消耗完成之前不應完成)。與 sink 節點不同,此節點採用預期會消耗批次的回呼函數。一旦此回呼完成,執行計畫將不再保留對批次的任何參考。可以在先前的調用完成之前呼叫消耗函數。如果消耗函數執行速度不夠快,則可能會堆積許多並行執行,從而封鎖 CPU 執行緒池。在所有消耗函數回呼完成之前,執行計畫將不會被標記為已完成。一旦交付了所有批次,執行計畫將等待 finish future 完成,然後再將執行計畫標記為已完成。這允許工作流程,其中消耗函數將批次轉換為非同步任務(目前在內部為資料集寫入節點完成)。

範例

// define a Custom SinkNodeConsumer
std::atomic<uint32_t> batches_seen{0};
arrow::Future<> finish = arrow::Future<>::Make();
struct CustomSinkNodeConsumer : public cp::SinkNodeConsumer {

    CustomSinkNodeConsumer(std::atomic<uint32_t> *batches_seen, arrow::Future<>finish):
    batches_seen(batches_seen), finish(std::move(finish)) {}
    // Consumption logic can be written here
    arrow::Status Consume(cp::ExecBatch batch) override {
    // data can be consumed in the expected way
    // transfer to another system or just do some work
    // and write to disk
    (*batches_seen)++;
    return arrow::Status::OK();
    }

    arrow::Future<> Finish() override { return finish; }

    std::atomic<uint32_t> *batches_seen;
    arrow::Future<> finish;

};

std::shared_ptr<CustomSinkNodeConsumer> consumer =
        std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);

arrow::acero::ExecNode *consuming_sink;

ARROW_ASSIGN_OR_RAISE(consuming_sink, MakeExecNode("consuming_sink", plan.get(),
    {source}, cp::ConsumingSinkNodeOptions(consumer)));

消耗 Sink 範例

484/// \brief An example showing a consuming sink node
485///
486/// Source-Consuming-Sink
487/// This example shows how the data can be consumed within the execution plan
488/// by using a ConsumingSink node. There is no data output from this execution plan.
489arrow::Status SourceConsumingSinkExample() {
490  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
491
492  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
493
494  ac::Declaration source{"source", std::move(source_node_options)};
495
496  std::atomic<uint32_t> batches_seen{0};
497  arrow::Future<> finish = arrow::Future<>::Make();
498  struct CustomSinkNodeConsumer : public ac::SinkNodeConsumer {
499    CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
500        : batches_seen(batches_seen), finish(std::move(finish)) {}
501
502    arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
503                       ac::BackpressureControl* backpressure_control,
504                       ac::ExecPlan* plan) override {
505      // This will be called as the plan is started (before the first call to Consume)
506      // and provides the schema of the data coming into the node, controls for pausing /
507      // resuming input, and a pointer to the plan itself which can be used to access
508      // other utilities such as the thread indexer or async task scheduler.
509      return arrow::Status::OK();
510    }
511
512    arrow::Status Consume(cp::ExecBatch batch) override {
513      (*batches_seen)++;
514      return arrow::Status::OK();
515    }
516
517    arrow::Future<> Finish() override {
518      // Here you can perform whatever (possibly async) cleanup is needed, e.g. closing
519      // output file handles and flushing remaining work
520      return arrow::Future<>::MakeFinished();
521    }
522
523    std::atomic<uint32_t>* batches_seen;
524    arrow::Future<> finish;
525  };
526  std::shared_ptr<CustomSinkNodeConsumer> consumer =
527      std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
528
529  ac::Declaration consuming_sink{"consuming_sink",
530                                 {std::move(source)},
531                                 ac::ConsumingSinkNodeOptions(std::move(consumer))};
532
533  // Since we are consuming the data within the plan there is no output and we simply
534  // run the plan to completion instead of collecting into a table.
535  ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));
536
537  std::cout << "The consuming sink node saw " << batches_seen.load() << " batches"
538            << std::endl;
539  return arrow::Status::OK();
540}

order_by_sink#

order_by_sink 操作是 sink 操作的擴充。此操作透過提供 OrderBySinkNodeOptions 來提供保證串流順序的能力。在這裡,提供 arrow::compute::SortOptions 以定義哪些欄位用於排序,以及是否按升序或降序值排序。

注意

此節點是「管線中斷器」,將完全實現記憶體中的資料集。未來將新增溢出機制,這應該可以減輕此限制。

排序 Sink 範例

545arrow::Status ExecutePlanAndCollectAsTableWithCustomSink(
546    std::shared_ptr<ac::ExecPlan> plan, std::shared_ptr<arrow::Schema> schema,
547    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
548  // translate sink_gen (async) to sink_reader (sync)
549  std::shared_ptr<arrow::RecordBatchReader> sink_reader =
550      ac::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
551
552  // validate the ExecPlan
553  ARROW_RETURN_NOT_OK(plan->Validate());
554  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
555  // start the ExecPlan
556  plan->StartProducing();
557
558  // collect sink_reader into a Table
559  std::shared_ptr<arrow::Table> response_table;
560
561  ARROW_ASSIGN_OR_RAISE(response_table,
562                        arrow::Table::FromRecordBatchReader(sink_reader.get()));
563
564  std::cout << "Results : " << response_table->ToString() << std::endl;
565
566  // stop producing
567  plan->StopProducing();
568  // plan mark finished
569  auto future = plan->finished();
570  return future.status();
571}
572
573/// \brief An example showing an order-by node
574///
575/// Source-OrderBy-Sink
576/// In this example, the data enters through the source node
577/// and the data is ordered in the sink node. The order can be
578/// ASCENDING or DESCENDING and it is configurable. The output
579/// is obtained as a table from the sink node.
580arrow::Status SourceOrderBySinkExample() {
581  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
582                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
583
584  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
585
586  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
587
588  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
589  ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
590                        ac::MakeExecNode("source", plan.get(), {}, source_node_options));
591
592  ARROW_RETURN_NOT_OK(ac::MakeExecNode(
593      "order_by_sink", plan.get(), {source},
594      ac::OrderBySinkNodeOptions{
595          cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
596
597  return ExecutePlanAndCollectAsTableWithCustomSink(plan, basic_data.schema, sink_gen);
598}

select_k_sink#

select_k_sink 選項啟用選擇前/後 K 個元素,類似於 SQL ORDER BY ... LIMIT K 子句。SelectKOptions 是透過使用 OrderBySinkNode 定義來定義的。此選項傳回一個 sink 節點,該節點接收輸入,然後計算 top_k/bottom_k。

注意

此節點是「管線中斷器」,將完全實現記憶體中的輸入。未來將新增溢出機制,這應該可以減輕此限制。

SelectK 範例

631/// \brief An example showing a select-k node
632///
633/// Source-KSelect
634/// This example shows how K number of elements can be selected
635/// either from the top or bottom. The output node is a modified
636/// sink node where output can be obtained as a table.
637arrow::Status SourceKSelectExample() {
638  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
639  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
640                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
641  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
642
643  ARROW_ASSIGN_OR_RAISE(
644      ac::ExecNode * source,
645      ac::MakeExecNode("source", plan.get(), {},
646                       ac::SourceNodeOptions{input.schema, input.gen()}));
647
648  cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
649
650  ARROW_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink", plan.get(), {source},
651                                       ac::SelectKSinkNodeOptions{options, &sink_gen}));
652
653  auto schema = arrow::schema(
654      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
655
656  return ExecutePlanAndCollectAsTableWithCustomSink(plan, schema, sink_gen);
657}

table_sink#

table_sink 節點提供將輸出作為記憶體中表格接收的能力。這比串流執行引擎提供的其他 sink 節點更易於使用,但僅在輸出可以輕鬆放入記憶體時才有意義。該節點是使用 TableSinkNodeOptions 建立的。

使用 table_sink 的範例

749/// \brief An example showing a table sink node
750///
751/// TableSink Example
752/// This example shows how a table_sink can be used
753/// in an execution plan. This includes a source node
754/// receiving data as batches and the table sink node
755/// which emits the output as a table.
756arrow::Status TableSinkExample() {
757  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
758                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
759
760  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
761
762  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
763
764  ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
765                        ac::MakeExecNode("source", plan.get(), {}, source_node_options));
766
767  std::shared_ptr<arrow::Table> output_table;
768  auto table_sink_options = ac::TableSinkNodeOptions{&output_table};
769
770  ARROW_RETURN_NOT_OK(
771      ac::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
772  // validate the ExecPlan
773  ARROW_RETURN_NOT_OK(plan->Validate());
774  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
775  // start the ExecPlan
776  plan->StartProducing();
777
778  // Wait for the plan to finish
779  auto finished = plan->finished();
780  RETURN_NOT_OK(finished.status());
781  std::cout << "Results : " << output_table->ToString() << std::endl;
782  return arrow::Status::OK();
783}

scan#

scan 是一種用於載入和處理資料集的操作。當您的輸入是資料集時,應優先於更通用的 source 節點。行為是使用 arrow::dataset::ScanNodeOptions 定義的。有關資料集和各種掃描選項的更多資訊,請參閱 表格資料集

此節點能夠將下推篩選器套用於檔案讀取器,從而減少需要讀取的資料量。這表示您可以將相同的篩選表達式提供給 scan 節點,您也可以將其提供給 FilterNode,因為篩選是在兩個不同的位置完成的。

掃描範例

271/// \brief An example demonstrating a scan and sink node
272///
273/// Scan-Table
274/// This example shows how scan operation can be applied on a dataset.
275/// There are operations that can be applied on the scan (project, filter)
276/// and the input data can be processed. The output is obtained as a table
277arrow::Status ScanSinkExample() {
278  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
279
280  auto options = std::make_shared<arrow::dataset::ScanOptions>();
281  options->projection = cp::project({}, {});  // create empty projection
282
283  // construct the scan node
284  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
285
286  ac::Declaration scan{"scan", std::move(scan_node_options)};
287
288  return ExecutePlanAndCollectAsTable(std::move(scan));
289}

write#

write 節點以 Parquet、Feather、CSV 等格式,將查詢結果儲存為檔案資料集,此功能使用 Arrow 中的 表格資料集 功能。arrow::dataset::WriteNodeOptions 提供寫入選項,而 arrow::dataset::FileSystemDatasetWriteOptions 則包含在其中。arrow::dataset::FileSystemDatasetWriteOptions 可控制寫入的資料集,包含輸出目錄、檔案命名方案等選項。

寫入範例

663/// \brief An example showing a write node
664/// \param file_path The destination to write to
665///
666/// Scan-Filter-Write
667/// This example shows how scan node can be used to load the data
668/// and after processing how it can be written to disk.
669arrow::Status ScanFilterWriteExample(const std::string& file_path) {
670  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
671
672  auto options = std::make_shared<arrow::dataset::ScanOptions>();
673  // empty projection
674  options->projection = cp::project({}, {});
675
676  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
677
678  ac::Declaration scan{"scan", std::move(scan_node_options)};
679
680  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
681
682  std::string root_path = "";
683  std::string uri = "file://" + file_path;
684  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
685                        arrow::fs::FileSystemFromUri(uri, &root_path));
686
687  auto base_path = root_path + "/parquet_dataset";
688  // Uncomment the following line, if run repeatedly
689  // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
690  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
691
692  // The partition schema determines which fields are part of the partitioning.
693  auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
694  // We'll use Hive-style partitioning,
695  // which creates directories with "key=value" pairs.
696
697  auto partitioning =
698      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
699  // We'll write Parquet files.
700  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
701
702  arrow::dataset::FileSystemDatasetWriteOptions write_options;
703  write_options.file_write_options = format->DefaultWriteOptions();
704  write_options.filesystem = filesystem;
705  write_options.base_dir = base_path;
706  write_options.partitioning = partitioning;
707  write_options.basename_template = "part{i}.parquet";
708
709  arrow::dataset::WriteNodeOptions write_node_options{write_options};
710
711  ac::Declaration write{"write", {std::move(scan)}, std::move(write_node_options)};
712
713  // Since the write node has no output we simply run the plan to completion and the
714  // data should be written
715  ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));
716
717  std::cout << "Dataset written to " << base_path << std::endl;
718  return arrow::Status::OK();
719}

union#

union 將多個具有相同結構描述的資料流合併為一個,類似於 SQL 的 UNION ALL 子句。

以下範例示範如何使用兩個資料來源達成此目的。

Union 範例

725/// \brief An example showing a union node
726///
727/// Source-Union-Table
728/// This example shows how a union operation can be applied on two
729/// data sources. The output is collected into a table.
730arrow::Status SourceUnionSinkExample() {
731  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
732
733  ac::Declaration lhs{"source",
734                      ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
735  lhs.label = "lhs";
736  ac::Declaration rhs{"source",
737                      ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
738  rhs.label = "rhs";
739  ac::Declaration union_plan{
740      "union", {std::move(lhs), std::move(rhs)}, ac::ExecNodeOptions{}};
741
742  return ExecutePlanAndCollectAsTable(std::move(union_plan));
743}

hash_join#

hash_join 運算提供關係代數運算,使用基於雜湊的演算法進行 join。HashJoinNodeOptions 包含定義 join 時所需的選項。hash_join 支援 left/right/full semi/anti/outerjoins。此外,也可以透過 join 選項設定 join 鍵 (即要 join 的欄位) 和後綴 (例如 "_x" 這樣的後綴詞,可以附加為左右關係中重複欄位名稱的後綴)。深入了解 hash-join

Hash-Join 範例

604/// \brief An example showing a hash join node
605///
606/// Source-HashJoin-Table
607/// This example shows how source node gets the data and how a self-join
608/// is applied on the data. The join options are configurable. The output
609/// is collected into a table.
610arrow::Status SourceHashJoinSinkExample() {
611  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
612
613  ac::Declaration left{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
614  ac::Declaration right{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
615
616  ac::HashJoinNodeOptions join_opts{
617      ac::JoinType::INNER,
618      /*left_keys=*/{"str"},
619      /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
620
621  ac::Declaration hashjoin{
622      "hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};
623
624  return ExecutePlanAndCollectAsTable(std::move(hashjoin));
625}

摘要#

這些節點的範例可以在 Arrow 原始碼的 cpp/examples/arrow/execution_plan_documentation_examples.cc 中找到。

完整範例

 19#include <arrow/array.h>
 20#include <arrow/builder.h>
 21
 22#include <arrow/acero/exec_plan.h>
 23#include <arrow/compute/api.h>
 24#include <arrow/compute/api_vector.h>
 25#include <arrow/compute/cast.h>
 26
 27#include <arrow/csv/api.h>
 28
 29#include <arrow/dataset/dataset.h>
 30#include <arrow/dataset/file_base.h>
 31#include <arrow/dataset/file_parquet.h>
 32#include <arrow/dataset/plan.h>
 33#include <arrow/dataset/scanner.h>
 34
 35#include <arrow/io/interfaces.h>
 36#include <arrow/io/memory.h>
 37
 38#include <arrow/result.h>
 39#include <arrow/status.h>
 40#include <arrow/table.h>
 41
 42#include <arrow/ipc/api.h>
 43
 44#include <arrow/util/future.h>
 45#include <arrow/util/range.h>
 46#include <arrow/util/thread_pool.h>
 47#include <arrow/util/vector.h>
 48
 49#include <iostream>
 50#include <memory>
 51#include <utility>
 52
 53// Demonstrate various operators in Arrow Streaming Execution Engine
 54
 55namespace cp = ::arrow::compute;
 56namespace ac = ::arrow::acero;
 57
 58constexpr char kSep[] = "******";
 59
 60void PrintBlock(const std::string& msg) {
 61  std::cout << "\n\t" << kSep << " " << msg << " " << kSep << "\n" << std::endl;
 62}
 63
 64template <typename TYPE,
 65          typename = typename std::enable_if<arrow::is_number_type<TYPE>::value |
 66                                             arrow::is_boolean_type<TYPE>::value |
 67                                             arrow::is_temporal_type<TYPE>::value>::type>
 68arrow::Result<std::shared_ptr<arrow::Array>> GetArrayDataSample(
 69    const std::vector<typename TYPE::c_type>& values) {
 70  using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
 71  ArrowBuilderType builder;
 72  ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
 73  ARROW_RETURN_NOT_OK(builder.AppendValues(values));
 74  return builder.Finish();
 75}
 76
 77template <class TYPE>
 78arrow::Result<std::shared_ptr<arrow::Array>> GetBinaryArrayDataSample(
 79    const std::vector<std::string>& values) {
 80  using ArrowBuilderType = typename arrow::TypeTraits<TYPE>::BuilderType;
 81  ArrowBuilderType builder;
 82  ARROW_RETURN_NOT_OK(builder.Reserve(values.size()));
 83  ARROW_RETURN_NOT_OK(builder.AppendValues(values));
 84  return builder.Finish();
 85}
 86
 87arrow::Result<std::shared_ptr<arrow::RecordBatch>> GetSampleRecordBatch(
 88    const arrow::ArrayVector array_vector, const arrow::FieldVector& field_vector) {
 89  std::shared_ptr<arrow::RecordBatch> record_batch;
 90  ARROW_ASSIGN_OR_RAISE(auto struct_result,
 91                        arrow::StructArray::Make(array_vector, field_vector));
 92  return record_batch->FromStructArray(struct_result);
 93}
 94
 95/// \brief Create a sample table
 96/// The table's contents will be:
 97/// a,b
 98/// 1,null
 99/// 2,true
100/// null,true
101/// 3,false
102/// null,true
103/// 4,false
104/// 5,null
105/// 6,false
106/// 7,false
107/// 8,true
108/// \return The created table
109
110arrow::Result<std::shared_ptr<arrow::Table>> GetTable() {
111  auto null_long = std::numeric_limits<int64_t>::quiet_NaN();
112  ARROW_ASSIGN_OR_RAISE(auto int64_array,
113                        GetArrayDataSample<arrow::Int64Type>(
114                            {1, 2, null_long, 3, null_long, 4, 5, 6, 7, 8}));
115
116  arrow::BooleanBuilder boolean_builder;
117  std::shared_ptr<arrow::BooleanArray> bool_array;
118
119  std::vector<uint8_t> bool_values = {false, true,  true,  false, true,
120                                      false, false, false, false, true};
121  std::vector<bool> is_valid = {false, true,  true, true, true,
122                                true,  false, true, true, true};
123
124  ARROW_RETURN_NOT_OK(boolean_builder.Reserve(10));
125
126  ARROW_RETURN_NOT_OK(boolean_builder.AppendValues(bool_values, is_valid));
127
128  ARROW_RETURN_NOT_OK(boolean_builder.Finish(&bool_array));
129
130  auto record_batch =
131      arrow::RecordBatch::Make(arrow::schema({arrow::field("a", arrow::int64()),
132                                              arrow::field("b", arrow::boolean())}),
133                               10, {int64_array, bool_array});
134  ARROW_ASSIGN_OR_RAISE(auto table, arrow::Table::FromRecordBatches({record_batch}));
135  return table;
136}
137
138/// \brief Create a sample dataset
139/// \return An in-memory dataset based on GetTable()
140arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> GetDataset() {
141  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
142  auto ds = std::make_shared<arrow::dataset::InMemoryDataset>(table);
143  return ds;
144}
145
146arrow::Result<cp::ExecBatch> GetExecBatchFromVectors(
147    const arrow::FieldVector& field_vector, const arrow::ArrayVector& array_vector) {
148  std::shared_ptr<arrow::RecordBatch> record_batch;
149  ARROW_ASSIGN_OR_RAISE(auto res_batch, GetSampleRecordBatch(array_vector, field_vector));
150  cp::ExecBatch batch{*res_batch};
151  return batch;
152}
153
154// (Doc section: BatchesWithSchema Definition)
155struct BatchesWithSchema {
156  std::vector<cp::ExecBatch> batches;
157  std::shared_ptr<arrow::Schema> schema;
158  // This method uses internal arrow utilities to
159  // convert a vector of record batches to an AsyncGenerator of optional batches
160  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen() const {
161    auto opt_batches = ::arrow::internal::MapVector(
162        [](cp::ExecBatch batch) { return std::make_optional(std::move(batch)); },
163        batches);
164    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> gen;
165    gen = arrow::MakeVectorGenerator(std::move(opt_batches));
166    return gen;
167  }
168};
169// (Doc section: BatchesWithSchema Definition)
170
171// (Doc section: MakeBasicBatches Definition)
172arrow::Result<BatchesWithSchema> MakeBasicBatches() {
173  BatchesWithSchema out;
174  auto field_vector = {arrow::field("a", arrow::int32()),
175                       arrow::field("b", arrow::boolean())};
176  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({0, 4}));
177  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({5, 6, 7}));
178  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({8, 9, 10}));
179
180  ARROW_ASSIGN_OR_RAISE(auto b1_bool,
181                        GetArrayDataSample<arrow::BooleanType>({false, true}));
182  ARROW_ASSIGN_OR_RAISE(auto b2_bool,
183                        GetArrayDataSample<arrow::BooleanType>({true, false, true}));
184  ARROW_ASSIGN_OR_RAISE(auto b3_bool,
185                        GetArrayDataSample<arrow::BooleanType>({false, true, false}));
186
187  ARROW_ASSIGN_OR_RAISE(auto b1,
188                        GetExecBatchFromVectors(field_vector, {b1_int, b1_bool}));
189  ARROW_ASSIGN_OR_RAISE(auto b2,
190                        GetExecBatchFromVectors(field_vector, {b2_int, b2_bool}));
191  ARROW_ASSIGN_OR_RAISE(auto b3,
192                        GetExecBatchFromVectors(field_vector, {b3_int, b3_bool}));
193
194  out.batches = {b1, b2, b3};
195  out.schema = arrow::schema(field_vector);
196  return out;
197}
198// (Doc section: MakeBasicBatches Definition)
199
200arrow::Result<BatchesWithSchema> MakeSortTestBasicBatches() {
201  BatchesWithSchema out;
202  auto field = arrow::field("a", arrow::int32());
203  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({1, 3, 0, 2}));
204  ARROW_ASSIGN_OR_RAISE(auto b2_int,
205                        GetArrayDataSample<arrow::Int32Type>({121, 101, 120, 12}));
206  ARROW_ASSIGN_OR_RAISE(auto b3_int,
207                        GetArrayDataSample<arrow::Int32Type>({10, 110, 210, 121}));
208  ARROW_ASSIGN_OR_RAISE(auto b4_int,
209                        GetArrayDataSample<arrow::Int32Type>({51, 101, 2, 34}));
210  ARROW_ASSIGN_OR_RAISE(auto b5_int,
211                        GetArrayDataSample<arrow::Int32Type>({11, 31, 1, 12}));
212  ARROW_ASSIGN_OR_RAISE(auto b6_int,
213                        GetArrayDataSample<arrow::Int32Type>({12, 101, 120, 12}));
214  ARROW_ASSIGN_OR_RAISE(auto b7_int,
215                        GetArrayDataSample<arrow::Int32Type>({0, 110, 210, 11}));
216  ARROW_ASSIGN_OR_RAISE(auto b8_int,
217                        GetArrayDataSample<arrow::Int32Type>({51, 10, 2, 3}));
218
219  ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors({field}, {b1_int}));
220  ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors({field}, {b2_int}));
221  ARROW_ASSIGN_OR_RAISE(auto b3,
222                        GetExecBatchFromVectors({field, field}, {b3_int, b8_int}));
223  ARROW_ASSIGN_OR_RAISE(auto b4,
224                        GetExecBatchFromVectors({field, field, field, field},
225                                                {b4_int, b5_int, b6_int, b7_int}));
226  out.batches = {b1, b2, b3, b4};
227  out.schema = arrow::schema({field});
228  return out;
229}
230
231arrow::Result<BatchesWithSchema> MakeGroupableBatches(int multiplicity = 1) {
232  BatchesWithSchema out;
233  auto fields = {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())};
234  ARROW_ASSIGN_OR_RAISE(auto b1_int, GetArrayDataSample<arrow::Int32Type>({12, 7, 3}));
235  ARROW_ASSIGN_OR_RAISE(auto b2_int, GetArrayDataSample<arrow::Int32Type>({-2, -1, 3}));
236  ARROW_ASSIGN_OR_RAISE(auto b3_int, GetArrayDataSample<arrow::Int32Type>({5, 3, -8}));
237  ARROW_ASSIGN_OR_RAISE(auto b1_str, GetBinaryArrayDataSample<arrow::StringType>(
238                                         {"alpha", "beta", "alpha"}));
239  ARROW_ASSIGN_OR_RAISE(auto b2_str, GetBinaryArrayDataSample<arrow::StringType>(
240                                         {"alpha", "gamma", "alpha"}));
241  ARROW_ASSIGN_OR_RAISE(auto b3_str, GetBinaryArrayDataSample<arrow::StringType>(
242                                         {"gamma", "beta", "alpha"}));
243  ARROW_ASSIGN_OR_RAISE(auto b1, GetExecBatchFromVectors(fields, {b1_int, b1_str}));
244  ARROW_ASSIGN_OR_RAISE(auto b2, GetExecBatchFromVectors(fields, {b2_int, b2_str}));
245  ARROW_ASSIGN_OR_RAISE(auto b3, GetExecBatchFromVectors(fields, {b3_int, b3_str}));
246  out.batches = {b1, b2, b3};
247
248  size_t batch_count = out.batches.size();
249  for (int repeat = 1; repeat < multiplicity; ++repeat) {
250    for (size_t i = 0; i < batch_count; ++i) {
251      out.batches.push_back(out.batches[i]);
252    }
253  }
254
255  out.schema = arrow::schema(fields);
256  return out;
257}
258
259arrow::Status ExecutePlanAndCollectAsTable(ac::Declaration plan) {
260  // collect sink_reader into a Table
261  std::shared_ptr<arrow::Table> response_table;
262  ARROW_ASSIGN_OR_RAISE(response_table, ac::DeclarationToTable(std::move(plan)));
263
264  std::cout << "Results : " << response_table->ToString() << std::endl;
265
266  return arrow::Status::OK();
267}
268
269// (Doc section: Scan Example)
270
271/// \brief An example demonstrating a scan and sink node
272///
273/// Scan-Table
274/// This example shows how scan operation can be applied on a dataset.
275/// There are operations that can be applied on the scan (project, filter)
276/// and the input data can be processed. The output is obtained as a table
277arrow::Status ScanSinkExample() {
278  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
279
280  auto options = std::make_shared<arrow::dataset::ScanOptions>();
281  options->projection = cp::project({}, {});  // create empty projection
282
283  // construct the scan node
284  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
285
286  ac::Declaration scan{"scan", std::move(scan_node_options)};
287
288  return ExecutePlanAndCollectAsTable(std::move(scan));
289}
290// (Doc section: Scan Example)
291
292// (Doc section: Source Example)
293
294/// \brief An example demonstrating a source and sink node
295///
296/// Source-Table Example
297/// This example shows how a custom source can be used
298/// in an execution plan. This includes source node using pregenerated
299/// data and collecting it into a table.
300///
301/// This sort of custom source is often not needed.  In most cases you can
302/// use a scan (for a dataset source) or a source like table_source, array_vector_source,
303/// exec_batch_source, or record_batch_source (for in-memory data)
304arrow::Status SourceSinkExample() {
305  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
306
307  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
308
309  ac::Declaration source{"source", std::move(source_node_options)};
310
311  return ExecutePlanAndCollectAsTable(std::move(source));
312}
313// (Doc section: Source Example)
314
315// (Doc section: Table Source Example)
316
317/// \brief An example showing a table source node
318///
319/// TableSource-Table Example
320/// This example shows how a table_source can be used
321/// in an execution plan. This includes a table source node
322/// receiving data from a table.  This plan simply collects the
323/// data back into a table but nodes could be added that modify
324/// or transform the data as well (as is shown in later examples)
325arrow::Status TableSourceSinkExample() {
326  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
327
328  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
329  int max_batch_size = 2;
330  auto table_source_options = ac::TableSourceNodeOptions{table, max_batch_size};
331
332  ac::Declaration source{"table_source", std::move(table_source_options)};
333
334  return ExecutePlanAndCollectAsTable(std::move(source));
335}
336// (Doc section: Table Source Example)
337
338// (Doc section: Filter Example)
339
340/// \brief An example showing a filter node
341///
342/// Source-Filter-Table
343/// This example shows how a filter can be used in an execution plan,
344/// to filter data from a source. The output from the execution plan
345/// is collected into a table.
346arrow::Status ScanFilterSinkExample() {
347  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
348
349  auto options = std::make_shared<arrow::dataset::ScanOptions>();
350  // specify the filter.  This filter removes all rows where the
351  // value of the "a" column is greater than 3.
352  cp::Expression filter_expr = cp::greater(cp::field_ref("a"), cp::literal(3));
353  // set filter for scanner : on-disk / push-down filtering.
354  // This step can be skipped if you are not reading from disk.
355  options->filter = filter_expr;
356  // empty projection
357  options->projection = cp::project({}, {});
358
359  // construct the scan node
360  std::cout << "Initialized Scanning Options" << std::endl;
361
362  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
363  std::cout << "Scan node options created" << std::endl;
364
365  ac::Declaration scan{"scan", std::move(scan_node_options)};
366
367  // pipe the scan node into the filter node
368  // Need to set the filter in scan node options and filter node options.
369  // At scan node it is used for on-disk / push-down filtering.
370  // At filter node it is used for in-memory filtering.
371  ac::Declaration filter{
372      "filter", {std::move(scan)}, ac::FilterNodeOptions(std::move(filter_expr))};
373
374  return ExecutePlanAndCollectAsTable(std::move(filter));
375}
376
377// (Doc section: Filter Example)
378
379// (Doc section: Project Example)
380
381/// \brief An example showing a project node
382///
383/// Scan-Project-Table
384/// This example shows how a Scan operation can be used to load the data
385/// into the execution plan, how a project operation can be applied on the
386/// data stream and how the output is collected into a table
387arrow::Status ScanProjectSinkExample() {
388  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
389
390  auto options = std::make_shared<arrow::dataset::ScanOptions>();
391  // projection
392  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
393  options->projection = cp::project({}, {});
394
395  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
396
397  ac::Declaration scan{"scan", std::move(scan_node_options)};
398  ac::Declaration project{
399      "project", {std::move(scan)}, ac::ProjectNodeOptions({a_times_2})};
400
401  return ExecutePlanAndCollectAsTable(std::move(project));
402}
403
404// (Doc section: Project Example)
405
406// This is a variation of ScanProjectSinkExample introducing how to use the
407// Declaration::Sequence function
408arrow::Status ScanProjectSequenceSinkExample() {
409  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
410
411  auto options = std::make_shared<arrow::dataset::ScanOptions>();
412  // projection
413  cp::Expression a_times_2 = cp::call("multiply", {cp::field_ref("a"), cp::literal(2)});
414  options->projection = cp::project({}, {});
415
416  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
417
418  // (Doc section: Project Sequence Example)
419  // Inputs do not have to be passed to the project node when using Sequence
420  ac::Declaration plan =
421      ac::Declaration::Sequence({{"scan", std::move(scan_node_options)},
422                                 {"project", ac::ProjectNodeOptions({a_times_2})}});
423  // (Doc section: Project Sequence Example)
424
425  return ExecutePlanAndCollectAsTable(std::move(plan));
426}
427
428// (Doc section: Scalar Aggregate Example)
429
430/// \brief An example showing an aggregation node to aggregate an entire table
431///
432/// Source-Aggregation-Table
433/// This example shows how an aggregation operation can be applied on a
434/// execution plan resulting in a scalar output. The source node loads the
435/// data and the aggregation (counting unique types in column 'a')
436/// is applied on this data. The output is collected into a table (that will
437/// have exactly one row)
438arrow::Status SourceScalarAggregateSinkExample() {
439  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
440
441  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
442
443  ac::Declaration source{"source", std::move(source_node_options)};
444  auto aggregate_options =
445      ac::AggregateNodeOptions{/*aggregates=*/{{"sum", nullptr, "a", "sum(a)"}}};
446  ac::Declaration aggregate{
447      "aggregate", {std::move(source)}, std::move(aggregate_options)};
448
449  return ExecutePlanAndCollectAsTable(std::move(aggregate));
450}
451// (Doc section: Scalar Aggregate Example)
452
453// (Doc section: Group Aggregate Example)
454
455/// \brief An example showing an aggregation node to perform a group-by operation
456///
457/// Source-Aggregation-Table
458/// This example shows how an aggregation operation can be applied on a
459/// execution plan resulting in grouped output. The source node loads the
460/// data and the aggregation (counting unique types in column 'a') is
461/// applied on this data. The output is collected into a table that will contain
462/// one row for each unique combination of group keys.
463arrow::Status SourceGroupAggregateSinkExample() {
464  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
465
466  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
467
468  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
469
470  ac::Declaration source{"source", std::move(source_node_options)};
471  auto options = std::make_shared<cp::CountOptions>(cp::CountOptions::ONLY_VALID);
472  auto aggregate_options =
473      ac::AggregateNodeOptions{/*aggregates=*/{{"hash_count", options, "a", "count(a)"}},
474                               /*keys=*/{"b"}};
475  ac::Declaration aggregate{
476      "aggregate", {std::move(source)}, std::move(aggregate_options)};
477
478  return ExecutePlanAndCollectAsTable(std::move(aggregate));
479}
480// (Doc section: Group Aggregate Example)
481
482// (Doc section: ConsumingSink Example)
483
484/// \brief An example showing a consuming sink node
485///
486/// Source-Consuming-Sink
487/// This example shows how the data can be consumed within the execution plan
488/// by using a ConsumingSink node. There is no data output from this execution plan.
489arrow::Status SourceConsumingSinkExample() {
490  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
491
492  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
493
494  ac::Declaration source{"source", std::move(source_node_options)};
495
496  std::atomic<uint32_t> batches_seen{0};
497  arrow::Future<> finish = arrow::Future<>::Make();
498  struct CustomSinkNodeConsumer : public ac::SinkNodeConsumer {
499    CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
500        : batches_seen(batches_seen), finish(std::move(finish)) {}
501
502    arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema,
503                       ac::BackpressureControl* backpressure_control,
504                       ac::ExecPlan* plan) override {
505      // This will be called as the plan is started (before the first call to Consume)
506      // and provides the schema of the data coming into the node, controls for pausing /
507      // resuming input, and a pointer to the plan itself which can be used to access
508      // other utilities such as the thread indexer or async task scheduler.
509      return arrow::Status::OK();
510    }
511
512    arrow::Status Consume(cp::ExecBatch batch) override {
513      (*batches_seen)++;
514      return arrow::Status::OK();
515    }
516
517    arrow::Future<> Finish() override {
518      // Here you can perform whatever (possibly async) cleanup is needed, e.g. closing
519      // output file handles and flushing remaining work
520      return arrow::Future<>::MakeFinished();
521    }
522
523    std::atomic<uint32_t>* batches_seen;
524    arrow::Future<> finish;
525  };
526  std::shared_ptr<CustomSinkNodeConsumer> consumer =
527      std::make_shared<CustomSinkNodeConsumer>(&batches_seen, finish);
528
529  ac::Declaration consuming_sink{"consuming_sink",
530                                 {std::move(source)},
531                                 ac::ConsumingSinkNodeOptions(std::move(consumer))};
532
533  // Since we are consuming the data within the plan there is no output and we simply
534  // run the plan to completion instead of collecting into a table.
535  ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(consuming_sink)));
536
537  std::cout << "The consuming sink node saw " << batches_seen.load() << " batches"
538            << std::endl;
539  return arrow::Status::OK();
540}
541// (Doc section: ConsumingSink Example)
542
543// (Doc section: OrderBySink Example)
544
545arrow::Status ExecutePlanAndCollectAsTableWithCustomSink(
546    std::shared_ptr<ac::ExecPlan> plan, std::shared_ptr<arrow::Schema> schema,
547    arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen) {
548  // translate sink_gen (async) to sink_reader (sync)
549  std::shared_ptr<arrow::RecordBatchReader> sink_reader =
550      ac::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
551
552  // validate the ExecPlan
553  ARROW_RETURN_NOT_OK(plan->Validate());
554  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
555  // start the ExecPlan
556  plan->StartProducing();
557
558  // collect sink_reader into a Table
559  std::shared_ptr<arrow::Table> response_table;
560
561  ARROW_ASSIGN_OR_RAISE(response_table,
562                        arrow::Table::FromRecordBatchReader(sink_reader.get()));
563
564  std::cout << "Results : " << response_table->ToString() << std::endl;
565
566  // stop producing
567  plan->StopProducing();
568  // plan mark finished
569  auto future = plan->finished();
570  return future.status();
571}
572
573/// \brief An example showing an order-by node
574///
575/// Source-OrderBy-Sink
576/// In this example, the data enters through the source node
577/// and the data is ordered in the sink node. The order can be
578/// ASCENDING or DESCENDING and it is configurable. The output
579/// is obtained as a table from the sink node.
580arrow::Status SourceOrderBySinkExample() {
581  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
582                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
583
584  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeSortTestBasicBatches());
585
586  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
587
588  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
589  ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
590                        ac::MakeExecNode("source", plan.get(), {}, source_node_options));
591
592  ARROW_RETURN_NOT_OK(ac::MakeExecNode(
593      "order_by_sink", plan.get(), {source},
594      ac::OrderBySinkNodeOptions{
595          cp::SortOptions{{cp::SortKey{"a", cp::SortOrder::Descending}}}, &sink_gen}));
596
597  return ExecutePlanAndCollectAsTableWithCustomSink(plan, basic_data.schema, sink_gen);
598}
599
600// (Doc section: OrderBySink Example)
601
602// (Doc section: HashJoin Example)
603
604/// \brief An example showing a hash join node
605///
606/// Source-HashJoin-Table
607/// This example shows how source node gets the data and how a self-join
608/// is applied on the data. The join options are configurable. The output
609/// is collected into a table.
610arrow::Status SourceHashJoinSinkExample() {
611  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
612
613  ac::Declaration left{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
614  ac::Declaration right{"source", ac::SourceNodeOptions{input.schema, input.gen()}};
615
616  ac::HashJoinNodeOptions join_opts{
617      ac::JoinType::INNER,
618      /*left_keys=*/{"str"},
619      /*right_keys=*/{"str"}, cp::literal(true), "l_", "r_"};
620
621  ac::Declaration hashjoin{
622      "hashjoin", {std::move(left), std::move(right)}, std::move(join_opts)};
623
624  return ExecutePlanAndCollectAsTable(std::move(hashjoin));
625}
626
627// (Doc section: HashJoin Example)
628
629// (Doc section: KSelect Example)
630
631/// \brief An example showing a select-k node
632///
633/// Source-KSelect
634/// This example shows how K number of elements can be selected
635/// either from the top or bottom. The output node is a modified
636/// sink node where output can be obtained as a table.
637arrow::Status SourceKSelectExample() {
638  ARROW_ASSIGN_OR_RAISE(auto input, MakeGroupableBatches());
639  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
640                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
641  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
642
643  ARROW_ASSIGN_OR_RAISE(
644      ac::ExecNode * source,
645      ac::MakeExecNode("source", plan.get(), {},
646                       ac::SourceNodeOptions{input.schema, input.gen()}));
647
648  cp::SelectKOptions options = cp::SelectKOptions::TopKDefault(/*k=*/2, {"i32"});
649
650  ARROW_RETURN_NOT_OK(ac::MakeExecNode("select_k_sink", plan.get(), {source},
651                                       ac::SelectKSinkNodeOptions{options, &sink_gen}));
652
653  auto schema = arrow::schema(
654      {arrow::field("i32", arrow::int32()), arrow::field("str", arrow::utf8())});
655
656  return ExecutePlanAndCollectAsTableWithCustomSink(plan, schema, sink_gen);
657}
658
659// (Doc section: KSelect Example)
660
661// (Doc section: Write Example)
662
663/// \brief An example showing a write node
664/// \param file_path The destination to write to
665///
666/// Scan-Filter-Write
667/// This example shows how scan node can be used to load the data
668/// and after processing how it can be written to disk.
669arrow::Status ScanFilterWriteExample(const std::string& file_path) {
670  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::dataset::Dataset> dataset, GetDataset());
671
672  auto options = std::make_shared<arrow::dataset::ScanOptions>();
673  // empty projection
674  options->projection = cp::project({}, {});
675
676  auto scan_node_options = arrow::dataset::ScanNodeOptions{dataset, options};
677
678  ac::Declaration scan{"scan", std::move(scan_node_options)};
679
680  arrow::AsyncGenerator<std::optional<cp::ExecBatch>> sink_gen;
681
682  std::string root_path = "";
683  std::string uri = "file://" + file_path;
684  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::fs::FileSystem> filesystem,
685                        arrow::fs::FileSystemFromUri(uri, &root_path));
686
687  auto base_path = root_path + "/parquet_dataset";
688  // Uncomment the following line, if run repeatedly
689  // ARROW_RETURN_NOT_OK(filesystem->DeleteDirContents(base_path));
690  ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
691
692  // The partition schema determines which fields are part of the partitioning.
693  auto partition_schema = arrow::schema({arrow::field("a", arrow::int32())});
694  // We'll use Hive-style partitioning,
695  // which creates directories with "key=value" pairs.
696
697  auto partitioning =
698      std::make_shared<arrow::dataset::HivePartitioning>(partition_schema);
699  // We'll write Parquet files.
700  auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
701
702  arrow::dataset::FileSystemDatasetWriteOptions write_options;
703  write_options.file_write_options = format->DefaultWriteOptions();
704  write_options.filesystem = filesystem;
705  write_options.base_dir = base_path;
706  write_options.partitioning = partitioning;
707  write_options.basename_template = "part{i}.parquet";
708
709  arrow::dataset::WriteNodeOptions write_node_options{write_options};
710
711  ac::Declaration write{"write", {std::move(scan)}, std::move(write_node_options)};
712
713  // Since the write node has no output we simply run the plan to completion and the
714  // data should be written
715  ARROW_RETURN_NOT_OK(ac::DeclarationToStatus(std::move(write)));
716
717  std::cout << "Dataset written to " << base_path << std::endl;
718  return arrow::Status::OK();
719}
720
721// (Doc section: Write Example)
722
723// (Doc section: Union Example)
724
725/// \brief An example showing a union node
726///
727/// Source-Union-Table
728/// This example shows how a union operation can be applied on two
729/// data sources. The output is collected into a table.
730arrow::Status SourceUnionSinkExample() {
731  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
732
733  ac::Declaration lhs{"source",
734                      ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
735  lhs.label = "lhs";
736  ac::Declaration rhs{"source",
737                      ac::SourceNodeOptions{basic_data.schema, basic_data.gen()}};
738  rhs.label = "rhs";
739  ac::Declaration union_plan{
740      "union", {std::move(lhs), std::move(rhs)}, ac::ExecNodeOptions{}};
741
742  return ExecutePlanAndCollectAsTable(std::move(union_plan));
743}
744
745// (Doc section: Union Example)
746
747// (Doc section: Table Sink Example)
748
749/// \brief An example showing a table sink node
750///
751/// TableSink Example
752/// This example shows how a table_sink can be used
753/// in an execution plan. This includes a source node
754/// receiving data as batches and the table sink node
755/// which emits the output as a table.
756arrow::Status TableSinkExample() {
757  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ac::ExecPlan> plan,
758                        ac::ExecPlan::Make(*cp::threaded_exec_context()));
759
760  ARROW_ASSIGN_OR_RAISE(auto basic_data, MakeBasicBatches());
761
762  auto source_node_options = ac::SourceNodeOptions{basic_data.schema, basic_data.gen()};
763
764  ARROW_ASSIGN_OR_RAISE(ac::ExecNode * source,
765                        ac::MakeExecNode("source", plan.get(), {}, source_node_options));
766
767  std::shared_ptr<arrow::Table> output_table;
768  auto table_sink_options = ac::TableSinkNodeOptions{&output_table};
769
770  ARROW_RETURN_NOT_OK(
771      ac::MakeExecNode("table_sink", plan.get(), {source}, table_sink_options));
772  // validate the ExecPlan
773  ARROW_RETURN_NOT_OK(plan->Validate());
774  std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
775  // start the ExecPlan
776  plan->StartProducing();
777
778  // Wait for the plan to finish
779  auto finished = plan->finished();
780  RETURN_NOT_OK(finished.status());
781  std::cout << "Results : " << output_table->ToString() << std::endl;
782  return arrow::Status::OK();
783}
784
785// (Doc section: Table Sink Example)
786
787// (Doc section: RecordBatchReaderSource Example)
788
789/// \brief An example showing the usage of a RecordBatchReader as the data source.
790///
791/// RecordBatchReaderSourceSink Example
792/// This example shows how a record_batch_reader_source can be used
793/// in an execution plan. This includes the source node
794/// receiving data from a TableRecordBatchReader.
795
796arrow::Status RecordBatchReaderSourceSinkExample() {
797  ARROW_ASSIGN_OR_RAISE(auto table, GetTable());
798  std::shared_ptr<arrow::RecordBatchReader> reader =
799      std::make_shared<arrow::TableBatchReader>(table);
800  ac::Declaration reader_source{"record_batch_reader_source",
801                                ac::RecordBatchReaderSourceNodeOptions{reader}};
802  return ExecutePlanAndCollectAsTable(std::move(reader_source));
803}
804
805// (Doc section: RecordBatchReaderSource Example)
806
807enum ExampleMode {
808  SOURCE_SINK = 0,
809  TABLE_SOURCE_SINK = 1,
810  SCAN = 2,
811  FILTER = 3,
812  PROJECT = 4,
813  SCALAR_AGGREGATION = 5,
814  GROUP_AGGREGATION = 6,
815  CONSUMING_SINK = 7,
816  ORDER_BY_SINK = 8,
817  HASHJOIN = 9,
818  KSELECT = 10,
819  WRITE = 11,
820  UNION = 12,
821  TABLE_SOURCE_TABLE_SINK = 13,
822  RECORD_BATCH_READER_SOURCE = 14,
823  PROJECT_SEQUENCE = 15
824};
825
826int main(int argc, char** argv) {
827  if (argc < 3) {
828    // Fake success for CI purposes.
829    return EXIT_SUCCESS;
830  }
831
832  std::string base_save_path = argv[1];
833  int mode = std::atoi(argv[2]);
834  arrow::Status status;
835  // ensure arrow::dataset node factories are in the registry
836  arrow::dataset::internal::Initialize();
837  switch (mode) {
838    case SOURCE_SINK:
839      PrintBlock("Source Sink Example");
840      status = SourceSinkExample();
841      break;
842    case TABLE_SOURCE_SINK:
843      PrintBlock("Table Source Sink Example");
844      status = TableSourceSinkExample();
845      break;
846    case SCAN:
847      PrintBlock("Scan Example");
848      status = ScanSinkExample();
849      break;
850    case FILTER:
851      PrintBlock("Filter Example");
852      status = ScanFilterSinkExample();
853      break;
854    case PROJECT:
855      PrintBlock("Project Example");
856      status = ScanProjectSinkExample();
857      break;
858    case PROJECT_SEQUENCE:
859      PrintBlock("Project Example (using Declaration::Sequence)");
860      status = ScanProjectSequenceSinkExample();
861      break;
862    case GROUP_AGGREGATION:
863      PrintBlock("Aggregate Example");
864      status = SourceGroupAggregateSinkExample();
865      break;
866    case SCALAR_AGGREGATION:
867      PrintBlock("Aggregate Example");
868      status = SourceScalarAggregateSinkExample();
869      break;
870    case CONSUMING_SINK:
871      PrintBlock("Consuming-Sink Example");
872      status = SourceConsumingSinkExample();
873      break;
874    case ORDER_BY_SINK:
875      PrintBlock("OrderBy Example");
876      status = SourceOrderBySinkExample();
877      break;
878    case HASHJOIN:
879      PrintBlock("HashJoin Example");
880      status = SourceHashJoinSinkExample();
881      break;
882    case KSELECT:
883      PrintBlock("KSelect Example");
884      status = SourceKSelectExample();
885      break;
886    case WRITE:
887      PrintBlock("Write Example");
888      status = ScanFilterWriteExample(base_save_path);
889      break;
890    case UNION:
891      PrintBlock("Union Example");
892      status = SourceUnionSinkExample();
893      break;
894    case TABLE_SOURCE_TABLE_SINK:
895      PrintBlock("TableSink Example");
896      status = TableSinkExample();
897      break;
898    case RECORD_BATCH_READER_SOURCE:
899      PrintBlock("RecordBatchReaderSource Example");
900      status = RecordBatchReaderSourceSinkExample();
901      break;
902    default:
903      break;
904  }
905
906  if (status.ok()) {
907    return EXIT_SUCCESS;
908  } else {
909    std::cout << "Error occurred: " << status.message() << std::endl;
910    return EXIT_FAILURE;
911  }
912}