Substrait#
arrow-substrait
模組實作了對 Substrait 格式的支援,能夠轉換成 Arrow 物件以及從 Arrow 物件轉換。
arrow-dataset
模組可以透過 Acero 查詢引擎執行 Substrait 計畫。
使用綱要#
Arrow 綱要可以使用 pyarrow.substrait.serialize_schema()
和 pyarrow.substrait.deserialize_schema()
函數進行編碼和解碼。
import pyarrow as pa
import pyarrow.substrait as pa_substrait
arrow_schema = pa.schema([
pa.field("x", pa.int32()),
pa.field("y", pa.string())
])
substrait_schema = pa_substrait.serialize_schema(arrow_schema)
以 Substrait NamedStruct
封送處理的綱要可直接作為 substrait_schema.schema
使用
>>> print(substrait_schema.schema)
b'\n\x01x\n\x01y\x12\x0c\n\x04*\x02\x10\x01\n\x04b\x02\x10\x01'
如果使用了 Arrow 自訂類型,綱要將需要擴展才能實際使用,因此,綱要也可以作為 擴展表達式 使用,其中包括所有擴展類型
>>> print(substrait_schema.expression)
b'"\x14\n\x01x\n\x01y\x12\x0c\n\x04*\x02\x10\x01\n\x04b\x02\x10\x01:\x19\x10,*\x15Acero 17.0.0'
如果安裝了 Substrait Python
,綱要也可以轉換為 substrait-python
物件
>>> print(substrait_schema.to_pysubstrait())
version {
minor_number: 44
producer: "Acero 17.0.0"
}
base_schema {
names: "x"
names: "y"
struct {
types {
i32 {
nullability: NULLABILITY_NULLABLE
}
}
types {
string {
nullability: NULLABILITY_NULLABLE
}
}
}
}
使用表達式#
Arrow 運算表達式可以使用 pyarrow.substrait.serialize_expressions()
和 pyarrow.substrait.deserialize_expressions()
函數進行編碼和解碼。
import pyarrow as pa
import pyarrow.compute as pa
import pyarrow.substrait as pa_substrait
arrow_schema = pa.schema([
pa.field("x", pa.int32()),
pa.field("y", pa.int32())
])
substrait_expr = pa_substrait.serialize_expressions(
exprs=[pc.field("x") + pc.field("y")],
names=["total"],
schema=arrow_schema
)
編碼為 substrait 表達式的結果將會是 protobuf ExtendedExpression
訊息資料本身
>>> print(bytes(substrait_expr))
b'\nZ\x12Xhttps://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml\x12\x07\x1a\x05\x1a\x03add\x1a>\n5\x1a3\x1a\x04*\x02\x10\x01"\n\x1a\x08\x12\x06\n\x02\x12\x00"\x00"\x0c\x1a\n\x12\x08\n\x04\x12\x02\x08\x01"\x00*\x11\n\x08overflow\x12\x05ERROR\x1a\x05total"\x14\n\x01x\n\x01y\x12\x0c\n\x04*\x02\x10\x01\n\x04*\x02\x10\x01:\x19\x10,*\x15Acero 17.0.0'
因此,如果需要 Substrait Python
物件,則表達式必須從 substrait-python
本身解碼
>>> import substrait
>>> pysubstrait_expr = substrait.proto.ExtendedExpression.FromString(substrait_expr)
>>> print(pysubstrait_expr)
version {
minor_number: 44
producer: "Acero 17.0.0"
}
extension_uris {
uri: "https://github.com/substrait-io/substrait/blob/main/extensions/functions_arithmetic.yaml"
}
extensions {
extension_function {
name: "add"
}
}
referred_expr {
expression {
scalar_function {
arguments {
value {
selection {
direct_reference {
struct_field {
}
}
root_reference {
}
}
}
}
arguments {
value {
selection {
direct_reference {
struct_field {
field: 1
}
}
root_reference {
}
}
}
}
options {
name: "overflow"
preference: "ERROR"
}
output_type {
i32 {
nullability: NULLABILITY_NULLABLE
}
}
}
}
output_names: "total"
}
base_schema {
names: "x"
names: "y"
struct {
types {
i32 {
nullability: NULLABILITY_NULLABLE
}
}
types {
i32 {
nullability: NULLABILITY_NULLABLE
}
}
}
}
使用 Substrait 擴展表達式執行查詢#
Dataset 支援使用 Substrait 的 擴展表達式 執行查詢,表達式可以 pyarrow.substrait.BoundExpressions
的形式傳遞到資料集掃描器中
import pyarrow.dataset as ds
import pyarrow.substrait as pa_substrait
# Use substrait-python to create the queries
from substrait import proto
dataset = ds.dataset("./data/index-0.parquet")
substrait_schema = pa_substrait.serialize_schema(dataset.schema).to_pysubstrait()
# SELECT project_name FROM dataset WHERE project_name = 'pyarrow'
projection = proto.ExtendedExpression(referred_expr=[
{"expression": {"selection": {"direct_reference": {"struct_field": {"field": 0}}}},
"output_names": ["project_name"]}
])
projection.MergeFrom(substrait_schema)
filtering = proto.ExtendedExpression(
extension_uris=[{"extension_uri_anchor": 99, "uri": "/functions_comparison.yaml"}],
extensions=[{"extension_function": {"extension_uri_reference": 99, "function_anchor": 199, "name": "equal:any1_any1"}}],
referred_expr=[
{"expression": {"scalar_function": {"function_reference": 199, "arguments": [
{"value": {"selection": {"direct_reference": {"struct_field": {"field": 0}}}}},
{"value": {"literal": {"string": "pyarrow"}}}
], "output_type": {"bool": {"nullability": False}}}}}
]
)
filtering.MergeFrom(substrait_schema)
results = dataset.scanner(
columns=pa.substrait.BoundExpressions.from_substrait(projection),
filter=pa.substrait.BoundExpressions.from_substrait(filtering)
).head(5)
project_name
0 pyarrow
1 pyarrow
2 pyarrow
3 pyarrow
4 pyarrow