使用 Apache Arrow 加速 R 和 Apache Spark


已發布 2019 年 1 月 25 日
作者 Javier Luraschi

Javier LuraschiRStudio 的軟體工程師

在 Apache Spark 中使用 Apache Arrow 支援 R 的功能目前正在 sparklyrSparkR 專案中積極開發。這篇文章探討了在使用 R 與 Apache Spark、Arrow 和 sparklyr 時,早期但有前景的效能改進。

設定

由於這項工作仍在積極開發中,請從 GitHub 安裝 sparklyrarrow,如下所示

devtools::install_github("apache/arrow", subdir = "r", ref = "apache-arrow-0.12.0")
devtools::install_github("rstudio/sparklyr", ref = "apache-arrow-0.12.0")

在這個基準測試中,我們將使用 dplyr,但預計在使用 DBIsparklyr 中的 Spark DataFrames 時也能獲得類似的改進。具有 1 千萬個數值列的本機 Spark 連線和資料框架初始化如下

library(sparklyr)
library(dplyr)

sc <- spark_connect(master = "local", config = list("sparklyr.shell.driver-memory" = "6g"))
data <- data.frame(y = runif(10^7, 0, 1))

複製

目前,使用 sparklyr 將資料複製到 Spark 是透過將資料從 R 持續儲存到磁碟,然後從 Spark 讀回資料來完成的。這原本是用於小型資料集,因為有更好的工具可以將資料傳輸到分散式儲存系統中。然而,許多使用者已要求支援以更快的速度將更多資料傳輸到 Spark 中。

使用 arrowsparklyr,我們可以將資料直接從 R 傳輸到 Spark,而無需在 R 中序列化這些資料或持續儲存到磁碟中。

以下範例使用 sparklyr 在啟用和未啟用 arrow 的情況下,將 1 千萬列從 R 複製到 Spark,使用 arrow 時效能提升了將近 16 倍。

此基準測試使用 microbenchmark R 套件,該套件會多次執行程式碼,提供總執行時間的統計資料,並繪製每次執行時間以了解每次迭代的分佈情況。

microbenchmark::microbenchmark(
  setup = library(arrow),
  arrow_on = {
    sparklyr_df <<- copy_to(sc, data, overwrite = T)
    count(sparklyr_df) %>% collect()
  },
  arrow_off = {
    if ("arrow" %in% .packages()) detach("package:arrow")
    sparklyr_df <<- copy_to(sc, data, overwrite = T)
    count(sparklyr_df) %>% collect()
  },
  times = 10
) %T>% print() %>% ggplot2::autoplot()
 Unit: seconds
      expr       min        lq       mean    median         uq       max neval
  arrow_on  3.011515  4.250025   7.257739  7.273011   8.974331  14.23325    10
 arrow_off 50.051947 68.523081 119.946947 71.898908 138.743419 390.44028    10
Copying data with R into Spark with and without Arrow

收集

同樣地,arrowsparklyr 現在可以避免在從 Spark 收集資料到 R 時在 R 中還原序列化資料。這些改進不如複製資料那麼顯著,因為 sparklyr 已經以欄狀格式收集資料。

以下基準測試從 Spark 收集 1 千萬列到 R,並顯示 arrow 可以帶來 3 倍的效能提升。

microbenchmark::microbenchmark(
  setup = library(arrow),
  arrow_on = {
    collect(sparklyr_df)
  },
  arrow_off = {
    if ("arrow" %in% .packages()) detach("package:arrow")
    collect(sparklyr_df)
  },
  times = 10
) %T>% print() %>% ggplot2::autoplot()
Unit: seconds
      expr      min        lq      mean    median        uq       max neval
  arrow_on 4.520593  5.609812  6.154509  5.928099  6.217447  9.432221    10
 arrow_off 7.882841 13.358113 16.670708 16.127704 21.051382 24.373331    10
Collecting data with R from Spark with and without Arrow

轉換

如今,使用 R 函數自訂轉換資料在 sparklyr 中執行,方法是將資料以列格式從 Spark 透過 socket 連線移動到 R 程序中,以列格式傳輸資料效率低下,因為每個列都需要還原序列化多種資料類型,然後資料會轉換為欄狀格式(R 最初設計為使用欄狀資料),一旦 R 完成此計算,資料會再次轉換為列格式,逐列序列化,然後透過 socket 連線傳送回 Spark。

透過在 sparklyr 中新增對 arrow 的支援,它使 Spark 能夠在 Spark 中並行執行列格式到欄格式的轉換。然後資料會透過 socket 傳輸,但不會發生自訂序列化。R 程序需要做的就是將資料從 socket 複製到其堆積中,轉換它,然後將其複製回 socket 連線。

以下範例在啟用和未啟用 arrow 的情況下轉換 10 萬列,arrow 使使用 R 函數的轉換速度提高了將近 41 倍。

microbenchmark::microbenchmark(
  setup = library(arrow),
  arrow_on = {
    sample_n(sparklyr_df, 10^5) %>% spark_apply(~ .x / 2) %>% count()
  },
  arrow_off = {
    if ("arrow" %in% .packages()) detach("package:arrow")
    sample_n(sparklyr_df, 10^5) %>% spark_apply(~ .x / 2) %>% count()
  },
  times = 10
) %T>% print() %>% ggplot2::autoplot()
Unit: seconds
      expr        min         lq       mean     median         uq        max neval
  arrow_on   3.881293   4.038376   5.136604   4.772739   5.759082   7.873711    10
 arrow_off 178.605733 183.654887 213.296238 227.182018 233.601885 238.877341    10
Transforming data with R in Spark with and without Arrow

其他基準測試和微調參數可以在 sparklyr /rstudio/sparklyr/pull/1611SparkR /apache/spark/pull/22954 下找到。期待將此功能帶給 Spark、Arrow 和 R 社群。