使用 Apache Arrow 加速 PySpark


已發布 26 Jul 2017
作者 BryanCutler

Bryan Cutler 是 IBM Spark 技術中心的軟體工程師 STC

Apache Spark 2.3 版本開始,Apache Arrow 將會是支援的相依性套件,並開始提供使用柱狀資料傳輸時更高的效能。如果您是偏好使用 Python 和 Pandas 的 Spark 使用者,這絕對是令人興奮的消息!初步的工作僅限於使用 toPandas() 收集 Spark DataFrame,我將在下方討論,然而目前還有許多額外的改進正在進行中。

最佳化 Spark 轉換為 Pandas

先前在 PySpark 中使用 DataFrame.toPandas() 將 Spark DataFrame 轉換為 Pandas 的方式非常沒有效率。基本上,它的運作方式是先將所有列收集到 Spark driver。接著,每列都會序列化為 Python 的 pickle 格式,並傳送到 Python worker process。這個子程序會將每列解 pickle 成一個巨大的元組列表。最後,使用 pandas.DataFrame.from_records() 從列表建立 Pandas DataFrame。

這一切可能看起來像是標準程序,但有 2 個明顯的問題:1) 即使使用 CPickle,Python 序列化也是一個緩慢的過程,以及 2) 使用 from_records 建立 pandas.DataFrame 必須緩慢地迭代純 Python 資料列表,並將每個值轉換為 Pandas 格式。請參閱此處以取得詳細的分析。

這正是 Arrow 真正發光發熱以協助最佳化這些步驟的地方:1) 一旦資料採用 Arrow 記憶體格式,就不再需要序列化/pickle,因為 Arrow 資料可以直接傳送到 Python process,2) 當 Python 收到 Arrow 資料時,pyarrow 可以使用零複製方法,從整個資料區塊一次建立 pandas.DataFrame,而不是處理個別的純量值。此外,轉換為 Arrow 資料可以在 JVM 上完成,並推送回 Spark executors 以平行執行,大幅降低 driver 的負載。

隨著 SPARK-13534 的合併,在呼叫 toPandas() 時使用 Arrow 需要透過將 SQLConf “spark.sql.execution.arrow.enabled” 設定為 “true” 來啟用。讓我們來看一個簡單的使用範例。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0-SNAPSHOT
      /_/

Using Python version 2.7.13 (default, Dec 20 2016 23:09:15)
SparkSession available as 'spark'.

In [1]: from pyspark.sql.functions import rand
   ...: df = spark.range(1 << 22).toDF("id").withColumn("x", rand())
   ...: df.printSchema()
   ...: 
root
 |-- id: long (nullable = false)
 |-- x: double (nullable = false)


In [2]: %time pdf = df.toPandas()
CPU times: user 17.4 s, sys: 792 ms, total: 18.1 s
Wall time: 20.7 s

In [3]: spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [4]: %time pdf = df.toPandas()
CPU times: user 40 ms, sys: 32 ms, total: 72 ms                                 
Wall time: 737 ms

In [5]: pdf.describe()
Out[5]: 
                 id             x
count  4.194304e+06  4.194304e+06
mean   2.097152e+06  4.998996e-01
std    1.210791e+06  2.887247e-01
min    0.000000e+00  8.291929e-07
25%    1.048576e+06  2.498116e-01
50%    2.097152e+06  4.999210e-01
75%    3.145727e+06  7.498380e-01
max    4.194303e+06  9.999996e-01

這個範例是在我的筆記型電腦上使用 Spark 預設值在本機執行的,因此顯示的時間不應被視為精確值。儘管如此,很明顯效能有大幅提升,而使用 Arrow 將原本極其緩慢的操作加速到幾乎難以察覺。

使用注意事項

在使用這項新功能之前,請記住以下幾點。在撰寫本文時,pyarrow 不會與 pyspark 一起自動安裝,需要手動安裝,請參閱安裝說明。計畫將 pyarrow 新增為 pyspark 的相依性套件,以便 > pip install pyspark 也會安裝 pyarrow。

目前,控制 SQLConf 預設為停用。可以透過程式化方式啟用,如上述範例所示,或將行 “spark.sql.execution.arrow.enabled=true” 新增至 SPARK_HOME/conf/spark-defaults.conf

此外,並非所有 Spark 資料類型目前都受到支援,且僅限於基本類型。擴展的類型支援正在開發中,預計也會包含在 Spark 2.3 版本中。

未來改進

如前所述,這只是使用 Arrow 讓 Spark Python 使用者更輕鬆的第一步。目前正在進行一些令人興奮的計畫,包括允許向量化 UDF 評估 (SPARK-21190, SPARK-21404),以及能夠使用 Pandas DataFrame 在分組資料上套用函數 (SPARK-20396)。正如 Arrow 在將 Spark 轉換為 Pandas 時有所助益一樣,在從現有的 Pandas DataFrame 建立 Spark DataFrame 時,它也可以朝另一個方向運作 (SPARK-20791)。敬請期待更多資訊!

協作者

達成這個首個里程碑是 Apache Arrow 和 Spark 社群共同努力的成果。感謝 Wes McKinneyLi JinHolden Karau、Reynold Xin、Wenchen Fan、Shane Knapp 以及許多其他協助推動這項工作的人員的辛勤付出。