将 PyArrow 与 Java 集成#
Arrow 支持通过 Arrow C 数据接口 在同一进程内交换数据。
这可以用于在 Python 和 Java 函数及方法之间交换数据,以便这两种语言可以在没有任何数据编组和解组成本的情况下进行交互。
注意
本文假设您有一个正确安装了 pyarrow
的 Python
环境,以及一个正确安装了 arrow
库的 Java
环境。必须使用 mvn -Parrow-c-data
编译 Arrow Java
版本,以确保启用 CData 交换支持。有关更多详细信息,请参阅 Python 安装说明 和 Java 文档。
从 Python 调用 Java 方法#
假设我们有一个简单的 Java 类,提供一个数字作为其输出
public class Simple {
public static int getNumber() {
return 4;
}
}
我们将此类保存在 Simple.java
文件中,并使用 javac Simple.java
将其编译为 Simple.class
。
创建 Simple.class
文件后,我们可以使用 JPype 库从 Python 使用该类,该库在 Python 解释器中启用 Java 运行时。
可以使用 pip
安装 jpype1
,就像大多数 Python 库一样
$ pip install jpype1
我们可以用 Simple
类做的最基本的事情是从 Python 使用 Simple.getNumber
方法,看看它是否会返回结果。
为此,我们可以创建一个 simple.py
文件,该文件使用 jpype
从 Simple.class
文件导入 Simple
类,并调用 Simple.getNumber
方法
import jpype
from jpype.types import *
jpype.startJVM(classpath=["./"])
Simple = JClass('Simple')
print(Simple.getNumber())
运行 simple.py
文件将显示我们的 Python 代码如何能够访问 Java
方法并打印预期结果
$ python simple.py
4
使用 pyarrow.jvm 从 Java 到 Python#
PyArrow 提供了一个 pyarrow.jvm
模块,该模块使与 Java 类交互以及将 Java 对象转换为实际 Python 对象更容易。
为了展示 pyarrow.jvm
,我们可以创建一个更复杂的类,名为 FillTen.java
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
public class FillTen {
static RootAllocator allocator = new RootAllocator();
public static BigIntVector createArray() {
BigIntVector intVector = new BigIntVector("ints", allocator);
intVector.allocateNew(10);
intVector.setValueCount(10);
FillTen.fillVector(intVector);
return intVector;
}
private static void fillVector(BigIntVector iv) {
iv.setSafe(0, 1);
iv.setSafe(1, 2);
iv.setSafe(2, 3);
iv.setSafe(3, 4);
iv.setSafe(4, 5);
iv.setSafe(5, 6);
iv.setSafe(6, 7);
iv.setSafe(7, 8);
iv.setSafe(8, 9);
iv.setSafe(9, 10);
}
}
此类提供一个公共的 createArray
方法,任何人都可以调用该方法以返回一个包含从 1 到 10 的数字的数组。
鉴于此类现在依赖于一堆包,仅使用 javac
编译它是不够的。我们需要创建一个专用的 pom.xml
文件,在其中我们可以收集依赖项
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.arrow.py2java</groupId>
<artifactId>FillTen</artifactId>
<version>1</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
<version>8.0.0</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>8.0.0</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>8.0.0</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
<version>8.0.0</version>
<type>jar</type>
</dependency>
</dependencies>
</project>
一旦创建了具有类的 FillTen.java
文件(例如 src/main/java/FillTen.java
),我们就可以使用 maven
使用 mvn package
编译项目,并将其放在 target
目录中。
$ mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ FillTen ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to /experiments/java2py/target/classes
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ FillTen ---
[INFO] Building jar: /experiments/java2py/target/FillTen-1.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
现在我们已经构建了包,我们可以使其对 Python 可用。为此,我们需要确保不仅该包本身可用,而且它的依赖项也可用。
我们可以使用 maven
来收集所有依赖项,并使它们在单个位置(dependencies
目录)中可用,以便我们可以更轻松地从 Python 加载它们
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.7:copy-dependencies (default-cli) @ FillTen ---
[INFO] Copying jsr305-3.0.2.jar to /experiments/java2py/dependencies/jsr305-3.0.2.jar
[INFO] Copying netty-common-4.1.72.Final.jar to /experiments/java2py/dependencies/netty-common-4.1.72.Final.jar
[INFO] Copying arrow-memory-core-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-memory-core-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-vector-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-vector-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-c-data-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-c-data-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-vector-8.0.0-SNAPSHOT.pom to /experiments/java2py/dependencies/arrow-vector-8.0.0-SNAPSHOT.pom
[INFO] Copying jackson-core-2.11.4.jar to /experiments/java2py/dependencies/jackson-core-2.11.4.jar
[INFO] Copying jackson-annotations-2.11.4.jar to /experiments/java2py/dependencies/jackson-annotations-2.11.4.jar
[INFO] Copying slf4j-api-1.7.25.jar to /experiments/java2py/dependencies/slf4j-api-1.7.25.jar
[INFO] Copying arrow-memory-netty-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-memory-netty-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-format-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-format-8.0.0-SNAPSHOT.jar
[INFO] Copying flatbuffers-java-1.12.0.jar to /experiments/java2py/dependencies/flatbuffers-java-1.12.0.jar
[INFO] Copying arrow-memory-8.0.0-SNAPSHOT.pom to /experiments/java2py/dependencies/arrow-memory-8.0.0-SNAPSHOT.pom
[INFO] Copying netty-buffer-4.1.72.Final.jar to /experiments/java2py/dependencies/netty-buffer-4.1.72.Final.jar
[INFO] Copying jackson-databind-2.11.4.jar to /experiments/java2py/dependencies/jackson-databind-2.11.4.jar
[INFO] Copying commons-codec-1.10.jar to /experiments/java2py/dependencies/commons-codec-1.10.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
注意
您也可以依赖 maven-assembly-plugin
来构建一个包含所有依赖项的单个 jar
,而不是手动收集依赖项。
一旦我们的包及其所有依赖项都可用,我们可以从 fillten_pyarrowjvm.py
脚本调用它,该脚本将导入 FillTen
类并打印调用 FillTen.createArray
的结果
import jpype
import jpype.imports
from jpype.types import *
# Start a JVM making available all dependencies we collected
# and our class from target/FillTen-1.jar
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
FillTen = JClass('FillTen')
array = FillTen.createArray()
print("ARRAY", type(array), array)
# Convert the proxied BigIntVector to an actual pyarrow array
import pyarrow.jvm
pyarray = pyarrow.jvm.array(array)
print("ARRAY", type(pyarray), pyarray)
del pyarray
运行 python 脚本将导致打印两行
ARRAY <java class 'org.apache.arrow.vector.BigIntVector'> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
ARRAY <class 'pyarrow.lib.Int64Array'> [
1,
2,
3,
4,
5,
6,
7,
8,
9,
10
]
第一行是调用 FillTen.createArray
方法的原始结果。结果对象是实际 Java 对象的代理,因此它并不是真正的 pyarrow Array,它将缺少其大部分功能和方法。这就是为什么我们随后使用 pyarrow.jvm.array
将其转换为实际的 pyarrow
数组。这使我们可以像对待任何其他 pyarrow
数组一样对待它。结果是输出中的第二行,其中数组被正确报告为 pyarrow.lib.Int64Array
类型,并使用 pyarrow
样式打印。
注意
目前,pyarrow.jvm
模块的功能相当有限,不支持诸如结构之类的嵌套类型,并且它仅在像 JPype 这样在同一进程中运行的 JVM 上有效。
使用 C 数据接口进行 Java 到 Python 的通信#
C 数据接口是 Arrow 中实现的一种协议,用于在不同环境之间交换数据,而无需数据编组和复制的开销。
这允许将来自 Python 或 Java 的数据暴露给另一种语言中实现的函数。
注意
将来,将实现 pyarrow.jvm
以利用 C 数据接口,目前它是专门为 JPype 编写的
为了展示 C 数据的工作原理,我们将稍微调整一下 FillTen
Java 类和 fillten.py
Python 脚本。给定一个 PyArrow 数组,我们将在 Java 中公开一个函数,该函数将其内容设置为从 1 到 10 的数字。
目前,在 pyarrow
中使用 C 数据接口需要显式安装 cffi
,就像大多数 Python 发行版一样,可以使用安装
$ pip install cffi
我们必须做的第一件事是调整 Python 脚本,以便它根据 C 数据接口将 Array 及其 Schema 的导出引用发送到 Java
import jpype
import jpype.imports
from jpype.types import *
# Init the JVM and make FillTen class available to Python.
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
FillTen = JClass('FillTen')
# Create a Python array of 10 elements
import pyarrow as pa
array = pa.array([0]*10)
from pyarrow.cffi import ffi as arrow_c
# Export the Python array through C Data
c_array = arrow_c.new("struct ArrowArray*")
c_array_ptr = int(arrow_c.cast("uintptr_t", c_array))
array._export_to_c(c_array_ptr)
# Export the Schema of the Array through C Data
c_schema = arrow_c.new("struct ArrowSchema*")
c_schema_ptr = int(arrow_c.cast("uintptr_t", c_schema))
array.type._export_to_c(c_schema_ptr)
# Send Array and its Schema to the Java function
# that will populate the array with numbers from 1 to 10
FillTen.fillCArray(c_array_ptr, c_schema_ptr)
# See how the content of our Python array was changed from Java
# while it remained of the Python type.
print("ARRAY", type(array), array)
注意
更改数组的内容不是一个安全的操作,这样做是为了创建此示例的目的,而且它之所以有效主要是因为数组的大小、类型或空值没有更改。
在 FillTen Java 类中,我们已经有了 fillVector
方法,但是该方法是私有的,即使我们将其设为公共的,它也只会接受 BigIntVector
对象,而不是 C 数据数组和模式引用。
因此,我们必须扩展 FillTen
类,添加一个 fillCArray
方法,该方法能够在 C 数据交换实体而不是 BigIntVector
实体上执行 fillVector
的工作
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.BigIntVector;
public class FillTen {
static RootAllocator allocator = new RootAllocator();
public static void fillCArray(long c_array_ptr, long c_schema_ptr) {
ArrowArray arrow_array = ArrowArray.wrap(c_array_ptr);
ArrowSchema arrow_schema = ArrowSchema.wrap(c_schema_ptr);
FieldVector v = Data.importVector(allocator, arrow_array, arrow_schema, null);
FillTen.fillVector((BigIntVector)v);
}
private static void fillVector(BigIntVector iv) {
iv.setSafe(0, 1);
iv.setSafe(1, 2);
iv.setSafe(2, 3);
iv.setSafe(3, 4);
iv.setSafe(4, 5);
iv.setSafe(5, 6);
iv.setSafe(6, 7);
iv.setSafe(7, 8);
iv.setSafe(8, 9);
iv.setSafe(9, 10);
}
}
fillCArray
方法的目标是获取 C 数据交换格式中接收的 Array 和 Schema,并将它们转换回 FieldVector
类型的对象,以便 Arrow Java 知道如何处理它。
如果我们再次运行 mvn package
,更新 maven 依赖项,然后更新我们的 Python 脚本,我们应该能够看到 Python 脚本打印的值如何被 Java 代码正确更改
$ mvn package
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
$ python fillten.py
ARRAY <class 'pyarrow.lib.Int64Array'> [
1,
2,
3,
4,
5,
6,
7,
8,
9,
10
]
我们还可以使用 C 流接口在 Java 和 Python 之间交换 pyarrow.RecordBatchReader
。我们将使用这个 Java 类作为演示,它允许您通过 Java 的实现读取 Arrow IPC 文件,或者将数据写入 JSON 文件
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.JsonFileWriter;
public class PythonInteropDemo implements AutoCloseable {
private final BufferAllocator allocator;
public PythonInteropDemo() {
this.allocator = new RootAllocator();
}
public void exportStream(String path, long cStreamPointer) throws Exception {
try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer)) {
ArrowFileReader reader = new ArrowFileReader(Files.newByteChannel(Paths.get(path)), allocator);
Data.exportArrayStream(allocator, reader, stream);
}
}
public void importStream(String path, long cStreamPointer) throws Exception {
try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer);
final ArrowReader input = Data.importArrayStream(allocator, stream);
JsonFileWriter writer = new JsonFileWriter(new File(path))) {
writer.start(input.getVectorSchemaRoot().getSchema(), input);
while (input.loadNextBatch()) {
writer.write(input.getVectorSchemaRoot());
}
}
}
@Override
public void close() throws Exception {
allocator.close();
}
}
在 Python 端,我们将像以前一样使用 JPype,只不过这次我们将来回发送 RecordBatchReader
import tempfile
import jpype
import jpype.imports
from jpype.types import *
# Init the JVM and make demo class available to Python.
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
PythonInteropDemo = JClass("PythonInteropDemo")
demo = PythonInteropDemo()
# Create a Python record batch reader
import pyarrow as pa
schema = pa.schema([
("ints", pa.int64()),
("strs", pa.string())
])
batches = [
pa.record_batch([
[0, 2, 4, 8],
["a", "b", "c", None],
], schema=schema),
pa.record_batch([
[None, 32, 64, None],
["e", None, None, "h"],
], schema=schema),
]
reader = pa.RecordBatchReader.from_batches(schema, batches)
from pyarrow.cffi import ffi as arrow_c
# Export the Python reader through C Data
c_stream = arrow_c.new("struct ArrowArrayStream*")
c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
reader._export_to_c(c_stream_ptr)
# Send reader to the Java function that writes a JSON file
with tempfile.NamedTemporaryFile() as temp:
demo.importStream(temp.name, c_stream_ptr)
# Read the JSON file back
with open(temp.name) as source:
print("JSON file written by Java:")
print(source.read())
# Write an Arrow IPC file for Java to read
with tempfile.NamedTemporaryFile() as temp:
with pa.ipc.new_file(temp.name, schema) as sink:
for batch in batches:
sink.write_batch(batch)
demo.exportStream(temp.name, c_stream_ptr)
with pa.RecordBatchReader._import_from_c(c_stream_ptr) as source:
print("IPC file read by Java:")
print(source.read_all())
$ mvn package
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
$ python demo.py
JSON file written by Java:
{"schema":{"fields":[{"name":"ints","nullable":true,"type":{"name":"int","bitWidth":64,"isSigned":true},"children":[]},{"name":"strs","nullable":true,"type":{"name":"utf8"},"children":[]}]},"batches":[{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[1,1,1,1],"DATA":["0","2","4","8"]},{"name":"strs","count":4,"VALIDITY":[1,1,1,0],"OFFSET":[0,1,2,3,3],"DATA":["a","b","c",""]}]},{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[0,1,1,0],"DATA":["0","32","64","0"]},{"name":"strs","count":4,"VALIDITY":[1,0,0,1],"OFFSET":[0,1,1,1,2],"DATA":["e","","","h"]}]}]}
IPC file read by Java:
pyarrow.Table
ints: int64
strs: string
----
ints: [[0,2,4,8],[null,32,64,null]]
strs: [["a","b","c",null],["e",null,null,"h"]]