将 PyArrow 与 Java 集成#

Arrow 支持通过 Arrow C 数据接口 在同一进程中交换数据。

这可用于在 Python 和 Java 函数和方法之间交换数据,使这两种语言可以在没有任何数据编组和解组成本的情况下进行交互。

注意

本文档假设您拥有已正确安装 pyarrowPython 环境,以及已正确安装 arrow 库的 Java 环境。 Arrow Java 版本必须使用 mvn -Parrow-c-data 编译,以确保 CData 交换支持已启用。有关更多详细信息,请参阅 Python 安装说明Java 文档

从 Python 调用 Java 方法#

假设我们有一个简单的 Java 类,它提供一个数字作为输出

public class Simple {
    public static int getNumber() {
        return 4;
    }
}

我们将此类保存在 Simple.java 文件中,并使用 javac Simple.java 将其编译为 Simple.class

创建 Simple.class 文件后,我们可以使用 JPype 库从 Python 中使用该类,该库在 Python 解释器中启用 Java 运行时。

jpype1 可以使用 pip 安装,就像大多数 Python 库一样

$ pip install jpype1

我们对 Simple 类能做的最基本的事情是使用 Python 中的 Simple.getNumber 方法,看看它是否会返回结果。

为此,我们可以创建一个 simple.py 文件,它使用 jpypeSimple.class 文件中导入 Simple 类并调用 Simple.getNumber 方法

import jpype
from jpype.types import *

jpype.startJVM(classpath=["./"])

Simple = JClass('Simple')

print(Simple.getNumber())

运行 simple.py 文件将展示我们的 Python 代码如何访问 Java 方法并打印预期结果

$ python simple.py
4

使用 pyarrow.jvm 将 Java 转为 Python#

PyArrow 提供了一个 pyarrow.jvm 模块,它使与 Java 类交互和将 Java 对象转换为实际 Python 对象变得更加容易。

为了展示 pyarrow.jvm,我们可以创建一个更复杂的类,名为 FillTen.java

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;


public class FillTen {
    static RootAllocator allocator = new RootAllocator();

    public static BigIntVector createArray() {
        BigIntVector intVector = new BigIntVector("ints", allocator);
        intVector.allocateNew(10);
        intVector.setValueCount(10);
        FillTen.fillVector(intVector);
        return intVector;
    }

    private static void fillVector(BigIntVector iv) {
        iv.setSafe(0, 1);
        iv.setSafe(1, 2);
        iv.setSafe(2, 3);
        iv.setSafe(3, 4);
        iv.setSafe(4, 5);
        iv.setSafe(5, 6);
        iv.setSafe(6, 7);
        iv.setSafe(7, 8);
        iv.setSafe(8, 9);
        iv.setSafe(9, 10);
    }
}

此类提供了一个公共 createArray 方法,任何人都可以调用该方法来获取包含从 1 到 10 的数字的数组。

鉴于此类现在依赖于一堆包,仅使用 javac 编译它是不够的。我们需要创建一个专门的 pom.xml 文件,我们可以在其中收集依赖项

<project>
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.apache.arrow.py2java</groupId>
    <artifactId>FillTen</artifactId>
    <version>1</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
        <groupId>org.apache.arrow</groupId>
        <artifactId>arrow-memory</artifactId>
        <version>8.0.0</version>
        <type>pom</type>
        </dependency>
        <dependency>
        <groupId>org.apache.arrow</groupId>
        <artifactId>arrow-memory-netty</artifactId>
        <version>8.0.0</version>
        <type>jar</type>
        </dependency>
        <dependency>
        <groupId>org.apache.arrow</groupId>
        <artifactId>arrow-vector</artifactId>
        <version>8.0.0</version>
        <type>pom</type>
        </dependency>
        <dependency>
        <groupId>org.apache.arrow</groupId>
        <artifactId>arrow-c-data</artifactId>
        <version>8.0.0</version>
        <type>jar</type>
        </dependency>
    </dependencies>
</project>

一旦 FillTen.java 文件与类一起创建为 src/main/java/FillTen.java,我们就可以使用 maven 使用 mvn package 编译项目,并使其在 target 目录中可用。

$ mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ FillTen ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to /experiments/java2py/target/classes
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ FillTen ---
[INFO] Building jar: /experiments/java2py/target/FillTen-1.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

现在我们已经构建了包,我们可以将其提供给 Python 使用。为此,我们需要确保不仅包本身可用,而且它的依赖项也可用。

我们可以使用 maven 收集所有依赖项,并将它们放在一个地方(dependencies 目录),以便我们可以更轻松地从 Python 中加载它们

$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.7:copy-dependencies (default-cli) @ FillTen ---
[INFO] Copying jsr305-3.0.2.jar to /experiments/java2py/dependencies/jsr305-3.0.2.jar
[INFO] Copying netty-common-4.1.72.Final.jar to /experiments/java2py/dependencies/netty-common-4.1.72.Final.jar
[INFO] Copying arrow-memory-core-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-memory-core-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-vector-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-vector-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-c-data-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-c-data-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-vector-8.0.0-SNAPSHOT.pom to /experiments/java2py/dependencies/arrow-vector-8.0.0-SNAPSHOT.pom
[INFO] Copying jackson-core-2.11.4.jar to /experiments/java2py/dependencies/jackson-core-2.11.4.jar
[INFO] Copying jackson-annotations-2.11.4.jar to /experiments/java2py/dependencies/jackson-annotations-2.11.4.jar
[INFO] Copying slf4j-api-1.7.25.jar to /experiments/java2py/dependencies/slf4j-api-1.7.25.jar
[INFO] Copying arrow-memory-netty-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-memory-netty-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-format-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-format-8.0.0-SNAPSHOT.jar
[INFO] Copying flatbuffers-java-1.12.0.jar to /experiments/java2py/dependencies/flatbuffers-java-1.12.0.jar
[INFO] Copying arrow-memory-8.0.0-SNAPSHOT.pom to /experiments/java2py/dependencies/arrow-memory-8.0.0-SNAPSHOT.pom
[INFO] Copying netty-buffer-4.1.72.Final.jar to /experiments/java2py/dependencies/netty-buffer-4.1.72.Final.jar
[INFO] Copying jackson-databind-2.11.4.jar to /experiments/java2py/dependencies/jackson-databind-2.11.4.jar
[INFO] Copying commons-codec-1.10.jar to /experiments/java2py/dependencies/commons-codec-1.10.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

注意

除了手动收集依赖项之外,您还可以依赖于 maven-assembly-plugin 来构建一个包含所有依赖项的单个 jar

一旦我们的包及其所有依赖项都可用,我们就可以从 fillten_pyarrowjvm.py 脚本调用它,该脚本将导入 FillTen 类并打印调用 FillTen.createArray 的结果

import jpype
import jpype.imports
from jpype.types import *

# Start a JVM making available all dependencies we collected
# and our class from target/FillTen-1.jar
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])

FillTen = JClass('FillTen')

array = FillTen.createArray()
print("ARRAY", type(array), array)

# Convert the proxied BigIntVector to an actual pyarrow array
import pyarrow.jvm
pyarray = pyarrow.jvm.array(array)
print("ARRAY", type(pyarray), pyarray)
del pyarray

运行 Python 脚本将导致打印两行

ARRAY <java class 'org.apache.arrow.vector.BigIntVector'> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
ARRAY <class 'pyarrow.lib.Int64Array'> [
    1,
    2,
    3,
    4,
    5,
    6,
    7,
    8,
    9,
    10
]

第一行是调用 FillTen.createArray 方法的原始结果。结果对象是实际 Java 对象的代理,所以它并不是真正的 pyarrow 数组,它会缺少大部分功能和方法。这就是为什么我们随后使用 pyarrow.jvm.array 将其转换为实际的 pyarrow 数组。这使我们能够像对待任何其他 pyarrow 数组一样对待它。结果是输出中的第二行,其中数组被正确报告为类型 pyarrow.lib.Int64Array,并使用 pyarrow 样式打印。

注意

目前,pyarrow.jvm 模块的功能相当有限,不支持嵌套类型(如结构体),并且它只能在与 JPype 相同的进程中运行的 JVM 上工作。

使用 C 数据接口进行 Java 到 Python 通信#

C 数据接口是在 Arrow 中实现的一种协议,用于在不同的环境之间交换数据,而无需进行数据编组和复制的开销。

这允许将来自 Python 或 Java 的数据公开给在其他语言中实现的函数。

注意

将来,pyarrow.jvm 将实现以利用 C 数据接口,目前它专门针对 JPype 编写

为了展示 C Data 的工作原理,我们将稍微调整一下我们的 FillTen Java 类和我们的 fillten.py Python 脚本。给定一个 PyArrow 数组,我们将公开一个 Java 中的函数,该函数将使用从 1 到 10 的数字设置其内容。

目前,在 pyarrow 中使用 C Data 接口需要显式安装 cffi,就像大多数 Python 发行版一样,可以使用以下命令安装:

$ pip install cffi

我们首先要做的是调整 Python 脚本,以便它根据 C Data 接口将数组及其模式的导出引用发送到 Java

import jpype
import jpype.imports
from jpype.types import *

# Init the JVM and make FillTen class available to Python.
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
FillTen = JClass('FillTen')

# Create a Python array of 10 elements
import pyarrow as pa
array = pa.array([0]*10)

from pyarrow.cffi import ffi as arrow_c

# Export the Python array through C Data
c_array = arrow_c.new("struct ArrowArray*")
c_array_ptr = int(arrow_c.cast("uintptr_t", c_array))
array._export_to_c(c_array_ptr)

# Export the Schema of the Array through C Data
c_schema = arrow_c.new("struct ArrowSchema*")
c_schema_ptr = int(arrow_c.cast("uintptr_t", c_schema))
array.type._export_to_c(c_schema_ptr)

# Send Array and its Schema to the Java function
# that will populate the array with numbers from 1 to 10
FillTen.fillCArray(c_array_ptr, c_schema_ptr)

# See how the content of our Python array was changed from Java
# while it remained of the Python type.
print("ARRAY", type(array), array)

注意

更改数组内容不是安全的操作,它是为了创建这个示例而完成的,并且它主要只因为数组的大小、类型或空值没有改变而有效。

在 FillTen Java 类中,我们已经有了 fillVector 方法,但该方法是私有的,即使我们将其公开,它也只能接受 BigIntVector 对象,而不是 C Data 数组和模式引用。

所以我们必须扩展我们的 FillTen 类,添加一个 fillCArray 方法,该方法能够执行 fillVector 的工作,但在 C Data 交换的实体上,而不是在 BigIntVector

import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.BigIntVector;


public class FillTen {
    static RootAllocator allocator = new RootAllocator();

    public static void fillCArray(long c_array_ptr, long c_schema_ptr) {
        ArrowArray arrow_array = ArrowArray.wrap(c_array_ptr);
        ArrowSchema arrow_schema = ArrowSchema.wrap(c_schema_ptr);

        FieldVector v = Data.importVector(allocator, arrow_array, arrow_schema, null);
        FillTen.fillVector((BigIntVector)v);
    }

    private static void fillVector(BigIntVector iv) {
        iv.setSafe(0, 1);
        iv.setSafe(1, 2);
        iv.setSafe(2, 3);
        iv.setSafe(3, 4);
        iv.setSafe(4, 5);
        iv.setSafe(5, 6);
        iv.setSafe(6, 7);
        iv.setSafe(7, 8);
        iv.setSafe(8, 9);
        iv.setSafe(9, 10);
    }
}

fillCArray 方法的目标是获取以 C Data 交换格式接收的数组和模式,并将它们转换回类型为 FieldVector 的对象,以便 Arrow Java 知道如何处理它。

如果我们再次运行 mvn package,更新 Maven 依赖项,然后是我们的 Python 脚本,我们应该能够看到 Python 脚本打印的值是如何被 Java 代码正确更改的

$ mvn package
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
$ python fillten.py
ARRAY <class 'pyarrow.lib.Int64Array'> [
    1,
    2,
    3,
    4,
    5,
    6,
    7,
    8,
    9,
    10
]

我们还可以使用 C 流接口在 Java 和 Python 之间交换 pyarrow.RecordBatchReader。我们将使用此 Java 类作为演示,它允许您通过 Java 的实现读取 Arrow IPC 文件,或将数据写入 JSON 文件

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.JsonFileWriter;

public class PythonInteropDemo implements AutoCloseable {
  private final BufferAllocator allocator;

  public PythonInteropDemo() {
    this.allocator = new RootAllocator();
  }

  public void exportStream(String path, long cStreamPointer) throws Exception {
    try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer)) {
      ArrowFileReader reader = new ArrowFileReader(Files.newByteChannel(Paths.get(path)), allocator);
      Data.exportArrayStream(allocator, reader, stream);
    }
  }

  public void importStream(String path, long cStreamPointer) throws Exception {
    try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer);
         final ArrowReader input = Data.importArrayStream(allocator, stream);
         JsonFileWriter writer = new JsonFileWriter(new File(path))) {
      writer.start(input.getVectorSchemaRoot().getSchema(), input);
      while (input.loadNextBatch()) {
        writer.write(input.getVectorSchemaRoot());
      }
    }
  }

  @Override
  public void close() throws Exception {
    allocator.close();
  }
}

在 Python 端,我们将像以前一样使用 JPype,只是这次我们将发送来回的 RecordBatchReader

import tempfile

import jpype
import jpype.imports
from jpype.types import *

# Init the JVM and make demo class available to Python.
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
PythonInteropDemo = JClass("PythonInteropDemo")
demo = PythonInteropDemo()

# Create a Python record batch reader
import pyarrow as pa
schema = pa.schema([
    ("ints", pa.int64()),
    ("strs", pa.string())
])
batches = [
    pa.record_batch([
        [0, 2, 4, 8],
        ["a", "b", "c", None],
    ], schema=schema),
    pa.record_batch([
        [None, 32, 64, None],
        ["e", None, None, "h"],
    ], schema=schema),
]
reader = pa.RecordBatchReader.from_batches(schema, batches)

from pyarrow.cffi import ffi as arrow_c

# Export the Python reader through C Data
c_stream = arrow_c.new("struct ArrowArrayStream*")
c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
reader._export_to_c(c_stream_ptr)

# Send reader to the Java function that writes a JSON file
with tempfile.NamedTemporaryFile() as temp:
    demo.importStream(temp.name, c_stream_ptr)

    # Read the JSON file back
    with open(temp.name) as source:
        print("JSON file written by Java:")
        print(source.read())


# Write an Arrow IPC file for Java to read
with tempfile.NamedTemporaryFile() as temp:
    with pa.ipc.new_file(temp.name, schema) as sink:
        for batch in batches:
            sink.write_batch(batch)

    demo.exportStream(temp.name, c_stream_ptr)
    with pa.RecordBatchReader._import_from_c(c_stream_ptr) as source:
        print("IPC file read by Java:")
        print(source.read_all())
$ mvn package
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
$ python demo.py
JSON file written by Java:
{"schema":{"fields":[{"name":"ints","nullable":true,"type":{"name":"int","bitWidth":64,"isSigned":true},"children":[]},{"name":"strs","nullable":true,"type":{"name":"utf8"},"children":[]}]},"batches":[{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[1,1,1,1],"DATA":["0","2","4","8"]},{"name":"strs","count":4,"VALIDITY":[1,1,1,0],"OFFSET":[0,1,2,3,3],"DATA":["a","b","c",""]}]},{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[0,1,1,0],"DATA":["0","32","64","0"]},{"name":"strs","count":4,"VALIDITY":[1,0,0,1],"OFFSET":[0,1,1,1,2],"DATA":["e","","","h"]}]}]}
IPC file read by Java:
pyarrow.Table
ints: int64
strs: string
----
ints: [[0,2,4,8],[null,32,64,null]]
strs: [["a","b","c",null],["e",null,null,"h"]]