串流、序列化和 IPC#
寫入和讀取串流#
Arrow 定義了兩種二進位格式來序列化記錄批次
串流格式:用於發送任意長度的記錄批次序列。此格式必須從頭到尾處理,且不支援隨機存取
檔案或隨機存取格式:用於序列化固定數量的記錄批次。支援隨機存取,因此與記憶體映射一起使用時非常有用
為了理解本節,請務必先閱讀關於 記憶體和 IO 的章節。
使用串流#
首先,讓我們建立一個小的記錄批次
In [1]: import pyarrow as pa
In [2]: data = [
...: pa.array([1, 2, 3, 4]),
...: pa.array(['foo', 'bar', 'baz', None]),
...: pa.array([True, None, False, True])
...: ]
...:
In [3]: batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])
In [4]: batch.num_rows
Out[4]: 4
In [5]: batch.num_columns
Out[5]: 3
現在,我們可以開始寫入包含一些批次的串流。為此,我們使用 RecordBatchStreamWriter
,它可以寫入可寫入的 NativeFile
物件或可寫入的 Python 物件。為了方便起見,可以使用 new_stream()
來建立它
In [6]: sink = pa.BufferOutputStream()
In [7]: with pa.ipc.new_stream(sink, batch.schema) as writer:
...: for i in range(5):
...: writer.write_batch(batch)
...:
在這裡,我們使用了記憶體內 Arrow 緩衝區串流 (sink
),但這也可以是 socket 或其他 IO sink。
在建立 StreamWriter
時,我們傳遞了綱要,因為綱要(欄位名稱和類型)對於此特定串流中發送的所有批次都必須相同。現在我們可以執行
In [8]: buf = sink.getvalue()
In [9]: buf.size
Out[9]: 1984
現在 buf
包含作為記憶體內位元組緩衝區的完整串流。我們可以通過 RecordBatchStreamReader
或方便函數 pyarrow.ipc.open_stream
讀取這樣的串流
In [10]: with pa.ipc.open_stream(buf) as reader:
....: schema = reader.schema
....: batches = [b for b in reader]
....:
In [11]: schema
Out[11]:
f0: int64
f1: string
f2: bool
In [12]: len(batches)
Out[12]: 5
我們可以檢查返回的批次是否與原始輸入相同
In [13]: batches[0].equals(batch)
Out[13]: True
重要的一點是,如果輸入來源支援零複製讀取(例如像記憶體映射或 pyarrow.BufferReader
),則返回的批次也是零複製的,並且在讀取時不會分配任何新記憶體。
寫入和讀取隨機存取檔案#
RecordBatchFileWriter
具有與 RecordBatchStreamWriter
相同的 API。您可以使用 new_file()
建立一個
In [14]: sink = pa.BufferOutputStream()
In [15]: with pa.ipc.new_file(sink, batch.schema) as writer:
....: for i in range(10):
....: writer.write_batch(batch)
....:
In [16]: buf = sink.getvalue()
In [17]: buf.size
Out[17]: 4226
RecordBatchFileReader
和 RecordBatchStreamReader
之間的區別在於,輸入來源必須具有用於隨機存取的 seek
方法。串流讀取器僅需要讀取操作。我們也可以使用 open_file()
方法開啟檔案
In [18]: with pa.ipc.open_file(buf) as reader:
....: num_record_batches = reader.num_record_batches
....:
In [19]: b = reader.get_batch(3)
由於我們可以存取整個有效負載,因此我們知道檔案中記錄批次的數量,並且可以隨機讀取任何批次。
In [20]: num_record_batches
Out[20]: 10
In [21]: b.equals(batch)
Out[21]: True
從串流和檔案格式讀取 pandas#
串流和檔案讀取器類別具有特殊的 read_pandas
方法,可簡化讀取多個記錄批次並將它們轉換為單個 DataFrame 輸出
In [22]: with pa.ipc.open_file(buf) as reader:
....: df = reader.read_pandas()
....:
In [23]: df[:5]
Out[23]:
f0 f1 f2
0 1 foo True
1 2 bar None
2 3 baz False
3 4 None True
4 1 foo True
高效地寫入和讀取 Arrow 資料#
Arrow 針對零複製和記憶體映射資料進行了最佳化,因此可以輕鬆讀取和寫入陣列,同時最大限度地減少常駐記憶體的使用量。
在寫入和讀取原始 Arrow 資料時,我們可以使用 Arrow 檔案格式或 Arrow 串流格式。
要將陣列轉儲到檔案,您可以使用 new_file()
,它將提供一個新的 RecordBatchFileWriter
實例,可用於將資料批次寫入該檔案。
例如,要寫入一個包含 10M 個整數的陣列,我們可以將其寫入 1000 個區塊,每個區塊包含 10000 個條目
In [24]: BATCH_SIZE = 10000
In [25]: NUM_BATCHES = 1000
In [26]: schema = pa.schema([pa.field('nums', pa.int32())])
In [27]: with pa.OSFile('bigfile.arrow', 'wb') as sink:
....: with pa.ipc.new_file(sink, schema) as writer:
....: for row in range(NUM_BATCHES):
....: batch = pa.record_batch([pa.array(range(BATCH_SIZE), type=pa.int32())], schema)
....: writer.write(batch)
....:
記錄批次支援多個欄位,因此在實務中,我們總是寫入相當於 Table
的內容。
以批次寫入是有效的,因為理論上我們只需要將當前正在寫入的批次保留在記憶體中。但是在讀取回寫時,我們可以通過直接映射磁碟中的資料來更有效率,並避免在讀取時分配任何新記憶體。
在正常情況下,讀取回我們的檔案將消耗幾百 MB 的記憶體
In [28]: with pa.OSFile('bigfile.arrow', 'rb') as source:
....: loaded_array = pa.ipc.open_file(source).read_all()
....:
In [29]: print("LEN:", len(loaded_array))
LEN: 10000000
In [30]: print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
RSS: 38MB
為了更有效率地從磁碟讀取大數據,我們可以記憶體映射檔案,以便 Arrow 可以直接引用從磁碟映射的資料,並避免必須分配自己的記憶體。在這種情況下,作業系統將能夠延遲分頁載入映射的記憶體,並在壓力下將其分頁移出,而不會產生任何寫回成本,從而可以更輕鬆地讀取大於總記憶體的陣列。
In [31]: with pa.memory_map('bigfile.arrow', 'rb') as source:
....: loaded_array = pa.ipc.open_file(source).read_all()
....:
In [32]: print("LEN:", len(loaded_array))
LEN: 10000000
In [33]: print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
RSS: 0MB
注意
其他高階 API(如 read_table()
)也提供了 memory_map
選項。但在這些情況下,記憶體映射無法幫助減少常駐記憶體消耗。有關詳細資訊,請參閱 讀取 Parquet 和記憶體映射。