跳到內容

Apache Arrow 讓您有效率地處理單一和多檔案資料集,即使資料集大到無法載入記憶體也沒問題。藉由 Arrow Dataset 物件的協助,您可以使用熟悉的 dplyr 語法來分析這類資料。本文介紹 Dataset,並示範如何使用 dplyr 和 arrow 分析它們:我們先從確保兩個套件都已載入開始

library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

範例:NYC 計程車資料

Arrow 的 Datasets 物件的主要動機是讓使用者能夠分析極大的資料集。舉例來說,考慮到 紐約市計程車行程記錄資料,該資料廣泛用於大數據練習和競賽中。為了展示 Apache Arrow 的功能,我們在公開的 Amazon S3 儲存桶中託管了此資料的 Parquet 格式版本:完整形式的資料集是一個非常大的表格,約有 17 億列和 24 欄,其中每一列對應於 2009 年至 2022 年之間的單次計程車行程。資料字典 也適用於此版本的 NYC 計程車資料。

此多檔案資料集包含 158 個不同的 Parquet 檔案,每個檔案對應於一個月的資料。單個檔案通常約為 400-500MB 大小,而完整資料集約為 70GB 大小。這不是一個小資料集 – 下載速度很慢,並且不適合典型機器的記憶體 🙂 – 因此我們也託管了一個「微型」版本的 NYC 計程車資料,其格式完全相同,但僅包含原始資料集中每千筆條目中的一筆(即,單個檔案小於 1MB 大小,而「微型」資料集僅為 70MB)

如果您在 arrow 中啟用了 Amazon S3 支援(對於大多數使用者來說是這樣;如果您需要針對此問題進行疑難排解,請參閱本文末尾的連結),您可以使用此命令連線到儲存在 S3 上的「微型計程車資料」副本

bucket <- s3_bucket("voltrondata-labs-datasets/nyc-taxi-tiny")

或者,您可以使用以下命令連線到 Google Cloud Storage (GCS) 上的資料副本

bucket <- gs_bucket("voltrondata-labs-datasets/nyc-taxi-tiny", anonymous = TRUE)

如果您想使用完整資料集,請將程式碼中的 nyc-taxi-tiny 替換為 nyc-taxi。除了大小 – 以及隨之而來的時間、頻寬使用量和 CPU 週期成本 – 這兩個版本的資料沒有任何差異:您可以使用微型計程車資料測試您的程式碼,然後檢查它如何使用完整資料集進行擴展。

若要將儲存在 bucket 中的資料集本機副本複製到名為 "nyc-taxi" 的資料夾,請使用 copy_files() 函數

copy_files(from = bucket, to = "nyc-taxi")

就本文而言,我們假設 NYC 計程車資料集(完整資料或微型版本)已在本機下載,並且存在於 "nyc-taxi" 目錄中。

開啟 Datasets

此程序的第一步是建立一個 Dataset 物件,指向資料目錄

ds <- open_dataset("nyc-taxi")

務必注意,當我們執行此操作時,資料值不會載入記憶體。相反地,Arrow 會掃描資料目錄以尋找相關檔案,解析檔案路徑以尋找「Hive 樣式分割」(見下文),並讀取資料檔案的標頭以建構一個 Schema,其中包含描述資料結構的 metadata。有關 Schemas 的更多資訊,請參閱 metadata 文章

自然而然地會產生兩個問題:open_dataset() 尋找哪種檔案,以及它期望在檔案路徑中找到什麼結構?讓我們先看看檔案類型。

預設情況下,open_dataset() 尋找 Parquet 檔案,但您可以使用 format 參數覆寫此設定。例如,如果資料編碼為 CSV 檔案,我們可以設定 format = "csv" 以連線到資料。Arrow Dataset 介面支援多種檔案格式,包括

  • "parquet"(預設值)
  • "feather""ipc""arrow" 的別名;因為 Feather 版本 2 是 Arrow 檔案格式)
  • "csv"(逗號分隔檔案)和 "tsv"(Tab 分隔檔案)
  • "text"(通用文字分隔檔案 - 使用 delimiter 參數指定要使用的分隔符號)

對於文字檔案,您可以將以下解析選項傳遞給 open_dataset(),以確保檔案正確讀取

  • delim
  • quote
  • escape_double
  • escape_backslash
  • skip_empty_rows

使用文字檔案時的另一種選擇是使用 open_delim_dataset()open_csv_dataset()open_tsv_dataset()。這些函數是 open_dataset() 的包裝函式,但具有反映 read_csv_arrow()read_delim_arrow()read_tsv_arrow() 的參數,以便在開啟單個檔案的函數和開啟資料集的函數之間輕鬆切換。

例如

ds <- open_csv_dataset("nyc-taxi/csv/")

有關這些參數以及一般解析分隔文字檔案的更多資訊,請參閱 read_delim_arrow()open_delim_dataset() 的說明文件。

接下來,open_dataset() 期望在檔案路徑中找到什麼資訊?預設情況下,Dataset 介面尋找 Hive 樣式分割結構,其中資料夾使用「key=value」慣例命名,並且資料夾中的資料檔案包含金鑰具有相關值的資料子集。例如,在 NYC 計程車資料中,檔案路徑如下所示

year=2009/month=1/part-0.parquet
year=2009/month=2/part-0.parquet
...

由此,open_dataset() 推斷第一個列出的 Parquet 檔案包含 2009 年 1 月的資料。從這個意義上來說,Hive 樣式分割是自我描述的:資料夾名稱明確說明了 Dataset 如何跨檔案分割。

有時目錄分割不是自我描述的;也就是說,它不包含欄位名稱。例如,假設 NYC 計程車資料使用如下檔案路徑

2009/01/part-0.parquet
2009/02/part-0.parquet
...

在這種情況下,open_dataset() 將需要一些關於如何使用檔案路徑的提示。在這種情況下,您可以將 c("year", "month") 提供給 partitioning 參數,說明第一個路徑段給出 year 的值,第二個段是 month2009/01/part-0.parquet 中的每一列都具有 year 的值 2009 和 month 的值 1,即使這些欄可能不存在於檔案中。換句話說,我們會像這樣開啟資料

ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))

無論哪種方式,當您查看 Dataset 時,您可以看到除了每個檔案中存在的欄之外,還有 yearmonth 欄。這些欄不存在於檔案本身中:它們是從分割結構推斷出來的。

ds
## 
## FileSystemDataset with 158 Parquet files
## vendor_name: string
## pickup_datetime: timestamp[ms]
## dropoff_datetime: timestamp[ms]
## passenger_count: int64
## trip_distance: double
## pickup_longitude: double
## pickup_latitude: double
## rate_code: string
## store_and_fwd: string
## dropoff_longitude: double
## dropoff_latitude: double
## payment_type: string
## fare_amount: double
## extra: double
## mta_tax: double
## tip_amount: double
## tolls_amount: double
## total_amount: double
## improvement_surcharge: double
## congestion_surcharge: double
## pickup_location_id: int64
## dropoff_location_id: int64
## year: int32
## month: int32

查詢 Datasets

現在我們有一個 Dataset 物件指向我們的資料,我們可以建構 dplyr 樣式的查詢。這是可能的,因為 arrow 提供了一個後端,允許使用者使用 dplyr 動詞操作表格化的 Arrow 資料。這是一個範例:假設您對最長計程車行程中的小費行為感到好奇。讓我們找出 2015 年票價超過 100 美元的行程的中位數小費百分比,並按乘客數量細分

system.time(ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = 100 * tip_amount / total_amount) %>%
  group_by(passenger_count) %>%
  summarise(
    median_tip_pct = median(tip_pct),
    n = n()
  ) %>%
  collect() %>%
  print())
## 
## # A tibble: 10 x 3
##    passenger_count median_tip_pct      n
##              <int>          <dbl>  <int>
##  1               1           16.6 143087
##  2               2           16.2  34418
##  3               5           16.7   5806
##  4               4           11.4   4771
##  5               6           16.7   3338
##  6               3           14.6   8922
##  7               0           10.1    380
##  8               8           16.7     32
##  9               9           16.7     42
## 10               7           16.7     11
## 
##    user  system elapsed
##   4.436   1.012   1.402

您剛剛從包含約 20 億列的 Dataset 中選取了一個子集,計算了一個新欄,並對其進行了彙總。所有這些都在現代筆記型電腦上幾秒鐘內完成。這是如何運作的?

Arrow 能夠如此快速地完成此任務有三個原因

首先,arrow 採用延遲評估方法來處理查詢:當在 Dataset 上呼叫 dplyr 動詞時,它們會記錄其動作,但在您執行 collect() 之前,不會在資料上評估這些動作。我們可以透過採用與之前相同的程式碼並省略最後一步來看到這一點

ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = 100 * tip_amount / total_amount) %>%
  group_by(passenger_count) %>%
  summarise(
    median_tip_pct = median(tip_pct),
    n = n()
  )
## 
## FileSystemDataset (query)
## passenger_count: int64
## median_tip_pct: double
## n: int32
## 
## See $.data for the source Arrow object

此版本的程式碼會立即傳回輸出,並顯示您所做的操作,而無需從檔案載入資料。由於這些查詢的評估是延遲的,因此您可以建立一個查詢,選取到一個小的子集,而無需產生可能很大的中間資料集。

其次,所有工作都下推到個別資料檔案,並根據檔案格式,下推到檔案內的資料區塊。因此,您可以透過從每個檔案收集較小的切片,從更大的資料集中選取資料子集:您不必將整個資料集載入記憶體即可從中切片。

第三,由於分割,您可以完全忽略某些檔案。在本範例中,透過篩選 year == 2015,立即排除所有對應於其他年份的檔案:您不必載入它們即可找到沒有列符合篩選條件。對於 Parquet 檔案 – 其中包含具有群組內資料統計資訊的行組 – 可能會有整個資料區塊您可以避免掃描,因為它們沒有 total_amount > 100 的列。

關於查詢 Datasets 的最後一件事要注意。假設您嘗試在 Arrow Dataset 上呼叫不支援的 dplyr 動詞或未實作的函數。在這種情況下,arrow 套件會引發錯誤。但是,對於 Arrow Table 物件(已在記憶體中)上的 dplyr 查詢,套件會在處理該 dplyr 動詞之前自動呼叫 collect()。若要瞭解有關 dplyr 後端的更多資訊,請參閱 資料整理文章

批次處理(實驗性)

有時您想在整個 Dataset 上執行 R 程式碼,但該 Dataset 遠大於記憶體。您可以使用 Dataset 查詢上的 map_batches 來逐批處理它。

注意map_batches 是實驗性的,不建議用於生產環境。

作為範例,若要隨機採樣 Dataset,請使用 map_batches 從每個批次中採樣一定百分比的列

sampled_data <- ds %>%
  filter(year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  map_batches(~ as_record_batch(sample_frac(as.data.frame(.), 1e-4))) %>%
  mutate(tip_pct = tip_amount / total_amount) %>%
  collect()

str(sampled_data)
## 
## tibble [10,918 <U+00D7> 4] (S3: tbl_df/tbl/data.frame)
##  $ tip_amount     : num [1:10918] 3 0 4 1 1 6 0 1.35 0 5.9 ...
##  $ total_amount   : num [1:10918] 18.8 13.3 20.3 15.8 13.3 ...
##  $ passenger_count: int [1:10918] 3 2 1 1 1 1 1 1 1 3 ...
##  $ tip_pct        : num [1:10918] 0.1596 0 0.197 0.0633 0.0752 ...

此函數也可用於透過計算每個批次的部分結果,然後彙總這些部分結果,來彙總 Dataset 上的摘要統計資訊。擴展上面的範例,您可以將模型擬合到樣本資料,然後使用 map_batches 來計算完整 Dataset 上的 MSE。

model <- lm(tip_pct ~ total_amount + passenger_count, data = sampled_data)

ds %>%
  filter(year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = tip_amount / total_amount) %>%
  map_batches(function(batch) {
    batch %>%
      as.data.frame() %>%
      mutate(pred_tip_pct = predict(model, newdata = .)) %>%
      filter(!is.nan(tip_pct)) %>%
      summarize(sse_partial = sum((pred_tip_pct - tip_pct)^2), n_partial = n()) %>%
      as_record_batch()
  }) %>%
  summarize(mse = sum(sse_partial) / sum(n_partial)) %>%
  pull(mse)
## 
## [1] 0.1304284

Dataset 選項

您可以透過幾種方式控制 Dataset 建立,以適應特殊用例。

處理目錄中的檔案

如果您正在處理單個檔案或一組不在同一目錄中的檔案,您可以將檔案路徑或多個檔案路徑的向量提供給 open_dataset()。如果您有一個太大的 CSV 檔案而無法讀取到記憶體中,這會很有用。您可以將檔案路徑傳遞給 open_dataset(),使用 group_by() 將 Dataset 分割成可管理的大小區塊,然後使用 write_dataset() 將每個區塊寫入到單獨的 Parquet 檔案中 — 所有這些都無需將完整的 CSV 檔案讀取到 R 中。

明確宣告欄名和資料類型

您可以指定 schema 參數給 open_dataset(),以宣告欄及其資料類型。如果您有具有不同儲存結構描述的資料檔案(例如,一個欄在一個檔案中可能是 int32,而在另一個檔案中可能是 int8),並且您想確保產生的 Dataset 具有特定類型,這會很有用。

需要明確的是,即使在這個混合整數類型範例中,也無需指定 schema,因為 Dataset 建構函式會協調這些差異。schema 規格只是讓您宣告您希望結果是什麼。

明確宣告分割格式

同樣地,您可以在 open_dataset()partitioning 參數中提供 Schema,以便宣告定義分割的虛擬欄的類型。如果您想在 NYC 計程車資料範例中將 month 保留為字串而不是整數,這會很有用。

處理多個資料來源

Datasets 的另一個功能是它們可以由多個資料來源組成。也就是說,您可能在一個位置有一個分割的 Parquet 檔案目錄,而在另一個目錄中,有未分割的檔案。或者,您可以指向 S3 儲存桶中的 Parquet 資料和本機檔案系統上的 CSV 目錄,並將它們作為單個 Dataset 一起查詢。若要建立多來源 Dataset,請將 Dataset 列表提供給 open_dataset(),而不是檔案路徑,或使用類似 big_dataset <- c(ds1, ds2) 的命令將它們串連起來。

寫入 Datasets

如您所見,透過以有效率的二進制欄狀格式(如 Parquet 或 Feather)儲存,並根據常用於篩選的欄進行分割,可以使大型 Dataset 的查詢速度非常快。但是,資料並不總是這樣儲存。有時您可能會從一個巨大的 CSV 開始。分析資料的第一步是清理並將其重塑為更可用的形式。

write_dataset() 函數允許您取得 Dataset 或另一個表格資料物件 — Arrow Table 或 RecordBatch,或 R 資料框 — 並將其寫入不同的檔案格式,分割成多個檔案。

假設您有一個 CSV 格式的 NYC 計程車資料版本

ds <- open_dataset("nyc-taxi/csv/", format = "csv")

您可以將其寫入新位置,並透過呼叫 write_dataset() 將檔案轉換為 Feather 格式

write_dataset(ds, "nyc-taxi/feather", format = "feather")

接下來,讓我們想像一下,payment_type 欄是您經常篩選的內容,因此您想要依該變數分割資料。這樣做可確保類似 payment_type == "Cash" 的篩選只會觸及 payment_type 始終為 "Cash" 的檔案子集。

表達您想要分割的欄的一種自然方式是使用 group_by() 方法

ds %>%
  group_by(payment_type) %>%
  write_dataset("nyc-taxi/feather", format = "feather")

這會將檔案寫入到如下所示的目錄樹

system("tree nyc-taxi/feather")
## feather
## ├── payment_type=1
## │   └── part-18.arrow
## ├── payment_type=2
## │   └── part-19.arrow
## ...
## └── payment_type=UNK
##     └── part-17.arrow
##
## 18 directories, 23 files

請注意,目錄名稱為 payment_type=Cash 和類似名稱:這是上面描述的 Hive 樣式分割。這表示當您在此目錄上呼叫 open_dataset() 時,您不必宣告分割是什麼,因為可以從檔案路徑讀取它們。(若要改為寫入分割段的裸值,即 Cash 而不是 payment_type=Cash,請使用 hive_style = FALSE 呼叫 write_dataset()。)

不過,也許 payment_type == "Cash" 是您唯一關心的資料,而您只想捨棄其餘資料並擁有較小的工作集。為此,您可以在寫入時 filter() 掉它們

ds %>%
  filter(payment_type == "Cash") %>%
  write_dataset("nyc-taxi/feather", format = "feather")

寫入 Datasets 時,您可以做的另一件事是選取欄的子集或重新排序它們。假設您從不關心 vendor_id,並且作為字串欄,它在您讀取時可能會佔用大量空間,因此讓我們捨棄它

ds %>%
  group_by(payment_type) %>%
  select(-vendor_id) %>%
  write_dataset("nyc-taxi/feather", format = "feather")

請注意,雖然您可以選取欄的子集,但目前在寫入 Dataset 時無法重新命名欄。

分割效能考量

分割 Datasets 有兩個影響效能的方面:它增加了檔案數量,並在檔案周圍建立了一個目錄結構。這兩者都有好處和成本。根據組態和 Dataset 的大小,成本可能會超過收益。

由於分割將 Dataset 分割成多個檔案,因此可以平行處理讀取和寫入分割的 Datasets。但是,每個額外檔案都會增加檔案系統互動處理中的少量額外負擔。它還會增加整體 Dataset 大小,因為每個檔案都有一些共用的 metadata。例如,每個 parquet 檔案都包含 schema 和群組級別統計資訊。分割數量是檔案數量的下限。如果您按日期分割 Dataset,其中包含一年的資料,您將至少有 365 個檔案。如果您進一步按另一個具有 1,000 個唯一值的維度進行分割,您將最多有 365,000 個檔案。這種精細的分割通常會導致主要由 metadata 組成的小檔案。

分割的 Datasets 建立巢狀資料夾結構,這些結構允許我們修剪在掃描中載入哪些檔案。但是,這會增加探索 Dataset 中檔案的額外負擔,因為我們需要遞迴「列出目錄」才能找到資料檔案。過於精細的分割可能會在此處造成問題:按日期分割資料集一年份的資料將需要 365 個列表呼叫才能找到所有檔案;新增另一個基數為 1,000 的欄將使呼叫次數達到 365,365 次。

最佳分割佈局將取決於您的資料、存取模式以及哪些系統將讀取資料。大多數系統(包括 Arrow)應在各種檔案大小和分割佈局中運作,但您應避免一些極端情況。這些指南可以幫助避免一些已知的最壞情況

  • 避免小於 20MB 和大於 2GB 的檔案。
  • 避免分割佈局具有超過 10,000 個不同的分割。

對於具有檔案內群組概念的檔案格式(例如 Parquet),適用類似的指南。行組可以在讀取時提供平行處理,並允許基於統計資訊進行資料跳過,但非常小的群組可能會導致 metadata 成為檔案大小的很大一部分。在大多數情況下,Arrow 的檔案寫入器為群組大小提供了合理的預設值。

交易 / ACID 保證

Dataset API 不提供交易支援或任何 ACID 保證。這會影響讀取和寫入。並行讀取沒有問題。並行寫入或與讀取同時發生的寫入可能會產生意外行為。可以使用各種方法來避免操作相同檔案,例如為每個寫入器使用唯一的基礎名稱範本、新檔案的暫存目錄,或單獨儲存檔案列表,而不是依賴目錄探索。

在寫入過程中意外終止程序可能會使系統處於不一致的狀態。寫入呼叫通常在要寫入的位元組已完全傳遞到作業系統頁面快取後立即傳回。即使寫入操作已完成,如果在寫入呼叫後立即發生突然斷電,也可能會遺失部分檔案。

大多數檔案格式都有在末尾寫入的魔術數字。這表示可以安全地偵測和捨棄部分檔案寫入。CSV 檔案格式沒有任何此類概念,並且部分寫入的 CSV 檔案可能會被偵測為有效。

進一步閱讀