使用 Apache Arrow 加速 PySpark
已發布 26 Jul 2017
作者 BryanCutler
Bryan Cutler 是 IBM Spark 技術中心的軟體工程師 STC
從 Apache Spark 2.3 版本開始,Apache Arrow 將會是支援的相依性套件,並開始提供使用柱狀資料傳輸時更高的效能。如果您是偏好使用 Python 和 Pandas 的 Spark 使用者,這絕對是令人興奮的消息!初步的工作僅限於使用 toPandas()
收集 Spark DataFrame,我將在下方討論,然而目前還有許多額外的改進正在進行中。
最佳化 Spark 轉換為 Pandas
先前在 PySpark 中使用 DataFrame.toPandas()
將 Spark DataFrame 轉換為 Pandas 的方式非常沒有效率。基本上,它的運作方式是先將所有列收集到 Spark driver。接著,每列都會序列化為 Python 的 pickle 格式,並傳送到 Python worker process。這個子程序會將每列解 pickle 成一個巨大的元組列表。最後,使用 pandas.DataFrame.from_records()
從列表建立 Pandas DataFrame。
這一切可能看起來像是標準程序,但有 2 個明顯的問題:1) 即使使用 CPickle,Python 序列化也是一個緩慢的過程,以及 2) 使用 from_records
建立 pandas.DataFrame
必須緩慢地迭代純 Python 資料列表,並將每個值轉換為 Pandas 格式。請參閱此處以取得詳細的分析。
這正是 Arrow 真正發光發熱以協助最佳化這些步驟的地方:1) 一旦資料採用 Arrow 記憶體格式,就不再需要序列化/pickle,因為 Arrow 資料可以直接傳送到 Python process,2) 當 Python 收到 Arrow 資料時,pyarrow 可以使用零複製方法,從整個資料區塊一次建立 pandas.DataFrame
,而不是處理個別的純量值。此外,轉換為 Arrow 資料可以在 JVM 上完成,並推送回 Spark executors 以平行執行,大幅降低 driver 的負載。
隨著 SPARK-13534 的合併,在呼叫 toPandas()
時使用 Arrow 需要透過將 SQLConf “spark.sql.execution.arrow.enabled” 設定為 “true” 來啟用。讓我們來看一個簡單的使用範例。
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.0-SNAPSHOT
/_/
Using Python version 2.7.13 (default, Dec 20 2016 23:09:15)
SparkSession available as 'spark'.
In [1]: from pyspark.sql.functions import rand
...: df = spark.range(1 << 22).toDF("id").withColumn("x", rand())
...: df.printSchema()
...:
root
|-- id: long (nullable = false)
|-- x: double (nullable = false)
In [2]: %time pdf = df.toPandas()
CPU times: user 17.4 s, sys: 792 ms, total: 18.1 s
Wall time: 20.7 s
In [3]: spark.conf.set("spark.sql.execution.arrow.enabled", "true")
In [4]: %time pdf = df.toPandas()
CPU times: user 40 ms, sys: 32 ms, total: 72 ms
Wall time: 737 ms
In [5]: pdf.describe()
Out[5]:
id x
count 4.194304e+06 4.194304e+06
mean 2.097152e+06 4.998996e-01
std 1.210791e+06 2.887247e-01
min 0.000000e+00 8.291929e-07
25% 1.048576e+06 2.498116e-01
50% 2.097152e+06 4.999210e-01
75% 3.145727e+06 7.498380e-01
max 4.194303e+06 9.999996e-01
這個範例是在我的筆記型電腦上使用 Spark 預設值在本機執行的,因此顯示的時間不應被視為精確值。儘管如此,很明顯效能有大幅提升,而使用 Arrow 將原本極其緩慢的操作加速到幾乎難以察覺。
使用注意事項
在使用這項新功能之前,請記住以下幾點。在撰寫本文時,pyarrow 不會與 pyspark 一起自動安裝,需要手動安裝,請參閱安裝說明。計畫將 pyarrow 新增為 pyspark 的相依性套件,以便 > pip install pyspark
也會安裝 pyarrow。
目前,控制 SQLConf 預設為停用。可以透過程式化方式啟用,如上述範例所示,或將行 “spark.sql.execution.arrow.enabled=true” 新增至 SPARK_HOME/conf/spark-defaults.conf
。
此外,並非所有 Spark 資料類型目前都受到支援,且僅限於基本類型。擴展的類型支援正在開發中,預計也會包含在 Spark 2.3 版本中。
未來改進
如前所述,這只是使用 Arrow 讓 Spark Python 使用者更輕鬆的第一步。目前正在進行一些令人興奮的計畫,包括允許向量化 UDF 評估 (SPARK-21190, SPARK-21404),以及能夠使用 Pandas DataFrame 在分組資料上套用函數 (SPARK-20396)。正如 Arrow 在將 Spark 轉換為 Pandas 時有所助益一樣,在從現有的 Pandas DataFrame 建立 Spark DataFrame 時,它也可以朝另一個方向運作 (SPARK-20791)。敬請期待更多資訊!
協作者
達成這個首個里程碑是 Apache Arrow 和 Spark 社群共同努力的成果。感謝 Wes McKinney、Li Jin、Holden Karau、Reynold Xin、Wenchen Fan、Shane Knapp 以及許多其他協助推動這項工作的人員的辛勤付出。