集成 PyArrow 与 Java#
Arrow 支持通过Arrow C 数据接口在同一进程内交换数据。
这可用于在 Python 和 Java 函数和方法之间交换数据,以便两种语言可以进行交互,而无需进行封送和解封数据的开销。
注意
本文假设您已正确安装带有 pyarrow 的 Python 环境,以及正确安装带有 arrow 库的 Java 环境。Arrow Java 版本必须使用 mvn -Parrow-c-data 进行编译,以确保启用 CData 交换支持。有关更多详细信息,请参阅Python 安装说明和Java 文档。
从 Python 调用 Java 方法#
假设我们有一个简单的 Java 类,它提供一个数字作为输出
public class Simple {
public static int getNumber() {
return 4;
}
}
我们将此类保存在 Simple.java 文件中,并使用 javac Simple.java 将其编译为 Simple.class。
创建 Simple.class 文件后,我们可以使用 JPype 库从 Python 中使用该类,该库在 Python 解释器中启用 Java 运行时。
jpype1 可以像大多数 Python 库一样使用 pip 安装
$ pip install jpype1
我们可以对 Simple 类做的最基本的事情是从 Python 使用 Simple.getNumber 方法,看看它是否会返回结果。
为此,我们可以创建一个 simple.py 文件,该文件使用 jpype 从 Simple.class 文件中导入 Simple 类并调用 Simple.getNumber 方法
import jpype
from jpype.types import *
jpype.startJVM(classpath=["./"])
Simple = JClass('Simple')
print(Simple.getNumber())
运行 simple.py 文件将显示我们的 Python 代码如何能够访问 Java 方法并打印预期的结果
$ python simple.py
4
使用 pyarrow.jvm 从 Java 到 Python#
PyArrow 提供了一个 pyarrow.jvm 模块,它使与 Java 类交互并将其 Java 对象转换为实际的 Python 对象变得更加容易。
为了展示 pyarrow.jvm,我们可以创建一个更复杂的类,名为 FillTen.java
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
public class FillTen {
static RootAllocator allocator = new RootAllocator();
public static BigIntVector createArray() {
BigIntVector intVector = new BigIntVector("ints", allocator);
intVector.allocateNew(10);
intVector.setValueCount(10);
FillTen.fillVector(intVector);
return intVector;
}
private static void fillVector(BigIntVector iv) {
iv.setSafe(0, 1);
iv.setSafe(1, 2);
iv.setSafe(2, 3);
iv.setSafe(3, 4);
iv.setSafe(4, 5);
iv.setSafe(5, 6);
iv.setSafe(6, 7);
iv.setSafe(7, 8);
iv.setSafe(8, 9);
iv.setSafe(9, 10);
}
}
此类提供了一个公共的 createArray 方法,任何人都可以调用该方法以获取包含从 1 到 10 的数字的数组。
鉴于此类现在依赖于一堆包,仅使用 javac 编译已不再足够。我们需要创建一个专用的 pom.xml 文件,我们可以在其中收集依赖项
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.arrow.py2java</groupId>
<artifactId>FillTen</artifactId>
<version>1</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
<version>8.0.0</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>8.0.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>8.0.0</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
<version>8.0.0</version>
<type>jar</type>
</dependency>
</dependencies>
</project>
创建带有类的 FillTen.java 文件作为 src/main/java/FillTen.java 后,我们可以使用 maven 使用 mvn package 编译项目,并使其在 target 目录中可用。
$ mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ FillTen ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to /experiments/java2py/target/classes
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ FillTen ---
[INFO] Building jar: /experiments/java2py/target/FillTen-1.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
现在我们已经构建了包,我们可以将其提供给 Python。为此,我们需要确保不仅包本身可用,而且其依赖项也可用。
我们可以使用 maven 收集所有依赖项并将它们放在一个地方(dependencies 目录),以便我们可以更轻松地从 Python 加载它们
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.7:copy-dependencies (default-cli) @ FillTen ---
[INFO] Copying jsr305-3.0.2.jar to /experiments/java2py/dependencies/jsr305-3.0.2.jar
[INFO] Copying netty-common-4.1.72.Final.jar to /experiments/java2py/dependencies/netty-common-4.1.72.Final.jar
[INFO] Copying arrow-memory-core-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-memory-core-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-vector-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-vector-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-c-data-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-c-data-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-vector-8.0.0-SNAPSHOT.pom to /experiments/java2py/dependencies/arrow-vector-8.0.0-SNAPSHOT.pom
[INFO] Copying jackson-core-2.11.4.jar to /experiments/java2py/dependencies/jackson-core-2.11.4.jar
[INFO] Copying jackson-annotations-2.11.4.jar to /experiments/java2py/dependencies/jackson-annotations-2.11.4.jar
[INFO] Copying slf4j-api-1.7.25.jar to /experiments/java2py/dependencies/slf4j-api-1.7.25.jar
[INFO] Copying arrow-memory-netty-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-memory-netty-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-format-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-format-8.0.0-SNAPSHOT.jar
[INFO] Copying flatbuffers-java-1.12.0.jar to /experiments/java2py/dependencies/flatbuffers-java-1.12.0.jar
[INFO] Copying arrow-memory-8.0.0-SNAPSHOT.pom to /experiments/java2py/dependencies/arrow-memory-8.0.0-SNAPSHOT.pom
[INFO] Copying netty-buffer-4.1.72.Final.jar to /experiments/java2py/dependencies/netty-buffer-4.1.72.Final.jar
[INFO] Copying jackson-databind-2.11.4.jar to /experiments/java2py/dependencies/jackson-databind-2.11.4.jar
[INFO] Copying commons-codec-1.10.jar to /experiments/java2py/dependencies/commons-codec-1.10.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
注意
除了手动收集依赖项,您还可以依赖 maven-assembly-plugin 来构建一个包含所有依赖项的 jar。
一旦我们的包及其所有依赖项都可用,我们就可以从 fillten_pyarrowjvm.py 脚本中调用它,该脚本将导入 FillTen 类并打印出调用 FillTen.createArray 的结果
import jpype
import jpype.imports
from jpype.types import *
# Start a JVM making available all dependencies we collected
# and our class from target/FillTen-1.jar
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
FillTen = JClass('FillTen')
array = FillTen.createArray()
print("ARRAY", type(array), array)
# Convert the proxied BigIntVector to an actual pyarrow array
import pyarrow.jvm
pyarray = pyarrow.jvm.array(array)
print("ARRAY", type(pyarray), pyarray)
del pyarray
运行 python 脚本将打印两行
ARRAY <java class 'org.apache.arrow.vector.BigIntVector'> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
ARRAY <class 'pyarrow.lib.Int64Array'> [
1,
2,
3,
4,
5,
6,
7,
8,
9,
10
]
第一行是调用 FillTen.createArray 方法的原始结果。结果对象是实际 Java 对象的代理,因此它不是真正的 pyarrow 数组,它将缺乏大多数功能和方法。这就是为什么我们随后使用 pyarrow.jvm.array 将其转换为实际的 pyarrow 数组。这使我们能够像对待任何其他 pyarrow 数组一样对待它。结果是输出中的第二行,其中数组被正确报告为 pyarrow.lib.Int64Array 类型,并使用 pyarrow 样式打印。
注意
目前,pyarrow.jvm 模块的功能相当有限,不支持像结构体这样的嵌套类型,并且它只适用于在与 JPype 相同的进程中运行的 JVM。
使用 C 数据接口进行 Java 到 Python 通信#
C 数据接口是 Arrow 中实现的一种协议,用于在不同环境之间交换数据,而无需封送和复制数据的开销。
这允许将来自 Python 或 Java 的数据公开给用另一种语言实现的函数。
注意
将来,pyarrow.jvm 将被实现以利用 C 数据接口,目前它专门为 JPype 编写
为了展示 C Data 的工作原理,我们将稍微调整我们的 FillTen Java 类和 fillten.py Python 脚本。给定一个 PyArrow 数组,我们将在 Java 中公开一个函数,将其内容设置为从 1 到 10 的数字。
目前在 pyarrow 中使用 C Data 接口需要显式安装 cffi,就像大多数 Python 发行版一样,它可以通过以下方式安装
$ pip install cffi
我们要做的第一件事是调整 Python 脚本,使其根据 C 数据接口将导出到 Java 的数组及其 Schema 的引用发送给 Java
import jpype
import jpype.imports
from jpype.types import *
# Init the JVM and make FillTen class available to Python.
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
FillTen = JClass('FillTen')
# Create a Python array of 10 elements
import pyarrow as pa
array = pa.array([0]*10)
from pyarrow.cffi import ffi as arrow_c
# Export the Python array through C Data
c_array = arrow_c.new("struct ArrowArray*")
c_array_ptr = int(arrow_c.cast("uintptr_t", c_array))
array._export_to_c(c_array_ptr)
# Export the Schema of the Array through C Data
c_schema = arrow_c.new("struct ArrowSchema*")
c_schema_ptr = int(arrow_c.cast("uintptr_t", c_schema))
array.type._export_to_c(c_schema_ptr)
# Send Array and its Schema to the Java function
# that will populate the array with numbers from 1 to 10
FillTen.fillCArray(c_array_ptr, c_schema_ptr)
# See how the content of our Python array was changed from Java
# while it remained of the Python type.
print("ARRAY", type(array), array)
注意
更改数组内容不是安全操作,这样做是为了创建此示例,并且它主要有效,只是因为数组的大小、类型或空值没有改变。
在 FillTen Java 类中,我们已经有了 fillVector 方法,但该方法是私有的,即使我们将其设为公共的,它也只接受 BigIntVector 对象,而不接受 C Data 数组和 schema 引用。
因此,我们必须扩展我们的 FillTen 类,添加一个 fillCArray 方法,该方法能够执行 fillVector 的工作,但作用于 C Data 交换实体而不是 BigIntVector
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.BigIntVector;
public class FillTen {
static RootAllocator allocator = new RootAllocator();
public static void fillCArray(long c_array_ptr, long c_schema_ptr) {
ArrowArray arrow_array = ArrowArray.wrap(c_array_ptr);
ArrowSchema arrow_schema = ArrowSchema.wrap(c_schema_ptr);
FieldVector v = Data.importVector(allocator, arrow_array, arrow_schema, null);
FillTen.fillVector((BigIntVector)v);
}
private static void fillVector(BigIntVector iv) {
iv.setSafe(0, 1);
iv.setSafe(1, 2);
iv.setSafe(2, 3);
iv.setSafe(3, 4);
iv.setSafe(4, 5);
iv.setSafe(5, 6);
iv.setSafe(6, 7);
iv.setSafe(7, 8);
iv.setSafe(8, 9);
iv.setSafe(9, 10);
}
}
fillCArray 方法的目标是获取以 C Data 交换格式接收的 Array 和 Schema,并将它们转换回 FieldVector 类型的对象,以便 Arrow Java 知道如何处理它。
如果我们再次运行 mvn package,更新 maven 依赖项,然后运行我们的 Python 脚本,我们应该能够看到 Python 脚本打印的值已由 Java 代码正确更改
$ mvn package
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
$ python fillten.py
ARRAY <class 'pyarrow.lib.Int64Array'> [
1,
2,
3,
4,
5,
6,
7,
8,
9,
10
]
我们还可以使用 C Stream Interface 在 Java 和 Python 之间交换 pyarrow.RecordBatchReader。我们将使用此 Java 类作为演示,它允许您通过 Java 的实现读取 Arrow IPC 文件,或将数据写入 JSON 文件
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.JsonFileWriter;
public class PythonInteropDemo implements AutoCloseable {
private final BufferAllocator allocator;
public PythonInteropDemo() {
this.allocator = new RootAllocator();
}
public void exportStream(String path, long cStreamPointer) throws Exception {
try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer)) {
ArrowFileReader reader = new ArrowFileReader(Files.newByteChannel(Paths.get(path)), allocator);
Data.exportArrayStream(allocator, reader, stream);
}
}
public void importStream(String path, long cStreamPointer) throws Exception {
try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer);
final ArrowReader input = Data.importArrayStream(allocator, stream);
JsonFileWriter writer = new JsonFileWriter(new File(path))) {
writer.start(input.getVectorSchemaRoot().getSchema(), input);
while (input.loadNextBatch()) {
writer.write(input.getVectorSchemaRoot());
}
}
}
@Override
public void close() throws Exception {
allocator.close();
}
}
在 Python 端,我们将像以前一样使用 JPype,但这次我们将来回发送 RecordBatchReaders
import tempfile
import jpype
import jpype.imports
from jpype.types import *
# Init the JVM and make demo class available to Python.
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
PythonInteropDemo = JClass("PythonInteropDemo")
demo = PythonInteropDemo()
# Create a Python record batch reader
import pyarrow as pa
schema = pa.schema([
("ints", pa.int64()),
("strs", pa.string())
])
batches = [
pa.record_batch([
[0, 2, 4, 8],
["a", "b", "c", None],
], schema=schema),
pa.record_batch([
[None, 32, 64, None],
["e", None, None, "h"],
], schema=schema),
]
reader = pa.RecordBatchReader.from_batches(schema, batches)
from pyarrow.cffi import ffi as arrow_c
# Export the Python reader through C Data
c_stream = arrow_c.new("struct ArrowArrayStream*")
c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
reader._export_to_c(c_stream_ptr)
# Send reader to the Java function that writes a JSON file
with tempfile.NamedTemporaryFile() as temp:
demo.importStream(temp.name, c_stream_ptr)
# Read the JSON file back
with open(temp.name) as source:
print("JSON file written by Java:")
print(source.read())
# Write an Arrow IPC file for Java to read
with tempfile.NamedTemporaryFile() as temp:
with pa.ipc.new_file(temp.name, schema) as sink:
for batch in batches:
sink.write_batch(batch)
demo.exportStream(temp.name, c_stream_ptr)
with pa.RecordBatchReader._import_from_c(c_stream_ptr) as source:
print("IPC file read by Java:")
print(source.read_all())
$ mvn package
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
$ python demo.py
JSON file written by Java:
{"schema":{"fields":[{"name":"ints","nullable":true,"type":{"name":"int","bitWidth":64,"isSigned":true},"children":[]},{"name":"strs","nullable":true,"type":{"name":"utf8"},"children":[]}]},"batches":[{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[1,1,1,1],"DATA":["0","2","4","8"]},{"name":"strs","count":4,"VALIDITY":[1,1,1,0],"OFFSET":[0,1,2,3,3],"DATA":["a","b","c",""]}]},{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[0,1,1,0],"DATA":["0","32","64","0"]},{"name":"strs","count":4,"VALIDITY":[1,0,0,1],"OFFSET":[0,1,1,1,2],"DATA":["e","","","h"]}]}]}
IPC file read by Java:
pyarrow.Table
ints: int64
strs: string
----
ints: [[0,2,4,8],[null,32,64,null]]
strs: [["a","b","c",null],["e",null,null,"h"]]