整合 PyArrow 與 Java#
Arrow 支援在同一程序內透過 Arrow C 資料介面 交換資料。
這可用於在 Python 和 Java 函數及方法之間交換資料,以便兩種語言可以互動,而無需編組和解組資料的成本。
注意
本文假設您已正確安裝 pyarrow
的 Python
環境,以及正確安裝 arrow
函式庫的 Java
環境。Arrow Java
版本必須使用 mvn -Parrow-c-data
編譯,以確保啟用 CData 交換支援。請參閱 Python 安裝說明 和 Java 文件 以取得更多詳細資訊。
從 Python 呼叫 Java 方法#
假設我們有一個簡單的 Java 類別,提供數字作為其輸出
public class Simple {
public static int getNumber() {
return 4;
}
}
我們會將此類別儲存在 Simple.java
檔案中,並繼續使用 javac Simple.java
將其編譯為 Simple.class
。
一旦建立 Simple.class
檔案,我們就可以使用 JPype 函式庫從 Python 使用該類別,這會在 Python 直譯器中啟用 Java 執行時期環境。
jpype1
可以像大多數 Python 函式庫一樣使用 pip
安裝
$ pip install jpype1
我們可以使用 Simple
類別做的最基本的事情是從 Python 使用 Simple.getNumber
方法,看看它是否會傳回結果。
為此,我們可以建立一個 simple.py
檔案,該檔案使用 jpype
從 Simple.class
檔案匯入 Simple
類別,並呼叫 Simple.getNumber
方法
import jpype
from jpype.types import *
jpype.startJVM(classpath=["./"])
Simple = JClass('Simple')
print(Simple.getNumber())
執行 simple.py
檔案將顯示我們的 Python 程式碼如何能夠存取 Java
方法並印出預期的結果
$ python simple.py
4
使用 pyarrow.jvm 從 Java 到 Python#
PyArrow 提供 pyarrow.jvm
模組,可更輕鬆地與 Java 類別互動,並將 Java 物件轉換為實際的 Python 物件。
為了展示 pyarrow.jvm
,我們可以建立一個更複雜的類別,名為 FillTen.java
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
public class FillTen {
static RootAllocator allocator = new RootAllocator();
public static BigIntVector createArray() {
BigIntVector intVector = new BigIntVector("ints", allocator);
intVector.allocateNew(10);
intVector.setValueCount(10);
FillTen.fillVector(intVector);
return intVector;
}
private static void fillVector(BigIntVector iv) {
iv.setSafe(0, 1);
iv.setSafe(1, 2);
iv.setSafe(2, 3);
iv.setSafe(3, 4);
iv.setSafe(4, 5);
iv.setSafe(5, 6);
iv.setSafe(6, 7);
iv.setSafe(7, 8);
iv.setSafe(8, 9);
iv.setSafe(9, 10);
}
}
此類別提供一個公用 createArray
方法,任何人都可以呼叫該方法以取回包含數字 1 到 10 的陣列。
鑑於此類別現在依賴於一堆套件,因此僅使用 javac
編譯它已不夠。我們需要建立一個專用的 pom.xml
檔案,我們可以在其中收集依賴項
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.arrow.py2java</groupId>
<artifactId>FillTen</artifactId>
<version>1</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
<version>8.0.0</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>8.0.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>8.0.0</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
<version>8.0.0</version>
<type>jar</type>
</dependency>
</dependencies>
</project>
一旦建立了包含類別的 FillTen.java
檔案作為 src/main/java/FillTen.java
,我們就可以使用 maven
與 mvn package
編譯專案,並使其在 target
目錄中可用。
$ mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ FillTen ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to /experiments/java2py/target/classes
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ FillTen ---
[INFO] Building jar: /experiments/java2py/target/FillTen-1.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
現在我們已經建置了套件,我們可以使其在 Python 中可用。為此,我們需要確保不僅套件本身可用,而且其依賴項也可用。
我們可以使用 maven
收集所有依賴項並使其在單個位置(dependencies
目錄)中可用,以便我們可以更輕鬆地從 Python 載入它們
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.7:copy-dependencies (default-cli) @ FillTen ---
[INFO] Copying jsr305-3.0.2.jar to /experiments/java2py/dependencies/jsr305-3.0.2.jar
[INFO] Copying netty-common-4.1.72.Final.jar to /experiments/java2py/dependencies/netty-common-4.1.72.Final.jar
[INFO] Copying arrow-memory-core-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-memory-core-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-vector-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-vector-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-c-data-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-c-data-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-vector-8.0.0-SNAPSHOT.pom to /experiments/java2py/dependencies/arrow-vector-8.0.0-SNAPSHOT.pom
[INFO] Copying jackson-core-2.11.4.jar to /experiments/java2py/dependencies/jackson-core-2.11.4.jar
[INFO] Copying jackson-annotations-2.11.4.jar to /experiments/java2py/dependencies/jackson-annotations-2.11.4.jar
[INFO] Copying slf4j-api-1.7.25.jar to /experiments/java2py/dependencies/slf4j-api-1.7.25.jar
[INFO] Copying arrow-memory-netty-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-memory-netty-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-format-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-format-8.0.0-SNAPSHOT.jar
[INFO] Copying flatbuffers-java-1.12.0.jar to /experiments/java2py/dependencies/flatbuffers-java-1.12.0.jar
[INFO] Copying arrow-memory-8.0.0-SNAPSHOT.pom to /experiments/java2py/dependencies/arrow-memory-8.0.0-SNAPSHOT.pom
[INFO] Copying netty-buffer-4.1.72.Final.jar to /experiments/java2py/dependencies/netty-buffer-4.1.72.Final.jar
[INFO] Copying jackson-databind-2.11.4.jar to /experiments/java2py/dependencies/jackson-databind-2.11.4.jar
[INFO] Copying commons-codec-1.10.jar to /experiments/java2py/dependencies/commons-codec-1.10.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
注意
除了手動收集依賴項之外,您還可以依賴 maven-assembly-plugin
來建置包含所有依賴項的單個 jar
。
一旦我們的套件及其所有依賴項都可用,我們就可以從 fillten_pyarrowjvm.py
腳本中呼叫它,該腳本將匯入 FillTen
類別並印出呼叫 FillTen.createArray
的結果
import jpype
import jpype.imports
from jpype.types import *
# Start a JVM making available all dependencies we collected
# and our class from target/FillTen-1.jar
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
FillTen = JClass('FillTen')
array = FillTen.createArray()
print("ARRAY", type(array), array)
# Convert the proxied BigIntVector to an actual pyarrow array
import pyarrow.jvm
pyarray = pyarrow.jvm.array(array)
print("ARRAY", type(pyarray), pyarray)
del pyarray
執行 python 腳本將導致印出兩行
ARRAY <java class 'org.apache.arrow.vector.BigIntVector'> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
ARRAY <class 'pyarrow.lib.Int64Array'> [
1,
2,
3,
4,
5,
6,
7,
8,
9,
10
]
第一行是呼叫 FillTen.createArray
方法的原始結果。產生的物件是實際 Java 物件的代理,因此它並非真正的 pyarrow Array,它將缺少其大多數功能和方法。這就是為什麼我們隨後使用 pyarrow.jvm.array
將其轉換為實際的 pyarrow
陣列。這使我們可以像對待任何其他 pyarrow
陣列一樣對待它。結果是輸出中的第二行,其中陣列被正確報告為 pyarrow.lib.Int64Array
類型,並使用 pyarrow
樣式印出。
注意
目前,pyarrow.jvm
模組的功能相當有限,不支援結構等巢狀類型,並且僅適用於在與 JPype 相同的程序中執行的 JVM。
使用 C 資料介面進行 Java 到 Python 通訊#
C 資料介面是在 Arrow 中實作的協定,用於在不同環境中交換資料,而無需編組和複製資料的成本。
這允許將來自 Python 或 Java 的資料公開給以另一種語言實作的函數。
注意
在未來,pyarrow.jvm
將被實作為利用 C 資料介面,目前它是專門為 JPype 撰寫的
為了展示 C 資料的工作原理,我們將稍微調整我們的 FillTen
Java 類別和我們的 fillten.py
Python 腳本。給定一個 PyArrow 陣列,我們將在 Java 中公開一個函數,該函數將其內容設定為數字 1 到 10。
目前在 pyarrow
中使用 C 資料介面需要明確安裝 cffi
,與大多數 Python 發行版一樣,可以使用以下命令安裝
$ pip install cffi
我們要做的第一件事是調整 Python 腳本,以便根據 C 資料介面將匯出的參考傳送給 Java,以參考陣列及其結構描述
import jpype
import jpype.imports
from jpype.types import *
# Init the JVM and make FillTen class available to Python.
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
FillTen = JClass('FillTen')
# Create a Python array of 10 elements
import pyarrow as pa
array = pa.array([0]*10)
from pyarrow.cffi import ffi as arrow_c
# Export the Python array through C Data
c_array = arrow_c.new("struct ArrowArray*")
c_array_ptr = int(arrow_c.cast("uintptr_t", c_array))
array._export_to_c(c_array_ptr)
# Export the Schema of the Array through C Data
c_schema = arrow_c.new("struct ArrowSchema*")
c_schema_ptr = int(arrow_c.cast("uintptr_t", c_schema))
array.type._export_to_c(c_schema_ptr)
# Send Array and its Schema to the Java function
# that will populate the array with numbers from 1 to 10
FillTen.fillCArray(c_array_ptr, c_schema_ptr)
# See how the content of our Python array was changed from Java
# while it remained of the Python type.
print("ARRAY", type(array), array)
注意
變更陣列內容不是安全的操作,這樣做是為了建立此範例,並且它主要僅在陣列的大小、類型或 null 未變更的情況下才有效。
在 FillTen Java 類別中,我們已經有 fillVector
方法,但該方法是 private 的,即使我們將其設為 public,它也只會接受 BigIntVector
物件,而不會接受 C Data 陣列和 schema 參考。
因此,我們必須擴展 FillTen
類別,新增一個 fillCArray
方法,該方法能夠執行與 fillVector
相同的工作,但作用於 C Data 交換實體,而不是 BigIntVector
物件。
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.BigIntVector;
public class FillTen {
static RootAllocator allocator = new RootAllocator();
public static void fillCArray(long c_array_ptr, long c_schema_ptr) {
ArrowArray arrow_array = ArrowArray.wrap(c_array_ptr);
ArrowSchema arrow_schema = ArrowSchema.wrap(c_schema_ptr);
FieldVector v = Data.importVector(allocator, arrow_array, arrow_schema, null);
FillTen.fillVector((BigIntVector)v);
}
private static void fillVector(BigIntVector iv) {
iv.setSafe(0, 1);
iv.setSafe(1, 2);
iv.setSafe(2, 3);
iv.setSafe(3, 4);
iv.setSafe(4, 5);
iv.setSafe(5, 6);
iv.setSafe(6, 7);
iv.setSafe(7, 8);
iv.setSafe(8, 9);
iv.setSafe(9, 10);
}
}
fillCArray
方法的目標是取得以 C Data 交換格式接收的陣列和 Schema,並將它們轉換回 FieldVector
類型的物件,以便 Arrow Java 知道如何處理它。
如果我們再次執行 mvn package
,更新 maven 依賴,然後更新我們的 Python 腳本,我們應該能夠看到 Python 腳本列印的值已由 Java 程式碼正確地更改。
$ mvn package
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
$ python fillten.py
ARRAY <class 'pyarrow.lib.Int64Array'> [
1,
2,
3,
4,
5,
6,
7,
8,
9,
10
]
我們也可以使用 C Stream 介面來交換 Java 和 Python 之間的 pyarrow.RecordBatchReader
。我們將使用這個 Java 類別作為示範,它可以讓您透過 Java 的實作讀取 Arrow IPC 檔案,或將資料寫入 JSON 檔案。
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.JsonFileWriter;
public class PythonInteropDemo implements AutoCloseable {
private final BufferAllocator allocator;
public PythonInteropDemo() {
this.allocator = new RootAllocator();
}
public void exportStream(String path, long cStreamPointer) throws Exception {
try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer)) {
ArrowFileReader reader = new ArrowFileReader(Files.newByteChannel(Paths.get(path)), allocator);
Data.exportArrayStream(allocator, reader, stream);
}
}
public void importStream(String path, long cStreamPointer) throws Exception {
try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer);
final ArrowReader input = Data.importArrayStream(allocator, stream);
JsonFileWriter writer = new JsonFileWriter(new File(path))) {
writer.start(input.getVectorSchemaRoot().getSchema(), input);
while (input.loadNextBatch()) {
writer.write(input.getVectorSchemaRoot());
}
}
}
@Override
public void close() throws Exception {
allocator.close();
}
}
在 Python 端,我們將像以前一樣使用 JPype,但這次我們將來回傳送 RecordBatchReaders。
import tempfile
import jpype
import jpype.imports
from jpype.types import *
# Init the JVM and make demo class available to Python.
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
PythonInteropDemo = JClass("PythonInteropDemo")
demo = PythonInteropDemo()
# Create a Python record batch reader
import pyarrow as pa
schema = pa.schema([
("ints", pa.int64()),
("strs", pa.string())
])
batches = [
pa.record_batch([
[0, 2, 4, 8],
["a", "b", "c", None],
], schema=schema),
pa.record_batch([
[None, 32, 64, None],
["e", None, None, "h"],
], schema=schema),
]
reader = pa.RecordBatchReader.from_batches(schema, batches)
from pyarrow.cffi import ffi as arrow_c
# Export the Python reader through C Data
c_stream = arrow_c.new("struct ArrowArrayStream*")
c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
reader._export_to_c(c_stream_ptr)
# Send reader to the Java function that writes a JSON file
with tempfile.NamedTemporaryFile() as temp:
demo.importStream(temp.name, c_stream_ptr)
# Read the JSON file back
with open(temp.name) as source:
print("JSON file written by Java:")
print(source.read())
# Write an Arrow IPC file for Java to read
with tempfile.NamedTemporaryFile() as temp:
with pa.ipc.new_file(temp.name, schema) as sink:
for batch in batches:
sink.write_batch(batch)
demo.exportStream(temp.name, c_stream_ptr)
with pa.RecordBatchReader._import_from_c(c_stream_ptr) as source:
print("IPC file read by Java:")
print(source.read_all())
$ mvn package
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
$ python demo.py
JSON file written by Java:
{"schema":{"fields":[{"name":"ints","nullable":true,"type":{"name":"int","bitWidth":64,"isSigned":true},"children":[]},{"name":"strs","nullable":true,"type":{"name":"utf8"},"children":[]}]},"batches":[{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[1,1,1,1],"DATA":["0","2","4","8"]},{"name":"strs","count":4,"VALIDITY":[1,1,1,0],"OFFSET":[0,1,2,3,3],"DATA":["a","b","c",""]}]},{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[0,1,1,0],"DATA":["0","32","64","0"]},{"name":"strs","count":4,"VALIDITY":[1,0,0,1],"OFFSET":[0,1,1,1,2],"DATA":["e","","","h"]}]}]}
IPC file read by Java:
pyarrow.Table
ints: int64
strs: string
----
ints: [[0,2,4,8],[null,32,64,null]]
strs: [["a","b","c",null],["e",null,null,"h"]]