DataFusion:適用於 Apache Arrow 的 Rust 原生查詢引擎


已發布 2019年2月4日
作者 Andy Grove (agrove)

我們很高興宣布 DataFusion 已捐贈給 Apache Arrow 專案。DataFusion 是 Apache Arrow 的 Rust 實作版本的記憶體內查詢引擎。

雖然 DataFusion 在兩年前開始啟動,但最近已重新實作為 Arrow 原生,目前功能有限,但支援對 RecordBatch 迭代器的 SQL 查詢,並支援 CSV 檔案。計畫增加對 Parquet 檔案的支援。

SQL 支援僅限於投影 (SELECT)、選擇 (WHERE) 和簡單的聚合函數 (MINMAXSUM),以及可選的 GROUP BY 子句。

支援的表達式包括識別符、字面值、簡單的數學運算 (+-*/)、二元表達式 (ANDOR)、相等和比較運算符 (=!=<<=>=>) 以及 CAST(expr AS type)

範例

以下範例示範了針對 CSV 檔案執行簡單的聚合 SQL 查詢。

// create execution context
let mut ctx = ExecutionContext::new();

// define schema for data source (csv file)
let schema = Arc::new(Schema::new(vec![
    Field::new("c1", DataType::Utf8, false),
    Field::new("c2", DataType::UInt32, false),
    Field::new("c3", DataType::Int8, false),
    Field::new("c4", DataType::Int16, false),
    Field::new("c5", DataType::Int32, false),
    Field::new("c6", DataType::Int64, false),
    Field::new("c7", DataType::UInt8, false),
    Field::new("c8", DataType::UInt16, false),
    Field::new("c9", DataType::UInt32, false),
    Field::new("c10", DataType::UInt64, false),
    Field::new("c11", DataType::Float32, false),
    Field::new("c12", DataType::Float64, false),
    Field::new("c13", DataType::Utf8, false),
]));

// register csv file with the execution context
let csv_datasource =
    CsvDataSource::new("test/data/aggregate_test_100.csv", schema.clone(), 1024);
ctx.register_datasource("aggregate_test_100", Rc::new(RefCell::new(csv_datasource)));

let sql = "SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 WHERE c11 > 0.1 AND c11 < 0.9 GROUP BY c1";

// execute the query
let relation = ctx.sql(&sql).unwrap();
let mut results = relation.borrow_mut();

// iterate over the results
while let Some(batch) = results.next().unwrap() {
    println!(
        "RecordBatch has {} rows and {} columns",
        batch.num_rows(),
        batch.num_columns()
    );

    let c1 = batch
        .column(0)
        .as_any()
        .downcast_ref::<BinaryArray>()
        .unwrap();

    let min = batch
        .column(1)
        .as_any()
        .downcast_ref::<Float64Array>()
        .unwrap();

    let max = batch
        .column(2)
        .as_any()
        .downcast_ref::<Float64Array>()
        .unwrap();

    for i in 0..batch.num_rows() {
        let c1_value: String = String::from_utf8(c1.value(i).to_vec()).unwrap();
        println!("{}, Min: {}, Max: {}", c1_value, min.value(i), max.value(i),);
    }
}

Roadmap

DataFusion 的 Roadmap 將取決於 Rust 社群的興趣,但以下是一些計畫中的短期項目

  • 擴展現有功能的測試覆蓋率
  • 增加對 Parquet 資料來源的支援
  • 實作更多 SQL 功能,例如 JOINORDER BYLIMIT
  • 實作 DataFrame API 作為 SQL 的替代方案
  • 增加對使用 Rust 的 async 和 await 功能進行分割和並行查詢執行的支援
  • 建立 Docker 映像檔,以便輕鬆地將 DataFusion 用作互動式和批次查詢的獨立查詢工具

歡迎貢獻者!

如果您對能夠使用 Rust 進行資料科學感到興奮,並希望為這項工作做出貢獻,那麼有很多種參與方式。最簡單的入門方法是針對您自己的資料來源試用 DataFusion,並針對您發現的任何問題提交錯誤報告。您也可以查看目前的 issue 清單,並嘗試修復其中一個。您也可以加入 user mailing list 提出問題。