使用 Apache Arrow 加速 R 和 Apache Spark
已發布 2019 年 1 月 25 日
作者 Javier Luraschi
Javier Luraschi 是 RStudio 的軟體工程師
在 Apache Spark 中使用 Apache Arrow 支援 R 的功能目前正在 sparklyr 和 SparkR 專案中積極開發。這篇文章探討了在使用 R 與 Apache Spark、Arrow 和 sparklyr
時,早期但有前景的效能改進。
設定
由於這項工作仍在積極開發中,請從 GitHub 安裝 sparklyr
和 arrow
,如下所示
devtools::install_github("apache/arrow", subdir = "r", ref = "apache-arrow-0.12.0")
devtools::install_github("rstudio/sparklyr", ref = "apache-arrow-0.12.0")
在這個基準測試中,我們將使用 dplyr,但預計在使用 DBI 或 sparklyr
中的 Spark DataFrames 時也能獲得類似的改進。具有 1 千萬個數值列的本機 Spark 連線和資料框架初始化如下
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", config = list("sparklyr.shell.driver-memory" = "6g"))
data <- data.frame(y = runif(10^7, 0, 1))
複製
目前,使用 sparklyr
將資料複製到 Spark 是透過將資料從 R 持續儲存到磁碟,然後從 Spark 讀回資料來完成的。這原本是用於小型資料集,因為有更好的工具可以將資料傳輸到分散式儲存系統中。然而,許多使用者已要求支援以更快的速度將更多資料傳輸到 Spark 中。
使用 arrow
和 sparklyr
,我們可以將資料直接從 R 傳輸到 Spark,而無需在 R 中序列化這些資料或持續儲存到磁碟中。
以下範例使用 sparklyr
在啟用和未啟用 arrow
的情況下,將 1 千萬列從 R 複製到 Spark,使用 arrow
時效能提升了將近 16 倍。
此基準測試使用 microbenchmark R 套件,該套件會多次執行程式碼,提供總執行時間的統計資料,並繪製每次執行時間以了解每次迭代的分佈情況。
microbenchmark::microbenchmark(
setup = library(arrow),
arrow_on = {
sparklyr_df <<- copy_to(sc, data, overwrite = T)
count(sparklyr_df) %>% collect()
},
arrow_off = {
if ("arrow" %in% .packages()) detach("package:arrow")
sparklyr_df <<- copy_to(sc, data, overwrite = T)
count(sparklyr_df) %>% collect()
},
times = 10
) %T>% print() %>% ggplot2::autoplot()
Unit: seconds
expr min lq mean median uq max neval
arrow_on 3.011515 4.250025 7.257739 7.273011 8.974331 14.23325 10
arrow_off 50.051947 68.523081 119.946947 71.898908 138.743419 390.44028 10

收集
同樣地,arrow
和 sparklyr
現在可以避免在從 Spark 收集資料到 R 時在 R 中還原序列化資料。這些改進不如複製資料那麼顯著,因為 sparklyr
已經以欄狀格式收集資料。
以下基準測試從 Spark 收集 1 千萬列到 R,並顯示 arrow
可以帶來 3 倍的效能提升。
microbenchmark::microbenchmark(
setup = library(arrow),
arrow_on = {
collect(sparklyr_df)
},
arrow_off = {
if ("arrow" %in% .packages()) detach("package:arrow")
collect(sparklyr_df)
},
times = 10
) %T>% print() %>% ggplot2::autoplot()
Unit: seconds
expr min lq mean median uq max neval
arrow_on 4.520593 5.609812 6.154509 5.928099 6.217447 9.432221 10
arrow_off 7.882841 13.358113 16.670708 16.127704 21.051382 24.373331 10

轉換
如今,使用 R 函數自訂轉換資料在 sparklyr
中執行,方法是將資料以列格式從 Spark 透過 socket 連線移動到 R 程序中,以列格式傳輸資料效率低下,因為每個列都需要還原序列化多種資料類型,然後資料會轉換為欄狀格式(R 最初設計為使用欄狀資料),一旦 R 完成此計算,資料會再次轉換為列格式,逐列序列化,然後透過 socket 連線傳送回 Spark。
透過在 sparklyr
中新增對 arrow
的支援,它使 Spark 能夠在 Spark 中並行執行列格式到欄格式的轉換。然後資料會透過 socket 傳輸,但不會發生自訂序列化。R 程序需要做的就是將資料從 socket 複製到其堆積中,轉換它,然後將其複製回 socket 連線。
以下範例在啟用和未啟用 arrow
的情況下轉換 10 萬列,arrow
使使用 R 函數的轉換速度提高了將近 41 倍。
microbenchmark::microbenchmark(
setup = library(arrow),
arrow_on = {
sample_n(sparklyr_df, 10^5) %>% spark_apply(~ .x / 2) %>% count()
},
arrow_off = {
if ("arrow" %in% .packages()) detach("package:arrow")
sample_n(sparklyr_df, 10^5) %>% spark_apply(~ .x / 2) %>% count()
},
times = 10
) %T>% print() %>% ggplot2::autoplot()
Unit: seconds
expr min lq mean median uq max neval
arrow_on 3.881293 4.038376 5.136604 4.772739 5.759082 7.873711 10
arrow_off 178.605733 183.654887 213.296238 227.182018 233.601885 238.877341 10

其他基準測試和微調參數可以在 sparklyr
/rstudio/sparklyr/pull/1611 和 SparkR
/apache/spark/pull/22954 下找到。期待將此功能帶給 Spark、Arrow 和 R 社群。