整合 PyArrow 與 Java#

Arrow 支援在同一程序內透過 Arrow C 資料介面 交換資料。

這可用於在 Python 和 Java 函數及方法之間交換資料,以便兩種語言可以互動,而無需編組和解組資料的成本。

注意

本文假設您已正確安裝 pyarrowPython 環境,以及正確安裝 arrow 函式庫的 Java 環境。Arrow Java 版本必須使用 mvn -Parrow-c-data 編譯,以確保啟用 CData 交換支援。請參閱 Python 安裝說明Java 文件 以取得更多詳細資訊。

從 Python 呼叫 Java 方法#

假設我們有一個簡單的 Java 類別,提供數字作為其輸出

public class Simple {
    public static int getNumber() {
        return 4;
    }
}

我們會將此類別儲存在 Simple.java 檔案中,並繼續使用 javac Simple.java 將其編譯為 Simple.class

一旦建立 Simple.class 檔案,我們就可以使用 JPype 函式庫從 Python 使用該類別,這會在 Python 直譯器中啟用 Java 執行時期環境。

jpype1 可以像大多數 Python 函式庫一樣使用 pip 安裝

$ pip install jpype1

我們可以使用 Simple 類別做的最基本的事情是從 Python 使用 Simple.getNumber 方法,看看它是否會傳回結果。

為此,我們可以建立一個 simple.py 檔案,該檔案使用 jpypeSimple.class 檔案匯入 Simple 類別,並呼叫 Simple.getNumber 方法

import jpype
from jpype.types import *

jpype.startJVM(classpath=["./"])

Simple = JClass('Simple')

print(Simple.getNumber())

執行 simple.py 檔案將顯示我們的 Python 程式碼如何能夠存取 Java 方法並印出預期的結果

$ python simple.py
4

使用 pyarrow.jvm 從 Java 到 Python#

PyArrow 提供 pyarrow.jvm 模組,可更輕鬆地與 Java 類別互動,並將 Java 物件轉換為實際的 Python 物件。

為了展示 pyarrow.jvm,我們可以建立一個更複雜的類別,名為 FillTen.java

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;


public class FillTen {
    static RootAllocator allocator = new RootAllocator();

    public static BigIntVector createArray() {
        BigIntVector intVector = new BigIntVector("ints", allocator);
        intVector.allocateNew(10);
        intVector.setValueCount(10);
        FillTen.fillVector(intVector);
        return intVector;
    }

    private static void fillVector(BigIntVector iv) {
        iv.setSafe(0, 1);
        iv.setSafe(1, 2);
        iv.setSafe(2, 3);
        iv.setSafe(3, 4);
        iv.setSafe(4, 5);
        iv.setSafe(5, 6);
        iv.setSafe(6, 7);
        iv.setSafe(7, 8);
        iv.setSafe(8, 9);
        iv.setSafe(9, 10);
    }
}

此類別提供一個公用 createArray 方法,任何人都可以呼叫該方法以取回包含數字 1 到 10 的陣列。

鑑於此類別現在依賴於一堆套件,因此僅使用 javac 編譯它已不夠。我們需要建立一個專用的 pom.xml 檔案,我們可以在其中收集依賴項

<project>
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.apache.arrow.py2java</groupId>
    <artifactId>FillTen</artifactId>
    <version>1</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
        <groupId>org.apache.arrow</groupId>
        <artifactId>arrow-memory</artifactId>
        <version>8.0.0</version>
        <type>pom</type>
        </dependency>
        <dependency>
        <groupId>org.apache.arrow</groupId>
        <artifactId>arrow-memory-netty</artifactId>
        <version>8.0.0</version>
        <type>jar</type>
        </dependency>
        <dependency>
        <groupId>org.apache.arrow</groupId>
        <artifactId>arrow-vector</artifactId>
        <version>8.0.0</version>
        <type>pom</type>
        </dependency>
        <dependency>
        <groupId>org.apache.arrow</groupId>
        <artifactId>arrow-c-data</artifactId>
        <version>8.0.0</version>
        <type>jar</type>
        </dependency>
    </dependencies>
</project>

一旦建立了包含類別的 FillTen.java 檔案作為 src/main/java/FillTen.java,我們就可以使用 mavenmvn package 編譯專案,並使其在 target 目錄中可用。

$ mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ FillTen ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to /experiments/java2py/target/classes
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ FillTen ---
[INFO] Building jar: /experiments/java2py/target/FillTen-1.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

現在我們已經建置了套件,我們可以使其在 Python 中可用。為此,我們需要確保不僅套件本身可用,而且其依賴項也可用。

我們可以使用 maven 收集所有依賴項並使其在單個位置(dependencies 目錄)中可用,以便我們可以更輕鬆地從 Python 載入它們

$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.7:copy-dependencies (default-cli) @ FillTen ---
[INFO] Copying jsr305-3.0.2.jar to /experiments/java2py/dependencies/jsr305-3.0.2.jar
[INFO] Copying netty-common-4.1.72.Final.jar to /experiments/java2py/dependencies/netty-common-4.1.72.Final.jar
[INFO] Copying arrow-memory-core-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-memory-core-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-vector-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-vector-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-c-data-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-c-data-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-vector-8.0.0-SNAPSHOT.pom to /experiments/java2py/dependencies/arrow-vector-8.0.0-SNAPSHOT.pom
[INFO] Copying jackson-core-2.11.4.jar to /experiments/java2py/dependencies/jackson-core-2.11.4.jar
[INFO] Copying jackson-annotations-2.11.4.jar to /experiments/java2py/dependencies/jackson-annotations-2.11.4.jar
[INFO] Copying slf4j-api-1.7.25.jar to /experiments/java2py/dependencies/slf4j-api-1.7.25.jar
[INFO] Copying arrow-memory-netty-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-memory-netty-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-format-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-format-8.0.0-SNAPSHOT.jar
[INFO] Copying flatbuffers-java-1.12.0.jar to /experiments/java2py/dependencies/flatbuffers-java-1.12.0.jar
[INFO] Copying arrow-memory-8.0.0-SNAPSHOT.pom to /experiments/java2py/dependencies/arrow-memory-8.0.0-SNAPSHOT.pom
[INFO] Copying netty-buffer-4.1.72.Final.jar to /experiments/java2py/dependencies/netty-buffer-4.1.72.Final.jar
[INFO] Copying jackson-databind-2.11.4.jar to /experiments/java2py/dependencies/jackson-databind-2.11.4.jar
[INFO] Copying commons-codec-1.10.jar to /experiments/java2py/dependencies/commons-codec-1.10.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

注意

除了手動收集依賴項之外,您還可以依賴 maven-assembly-plugin 來建置包含所有依賴項的單個 jar

一旦我們的套件及其所有依賴項都可用,我們就可以從 fillten_pyarrowjvm.py 腳本中呼叫它,該腳本將匯入 FillTen 類別並印出呼叫 FillTen.createArray 的結果

import jpype
import jpype.imports
from jpype.types import *

# Start a JVM making available all dependencies we collected
# and our class from target/FillTen-1.jar
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])

FillTen = JClass('FillTen')

array = FillTen.createArray()
print("ARRAY", type(array), array)

# Convert the proxied BigIntVector to an actual pyarrow array
import pyarrow.jvm
pyarray = pyarrow.jvm.array(array)
print("ARRAY", type(pyarray), pyarray)
del pyarray

執行 python 腳本將導致印出兩行

ARRAY <java class 'org.apache.arrow.vector.BigIntVector'> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
ARRAY <class 'pyarrow.lib.Int64Array'> [
    1,
    2,
    3,
    4,
    5,
    6,
    7,
    8,
    9,
    10
]

第一行是呼叫 FillTen.createArray 方法的原始結果。產生的物件是實際 Java 物件的代理,因此它並非真正的 pyarrow Array,它將缺少其大多數功能和方法。這就是為什麼我們隨後使用 pyarrow.jvm.array 將其轉換為實際的 pyarrow 陣列。這使我們可以像對待任何其他 pyarrow 陣列一樣對待它。結果是輸出中的第二行,其中陣列被正確報告為 pyarrow.lib.Int64Array 類型,並使用 pyarrow 樣式印出。

注意

目前,pyarrow.jvm 模組的功能相當有限,不支援結構等巢狀類型,並且僅適用於在與 JPype 相同的程序中執行的 JVM。

使用 C 資料介面進行 Java 到 Python 通訊#

C 資料介面是在 Arrow 中實作的協定,用於在不同環境中交換資料,而無需編組和複製資料的成本。

這允許將來自 Python 或 Java 的資料公開給以另一種語言實作的函數。

注意

在未來,pyarrow.jvm 將被實作為利用 C 資料介面,目前它是專門為 JPype 撰寫的

為了展示 C 資料的工作原理,我們將稍微調整我們的 FillTen Java 類別和我們的 fillten.py Python 腳本。給定一個 PyArrow 陣列,我們將在 Java 中公開一個函數,該函數將其內容設定為數字 1 到 10。

目前在 pyarrow 中使用 C 資料介面需要明確安裝 cffi,與大多數 Python 發行版一樣,可以使用以下命令安裝

$ pip install cffi

我們要做的第一件事是調整 Python 腳本,以便根據 C 資料介面將匯出的參考傳送給 Java,以參考陣列及其結構描述

import jpype
import jpype.imports
from jpype.types import *

# Init the JVM and make FillTen class available to Python.
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
FillTen = JClass('FillTen')

# Create a Python array of 10 elements
import pyarrow as pa
array = pa.array([0]*10)

from pyarrow.cffi import ffi as arrow_c

# Export the Python array through C Data
c_array = arrow_c.new("struct ArrowArray*")
c_array_ptr = int(arrow_c.cast("uintptr_t", c_array))
array._export_to_c(c_array_ptr)

# Export the Schema of the Array through C Data
c_schema = arrow_c.new("struct ArrowSchema*")
c_schema_ptr = int(arrow_c.cast("uintptr_t", c_schema))
array.type._export_to_c(c_schema_ptr)

# Send Array and its Schema to the Java function
# that will populate the array with numbers from 1 to 10
FillTen.fillCArray(c_array_ptr, c_schema_ptr)

# See how the content of our Python array was changed from Java
# while it remained of the Python type.
print("ARRAY", type(array), array)

注意

變更陣列內容不是安全的操作,這樣做是為了建立此範例,並且它主要僅在陣列的大小、類型或 null 未變更的情況下才有效。

在 FillTen Java 類別中,我們已經有 fillVector 方法,但該方法是 private 的,即使我們將其設為 public,它也只會接受 BigIntVector 物件,而不會接受 C Data 陣列和 schema 參考。

因此,我們必須擴展 FillTen 類別,新增一個 fillCArray 方法,該方法能夠執行與 fillVector 相同的工作,但作用於 C Data 交換實體,而不是 BigIntVector 物件。

import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.BigIntVector;


public class FillTen {
    static RootAllocator allocator = new RootAllocator();

    public static void fillCArray(long c_array_ptr, long c_schema_ptr) {
        ArrowArray arrow_array = ArrowArray.wrap(c_array_ptr);
        ArrowSchema arrow_schema = ArrowSchema.wrap(c_schema_ptr);

        FieldVector v = Data.importVector(allocator, arrow_array, arrow_schema, null);
        FillTen.fillVector((BigIntVector)v);
    }

    private static void fillVector(BigIntVector iv) {
        iv.setSafe(0, 1);
        iv.setSafe(1, 2);
        iv.setSafe(2, 3);
        iv.setSafe(3, 4);
        iv.setSafe(4, 5);
        iv.setSafe(5, 6);
        iv.setSafe(6, 7);
        iv.setSafe(7, 8);
        iv.setSafe(8, 9);
        iv.setSafe(9, 10);
    }
}

fillCArray 方法的目標是取得以 C Data 交換格式接收的陣列和 Schema,並將它們轉換回 FieldVector 類型的物件,以便 Arrow Java 知道如何處理它。

如果我們再次執行 mvn package,更新 maven 依賴,然後更新我們的 Python 腳本,我們應該能夠看到 Python 腳本列印的值已由 Java 程式碼正確地更改。

$ mvn package
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
$ python fillten.py
ARRAY <class 'pyarrow.lib.Int64Array'> [
    1,
    2,
    3,
    4,
    5,
    6,
    7,
    8,
    9,
    10
]

我們也可以使用 C Stream 介面來交換 Java 和 Python 之間的 pyarrow.RecordBatchReader。我們將使用這個 Java 類別作為示範,它可以讓您透過 Java 的實作讀取 Arrow IPC 檔案,或將資料寫入 JSON 檔案。

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.JsonFileWriter;

public class PythonInteropDemo implements AutoCloseable {
  private final BufferAllocator allocator;

  public PythonInteropDemo() {
    this.allocator = new RootAllocator();
  }

  public void exportStream(String path, long cStreamPointer) throws Exception {
    try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer)) {
      ArrowFileReader reader = new ArrowFileReader(Files.newByteChannel(Paths.get(path)), allocator);
      Data.exportArrayStream(allocator, reader, stream);
    }
  }

  public void importStream(String path, long cStreamPointer) throws Exception {
    try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer);
         final ArrowReader input = Data.importArrayStream(allocator, stream);
         JsonFileWriter writer = new JsonFileWriter(new File(path))) {
      writer.start(input.getVectorSchemaRoot().getSchema(), input);
      while (input.loadNextBatch()) {
        writer.write(input.getVectorSchemaRoot());
      }
    }
  }

  @Override
  public void close() throws Exception {
    allocator.close();
  }
}

在 Python 端,我們將像以前一樣使用 JPype,但這次我們將來回傳送 RecordBatchReaders。

import tempfile

import jpype
import jpype.imports
from jpype.types import *

# Init the JVM and make demo class available to Python.
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
PythonInteropDemo = JClass("PythonInteropDemo")
demo = PythonInteropDemo()

# Create a Python record batch reader
import pyarrow as pa
schema = pa.schema([
    ("ints", pa.int64()),
    ("strs", pa.string())
])
batches = [
    pa.record_batch([
        [0, 2, 4, 8],
        ["a", "b", "c", None],
    ], schema=schema),
    pa.record_batch([
        [None, 32, 64, None],
        ["e", None, None, "h"],
    ], schema=schema),
]
reader = pa.RecordBatchReader.from_batches(schema, batches)

from pyarrow.cffi import ffi as arrow_c

# Export the Python reader through C Data
c_stream = arrow_c.new("struct ArrowArrayStream*")
c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
reader._export_to_c(c_stream_ptr)

# Send reader to the Java function that writes a JSON file
with tempfile.NamedTemporaryFile() as temp:
    demo.importStream(temp.name, c_stream_ptr)

    # Read the JSON file back
    with open(temp.name) as source:
        print("JSON file written by Java:")
        print(source.read())


# Write an Arrow IPC file for Java to read
with tempfile.NamedTemporaryFile() as temp:
    with pa.ipc.new_file(temp.name, schema) as sink:
        for batch in batches:
            sink.write_batch(batch)

    demo.exportStream(temp.name, c_stream_ptr)
    with pa.RecordBatchReader._import_from_c(c_stream_ptr) as source:
        print("IPC file read by Java:")
        print(source.read_all())
$ mvn package
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
$ python demo.py
JSON file written by Java:
{"schema":{"fields":[{"name":"ints","nullable":true,"type":{"name":"int","bitWidth":64,"isSigned":true},"children":[]},{"name":"strs","nullable":true,"type":{"name":"utf8"},"children":[]}]},"batches":[{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[1,1,1,1],"DATA":["0","2","4","8"]},{"name":"strs","count":4,"VALIDITY":[1,1,1,0],"OFFSET":[0,1,2,3,3],"DATA":["a","b","c",""]}]},{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[0,1,1,0],"DATA":["0","32","64","0"]},{"name":"strs","count":4,"VALIDITY":[1,0,0,1],"OFFSET":[0,1,1,1,2],"DATA":["e","","","h"]}]}]}
IPC file read by Java:
pyarrow.Table
ints: int64
strs: string
----
ints: [[0,2,4,8],[null,32,64,null]]
strs: [["a","b","c",null],["e",null,null,"h"]]