串流、序列化和 IPC#

寫入和讀取串流#

Arrow 定義了兩種二進位格式來序列化記錄批次

  • 串流格式:用於發送任意長度的記錄批次序列。此格式必須從頭到尾處理,且不支援隨機存取

  • 檔案或隨機存取格式:用於序列化固定數量的記錄批次。支援隨機存取,因此與記憶體映射一起使用時非常有用

為了理解本節,請務必先閱讀關於 記憶體和 IO 的章節。

使用串流#

首先,讓我們建立一個小的記錄批次

In [1]: import pyarrow as pa

In [2]: data = [
   ...:     pa.array([1, 2, 3, 4]),
   ...:     pa.array(['foo', 'bar', 'baz', None]),
   ...:     pa.array([True, None, False, True])
   ...: ]
   ...: 

In [3]: batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])

In [4]: batch.num_rows
Out[4]: 4

In [5]: batch.num_columns
Out[5]: 3

現在,我們可以開始寫入包含一些批次的串流。為此,我們使用 RecordBatchStreamWriter,它可以寫入可寫入的 NativeFile 物件或可寫入的 Python 物件。為了方便起見,可以使用 new_stream() 來建立它

In [6]: sink = pa.BufferOutputStream()

In [7]: with pa.ipc.new_stream(sink, batch.schema) as writer:
   ...:    for i in range(5):
   ...:       writer.write_batch(batch)
   ...: 

在這裡,我們使用了記憶體內 Arrow 緩衝區串流 (sink),但這也可以是 socket 或其他 IO sink。

在建立 StreamWriter 時,我們傳遞了綱要,因為綱要(欄位名稱和類型)對於此特定串流中發送的所有批次都必須相同。現在我們可以執行

In [8]: buf = sink.getvalue()

In [9]: buf.size
Out[9]: 1984

現在 buf 包含作為記憶體內位元組緩衝區的完整串流。我們可以通過 RecordBatchStreamReader 或方便函數 pyarrow.ipc.open_stream 讀取這樣的串流

In [10]: with pa.ipc.open_stream(buf) as reader:
   ....:       schema = reader.schema
   ....:       batches = [b for b in reader]
   ....: 

In [11]: schema
Out[11]: 
f0: int64
f1: string
f2: bool

In [12]: len(batches)
Out[12]: 5

我們可以檢查返回的批次是否與原始輸入相同

In [13]: batches[0].equals(batch)
Out[13]: True

重要的一點是,如果輸入來源支援零複製讀取(例如像記憶體映射或 pyarrow.BufferReader),則返回的批次也是零複製的,並且在讀取時不會分配任何新記憶體。

寫入和讀取隨機存取檔案#

RecordBatchFileWriter 具有與 RecordBatchStreamWriter 相同的 API。您可以使用 new_file() 建立一個

In [14]: sink = pa.BufferOutputStream()

In [15]: with pa.ipc.new_file(sink, batch.schema) as writer:
   ....:    for i in range(10):
   ....:       writer.write_batch(batch)
   ....: 

In [16]: buf = sink.getvalue()

In [17]: buf.size
Out[17]: 4226

RecordBatchFileReaderRecordBatchStreamReader 之間的區別在於,輸入來源必須具有用於隨機存取的 seek 方法。串流讀取器僅需要讀取操作。我們也可以使用 open_file() 方法開啟檔案

In [18]: with pa.ipc.open_file(buf) as reader:
   ....:    num_record_batches = reader.num_record_batches
   ....: 

In [19]: b = reader.get_batch(3)

由於我們可以存取整個有效負載,因此我們知道檔案中記錄批次的數量,並且可以隨機讀取任何批次。

In [20]: num_record_batches
Out[20]: 10

In [21]: b.equals(batch)
Out[21]: True

從串流和檔案格式讀取 pandas#

串流和檔案讀取器類別具有特殊的 read_pandas 方法,可簡化讀取多個記錄批次並將它們轉換為單個 DataFrame 輸出

In [22]: with pa.ipc.open_file(buf) as reader:
   ....:    df = reader.read_pandas()
   ....: 

In [23]: df[:5]
Out[23]: 
   f0    f1     f2
0   1   foo   True
1   2   bar   None
2   3   baz  False
3   4  None   True
4   1   foo   True

高效地寫入和讀取 Arrow 資料#

Arrow 針對零複製和記憶體映射資料進行了最佳化,因此可以輕鬆讀取和寫入陣列,同時最大限度地減少常駐記憶體的使用量。

在寫入和讀取原始 Arrow 資料時,我們可以使用 Arrow 檔案格式或 Arrow 串流格式。

要將陣列轉儲到檔案,您可以使用 new_file(),它將提供一個新的 RecordBatchFileWriter 實例,可用於將資料批次寫入該檔案。

例如,要寫入一個包含 10M 個整數的陣列,我們可以將其寫入 1000 個區塊,每個區塊包含 10000 個條目

In [24]: BATCH_SIZE = 10000

In [25]: NUM_BATCHES = 1000

In [26]: schema = pa.schema([pa.field('nums', pa.int32())])

In [27]: with pa.OSFile('bigfile.arrow', 'wb') as sink:
   ....:    with pa.ipc.new_file(sink, schema) as writer:
   ....:       for row in range(NUM_BATCHES):
   ....:             batch = pa.record_batch([pa.array(range(BATCH_SIZE), type=pa.int32())], schema)
   ....:             writer.write(batch)
   ....: 

記錄批次支援多個欄位,因此在實務中,我們總是寫入相當於 Table 的內容。

以批次寫入是有效的,因為理論上我們只需要將當前正在寫入的批次保留在記憶體中。但是在讀取回寫時,我們可以通過直接映射磁碟中的資料來更有效率,並避免在讀取時分配任何新記憶體。

在正常情況下,讀取回我們的檔案將消耗幾百 MB 的記憶體

In [28]: with pa.OSFile('bigfile.arrow', 'rb') as source:
   ....:    loaded_array = pa.ipc.open_file(source).read_all()
   ....: 

In [29]: print("LEN:", len(loaded_array))
LEN: 10000000

In [30]: print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
RSS: 38MB

為了更有效率地從磁碟讀取大數據,我們可以記憶體映射檔案,以便 Arrow 可以直接引用從磁碟映射的資料,並避免必須分配自己的記憶體。在這種情況下,作業系統將能夠延遲分頁載入映射的記憶體,並在壓力下將其分頁移出,而不會產生任何寫回成本,從而可以更輕鬆地讀取大於總記憶體的陣列。

In [31]: with pa.memory_map('bigfile.arrow', 'rb') as source:
   ....:    loaded_array = pa.ipc.open_file(source).read_all()
   ....: 

In [32]: print("LEN:", len(loaded_array))
LEN: 10000000

In [33]: print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
RSS: 0MB

注意

其他高階 API(如 read_table())也提供了 memory_map 選項。但在這些情況下,記憶體映射無法幫助減少常駐記憶體消耗。有關詳細資訊,請參閱 讀取 Parquet 和記憶體映射