讀寫數據

關於使用 Apache Arrow 從磁碟讀寫資料的食譜。

寫一個 Parquet 檔案

假如有 100 個數字陣列,從 0 到 99

import numpy as np
import pyarrow as pa

arr = pa.array(np.arange(100))

print(f"{arr[0]} .. {arr[-1]}")
0 .. 99

為將其寫入 Parquet 檔案,由於 Parquet 是包含多個命名欄位的格式,我們必須建立一個 pyarrow.Table,如此一來我們便可以得到一個單一欄位的資料表,之後就能將其寫入 Parquet 檔案。

table = pa.Table.from_arrays([arr], names=["col1"])

一旦我們有了一個資料表,就可以使用 pyarrow.parquet 模組提供的函數將其寫入 Parquet 檔案

import pyarrow.parquet as pq

pq.write_table(table, "example.parquet", compression=None)

讀取一個 Parquet 檔案

假如有 Parquet 檔案,使用 pyarrow.parquet.read_table() 函數可以將其讀回 pyarrow.Table

import pyarrow.parquet as pq

table = pq.read_table("example.parquet")

產生的資料表將包含存在於 Parquet 檔案中的相同欄位,如 ChunkedArray

print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]

讀取 Parquet 資料的子集

在使用 pyarrow.parquet.read_table() 讀取 Parquet 檔案時,可以透過使用 filterscolumns 參數,來限制要讀入記憶體中的哪些欄位和列

import pyarrow.parquet as pq

table = pq.read_table("example.parquet",
                      columns=["col1"],
                      filters=[
                          ("col1", ">", 5),
                          ("col1", "<", 10),
                      ])

產生的表格會僅包含投射的欄位和篩選的列。請參閱 pyarrow.parquet.read_table() 文件,以取得有關篩選程式語法變化的詳細資料。

print(table)
pyarrow.Table
col1: int64
----
col1: [[6,7,8,9]]

將 Arrow Arrays 儲存到磁碟

除了使用 arrow 來讀取和儲存 Parquet 等常見檔案格式外,也可將資料傾印到 raw arrow 格式,此格式允許從磁碟中直接進行資料記憶體對應。此格式稱為 Arrow IPC 格式。

假如有 100 個數字陣列,從 0 到 99

import numpy as np
import pyarrow as pa

arr = pa.array(np.arange(100))

print(f"{arr[0]} .. {arr[-1]}")
0 .. 99

透過建立 pyarrow.RecordBatch 並將記錄批次寫入磁碟,即可儲存陣列。

schema = pa.schema([
    pa.field('nums', arr.type)
])

with pa.OSFile('arraydata.arrow', 'wb') as sink:
    with pa.ipc.new_file(sink, schema=schema) as writer:
        batch = pa.record_batch([arr], schema=schema)
        writer.write(batch)

如果要將多個陣列儲存到同一檔案,只需適當地調整 schema,並將它們全部加入 record_batch 呼叫即可。

從磁碟中記憶體對應 Arrow Arrays

已使用 Arrow IPC 格式寫入磁碟的 Arrow arrays 可從磁碟中直接記憶體對應回來。

with pa.memory_map('arraydata.arrow', 'r') as source:
    loaded_arrays = pa.ipc.open_file(source).read_all()
arr = loaded_arrays[0]
print(f"{arr[0]} .. {arr[-1]}")
0 .. 99

處理 CSV 檔案

可以使用 pyarrow.csv.write_csv() 函數將 Arrow pyarrow.Table 寫入 CSV 檔案

arr = pa.array(range(100))
table = pa.Table.from_arrays([arr], names=["col1"])

import pyarrow.csv
pa.csv.write_csv(table, "table.csv",
                 write_options=pa.csv.WriteOptions(include_header=True))

增量處理 CSV 檔案

如果您需要在產生或擷取資料時遞增方式將資料寫入 CSV 檔案,且不希望將整個資料表留在記憶體中以一次寫入,則可以使用 pyarrow.csv.CSVWriter 以遞增方式寫入資料

schema = pa.schema([("col1", pa.int32())])
with pa.csv.CSVWriter("table.csv", schema=schema) as writer:
    for chunk in range(10):
        datachunk = range(chunk*10, (chunk+1)*10)
        table = pa.Table.from_arrays([pa.array(datachunk)], schema=schema)
        writer.write(table)

同樣也可以通過傳遞 pyarrow.RecordBatch(就像您對資料表所做的那樣)來寫入資料

讀取 CSV 檔案

Arrow 可以使用經過最佳化且可利用多個執行緒的程式碼路徑,從 CSV 中讀取 pyarrow.Table 實體

import pyarrow.csv

table = pa.csv.read_csv("table.csv")

Arrow 會盡全力推斷資料類型。可提供進一步的選項給 pyarrow.csv.read_csv(),以驅動 pyarrow.csv.ConvertOptions

print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]

寫入分割的資料集

當您的資料集龐大的時候,通常會想將其分割成多個不同的檔案。您可以手動執行此動作,或使用 pyarrow.dataset.write_dataset() 讓 Arrow 協助您將資料切割成區塊

partitioning 參數允許您告知 pyarrow.dataset.write_dataset() 應針對哪些欄位分割資料

例如,給定 100 個生日,介於 2000 到 2009 年之間

import numpy.random
data = pa.table({"day": numpy.random.randint(1, 31, size=100),
                 "month": numpy.random.randint(1, 12, size=100),
                 "year": [2000 + x // 10 for x in range(100)]})

然後,我們可以透過 year 欄位分割資料,這樣的話會儲存成 10 個不同的檔案

import pyarrow as pa
import pyarrow.dataset as ds

ds.write_dataset(data, "./partitioned", format="parquet",
                 partitioning=ds.partitioning(pa.schema([("year", pa.int16())])))

Arrow 會預設將資料集分割到子目錄中,這將導致產生 10 個不同的目錄,其中每個目錄都以分割欄位的值命名,且每個目錄中都有一個檔案包含對應分割區段的資料子集

from pyarrow import fs

localfs = fs.LocalFileSystem()
partitioned_dir_content = localfs.get_file_info(fs.FileSelector("./partitioned", recursive=True))
files = sorted((f.path for f in partitioned_dir_content if f.type == fs.FileType.File))

for file in files:
    print(file)
./partitioned/2000/part-0.parquet
./partitioned/2001/part-0.parquet
./partitioned/2002/part-0.parquet
./partitioned/2003/part-0.parquet
./partitioned/2004/part-0.parquet
./partitioned/2005/part-0.parquet
./partitioned/2006/part-0.parquet
./partitioned/2007/part-0.parquet
./partitioned/2008/part-0.parquet
./partitioned/2009/part-0.parquet

讀取分割的資料

某些情況下,您的資料集可能由多個個別檔案組成,每個檔案包含一個資料部分。

在此情況下,pyarrow.dataset.dataset() 函數提供一個介面來偵測和讀取所有那些檔案作為一個單一的巨型資料集。

例如,如果我們有如下結構

examples/
├── dataset1.parquet
├── dataset2.parquet
└── dataset3.parquet

然後,將pyarrow.dataset.dataset()函數指向examples 目錄會偵測那些parquet 檔案,並將它們全部顯示為單一的 pyarrow.dataset.Dataset

import pyarrow.dataset as ds

dataset = ds.dataset("./examples", format="parquet")
print(dataset.files)
['./examples/dataset1.parquet', './examples/dataset2.parquet', './examples/dataset3.parquet']

整個資料集可以使用pyarrow.dataset.Dataset.to_table() 視為一個單一的巨型表格。儘管每個 parquet 檔案只包含 10 列,將資料集轉換成表格會顯示它們為一個單一的 Table。

table = dataset.to_table()
print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,5,6,7,8,9],[10,11,12,13,14,15,16,17,18,19],[20,21,22,23,24,25,26,27,28,29]]

請注意轉換成表格將強制載入所有資料到記憶體中。對大型資料集來說,這通常不是您想要的。

因此,依賴於pyarrow.dataset.Dataset.to_batches() 方法可能會更好,此方法會對資料集迭代載入資料,並針對每個資料傳回一個pyarrow.RecordBatch

for record_batch in dataset.to_batches():
    col1 = record_batch.column("col1")
    print(f"{col1._name} = {col1[0]} .. {col1[-1]}")
col1 = 0 .. 9
col1 = 10 .. 19
col1 = 20 .. 29

從 S3 讀取區隔資料

pyarrow.dataset.Dataset 也能抽象化來自遠端來源的區隔資料,例如 S3 或 HDFS。

from pyarrow import fs

# List content of s3://ursa-labs-taxi-data/2011
s3 = fs.SubTreeFileSystem(
    "ursa-labs-taxi-data",
    fs.S3FileSystem(region="us-east-2", anonymous=True)
)
for entry in s3.get_file_info(fs.FileSelector("2011", recursive=True)):
    if entry.type == fs.FileType.File:
        print(entry.path)
2011/01/data.parquet
2011/02/data.parquet
2011/03/data.parquet
2011/04/data.parquet
2011/05/data.parquet
2011/06/data.parquet
2011/07/data.parquet
2011/08/data.parquet
2011/09/data.parquet
2011/10/data.parquet
2011/11/data.parquet
2011/12/data.parquet

儲存在存放區的資料可以用 month 為基準進行區隔,將其載入為一個單一的巨型資料集

dataset = ds.dataset("s3://ursa-labs-taxi-data/2011",
                     partitioning=["month"])
for f in dataset.files[:10]:
    print(f)
print("...")
ursa-labs-taxi-data/2011/01/data.parquet
ursa-labs-taxi-data/2011/02/data.parquet
ursa-labs-taxi-data/2011/03/data.parquet
ursa-labs-taxi-data/2011/04/data.parquet
ursa-labs-taxi-data/2011/05/data.parquet
ursa-labs-taxi-data/2011/06/data.parquet
ursa-labs-taxi-data/2011/07/data.parquet
ursa-labs-taxi-data/2011/08/data.parquet
ursa-labs-taxi-data/2011/09/data.parquet
ursa-labs-taxi-data/2011/10/data.parquet
...

然後可以使用 pyarrow.dataset.Dataset.to_table()pyarrow.dataset.Dataset.to_batches() 來使用資料集,就像您處理本机資料集一樣。

註解

也可以載入分割的資料,其格式為 ipc 箭號或羽毛。

警告

如果上述程式碼擲回錯誤,原因很可能在於您尚未設定 AWS 認證。請依照以下說明取得 AWS 存取金鑰識別碼AWS 秘密存取金鑰AWS 認證

認證通常儲存在 ~/.aws/credentials(Mac 或 Linux)或 C:\Users\<USERNAME>\.aws\credentials(Windows)檔案中。您需要在適當位置建立或更新這個檔案。

檔案內容應如下所示

[default]
aws_access_key_id=<YOUR_AWS_ACCESS_KEY_ID>
aws_secret_access_key=<YOUR_AWS_SECRET_ACCESS_KEY>

撰寫羽毛檔案

假如有 100 個數字陣列,從 0 到 99

import numpy as np
import pyarrow as pa

arr = pa.array(np.arange(100))

print(f"{arr[0]} .. {arr[-1]}")
0 .. 99

要寫入羽毛檔案,由於羽毛儲存多個欄,我們必須建立一個 pyarrow.Table ,以便取得可寫入羽毛檔案的單一欄表格。

table = pa.Table.from_arrays([arr], names=["col1"])

取得表格後,可以使用 pyarrow.feather 模組提供的函數將其寫入羽毛檔案

import pyarrow.feather as ft

ft.write_feather(table, 'example.feather')

讀取羽毛檔案

如果已存在羽毛檔案,可以使用 pyarrow.feather.read_table() 函數將其讀回 pyarrow.Table

import pyarrow.feather as ft

table = ft.read_table("example.feather")

產生的資料表將包含存在於 Parquet 檔案中的相同欄位,如 ChunkedArray

print(table)
pyarrow.Table
col1: int64
----
col1: [[0,1,2,3,4,...,95,96,97,98,99]]

讀取線分隔 JSON

Arrow 內建支援區隔換行 JSON。每行都以 JSON 物件代表一列資料。

假設一檔案中的資料,每行都有一個包含資料列的 JSON 物件

import tempfile

with tempfile.NamedTemporaryFile(delete=False, mode="w+") as f:
    f.write('{"a": 1, "b": 2.0, "c": 1}\n')
    f.write('{"a": 3, "b": 3.0, "c": 2}\n')
    f.write('{"a": 5, "b": 4.0, "c": 3}\n')
    f.write('{"a": 7, "b": 5.0, "c": 4}\n')

檔案內容可使用 pyarrow.Table 透過 pyarrow.json.read_json() 讀回

import pyarrow as pa
import pyarrow.json

table = pa.json.read_json(f.name)
print(table.to_pydict())
{'a': [1, 3, 5, 7], 'b': [2.0, 3.0, 4.0, 5.0], 'c': [1, 2, 3, 4]}

寫入壓縮資料

Arrow 支援以壓縮格式寫入檔案,支援原本就提供壓縮的格式(例如 Parquet 或 Feather),也支援原本不支援壓縮的格式(例如 CSV)。

假設有一個表格

table = pa.table([
    pa.array([1, 2, 3, 4, 5])
], names=["numbers"])

寫入壓縮的 Parquet 或 Feather 資料是由 pyarrow.feather.write_feather()pyarrow.parquet.write_table() 函式的 compression 參數決定的

pa.feather.write_feather(table, "compressed.feather",
                         compression="lz4")
pa.parquet.write_table(table, "compressed.parquet",
                       compression="lz4")

您可以參考各個函式的說明文件取得支援的壓縮格式清單。

註解

實際上,寫入 Parquet 或 Feather 檔案時,Arrow 預設會使用壓縮。Feather 預設使用 lz4 壓縮,而 Parquet 預設使用 snappy 壓縮。

對於不支援原本壓縮的格式,例如 CSV,可以使用 pyarrow.CompressedOutputStream 儲存壓縮資料

with pa.CompressedOutputStream("compressed.csv.gz", "gzip") as out:
    pa.csv.write_csv(table, out)

這需要在讀回檔案時解壓縮,而這可以用下一段 دستور提到的 pyarrow.CompressedInputStream 做到。

讀取壓縮資料

Arrow 可支援讀取壓縮檔案,包括原生支援壓縮格式的 Parquet 或 Feather,以及原生不支援壓縮格式的檔案,例如 CSV,但已被應用程式壓縮。

讀取原生支援壓縮的壓縮格式時不需要任何特殊處理。例如,我們可以透過單純呼叫 pyarrow.feather.read_table()pyarrow.parquet.read_table() 來讀回我們在先前食譜中編寫的 Parquet 與 Feather 檔案

table_feather = pa.feather.read_table("compressed.feather")
print(table_feather)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]
table_parquet = pa.parquet.read_table("compressed.parquet")
print(table_parquet)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]

相反地,讀取原生不支援壓縮格式的資料時,需要在解碼之前先將其解壓縮。可以使用會在提供實際讀取函式結果之前,使用解壓縮作業包裝檔案的 pyarrow.CompressedInputStream 類別。

例如,若要讀取壓縮的 CSV 檔案

with pa.CompressedInputStream(pa.OSFile("compressed.csv.gz"), "gzip") as input:
    table_csv = pa.csv.read_csv(input)
    print(table_csv)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]

註解

在 CSV 的情況下,Arrow 其實很聰明,可以根據副檔名試著偵測壓縮檔案。因此,您的檔案若命名為 *.gz*.bz2pyarrow.csv.read_csv() 函式就會嘗試依此解壓縮

table_csv2 = pa.csv.read_csv("compressed.csv.gz")
print(table_csv2)
pyarrow.Table
numbers: int64
----
numbers: [[1,2,3,4,5]]