運算函數#

Arrow 支援對可能具有不同類型的輸入執行邏輯運算。

標準運算操作由 pyarrow.compute 模組提供,可以直接使用

>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> a = pa.array([1, 1, 2, 3])
>>> pc.sum(a)
<pyarrow.Int64Scalar: 7>

分組聚合函數會引發例外,需要透過 pyarrow.Table.group_by() 功能使用。詳情請參閱 分組聚合

標準運算函數#

許多運算函數同時支援陣列(分塊或非分塊)和純量輸入,但有些函數會強制要求其中一種。例如,sort_indices 要求其第一個也是唯一的輸入必須是陣列。

以下是一些簡單的範例

>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> a = pa.array([1, 1, 2, 3])
>>> b = pa.array([4, 1, 2, 8])
>>> pc.equal(a, b)
<pyarrow.lib.BooleanArray object at 0x7f686e4eef30>
[
  false,
  true,
  true,
  false
]
>>> x, y = pa.scalar(7.8), pa.scalar(9.3)
>>> pc.multiply(x, y)
<pyarrow.DoubleScalar: 72.54>

這些函數可以執行的操作不僅僅是逐元素運算。以下是一個表格排序的範例

>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> t = pa.table({'x':[1,2,3],'y':[3,2,1]})
>>> i = pc.sort_indices(t, sort_keys=[('y', 'ascending')])
>>> i
<pyarrow.lib.UInt64Array object at 0x7fcee5df75e8>
[
  2,
  1,
  0
]

如需 PyArrow 提供的完整運算函數列表,您可以參考 運算函數 參考文件。

分組聚合#

PyArrow 透過 pyarrow.Table.group_by() 方法,支援對 pyarrow.Table 進行分組聚合。此方法會傳回一個分組宣告,可以對其應用雜湊聚合函數

>>> import pyarrow as pa
>>> t = pa.table([
...       pa.array(["a", "a", "b", "b", "c"]),
...       pa.array([1, 2, 3, 4, 5]),
... ], names=["keys", "values"])
>>> t.group_by("keys").aggregate([("values", "sum")])
pyarrow.Table
values_sum: int64
keys: string
----
values_sum: [[3,7,5]]
keys: [["a","b","c"]]

在上一個範例中,傳遞給 aggregate 方法的 "sum" 聚合是 hash_sum 運算函數。

可以同時執行多個聚合,方法是將它們提供給 aggregate 方法

>>> import pyarrow as pa
>>> t = pa.table([
...       pa.array(["a", "a", "b", "b", "c"]),
...       pa.array([1, 2, 3, 4, 5]),
... ], names=["keys", "values"])
>>> t.group_by("keys").aggregate([
...    ("values", "sum"),
...    ("keys", "count")
... ])
pyarrow.Table
values_sum: int64
keys_count: int64
keys: string
----
values_sum: [[3,7,5]]
keys_count: [[2,2,1]]
keys: [["a","b","c"]]

也可以為每個聚合函數提供聚合選項,例如,我們可以使用 CountOptions 來變更我們計算空值的方式

>>> import pyarrow as pa
>>> import pyarrow.compute as pc
>>> table_with_nulls = pa.table([
...    pa.array(["a", "a", "a"]),
...    pa.array([1, None, None])
... ], names=["keys", "values"])
>>> table_with_nulls.group_by(["keys"]).aggregate([
...    ("values", "count", pc.CountOptions(mode="all"))
... ])
pyarrow.Table
values_count: int64
keys: string
----
values_count: [[3]]
keys: [["a"]]
>>> table_with_nulls.group_by(["keys"]).aggregate([
...    ("values", "count", pc.CountOptions(mode="only_valid"))
... ])
pyarrow.Table
values_count: int64
keys: string
----
values_count: [[1]]
keys: [["a"]]

以下是所有支援的分組聚合函數列表。您可以使用它們,無論是否帶有 "hash_" 前綴。

hash_all

判斷每個群組中的所有元素是否評估為 true

ScalarAggregateOptions

hash_any

判斷每個群組中是否有任何元素評估為 true

ScalarAggregateOptions

hash_approximate_median

計算每個群組中數值的近似中位數

ScalarAggregateOptions

hash_count

計算每個群組中空值/非空值的數量

CountOptions

hash_count_all

計算每個群組中的列數

hash_count_distinct

計算每個群組中的相異值

CountOptions

hash_distinct

保留每個群組中的相異值

CountOptions

hash_first

計算每個群組中的第一個值

ScalarAggregateOptions

hash_first_last

計算每個群組中數值的第一個和最後一個值

ScalarAggregateOptions

hash_last

計算每個群組中的第一個值

ScalarAggregateOptions

hash_list

列出每個群組中的所有值

hash_max

計算每個群組中數值的最小值或最大值

ScalarAggregateOptions

hash_mean

計算每個群組中數值的平均值

ScalarAggregateOptions

hash_min

計算每個群組中數值的最小值或最大值

ScalarAggregateOptions

hash_min_max

計算每個群組中數值的最小值和最大值

ScalarAggregateOptions

hash_one

從每個群組取得一個值

hash_product

計算每個群組中數值的乘積

ScalarAggregateOptions

hash_stddev

計算每個群組中數值的標準差

hash_sum

加總每個群組中的數值

ScalarAggregateOptions

hash_tdigest

計算每個群組中數值的近似分位數

TDigestOptions

hash_variance

計算每個群組中數值的變異數

表格與資料集聯結#

TableDataset 都透過 Table.join()Dataset.join() 方法支援聯結操作。

這些方法接受一個右側表格或資料集,它將與初始表格或資料集聯結,以及一個或多個應從兩個實體使用的鍵,以執行聯結。

預設情況下,會執行 left outer join,但可以要求任何支援的聯結類型

  • left semi

  • right semi

  • left anti

  • right anti

  • inner

  • left outer

  • right outer

  • full outer

基本的聯結可以透過僅提供一個表格和一個聯結應依據的鍵來執行

import pyarrow as pa

table1 = pa.table({'id': [1, 2, 3],
                   'year': [2020, 2022, 2019]})

table2 = pa.table({'id': [3, 4],
                   'n_legs': [5, 100],
                   'animal': ["Brittle stars", "Centipede"]})

joined_table = table1.join(table2, keys="id")

結果將是一個新的表格,透過在 id 鍵上使用 left outer join 聯結 table1table2 而建立

pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
n_legs: [[5,null,null]]
animal: [["Brittle stars",null,null]]

我們可以執行其他類型的聯結,例如 full outer join,方法是將它們傳遞給 join_type 引數

table1.join(table2, keys='id', join_type="full outer")

在這種情況下,結果將會是

pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: [[3,1,2,4]]
year: [[2019,2020,2022,null]]
n_legs: [[5,null,null,100]]
animal: [["Brittle stars",null,null,"Centipede"]]

也可以提供額外的聯結鍵,以便聯結在兩個鍵而不是一個鍵上發生。例如,我們可以將 year 欄新增至 table2,以便我們可以在 ('id', 'year') 上聯結

table2_withyear = table2.append_column("year", pa.array([2019, 2022]))
table1.join(table2_withyear, keys=["id", "year"])

結果將是一個表格,其中只有 id=3year=2019 的條目有資料,其餘條目將為 null

pyarrow.Table
id: int64
year: int64
animal: string
n_legs: int64
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
animal: [["Brittle stars",null,null]]
n_legs: [[5,null,null]]

相同的能力也適用於 Dataset.join(),因此您可以取得兩個資料集並將它們聯結

import pyarrow.dataset as ds

ds1 = ds.dataset(table1)
ds2 = ds.dataset(table2)

joined_ds = ds1.join(ds2, keys="id")

產生的資料集將是一個包含聯結資料的 InMemoryDataset

>>> joined_ds.head(5)

pyarrow.Table
id: int64
year: int64
animal: string
n_legs: int64
----
id: [[3,1,2]]
year: [[2019,2020,2022]]
animal: [["Brittle stars",null,null]]
n_legs: [[5,null,null]]

依運算式篩選#

TableDataset 都可以使用布林值 Expression 進行篩選。

運算式可以從 pyarrow.compute.field() 開始建立。然後可以將比較和轉換應用於一個或多個欄位,以建立您關心的篩選運算式。

大多數 運算函數 可以用於對 field 執行轉換。

例如,我們可以建立一個篩選器,以尋找 "nums" 欄中所有偶數的列

import pyarrow.compute as pc
even_filter = (pc.bit_wise_and(pc.field("nums"), pc.scalar(1)) == pc.scalar(0))

注意

此篩選器透過對數字與 1 執行位元 AND 運算來尋找偶數。由於 1 的二進位形式為 00000001,因此只有最後一個位元設定為 1 的數字,才會從 bit_wise_and 運算中傳回非零結果。 這樣我們就能識別出所有奇數。 考慮到我們對偶數感興趣,因此我們接著檢查 bit_wise_and 運算傳回的數字是否等於 0。 只有最後一個位元為 0 的數字才會傳回 0 作為 num & 1 的結果,且由於最後一個位元為 0 的所有數字都是 2 的倍數,因此我們將僅篩選偶數。

一旦我們有了篩選器,就可以將其提供給 Table.filter() 方法,以僅針對符合的列篩選表格

>>> table = pa.table({'nums': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
...                   'chars': ["a", "b", "c", "d", "e", "f", "g", "h", "i", "l"]})
>>> table.filter(even_filter)
pyarrow.Table
nums: int64
chars: string
----
nums: [[2,4,6,8,10]]
chars: [["b","d","f","h","l"]]

多個篩選器可以使用 &|~ 結合,以執行 andornot 運算。 例如,使用 ~even_filter 實際上最終會篩選出所有奇數

>>> table.filter(~even_filter)
pyarrow.Table
nums: int64
chars: string
----
nums: [[1,3,5,7,9]]
chars: [["a","c","e","g","i"]]

我們可以透過將 even_filterpc.field("nums") > 5 篩選器結合,來建立一個尋找所有大於 5 的偶數的篩選器

>>> table.filter(even_filter & (pc.field("nums") > 5))
pyarrow.Table
nums: int64
chars: string
----
nums: [[6,8,10]]
chars: [["f","h","l"]]

Dataset 也可以使用 Dataset.filter() 方法進行篩選。 此方法將傳回 Dataset 的一個實例,該實例將在存取資料集的實際資料後立即延遲套用篩選器

>>> dataset = ds.dataset(table)
>>> filtered = dataset.filter(pc.field("nums") < 5).filter(pc.field("nums") > 2)
>>> filtered.to_table()
pyarrow.Table
nums: int64
chars: string
----
nums: [[3,4]]
chars: [["c","d"]]

使用者定義函數#

警告

此 API 為實驗性

PyArrow 允許定義和註冊自訂的計算函數。 這些函數隨後可以從 Python 以及 C++(以及可能任何其他包裝 Arrow C++ 的實作,例如 R arrow 套件)中使用其註冊的函數名稱來呼叫。

UDF 支援僅限於純量函數。 純量函數是對陣列或純量執行逐元素運算的函數。 一般而言,純量函數的輸出不取決於引數中值的順序。 請注意,此類函數大致對應於 SQL 運算式中使用的函數,或 NumPy 通用函數

若要註冊 UDF,需要定義函數名稱、函數文件、輸入類型和輸出類型。 使用 pyarrow.compute.register_scalar_function()

import numpy as np

import pyarrow as pa
import pyarrow.compute as pc

function_name = "numpy_gcd"
function_docs = {
      "summary": "Calculates the greatest common divisor",
      "description":
         "Given 'x' and 'y' find the greatest number that divides\n"
         "evenly into both x and y."
}

input_types = {
   "x" : pa.int64(),
   "y" : pa.int64()
}

output_type = pa.int64()

def to_np(val):
    if isinstance(val, pa.Scalar):
       return val.as_py()
    else:
       return np.array(val)

def gcd_numpy(ctx, x, y):
    np_x = to_np(x)
    np_y = to_np(y)
    return pa.array(np.gcd(np_x, np_y))

pc.register_scalar_function(gcd_numpy,
                           function_name,
                           function_docs,
                           input_types,
                           output_type)

使用者定義函數的實作始終採用第一個內容參數(在上面的範例中命名為 ctx),它是 pyarrow.compute.UdfContext 的一個實例。 此內容公開了幾個有用的屬性,特別是 memory_pool,用於使用者定義函數內容中的分配。

您可以使用 pyarrow.compute.call_function() 直接呼叫使用者定義函數

>>> pc.call_function("numpy_gcd", [pa.scalar(27), pa.scalar(63)])
<pyarrow.Int64Scalar: 9>
>>> pc.call_function("numpy_gcd", [pa.scalar(27), pa.array([81, 12, 5])])
<pyarrow.lib.Int64Array object at 0x7fcfa0e7b100>
[
  27,
  3,
  1
]

使用資料集#

更廣泛地說,使用者定義函數在可以透過名稱引用計算函數的任何地方都可用。 例如,可以使用 Expression._call() 在資料集的欄位上呼叫它們。

考慮一個資料位於表格中的情況,我們想要計算一個欄位與純量值 30 的 GCD。 我們將重複使用上面建立的 “numpy_gcd” 使用者定義函數

>>> import pyarrow.dataset as ds
>>> data_table = pa.table({'category': ['A', 'B', 'C', 'D'], 'value': [90, 630, 1827, 2709]})
>>> dataset = ds.dataset(data_table)
>>> func_args = [pc.scalar(30), ds.field("value")]
>>> dataset.to_table(
...             columns={
...                 'gcd_value': ds.field('')._call("numpy_gcd", func_args),
...                 'value': ds.field('value'),
...                 'category': ds.field('category')
...             })
pyarrow.Table
gcd_value: int64
value: int64
category: string
----
gcd_value: [[30,30,3,3]]
value: [[90,630,1827,2709]]
category: [["A","B","C","D"]]

請注意,ds.field('')._call(...) 傳回 pyarrow.compute.Expression()。 傳遞給此函數呼叫的引數是運算式,而不是純量值(請注意 pyarrow.scalar()pyarrow.compute.scalar() 之間的差異,後者產生運算式)。 當投影運算子執行它時,會評估此運算式。

投影運算式#

在上面的範例中,我們使用運算式將新欄位(gcd_value)新增至我們的表格。 將新的、動態計算的欄位新增至表格稱為「投影」,並且可以投影運算式中使用的函數種類有限制。 投影函數必須為每個輸入列發出單個輸出值。 該輸出值應完全從輸入列計算得出,並且不應取決於任何其他列。 例如,我們一直用作範例的 “numpy_gcd” 函數是可在投影中使用的有效函數。 「累積總和」函數將不是有效函數,因為每個輸入列的結果取決於之前的列。 「捨棄空值」函數也將無效,因為它不會為某些列發出值。