DuckDB 呱 Arrow:Apache Arrow 與 DuckDB 之間的零複製資料整合


已發布 2021 年 12 月 03 日
作者 Pedro Holanda, Jonathan Keane

TLDR:DuckDB 和 Apache Arrow 之間的零複製整合,允許在 Python 和 R 中使用 SQL 或關聯式 API 快速分析大於記憶體的資料集。

這篇文章是與 DuckDB 部落格 合作並交叉發布的。

作為 Apache Arrow 的一部分,它是一種針對分析函式庫最佳化的記憶體內資料格式。與 Pandas 和 R Dataframe 類似,它使用欄狀資料模型。但 Arrow 專案包含的不僅僅是格式:Arrow C++ 函式庫,可透過綁定在 Python、R 和 Ruby 中存取,還具有額外功能,可讓您有效率地計算資料集。這些額外功能建立在上述記憶體內格式的實作之上。資料集可能跨越多個 Parquet、CSV 或其他格式的檔案,而檔案甚至可能位於遠端或雲端儲存空間,例如 HDFS 或 Amazon S3。Arrow C++ 查詢引擎支援查詢結果的串流,具有複雜資料類型(例如,列表、結構、地圖)的有效率實作,並且可以執行重要的掃描最佳化,例如投影和篩選下推。

DuckDB 是一個新的分析資料管理系統,旨在在其他程序中執行複雜的 SQL 查詢。DuckDB 具有 R 和 Python 等的綁定。DuckDB 可以直接查詢 Arrow 資料集,並將查詢結果串流回 Arrow。這種整合允許使用者使用 DuckDB 的 SQL 介面和 API 查詢 Arrow 資料,同時利用 DuckDB 的平行向量化執行引擎,而無需任何額外的資料複製。此外,這種整合充分利用了 Arrow 的謂詞和篩選下推,同時掃描資料集。

這種整合是獨一無二的,因為它使用 DuckDB 和 Arrow 之間以及反之亦然的零複製資料串流,因此您可以將兩者一起組合查詢。這產生了三個主要優點

  1. 大於記憶體的分析: 由於兩個函式庫都支援串流查詢結果,因此我們能夠在不必從磁碟完全載入資料的情況下執行。相反地,我們可以一次執行一個批次。這讓我們能夠對大於記憶體的資料執行查詢。
  2. 複雜資料類型: DuckDB 可以有效率地處理可以儲存在 Arrow 向量中的複雜資料類型,包括任意巢狀的結構、列表和地圖。
  3. 進階最佳化器: DuckDB 的最先進最佳化器可以直接將篩選器和投影下推到 Arrow 掃描中。因此,只會讀取相關的欄位和分割區,從而允許系統利用例如 Parquet 檔案中的分割區消除。這顯著加速了查詢執行。

對於那些只對基準測試感興趣的人,您可以跳到下面的基準測試章節

快速導覽

在深入探討整合細節之前,在本節中,我們提供了一個快速的動機範例,說明 DuckDB-Arrow 整合功能強大且易於使用。只需幾行程式碼,您就可以開始查詢 Arrow 資料集。假設您想分析惡名昭彰的 NYC 計程車資料集,並找出團體小費是否比單人乘客多或少。

R

Arrow 和 DuckDB 都支援 dplyr 管道,適合更習慣使用 dplyr 進行資料分析的人。Arrow 套件包含兩個輔助函數,讓我們可以在 Arrow 和 DuckDB 之間來回傳遞資料(to_duckdb()to_arrow())。這在 Arrow 或 DuckDB 其中一個支援,但另一個不支援的情況下特別有用。例如,如果您找到一個複雜的 dplyr 管道,其中 SQL 轉換不適用於 DuckDB,請在管道之前使用 to_arrow() 來使用 Arrow 引擎。或者,如果您有一個在 Arrow 中尚未實作的函數(例如,視窗聚合),請使用 to_duckdb() 來使用 DuckDB 引擎。所有這些都不需要為來回傳遞資料時(重新)序列化資料付出任何成本!

library(duckdb)
library(arrow)
library(dplyr)

# Open dataset using year,month folder partition
ds <- arrow::open_dataset("nyc-taxi", partitioning = c("year", "month"))

ds %>%
  # Look only at 2015 on, where the number of passenger is positive, the trip distance is
  # greater than a quarter mile, and where the fare amount is positive
  filter(year > 2014 & passenger_count > 0 & trip_distance > 0.25 & fare_amount > 0) %>%
  # Pass off to DuckDB
  to_duckdb() %>%
  group_by(passenger_count) %>%
  mutate(tip_pct = tip_amount / fare_amount) %>%
  summarise(
    fare_amount = mean(fare_amount, na.rm = TRUE),
    tip_amount = mean(tip_amount, na.rm = TRUE),
    tip_pct = mean(tip_pct, na.rm = TRUE)
  ) %>%
  arrange(passenger_count) %>%
  collect()

Python

Python 中的工作流程與 R 中一樣簡單。在這個範例中,我們使用 DuckDB 的關聯式 API。

import duckdb
import pyarrow as pa
import pyarrow.dataset as ds

# Open dataset using year,month folder partition
nyc = ds.dataset('nyc-taxi/', partitioning=["year", "month"])

# We transform the nyc dataset into a DuckDB relation
nyc = duckdb.arrow(nyc)

# Run same query again
nyc.filter("year > 2014 & passenger_count > 0 & trip_distance > 0.25 & fare_amount > 0")
    .aggregate("SELECT AVG(fare_amount), AVG(tip_amount), AVG(tip_amount / fare_amount) as tip_pct","passenger_count").arrow()

DuckDB 和 Arrow:基礎知識

在本節中,我們將查看一些在 Python 和 R 中讀取和輸出 Arrow 表格所需程式碼的基本範例。

設定

首先,我們需要安裝 DuckDB 和 Arrow。下面顯示了 Python 和 R 中兩個函式庫的安裝過程。

# Python Install
pip install duckdb
pip install pyarrow
# R Install
install.packages("duckdb")
install.packages("arrow")

為了執行本節中的範例,我們需要下載以下自訂 parquet 檔案

  • https://github.com/duckdb/duckdb-web/blob/master/_posts/data/integers.parquet?raw=true
  • https://github.com/cwida/duckdb-data/releases/download/v1.0/lineitemsf1.snappy.parquet

Python

在 Python 中,有兩種方法可以查詢 Arrow 中的資料

  1. 透過關聯式 API

     # Reads Parquet File to an Arrow Table
     arrow_table = pq.read_table('integers.parquet')
    
     # Transforms Arrow Table -> DuckDB Relation
     rel_from_arrow = duckdb.arrow(arrow_table)
    
     # we can run a SQL query on this and print the result
     print(rel_from_arrow.query('arrow_table', 'SELECT SUM(data) FROM arrow_table WHERE data > 50').fetchone())
    
     # Transforms DuckDB Relation -> Arrow Table
     arrow_table_from_duckdb = rel_from_arrow.arrow()
    
  2. 透過使用替換掃描並使用 SQL 直接查詢物件

     # Reads Parquet File to an Arrow Table
     arrow_table = pq.read_table('integers.parquet')
    
     # Gets Database Connection
     con = duckdb.connect()
    
     # we can run a SQL query on this and print the result
     print(con.execute('SELECT SUM(data) FROM arrow_table WHERE data > 50').fetchone())
    
     # Transforms Query Result from DuckDB to Arrow Table
     # We can directly read the arrow object through DuckDB's replacement scans.
     con.execute("SELECT * FROM arrow_table").fetch_arrow_table()
    

可以將 DuckDB 關係和查詢結果都轉換回 Arrow。

R

在 R 中,您可以透過將表格註冊為視圖來與 DuckDB 中的 Arrow 資料互動(另一種方法是使用 dplyr,如上所示)。

library(duckdb)
library(arrow)
library(dplyr)

# Reads Parquet File to an Arrow Table
arrow_table <- arrow::read_parquet("integers.parquet", as_data_frame = FALSE)

# Gets Database Connection
con <- dbConnect(duckdb::duckdb())

# Registers arrow table as a DuckDB view
arrow::to_duckdb(arrow_table, table_name = "arrow_table", con = con)

# we can run a SQL query on this and print the result
print(dbGetQuery(con, "SELECT SUM(data) FROM arrow_table WHERE data > 50"))

# Transforms Query Result from DuckDB to Arrow Table
result <- dbSendQuery(con, "SELECT * FROM arrow_table")

從 Arrow 串流資料/到 Arrow 串流資料

在上一節中,我們描述了如何與 Arrow 表格互動。但是,Arrow 也允許使用者以串流方式與資料互動。無論是使用它(例如,從 Arrow 資料集)還是產生它(例如,傳回 RecordBatchReader)。當然,DuckDB 能夠使用資料集並產生 RecordBatchReader。此範例使用 NYC 計程車資料集,該資料集儲存在按年份和月份分割的 Parquet 檔案中,我們可以透過 Arrow R 套件下載

arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi")

Python

# Reads dataset partitioning it in year/month folder
nyc_dataset = ds.dataset('nyc-taxi/', partitioning=["year", "month"])

# Gets Database Connection
con = duckdb.connect()

query = con.execute("SELECT * FROM nyc_dataset")
# DuckDB's queries can now produce a Record Batch Reader
record_batch_reader = query.fetch_record_batch()
# Which means we can stream the whole query per batch.
# This retrieves the first batch
chunk = record_batch_reader.read_next_batch()

R

# Reads dataset partitioning it in year/month folder
nyc_dataset = open_dataset("nyc-taxi/", partitioning = c("year", "month"))

# Gets Database Connection
con <- dbConnect(duckdb::duckdb())

# We can use the same function as before to register our arrow dataset
duckdb::duckdb_register_arrow(con, "nyc", nyc_dataset)

res <- dbSendQuery(con, "SELECT * FROM nyc", arrow = TRUE)
# DuckDB's queries can now produce a Record Batch Reader
record_batch_reader <- duckdb::duckdb_fetch_record_batch(res)

# Which means we can stream the whole query per batch.
# This retrieves the first batch
cur_batch <- record_batch_reader$read_next_batch()

前面的 R 程式碼以低階細節顯示了資料如何串流。我們在 Arrow 套件中提供了輔助程式碼 to_arrow(),它是圍繞此程式碼的包裝器,可輕鬆將此串流合併到 dplyr 管道中。1

基準測試比較

在這裡,我們在一個簡單的基準測試中展示了使用 DuckDB 查詢 Arrow 資料集和使用 Pandas 查詢 Arrow 資料集之間的效能差異。對於投影和篩選下推比較,我們都將使用 Arrow 表格。這是因為 Pandas 無法使用 Arrow 串流物件。

對於 NYC 計程車基準測試,我們使用了 scilens diamonds 配置,對於 TPC-H 基準測試,我們使用了 m1 MacBook Pro。在這兩種情況下,都使用了 DuckDB 中的平行處理(現在預設為開啟)。

在與 Pandas 的比較中,請注意 DuckDB 以平行方式執行,而 pandas 僅支援單執行緒執行。除此之外,應該注意的是,我們正在比較自動最佳化。DuckDB 的查詢最佳化器可以自動下推篩選器和投影。pandas 不支援這種自動最佳化,但使用者可以透過在 read_parquet() 呼叫中手動指定它們來手動執行其中一些謂詞和篩選下推。

投影下推

在這個範例中,我們對我們的 lineitem 表格的兩列執行簡單的聚合。

# DuckDB
lineitem = pq.read_table('lineitemsf1.snappy.parquet')
con = duckdb.connect()

# Transforms Query Result from DuckDB to Arrow Table
con.execute("""SELECT sum(l_extendedprice * l_discount) AS revenue
                FROM
                lineitem;""").fetch_arrow_table()

# Pandas
arrow_table = pq.read_table('lineitemsf1.snappy.parquet')

# Converts an Arrow table to a Dataframe
df = arrow_table.to_pandas()

# Runs aggregation
res =  pd.DataFrame({'sum': [(df.l_extendedprice * df.l_discount).sum()]})

# Creates an Arrow Table from a Dataframe
new_table = pa.Table.from_pandas(res)

名稱 時間 (秒)
DuckDB 0.19
Pandas 2.13

lineitem 表格由 16 列組成,但是,要執行此查詢,只需要兩列 l_extendedprice 和 * l_discount。由於 DuckDB 可以下推這些列的投影,因此它執行此查詢的速度比 Pandas 快一個數量級左右。

篩選下推

對於我們的篩選下推,我們重複使用上一節中使用的相同聚合,但在另外 4 個欄位上新增篩選器。

# DuckDB
lineitem = pq.read_table('lineitemsf1.snappy.parquet')

# Get database connection
con = duckdb.connect()

# Transforms Query Result from DuckDB to Arrow Table
con.execute("""SELECT sum(l_extendedprice * l_discount) AS revenue
        FROM
            lineitem
        WHERE
            l_shipdate >= CAST('1994-01-01' AS date)
            AND l_shipdate < CAST('1995-01-01' AS date)
            AND l_discount BETWEEN 0.05
            AND 0.07
            AND l_quantity < 24; """).fetch_arrow_table()

# Pandas
arrow_table = pq.read_table('lineitemsf1.snappy.parquet')

df = arrow_table.to_pandas()
filtered_df = lineitem[
        (lineitem.l_shipdate >= "1994-01-01") &
        (lineitem.l_shipdate < "1995-01-01") &
        (lineitem.l_discount >= 0.05) &
        (lineitem.l_discount <= 0.07) &
        (lineitem.l_quantity < 24)]

res =  pd.DataFrame({'sum': [(filtered_df.l_extendedprice * filtered_df.l_discount).sum()]})
new_table = pa.Table.from_pandas(res)
名稱 時間 (秒)
DuckDB 0.04
Pandas 2.29

現在 DuckDB 和 Pandas 之間的差異更加顯著,比 Pandas 快兩個數量級。同樣,由於篩選器和投影都下推到 Arrow,DuckDB 讀取的資料量比 Pandas 少,而 Pandas 無法自動執行此最佳化。

串流

如先前所示,DuckDB 能夠以串流方式使用和產生 Arrow 資料。在本節中,我們執行一個簡單的基準測試,以展示與完全物化和 Pandas 相比,在速度和記憶體使用方面的優勢。此範例使用完整的 NYC 計程車資料集,您可以下載

# DuckDB
# Open dataset using year,month folder partition
nyc = ds.dataset('nyc-taxi/', partitioning=["year", "month"])

# Get database connection
con = duckdb.connect()

# Run query that selects part of the data
query = con.execute("SELECT total_amount, passenger_count,year FROM nyc where total_amount > 100 and year > 2014")

# Create Record Batch Reader from Query Result.
# "fetch_record_batch()" also accepts an extra parameter related to the desired produced chunk size.
record_batch_reader = query.fetch_record_batch()

# Retrieve all batch chunks
chunk = record_batch_reader.read_next_batch()
while len(chunk) > 0:
    chunk = record_batch_reader.read_next_batch()
# Pandas
# We must exclude one of the columns of the NYC dataset due to an unimplemented cast in Arrow.
working_columns = ["vendor_id","pickup_at","dropoff_at","passenger_count","trip_distance","pickup_longitude",
    "pickup_latitude","store_and_fwd_flag","dropoff_longitude","dropoff_latitude","payment_type",
    "fare_amount","extra","mta_tax","tip_amount","tolls_amount","total_amount","year", "month"]

# Open dataset using year,month folder partition
nyc_dataset = ds.dataset(dir, partitioning=["year", "month"])
# Generate a scanner to skip problematic column
dataset_scanner = nyc_dataset.scanner(columns=working_columns)

# Materialize dataset to an Arrow Table
nyc_table = dataset_scanner.to_table()

# Generate Dataframe from Arow Table
nyc_df = nyc_table.to_pandas()

# Apply Filter
filtered_df = nyc_df[
    (nyc_df.total_amount > 100) &
    (nyc_df.year >2014)]

# Apply Projection
res = filtered_df[["total_amount", "passenger_count","year"]]

# Transform Result back to an Arrow Table
new_table = pa.Table.from_pandas(res)
名稱 時間 (秒) 峰值記憶體使用量 (GB)
DuckDB 0.05 0.3
Pandas 146.91 248

DuckDB 和 Pandas 之間的時間差異是我們在本文中探討的所有整合優勢的組合。在 DuckDB 中,應用篩選器下推來執行分割區消除(即,我們跳過讀取年份 <= 2014 的 Parquet 檔案)。篩選器下推也用於消除不相關的 row_groups(即,總金額始終 <= 100 的 row groups)。由於我們的投影下推,Arrow 只需要從 Parquet 檔案中讀取感興趣的欄位,這使其只需讀取 20 列中的 4 列。另一方面,Pandas 無法自動下推任何這些最佳化,這表示必須讀取完整的資料集。這導致查詢執行時間相差 4 個數量級。

在上表中,我們還描述了 DuckDB(串流)和 Pandas(完全物化)之間峰值記憶體使用量的比較。在 DuckDB 中,我們只需要將感興趣的 row-group 載入記憶體中。因此,我們的記憶體使用量很低。我們也具有恆定的記憶體使用量,因為我們一次只需要將其中一個 row group 保存在記憶體中。另一方面,Pandas 在執行查詢時必須完全物化所有 Parquet 檔案。因此,我們看到其記憶體消耗持續急劇增加。兩種解決方案的總記憶體消耗差異約為 3 個數量級。

結論與回饋

在這篇部落格文章中,我們主要展示了如何使用 DuckDB 對 Arrow 資料集執行查詢。還有其他函式庫也可以使用 Arrow 格式,但它們具有不同的用途和功能。與往常一樣,如果您希望在未來的文章中看到與不同工具的基準測試,我們很高興聽到您的意見!請隨時發送電子郵件給我們 email 或直接在 Hacker News 文章中分享您的想法。

最後但並非最不重要的一點是,如果您在使用我們的整合時遇到任何問題,請在 DuckDB 的 issue trackerArrow 的 issue tracker 中開啟 issue,具體取決於哪個函式庫有問題。

  1. 在 Arrow 6.0.0 中,to_arrow() 目前傳回完整的表格,但在我們即將發布的 7.0.0 版本中將允許完全串流。