表格資料集#

pyarrow.dataset 模組提供了高效處理表格資料、可能大於記憶體以及多檔案資料集的功能。這包括:

  • 一個統一的介面,支援不同的來源和檔案格式,以及不同的檔案系統(本機、雲端)。

  • 來源探索(爬行目錄、處理基於目錄的分區資料集、基本綱要正規化等等)

  • 最佳化讀取,具有謂詞下推(篩選行)、投影(選擇和衍生欄位),以及可選的並行讀取。

目前支援的檔案格式為 Parquet、Feather / Arrow IPC、CSV 和 ORC(請注意,目前 ORC 資料集僅能讀取,尚無法寫入)。目標是在未來擴展對其他檔案格式和資料來源(例如資料庫連線)的支援。

對於熟悉現有 pyarrow.parquet.ParquetDataset 以讀取 Parquet 資料集的使用者:pyarrow.dataset 的目標類似,但不僅限於 Parquet 格式,也不僅限於 Python:相同的資料集 API 也暴露在 R 綁定或 Arrow 中。此外,pyarrow.dataset 擁有更高的效能和新功能(例如,在檔案內篩選,而不僅僅是在分區鍵上篩選)。

讀取資料集#

對於以下範例,讓我們建立一個小型資料集,其中包含一個目錄,內有兩個 Parquet 檔案:

In [1]: import tempfile

In [2]: import pathlib

In [3]: import pyarrow as pa

In [4]: import pyarrow.parquet as pq

In [5]: import numpy as np

In [6]: base = pathlib.Path(tempfile.mkdtemp(prefix="pyarrow-"))

In [7]: (base / "parquet_dataset").mkdir(exist_ok=True)

# creating an Arrow Table
In [8]: table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5})

# writing it into two parquet files
In [9]: pq.write_table(table.slice(0, 5), base / "parquet_dataset/data1.parquet")

In [10]: pq.write_table(table.slice(5, 10), base / "parquet_dataset/data2.parquet")

資料集探索#

Dataset 物件可以使用 dataset() 函數建立。我們可以將包含資料檔案的目錄路徑傳遞給它:

In [11]: import pyarrow.dataset as ds

In [12]: dataset = ds.dataset(base / "parquet_dataset", format="parquet")

In [13]: dataset
Out[13]: <pyarrow._dataset.FileSystemDataset at 0x7fe41378bb20>

除了搜尋基礎目錄之外,dataset() 也接受單一檔案的路徑或檔案路徑列表。

建立 Dataset 物件並不會開始讀取資料本身。如有需要,它只會爬行目錄以尋找所有檔案:

In [14]: dataset.files
Out[14]: 
['/tmp/pyarrow-c54gjofy/parquet_dataset/data1.parquet',
 '/tmp/pyarrow-c54gjofy/parquet_dataset/data2.parquet']

... 並推斷資料集的綱要(預設情況下從第一個檔案推斷):

In [15]: print(dataset.schema.to_string(show_field_metadata=False))
a: int64
b: double
c: int64

使用 Dataset.to_table() 方法,我們可以將資料集(或其一部分)讀取到 pyarrow Table 中(請注意,根據資料集的大小,這可能需要大量記憶體,請參閱下文關於篩選/迭代載入的說明):

In [16]: dataset.to_table()
Out[16]: 
pyarrow.Table
a: int64
b: double
c: int64
----
a: [[0,1,2,3,4],[5,6,7,8,9]]
b: [[-1.1423398343185012,-0.9461515760216167,2.0972627853077754,0.5151372091045947,0.249422801290706],[0.9102115779123913,3.634010291375531,0.4046825981197749,-0.9952879610759159,1.5447254069340937]]
c: [[1,2,1,2,1],[2,1,2,1,2]]

# converting to pandas to see the contents of the scanned table
In [17]: dataset.to_table().to_pandas()
Out[17]: 
   a         b  c
0  0 -1.142340  1
1  1 -0.946152  2
2  2  2.097263  1
3  3  0.515137  2
4  4  0.249423  1
5  5  0.910212  2
6  6  3.634010  1
7  7  0.404683  2
8  8 -0.995288  1
9  9  1.544725  2

讀取不同的檔案格式#

上面的範例使用 Parquet 檔案作為資料集來源,但 Dataset API 在多種檔案格式和檔案系統中提供了一致的介面。目前,支援 Parquet、ORC、Feather / Arrow IPC 和 CSV 檔案格式;未來計畫支援更多格式。

如果我們將表格儲存為 Feather 檔案而不是 Parquet 檔案:

In [18]: import pyarrow.feather as feather

In [19]: feather.write_feather(table, base / "data.feather")

...那麼我們可以使用相同的函數讀取 Feather 檔案,但需指定 format="feather"

In [20]: dataset = ds.dataset(base / "data.feather", format="feather")

In [21]: dataset.to_table().to_pandas().head()
Out[21]: 
   a         b  c
0  0 -1.142340  1
1  1 -0.946152  2
2  2  2.097263  1
3  3  0.515137  2
4  4  0.249423  1

自訂檔案格式#

格式名稱作為字串,例如:

ds.dataset(..., format="parquet")

是預設建構的 ParquetFileFormat 的簡寫:

ds.dataset(..., format=ds.ParquetFileFormat())

FileFormat 物件可以使用關鍵字自訂。例如:

parquet_format = ds.ParquetFileFormat(read_options={'dictionary_columns': ['a']})
ds.dataset(..., format=parquet_format)

將配置欄位 "a" 在掃描時進行字典編碼。

篩選資料#

為了避免在只需要子集時讀取所有資料,可以使用 columnsfilter 關鍵字。

columns 關鍵字可用於僅讀取指定的欄位:

In [22]: dataset = ds.dataset(base / "parquet_dataset", format="parquet")

In [23]: dataset.to_table(columns=['a', 'b']).to_pandas()
Out[23]: 
   a         b
0  0 -1.142340
1  1 -0.946152
2  2  2.097263
3  3  0.515137
4  4  0.249423
5  5  0.910212
6  6  3.634010
7  7  0.404683
8  8 -0.995288
9  9  1.544725

使用 filter 關鍵字,不符合篩選條件的列將不會包含在傳回的表格中。此關鍵字預期一個布林值 Expression,至少參考到一個欄位

In [24]: dataset.to_table(filter=ds.field('a') >= 7).to_pandas()
Out[24]: 
   a         b  c
0  7  0.404683  2
1  8 -0.995288  1
2  9  1.544725  2

In [25]: dataset.to_table(filter=ds.field('c') == 2).to_pandas()
Out[25]: 
   a         b  c
0  1 -0.946152  2
1  3  0.515137  2
2  5  0.910212  2
3  7  0.404683  2
4  9  1.544725  2

建構這些 Expression 物件最簡單的方式是使用 field() 輔助函數。任何欄位(而不僅僅是分割區欄位)都可以使用 field() 函數(它會建立一個 FieldExpression)來參考。提供了運算子多載來組合篩選器,包括比較(等於、大於/小於等等)、集合成員資格測試和布林組合(&|~

In [26]: ds.field('a') != 3
Out[26]: <pyarrow.compute.Expression (a != 3)>

In [27]: ds.field('a').isin([1, 2, 3])
Out[27]: 
<pyarrow.compute.Expression is_in(a, {value_set=int64:[
  1,
  2,
  3
], null_matching_behavior=MATCH})>

In [28]: (ds.field('a') > ds.field('b')) & (ds.field('b') > 1)
Out[28]: <pyarrow.compute.Expression ((a > b) and (b > 1))>

請注意,Expression 物件不能使用 Python 邏輯運算子 andornot 來組合。

投影欄位#

可以使用 columns 關鍵字,並傳遞欄位名稱列表,來讀取資料集中欄位的子集。此關鍵字也可以與表達式結合使用,以進行更複雜的投影。

在這種情況下,我們傳遞一個字典,其中鍵是結果欄位名稱,值是用於建構欄位值的表達式

In [29]: projection = {
   ....:     "a_renamed": ds.field("a"),
   ....:     "b_as_float32": ds.field("b").cast("float32"),
   ....:     "c_1": ds.field("c") == 1,
   ....: }
   ....: 

In [30]: dataset.to_table(columns=projection).to_pandas().head()
Out[30]: 
   a_renamed  b_as_float32    c_1
0          0     -1.142340   True
1          1     -0.946152  False
2          2      2.097263   True
3          3      0.515137  False
4          4      0.249423   True

該字典也決定了欄位的選擇(只有字典中的鍵才會作為結果表格中的欄位出現)。如果您想在現有欄位的基礎上包含衍生欄位,您可以從資料集結構描述建立字典

In [31]: projection = {col: ds.field(col) for col in dataset.schema.names}

In [32]: projection.update({"b_large": ds.field("b") > 1})

In [33]: dataset.to_table(columns=projection).to_pandas().head()
Out[33]: 
   a         b  c  b_large
0  0 -1.142340  1    False
1  1 -0.946152  2    False
2  2  2.097263  1     True
3  3  0.515137  2    False
4  4  0.249423  1    False

讀取分割資料#

上面展示了一個由包含檔案的扁平目錄組成的資料集。但是,資料集可以利用巢狀目錄結構來定義分割資料集,其中子目錄名稱包含有關該目錄中儲存的資料子集資訊。

例如,按年份和月份分割的資料集在磁碟上可能如下所示

dataset_name/
  year=2007/
    month=01/
       data0.parquet
       data1.parquet
       ...
    month=02/
       data0.parquet
       data1.parquet
       ...
    month=03/
    ...
  year=2008/
    month=01/
    ...
  ...

上面的分割方案使用 “/key=value/” 目錄名稱,這在 Apache Hive 中很常見。

讓我們建立一個小的分割資料集。write_to_dataset() 函數可以寫入這種類似 Hive 的分割資料集。

In [34]: table = pa.table({'a': range(10), 'b': np.random.randn(10), 'c': [1, 2] * 5,
   ....:                   'part': ['a'] * 5 + ['b'] * 5})
   ....: 

In [35]: pq.write_to_dataset(table, "parquet_dataset_partitioned",
   ....:                     partition_cols=['part'])
   ....: 

上面建立了一個包含兩個子目錄(“part=a” 和 “part=b”)的目錄,並且寫入這些目錄的 Parquet 檔案不再包含 “part” 欄位。

使用 dataset() 讀取此資料集時,我們現在指定資料集應使用類似 Hive 的分割方案,並使用 partitioning 關鍵字

In [36]: dataset = ds.dataset("parquet_dataset_partitioned", format="parquet",
   ....:                      partitioning="hive")
   ....: 

In [37]: dataset.files
Out[37]: 
['parquet_dataset_partitioned/part=a/0830537e25e449b68b3160184c6302df-0.parquet',
 'parquet_dataset_partitioned/part=b/0830537e25e449b68b3160184c6302df-0.parquet']

儘管分割區欄位未包含在實際的 Parquet 檔案中,但在掃描此資料集時,它們將被新增回結果表格中

In [38]: dataset.to_table().to_pandas().head(3)
Out[38]: 
   a         b  c part
0  0  1.750073  1    a
1  1  1.345434  2    a
2  2  0.413815  1    a

我們現在可以根據分割區鍵進行篩選,如果檔案不符合篩選條件,則可以完全避免載入檔案

In [39]: dataset.to_table(filter=ds.field("part") == "b").to_pandas()
Out[39]: 
   a         b  c part
0  5 -1.002496  2    b
1  6  0.164121  1    b
2  7 -0.598744  2    b
3  8 -0.469405  1    b
4  9  2.311087  2    b

不同的分割方案#

上面的範例使用類似 Hive 的目錄方案,例如 “/year=2009/month=11/day=15”。我們透過傳遞 partitioning="hive" 關鍵字來指定這一點。在這種情況下,分割區鍵的類型是從檔案路徑推斷出來的。

也可以使用 partitioning() 函數來明確定義分割區鍵的結構描述。例如

part = ds.partitioning(
    pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int32())]),
    flavor="hive"
)
dataset = ds.dataset(..., partitioning=part)

也支援「目錄分割區」,其中檔案路徑中的區段表示分割區鍵的值,但不包含名稱(欄位名稱隱含在區段的索引中)。例如,給定欄位名稱 “year”、“month” 和 “day”,一個路徑可能是 “/2019/11/15”。

由於名稱未包含在檔案路徑中,因此在建構目錄分割區時必須指定這些名稱

part = ds.partitioning(field_names=["year", "month", "day"])

目錄分割區也支援提供完整的結構描述,而不是從檔案路徑推斷類型。

從雲端儲存空間讀取#

除了本機檔案之外,pyarrow 也支援從雲端儲存空間讀取。目前,支援 HDFSAmazon S3 相容 儲存空間

當傳遞檔案 URI 時,將會推斷檔案系統。例如,指定 S3 路徑

dataset = ds.dataset("s3://voltrondata-labs-datasets/nyc-taxi/")

通常,您會想要自訂連線參數,然後可以建立檔案系統物件並將其傳遞給 filesystem 關鍵字

from pyarrow import fs

s3  = fs.S3FileSystem(region="us-east-2")
dataset = ds.dataset("voltrondata-labs-datasets/nyc-taxi/", filesystem=s3)

目前可用的類別為 S3FileSystemHadoopFileSystem。請參閱 檔案系統介面 文件以瞭解更多詳細資訊。

從 Minio 讀取#

除了雲端儲存空間之外,pyarrow 也支援從模擬 S3 API 的 MinIO 物件儲存執行個體讀取。與 toxiproxy 配對使用時,這對於測試或基準測試非常有用。

from pyarrow import fs

# By default, MinIO will listen for unencrypted HTTP traffic.
minio = fs.S3FileSystem(scheme="http", endpoint_override="localhost:9000")
dataset = ds.dataset("voltrondata-labs-datasets/nyc-taxi/", filesystem=minio)

使用 Parquet 資料集#

雖然 Datasets API 為不同的檔案格式提供了統一的介面,但 Parquet 資料集存在一些特定的方法。

諸如 Dask 等一些處理框架(可選地)在分割資料集中使用 _metadata 檔案,其中包含有關完整資料集的結構描述和列組元資料的資訊。使用此類檔案可以更有效率地建立 Parquet Dataset,因為它不需要推斷結構描述並爬梳目錄以尋找所有 Parquet 檔案(對於存取檔案成本高昂的檔案系統尤其如此)。parquet_dataset() 函數允許我們從具有 _metadata 檔案的分割資料集建立 Dataset

dataset = ds.parquet_dataset("/path/to/dir/_metadata")

預設情況下,為 Parquet 資料集建構的 Dataset 物件會將每個片段對應到單個 Parquet 檔案。如果您想要對應到 Parquet 檔案的每個列組的片段,則可以使用片段的 split_by_row_group() 方法

fragments = list(dataset.get_fragments())
fragments[0].split_by_row_group()

此方法傳回一個新的片段列表,這些片段對應到原始片段(Parquet 檔案)的每個列組。get_fragments()split_by_row_group() 都接受可選的篩選表達式,以取得篩選後的片段列表。

手動指定 Dataset#

dataset() 函數可以輕鬆建立 Dataset,以檢視目錄、爬梳所有子目錄以尋找檔案和分割資訊。但是,有時不需要探索,並且資料集的檔案和分割區已為人所知(例如,當此資訊儲存在元資料中時)。在這種情況下,可以明確地建立 Dataset,而無需任何自動探索或推斷。

對於此處的範例,我們將使用一個檔案名稱包含其他分割資訊的資料集

# creating a dummy dataset: directory with two files
In [40]: table = pa.table({'col1': range(3), 'col2': np.random.randn(3)})

In [41]: (base / "parquet_dataset_manual").mkdir(exist_ok=True)

In [42]: pq.write_table(table, base / "parquet_dataset_manual" / "data_2018.parquet")

In [43]: pq.write_table(table, base / "parquet_dataset_manual" / "data_2019.parquet")

若要從檔案列表建立 Dataset,我們需要手動指定路徑、結構描述、格式、檔案系統和分割區表達式

In [44]: from pyarrow import fs

In [45]: schema = pa.schema([("year", pa.int64()), ("col1", pa.int64()), ("col2", pa.float64())])

In [46]: dataset = ds.FileSystemDataset.from_paths(
   ....:     ["data_2018.parquet", "data_2019.parquet"], schema=schema, format=ds.ParquetFileFormat(),
   ....:     filesystem=fs.SubTreeFileSystem(str(base / "parquet_dataset_manual"), fs.LocalFileSystem()),
   ....:     partitions=[ds.field('year') == 2018, ds.field('year') == 2019])
   ....: 

由於我們為檔案指定了「分割區表達式」,因此此資訊在讀取資料時會實體化為欄位,並且可以用於篩選

In [47]: dataset.to_table().to_pandas()
Out[47]: 
   year  col1      col2
0  2018     0 -1.564310
1  2018     1 -1.397956
2  2018     2 -1.809460
3  2019     0 -1.564310
4  2019     1 -1.397956
5  2019     2 -1.809460

In [48]: dataset.to_table(filter=ds.field('year') == 2019).to_pandas()
Out[48]: 
   year  col1      col2
0  2019     0 -1.564310
1  2019     1 -1.397956
2  2019     2 -1.809460

手動列出檔案的另一個好處是,檔案的順序控制資料的順序。當執行排序讀取(或讀取到表格)時,傳回的列將符合給定檔案的順序。這僅適用於使用檔案列表建構資料集的情況。當檔案是透過掃描目錄來探索時,不保證順序。

迭代式(核心外或串流)讀取#

先前的範例示範了如何使用 to_table() 將資料讀取到表格中。如果資料集很小,或者只需要讀取少量資料,這會很有用。dataset API 包含額外的方法,可以串流方式讀取和處理大量資料。

最簡單的方法是使用 Dataset.to_batches() 方法。此方法傳回記錄批次的迭代器。例如,我們可以使用此方法來計算欄位的平均值,而無需將整個欄位載入記憶體

In [49]: import pyarrow.compute as pc

In [50]: col2_sum = 0

In [51]: count = 0

In [52]: for batch in dataset.to_batches(columns=["col2"], filter=~ds.field("col2").is_null()):
   ....:     col2_sum += pc.sum(batch.column("col2")).as_py()
   ....:     count += batch.num_rows
   ....: 

In [53]: mean_a = col2_sum/count

自訂批次大小#

資料集的迭代式讀取通常稱為資料集的「掃描」,而 pyarrow 使用稱為 Scanner 的物件來執行此操作。Scanner 是由資料集的 to_table()to_batches() 方法自動為您建立的。您傳遞給這些方法的任何引數都將傳遞給 Scanner 建構函式。

其中一個參數是 batch_size。這控制掃描器傳回的批次的最大大小。如果資料集由小檔案組成,或這些檔案本身由小列組組成,則批次仍然可以小於 batch_size。例如,除非將 batch_size 設定為較小的值,否則每個列組具有 10,000 列的 parquet 檔案將產生最多 10,000 列的批次。

預設批次大小為一百萬列,這通常是一個很好的預設值,但如果您要讀取大量欄位,則可能需要自訂它。

關於交易和 ACID 保證的注意事項#

dataset API 不提供交易支援或任何 ACID 保證。這會影響讀取和寫入。並行讀取沒有問題。並行寫入或與讀取同時發生的寫入可能會產生非預期的行為。可以使用各種方法來避免對相同檔案進行操作,例如為每個寫入器使用唯一的 basename 範本、用於新檔案的暫存目錄,或單獨儲存檔案列表,而不是依賴目錄探索。

在寫入正在進行時意外終止程序可能會使系統處於不一致的狀態。寫入呼叫通常在要寫入的位元組完全傳遞到 OS 頁面快取後立即傳回。即使寫入操作已完成,如果在寫入呼叫後立即發生突然斷電,檔案的一部分仍可能遺失。

大多數檔案格式都有在結尾寫入的魔術數字。這表示可以安全地偵測並捨棄部分檔案寫入。CSV 檔案格式沒有任何此類概念,並且部分寫入的 CSV 檔案可能會被偵測為有效。

寫入資料集#

dataset API 也簡化了使用 write_dataset() 將資料寫入資料集。當您想要分割資料或需要寫入大量資料時,這會很有用。基本的資料集寫入類似於寫入表格,不同之處在於您指定的是目錄而不是檔案名稱。

In [54]: table = pa.table({"a": range(10), "b": np.random.randn(10), "c": [1, 2] * 5})

In [55]: ds.write_dataset(table, "sample_dataset", format="parquet")

上面的範例將在我們的 sample_dataset 目錄中建立一個名為 part-0.parquet 的單個檔案。

警告

如果您再次執行範例,它將取代現有的 part-0.parquet 檔案。將檔案附加到現有資料集需要為每次呼叫 ds.write_dataset 指定新的 basename_template,以避免覆寫。

寫入分割資料#

可以使用分割物件來指定應如何分割輸出資料。這使用與我們用於讀取資料集相同的分割物件類型。若要將上面的資料寫出到分割目錄,我們只需要指定我們希望如何分割資料集。例如

In [56]: part = ds.partitioning(
   ....:     pa.schema([("c", pa.int16())]), flavor="hive"
   ....: )
   ....: 

In [57]: ds.write_dataset(table, "partitioned_dataset", format="parquet", partitioning=part)

這將建立兩個檔案。我們一半的資料將位於 dataset_root/c=1 目錄中,另一半將位於 dataset_root/c=2 目錄中。

分割效能考量#

分割資料集有兩個方面會影響效能:它增加了檔案的數量,並在檔案周圍建立目錄結構。這兩者都有好處和成本。根據組態和資料集的大小,成本可能超過好處。

由於分割區將資料集分割成多個檔案,因此可以使用平行處理讀取和寫入分割資料集。但是,每個額外的檔案都會增加少量處理檔案系統互動的額外負荷。它也會增加整體資料集大小,因為每個檔案都有一些共用的元資料。例如,每個 parquet 檔案都包含結構描述和群組層級統計資訊。分割區的數量是檔案數量的下限。如果您按日期分割資料集,並包含一年的資料,您將至少有 365 個檔案。如果您進一步按另一個具有 1,000 個唯一值的維度進行分割,您將最多有 365,000 個檔案。這種精細的分割通常會導致主要由元資料組成的小檔案。

分割資料集會建立巢狀資料夾結構,這些結構允許我們修剪掃描中載入的檔案。但是,這會增加探索資料集中檔案的額外負荷,因為我們需要遞迴地「列出目錄」才能找到資料檔案。過於精細的分割可能會在此處造成問題:按日期分割一年的資料的資料集將需要 365 次列表呼叫才能找到所有檔案;新增另一個基數為 1,000 的欄位將使其達到 365,365 次呼叫。

最佳分割配置將取決於您的資料、存取模式以及將讀取資料的系統。大多數系統(包括 Arrow)都應適用於各種檔案大小和分割配置,但有些極端情況您應避免。這些準則可以幫助避免一些已知的最壞情況

  • 避免小於 20MB 和大於 2GB 的檔案。

  • 避免分割配置超過 10,000 個不同的分割區。

對於具有檔案內群組概念的檔案格式(例如 Parquet),適用類似的準則。列組可以在讀取時提供平行處理,並允許根據統計資訊跳過資料,但非常小的群組可能會導致元資料成為檔案大小的很大一部分。在大多數情況下,Arrow 的檔案寫入器為群組大小調整提供了合理的預設值。

設定寫入期間開啟的檔案#

將資料寫入磁碟時,有一些參數對於最佳化寫入非常重要,例如每個檔案的列數和寫入期間允許的最大開啟檔案數。

使用 write_dataset()max_open_files 參數設定最大開啟檔案數。

如果 max_open_files 設定為大於 0,則這將限制可以保持開啟的最大檔案數。這僅適用於寫入分割資料集,其中列會根據其分割區值分派到適當的檔案。如果嘗試開啟過多檔案,則最近最少使用的檔案將會關閉。如果此設定設定得太低,您最終可能會將資料分割成許多小檔案。

如果您的程序同時使用其他檔案處理常式(透過資料集掃描器或其他方式),您可能會達到系統檔案處理常式限制。例如,如果您正在掃描具有 300 個檔案的資料集並寫出到 900 個檔案,則總共 1200 個檔案可能會超過系統限制。(在 Linux 上,這可能是「開啟檔案過多」錯誤。)您可以減少此 max_open_files 設定,或增加系統上的檔案處理常式限制。預設值為 900,這允許掃描器開啟一些檔案,然後才會達到 Linux 預設限制 1024。

write_dataset() 中使用的另一個重要組態是 max_rows_per_file

使用 write_dataset()max_rows_per_files 參數設定每個檔案中寫入的最大列數。

如果 max_rows_per_file 設定為大於 0,則這將限制放置在任何單個檔案中的列數。否則,將沒有限制,並且將在每個輸出目錄中建立一個檔案,除非需要關閉檔案以符合 max_open_files。此設定是控制檔案大小的主要方法。對於寫入大量資料的工作負載,檔案可能會在沒有列數上限的情況下變得非常大,從而導致下游讀取器發生記憶體不足錯誤。列數和檔案大小之間的關係取決於資料集結構描述以及資料的壓縮程度(如果有的話)。

設定寫入期間每個群組的列數#

可以設定每個群組寫入磁碟的資料量。此組態包括下限和上限。形成列組所需的最小列數是使用 write_dataset()min_rows_per_group 參數定義的。

注意

如果 min_rows_per_group 設定為大於 0,則這將導致資料集寫入器批次處理傳入的資料,並且僅在累積足夠的列時才將列組寫入磁碟。如果其他選項(例如 max_open_filesmax_rows_per_file)強制使用較小的列組大小,則最終列組大小可能會小於此值。

每個群組允許的最大列數是使用 write_dataset()max_rows_per_group 參數定義的。

如果 max_rows_per_group 設定為大於 0,則資料集寫入器可能會將大型傳入批次分割成多個列組。如果設定了此值,則也應設定 min_rows_per_group,否則您最終可能會得到非常小的列組(例如,如果傳入的列組大小僅略大於此值)。

列組內建於 Parquet 和 IPC/Feather 格式中,但不影響 JSON 或 CSV。在 Arrow 中讀回 Parquet 和 IPC 格式時,列組邊界會變成記錄批次邊界,從而決定下游讀取器的預設批次大小。此外,Parquet 檔案中的列組具有欄位統計資訊,這可以幫助讀取器跳過不相關的資料,但可能會增加檔案大小。作為一個極端範例,如果有人在 Parquet 中設定 max_rows_per_group=1,他們將擁有大型檔案,因為大多數檔案將是列組統計資訊。

寫入大量資料#

上面的範例從表格寫入資料。如果您要寫入大量資料,您可能無法將所有內容載入到單個記憶體內表格中。幸運的是,write_dataset() 方法也接受記錄批次的迭代器。這使得重新分割大型資料集變得非常簡單,例如,無需將整個資料集載入記憶體

In [58]: old_part = ds.partitioning(
   ....:     pa.schema([("c", pa.int16())]), flavor="hive"
   ....: )
   ....: 

In [59]: new_part = ds.partitioning(
   ....:     pa.schema([("c", pa.int16())]), flavor=None
   ....: )
   ....: 

In [60]: input_dataset = ds.dataset("partitioned_dataset", partitioning=old_part)

# A scanner can act as an iterator of record batches but you could also receive
# data from the network (e.g. via flight), from your own scanning, or from any
# other method that yields record batches.  In addition, you can pass a dataset
# into write_dataset directly but this method is useful if you want to customize
# the scanner (e.g. to filter the input dataset or set a maximum batch size)
In [61]: scanner = input_dataset.scanner()

In [62]: ds.write_dataset(scanner, "repartitioned_dataset", format="parquet", partitioning=new_part)

在上面的範例執行後,我們的資料將位於 dataset_root/1 和 dataset_root/2 目錄中。在這個簡單的範例中,我們沒有變更資料的結構(僅變更目錄命名結構描述),但您也可以使用此機制來變更用於分割資料集的欄位。當您期望以特定方式查詢資料,並且可以利用分割區來減少需要讀取的資料量時,這非常有用。

自訂和檢查寫入的檔案#

預設情況下,dataset API 將建立名為 “part-i.format” 的檔案,其中 “i” 是在寫入期間產生的整數,“format” 是在 write_dataset 呼叫中指定的檔案格式。對於簡單的資料集,可能可以知道將建立哪些檔案,但對於較大或分割的資料集,這並不容易。file_visitor 關鍵字可用於提供一個訪問器,該訪問器將在每次建立檔案時被呼叫

In [63]: def file_visitor(written_file):
   ....:     print(f"path={written_file.path}")
   ....:     print(f"size={written_file.size} bytes")
   ....:     print(f"metadata={written_file.metadata}")
   ....: 
In [64]: ds.write_dataset(table, "dataset_visited", format="parquet", partitioning=part,
   ....:                  file_visitor=file_visitor)
   ....: 
path=dataset_visited/c=1/part-0.parquet
size=793 bytes
metadata=<pyarrow._parquet.FileMetaData object at 0x7fe412578a90>
  created_by: parquet-cpp-arrow version 19.0.0
  num_columns: 2
  num_rows: 5
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 0
path=dataset_visited/c=2/part-0.parquet
size=795 bytes
metadata=<pyarrow._parquet.FileMetaData object at 0x7fe4138da9d0>
  created_by: parquet-cpp-arrow version 19.0.0
  num_columns: 2
  num_rows: 5
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 0

這將允許您收集屬於資料集的檔案名稱,並將它們儲存在其他位置,當您想要避免下次需要讀取資料時掃描目錄時,這會很有用。它也可以用於產生 Dask 或 Spark 等其他工具使用的 _metadata 索引檔案,以建立資料集的索引。

設定寫入期間格式特定的參數#

除了所有格式共用的通用選項之外,還有特定於格式的選項,這些選項對於特定格式而言是唯一的。例如,若要在寫入 Parquet 檔案時允許截斷的時間戳記

In [65]: parquet_format = ds.ParquetFileFormat()

In [66]: write_options = parquet_format.make_write_options(allow_truncated_timestamps=True)

In [67]: ds.write_dataset(table, "sample_dataset2", format="parquet", partitioning=part,
   ....:                  file_options=write_options)
   ....: