擴展 pyarrow#

透過 PyCapsule 介面控制轉換為 (Py)Arrow 的方式#

Arrow C 資料介面 允許在 Arrow 的不同實作之間移動 Arrow 資料。這是一個通用的跨語言介面,並非專門針對 Python,但對於 Python 函式庫,此介面透過一個 Python 特定的層級進行擴展:Arrow PyCapsule 介面

此 Python 介面確保支援 C Data 介面的不同函式庫,能夠以標準方式匯出 Arrow 資料結構,並識別彼此的物件。

如果您有一個 Python 函式庫提供資料結構,這些結構在底層保存 Arrow 相容的資料,您可以在這些物件上實作以下方法

  • __arrow_c_schema__ 用於綱要或類似類型的物件。

  • __arrow_c_array__ 用於陣列和記錄批次(連續表格)。

  • __arrow_c_stream__ 用於分塊陣列、表格和資料串流。

這些方法會回傳 PyCapsule 物件,更詳細的語意可以在 規格 中找到。

當您的資料結構定義了這些方法,PyArrow 建構函式(見下方)將會辨識這些物件支援此協定,並將它們零複製地轉換為 PyArrow 資料結構。對於任何其他支援此協定的函式庫在引入資料時,情況也是如此。

同樣地,如果您的函式庫有接受使用者提供資料的函式,您可以透過檢查這些方法是否存在來新增對此協定的支援,因此可以接受任何 Arrow 資料(而不是硬式編碼對特定 Arrow 生產者(如 PyArrow)的支援)。

為了透過此協定使用 PyArrow 的資料,可以使用以下建構函式來建立各種 PyArrow 物件

可以透過使用 pyarrow.field() 取用相容於綱要的物件,然後存取結果欄位的 .type,來建立 DataType

透過 __arrow_array__ 協定控制轉換為 pyarrow.Array 的方式#

pyarrow.array() 函式內建支援 Python 序列、numpy 陣列和 pandas 1D 物件(Series、Index、Categorical 等),以將它們轉換為 Arrow 陣列。這可以透過實作 __arrow_array__ 方法(類似於 numpy 的 __array__ 協定)來擴展到其他類似陣列的物件。

例如,為了支援將您的 duck array 類別轉換為 Arrow 陣列,請定義 __arrow_array__ 方法以回傳 Arrow 陣列

class MyDuckArray:

    ...

    def __arrow_array__(self, type=None):
        # convert the underlying array values to a pyarrow Array
        import pyarrow
        return pyarrow.array(..., type=type)

__arrow_array__ 方法接受一個可選的 type 關鍵字,該關鍵字從 pyarrow.array() 傳遞而來。此方法可以回傳 ArrayChunkedArray

注意

對於更通用的方式來控制 Python 物件轉換為 Arrow 資料,請考慮 Arrow PyCapsule 介面。它不只針對 PyArrow,並且支援轉換其他物件,例如表格和綱要。

定義擴展類型(「使用者定義類型」)#

Arrow 提供擴展類型的概念,允許使用者使用額外的語意來註解資料類型。這讓開發人員可以指定自訂的序列化和反序列化常式(例如,用於 Python 純量pandas),並更輕鬆地解譯資料。

在 Arrow 中,擴展類型 是透過使用自訂類型名稱和可選的位元組字串(可用於提供額外的元資料,在本文件中稱為「參數」)來註解任何內建的 Arrow 資料類型(「儲存類型」)來指定的。這些會以欄位的 custom_metadata 中的 ARROW:extension:nameARROW:extension:metadata 鍵的形式出現。

請注意,由於這些註解是 Arrow 規格的一部分,它們可能會被其他(非 Python)Arrow 消費者(例如 PySpark)識別。

PyArrow 允許您透過子類別化 ExtensionType 並為衍生類別提供自己的擴展名稱和機制來(反)序列化任何參數,從 Python 定義擴展類型。例如,我們可以為分數定義一個自訂的有理類型,該類型可以表示為一對整數

class RationalType(pa.ExtensionType):

    def __init__(self, data_type: pa.DataType):
        if not pa.types.is_integer(data_type):
            raise TypeError(f"data_type must be an integer type not {data_type}")

        super().__init__(
            pa.struct(
                [
                    ("numer", data_type),
                    ("denom", data_type),
                ],
            ),
            "my_package.rational",
        )

    def __arrow_ext_serialize__(self) -> bytes:
        # No parameters are necessary
        return b""

    @classmethod
    def __arrow_ext_deserialize__(cls, storage_type, serialized):
        # Sanity checks, not required but illustrate the method signature.
        assert pa.types.is_struct(storage_type)
        assert pa.types.is_integer(storage_type[0].type)
        assert storage_type[0].type == storage_type[1].type
        assert serialized == b""

        # return an instance of this subclass
        return RationalType(storage_type[0].type)

特殊方法 __arrow_ext_serialize____arrow_ext_deserialize__ 定義了擴展類型實例的序列化和反序列化。

現在可以使用它來建立保存擴展類型的陣列和表格

>>> rational_type = RationalType(pa.int32())
>>> rational_type.extension_name
'my_package.rational'
>>> rational_type.storage_type
StructType(struct<numer: int32, denom: int32>)

>>> storage_array = pa.array(
...     [
...         {"numer": 10, "denom": 17},
...         {"numer": 20, "denom": 13},
...     ],
...     type=rational_type.storage_type,
... )
>>> arr = rational_type.wrap_array(storage_array)
>>> # or equivalently
>>> arr = pa.ExtensionArray.from_storage(rational_type, storage_array)
>>> arr
<pyarrow.lib.ExtensionArray object at 0x1067f5420>
-- is_valid: all not null
-- child 0 type: int32
  [
    10,
    20
  ]
-- child 1 type: int32
  [
    17,
    13
  ]

此陣列可以包含在 RecordBatch 中,透過 IPC 發送,並在另一個 Python 程序中接收。接收程序必須明確註冊擴展類型以進行反序列化,否則它將回退到儲存類型

>>> pa.register_extension_type(RationalType(pa.int32()))

例如,建立 RecordBatch 並使用 IPC 協定將其寫入串流

>>> batch = pa.RecordBatch.from_arrays([arr], ["ext"])
>>> sink = pa.BufferOutputStream()
>>> with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
...    writer.write_batch(batch)
>>> buf = sink.getvalue()

然後讀回它會產生正確的類型

>>> with pa.ipc.open_stream(buf) as reader:
...    result = reader.read_all()
>>> result.column("ext").type
RationalType(StructType(struct<numer: int32, denom: int32>))

此外,請注意,雖然我們註冊了具體類型 RationalType(pa.int32()),但相同的擴展名稱 ("my_package.rational") 由 RationalType(integer_type) 用於所有 Arrow 整數類型。因此,上述程式碼也允許使用者(反)序列化這些資料類型

>>> big_rational_type = RationalType(pa.int64())
>>> storage_array = pa.array(
...     [
...         {"numer": 10, "denom": 17},
...         {"numer": 20, "denom": 13},
...     ],
...     type=big_rational_type.storage_type,
... )
>>> arr = big_rational_type.wrap_array(storage_array)
>>> batch = pa.RecordBatch.from_arrays([arr], ["ext"])
>>> sink = pa.BufferOutputStream()
>>> with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
...    writer.write_batch(batch)
>>> buf = sink.getvalue()
>>> with pa.ipc.open_stream(buf) as reader:
...    result = reader.read_all()
>>> result.column("ext").type
RationalType(StructType(struct<numer: int64, denom: int64>))

接收應用程式不需要是 Python,但如果它已實作自己的擴展類型來接收它,則仍然可以將擴展類型識別為「my_package.rational」類型。如果類型未在接收應用程式中註冊,它將回退到儲存類型。

參數化擴展類型#

上面的範例說明了如何建構不需要儲存類型以外的額外元資料的擴展類型。但 Arrow 也提供了更彈性、參數化的擴展類型。

此處給出的範例實作了 pandas「period」資料類型 的擴展類型,表示時間跨度(例如,一天、一個月、一季等的頻率)。它儲存為 int64 陣列,該陣列被解譯為自 1970 年以來給定頻率的時間跨度數。

class PeriodType(pa.ExtensionType):

    def __init__(self, freq):
        # attributes need to be set first before calling
        # super init (as that calls serialize)
        self._freq = freq
        super().__init__(pa.int64(), "my_package.period")

    @property
    def freq(self):
        return self._freq

    def __arrow_ext_serialize__(self):
        return "freq={}".format(self.freq).encode()

    @classmethod
    def __arrow_ext_deserialize__(cls, storage_type, serialized):
        # Return an instance of this subclass given the serialized
        # metadata.
        serialized = serialized.decode()
        assert serialized.startswith("freq=")
        freq = serialized.split("=")[1]
        return PeriodType(freq)

在此,我們確保將重建實例所需的所有資訊儲存在序列化的元資料中(在 __arrow_ext_deserialize__ 類別方法中),在本例中為頻率字串。

請注意,一旦建立,資料類型實例就被視為不可變的。在上面的範例中,因此 freq 參數儲存在具有公用唯讀屬性的私有屬性中,以供存取。

自訂擴展陣列類別#

預設情況下,所有具有擴展類型的陣列都會建構或反序列化為內建的 ExtensionArray 物件。然而,可能有人想要子類別化 ExtensionArray,以便新增一些特定於擴展類型的自訂邏輯。Arrow 允許透過將特殊方法 __arrow_ext_class__ 新增到擴展類型的定義中來做到這一點。

例如,讓我們考慮 Numpy Quickstart 中 3D 空間中的點的範例。我們可以將它們儲存為固定大小的列表,我們希望能夠將資料提取為 2-D Numpy 陣列 (N, 3) 而無需任何複製

class Point3DArray(pa.ExtensionArray):
    def to_numpy_array(self):
        return self.storage.flatten().to_numpy().reshape((-1, 3))


class Point3DType(pa.ExtensionType):
    def __init__(self):
        super().__init__(pa.list_(pa.float32(), 3), "my_package.Point3DType")

    def __arrow_ext_serialize__(self):
        return b""

    @classmethod
    def __arrow_ext_deserialize__(cls, storage_type, serialized):
        return Point3DType()

    def __arrow_ext_class__(self):
        return Point3DArray

使用此擴展類型建構的陣列現在具有預期的自訂陣列類別

>>> storage = pa.array([[1, 2, 3], [4, 5, 6]], pa.list_(pa.float32(), 3))
>>> arr = pa.ExtensionArray.from_storage(Point3DType(), storage)
>>> arr
<__main__.Point3DArray object at 0x7f40dea80670>
[
    [
        1,
        2,
        3
    ],
    [
        4,
        5,
        6
    ]
]

然後,擴展類別中的其他方法可供使用者使用

>>> arr.to_numpy_array()
array([[1., 2., 3.],
   [4., 5., 6.]], dtype=float32)

此陣列可以透過 IPC 發送,在另一個 Python 程序中接收,並且自訂擴展陣列類別將被保留(只要接收程序在使用 register_extension_type() 註冊擴展類型後再讀取 IPC 資料)。

自訂純量轉換#

如果您希望自訂擴充類型 (extension type) 的純量 (scalar) 在呼叫 ExtensionScalar.as_py() 時轉換為自訂類型,您可以透過子類別化 (subclassing) ExtensionScalar.as_py() 方法來覆寫 (override) ExtensionScalar.as_py() 方法。例如,如果我們希望上述的 3D 點類型範例傳回自訂的 3D 點類別而不是列表,我們會實作:

from collections import namedtuple

Point3D = namedtuple("Point3D", ["x", "y", "z"])

class Point3DScalar(pa.ExtensionScalar):
    def as_py(self) -> Point3D:
        return Point3D(*self.value.as_py())

class Point3DType(pa.ExtensionType):
    def __init__(self):
        super().__init__(pa.list_(pa.float32(), 3), "my_package.Point3DType")

    def __arrow_ext_serialize__(self):
        return b""

    @classmethod
    def __arrow_ext_deserialize__(cls, storage_type, serialized):
        return Point3DType()

    def __arrow_ext_scalar_class__(self):
        return Point3DScalar

現在,使用此擴充類型建構的陣列會提供可轉換為我們的 Point3D 類別的純量

>>> storage = pa.array([[1, 2, 3], [4, 5, 6]], pa.list_(pa.float32(), 3))
>>> arr = pa.ExtensionArray.from_storage(Point3DType(), storage)
>>> arr[0].as_py()
Point3D(x=1.0, y=2.0, z=3.0)

>>> arr.to_pylist()
[Point3D(x=1.0, y=2.0, z=3.0), Point3D(x=4.0, y=5.0, z=6.0)]

轉換為 pandas#

若您的擴充類型有對應的 pandas 擴充陣列,則可以控制擴充類型欄位轉換為 pandas 的行為 (在 Table.to_pandas() 中)。

為此,需要實作 ExtensionType.to_pandas_dtype() 方法,且應傳回 pandas.api.extensions.ExtensionDtype 子類別實例。

以上述的 pandas period 類型為例,會如下所示:

class PeriodType(pa.ExtensionType):
    ...

    def to_pandas_dtype(self):
        import pandas as pd
        return pd.PeriodDtype(freq=self.freq)

其次,pandas 的 ExtensionDtype 本身需要實作 __from_arrow__ 方法:此方法在給定擴充類型的 pyarrow Array 或 ChunkedArray 時,可以建構對應的 pandas ExtensionArray。此方法應具有以下簽章:

class MyExtensionDtype(pd.api.extensions.ExtensionDtype):
    ...

    def __from_arrow__(self, array: pyarrow.Array/ChunkedArray) -> pandas.ExtensionArray:
        ...

如此一來,您可以控制 pyarrow 擴充類型的 Array 轉換為 pandas ExtensionArray 的行為,而 pandas ExtensionArray 可以儲存在 DataFrame 中。

標準擴充類型#

您可以在標準擴充類型章節中找到標準擴充類型的官方列表。在此我們新增關於如何在 pyarrow 中使用它們的範例。

固定大小張量#

若要建立具有相等形狀的張量陣列 (固定形狀張量陣列),我們首先需要定義一個具有值類型和形狀的固定形狀張量擴充類型:

>>> tensor_type = pa.fixed_shape_tensor(pa.int32(), (2, 2))

接著,我們需要具有 pyarrow.list_() 類型的儲存陣列,其中 value_type` 是固定形狀張量的值類型,而列表大小是 tensor_type 形狀元素的乘積。然後,我們可以使用 pa.ExtensionArray.from_storage() 方法建立張量陣列:

>>> arr = [[1, 2, 3, 4], [10, 20, 30, 40], [100, 200, 300, 400]]
>>> storage = pa.array(arr, pa.list_(pa.int32(), 4))
>>> tensor_array = pa.ExtensionArray.from_storage(tensor_type, storage)

我們也可以建立另一個具有不同值類型的張量陣列:

>>> tensor_type_2 = pa.fixed_shape_tensor(pa.float32(), (2, 2))
>>> storage_2 = pa.array(arr, pa.list_(pa.float32(), 4))
>>> tensor_array_2 = pa.ExtensionArray.from_storage(tensor_type_2, storage_2)

擴充陣列可以用作 pyarrow.Tablepyarrow.RecordBatch 中的欄位:

>>> data = [
...     pa.array([1, 2, 3]),
...     pa.array(["foo", "bar", None]),
...     pa.array([True, None, True]),
...     tensor_array,
...     tensor_array_2
... ]
>>> my_schema = pa.schema([("f0", pa.int8()),
...                        ("f1", pa.string()),
...                        ("f2", pa.bool_()),
...                        ("tensors_int", tensor_type),
...                        ("tensors_float", tensor_type_2)])
>>> table = pa.Table.from_arrays(data, schema=my_schema)
>>> table
pyarrow.Table
f0: int8
f1: string
f2: bool
tensors_int: extension<arrow.fixed_shape_tensor[value_type=int32, shape=[2,2]]>
tensors_float: extension<arrow.fixed_shape_tensor[value_type=float, shape=[2,2]]>
----
f0: [[1,2,3]]
f1: [["foo","bar",null]]
f2: [[true,null,true]]
tensors_int: [[[1,2,3,4],[10,20,30,40],[100,200,300,400]]]
tensors_float: [[[1,2,3,4],[10,20,30,40],[100,200,300,400]]]

我們也可以將張量陣列轉換為單個多維 numpy ndarray。轉換後,arrow 陣列的長度會變成 numpy ndarray 中的第一個維度:

>>> numpy_tensor = tensor_array_2.to_numpy_ndarray()
>>> numpy_tensor
array([[[  1.,   2.],
        [  3.,   4.]],
       [[ 10.,  20.],
        [ 30.,  40.]],
       [[100., 200.],
        [300., 400.]]])
 >>> numpy_tensor.shape
(3, 2, 2)

注意

可選參數 permutationdim_names 旨在向使用者提供關於資料邏輯佈局 (logical layout) 相對於物理佈局 (physical layout) 的資訊。

轉換為 numpy ndarray 僅適用於簡單的排列 (None[0, 1, ... N-1],其中 N 是張量維度的數量)。

反之亦然,我們也可以將 numpy ndarray 轉換為固定形狀張量陣列:

>>> pa.FixedShapeTensorArray.from_numpy_ndarray(numpy_tensor)
<pyarrow.lib.FixedShapeTensorArray object at ...>
[
  [
    1,
    2,
    3,
    4
  ],
  [
    10,
    20,
    30,
    40
  ],
  [
    100,
    200,
    300,
    400
  ]
]

轉換後,ndarray 的第一個維度會變成 pyarrow 擴充陣列的長度。我們可以在範例中看到,形狀為 (3, 2, 2) 的 ndarray 變成一個長度為 3 的 arrow 陣列,其張量元素形狀為 (2, 2)

# ndarray of shape (3, 2, 2)
>>> numpy_tensor.shape
(3, 2, 2)

# arrow array of length 3 with tensor elements of shape (2, 2)
>>> pyarrow_tensor_array = pa.FixedShapeTensorArray.from_numpy_ndarray(numpy_tensor)
>>> len(pyarrow_tensor_array)
3
>>> pyarrow_tensor_array.type.shape
[2, 2]

擴充類型也可以定義 permutationdim_names。例如:

>>> tensor_type = pa.fixed_shape_tensor(pa.float64(), [2, 2, 3], permutation=[0, 2, 1])

>>> tensor_type = pa.fixed_shape_tensor(pa.bool_(), [2, 2, 3], dim_names=["C", "H", "W"])

針對 NCHW 格式,其中:

  • N:影像數量,在我們的例子中,這是陣列的長度,且始終位於第一個維度

  • C:影像的通道數

  • H:影像的高度

  • W:影像的寬度