C 数据接口#
Arrow 通过 Arrow C 数据接口 支持在同一进程内交换数据,而无需复制或序列化,即使在不同的语言运行时之间也是如此。
Java 到 Python#
请参阅 将 PyArrow 与 Java 集成 以使用 C 数据接口实现 Java 到 Python 的通信。
Java 到 C++#
请参阅 构建 Arrow C++ 以构建 Arrow C++ 库
$ git clone https://github.com/apache/arrow.git
$ cd arrow/cpp
$ mkdir build # from inside the `cpp` subdirectory
$ cd build
$ cmake .. --preset ninja-debug-minimal
$ cmake --build .
$ tree debug/
debug/
├── libarrow.800.0.0.dylib
├── libarrow.800.dylib -> libarrow.800.0.0.dylib
└── libarrow.dylib -> libarrow.800.dylib
从 C++ 共享 Int64 数组到 Java#
C++ 端
在 CDataCppBridge.h 中实现一个函数,该函数通过 C 数据接口导出数组
#include <iostream>
#include <arrow/api.h>
#include <arrow/c/bridge.h>
void FillInt64Array(const uintptr_t c_schema_ptr, const uintptr_t c_array_ptr) {
arrow::Int64Builder builder;
builder.Append(1);
builder.Append(2);
builder.Append(3);
builder.AppendNull();
builder.Append(5);
builder.Append(6);
builder.Append(7);
builder.Append(8);
builder.Append(9);
builder.Append(10);
std::shared_ptr<arrow::Array> array = *builder.Finish();
struct ArrowSchema* c_schema = reinterpret_cast<struct ArrowSchema*>(c_schema_ptr);
auto c_schema_status = arrow::ExportType(*array->type(), c_schema);
if (!c_schema_status.ok()) c_schema_status.Abort();
struct ArrowArray* c_array = reinterpret_cast<struct ArrowArray*>(c_array_ptr);
auto c_array_status = arrow::ExportArray(*array, c_array);
if (!c_array_status.ok()) c_array_status.Abort();
}
Java 端
对于此示例,我们将使用 JavaCPP 从 Java 调用我们的 C++ 函数,而无需自己编写 JNI 绑定。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>java-cdata-example</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<arrow.version>9.0.0</arrow.version>
</properties>
<dependencies>
<dependency>
<groupId>org.bytedeco</groupId>
<artifactId>javacpp</artifactId>
<version>1.5.7</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-core</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-format</artifactId>
<version>${arrow.version}</version>
</dependency>
</dependencies>
</project>
import org.bytedeco.javacpp.annotation.Platform;
import org.bytedeco.javacpp.annotation.Properties;
import org.bytedeco.javacpp.tools.InfoMap;
import org.bytedeco.javacpp.tools.InfoMapper;
@Properties(
target = "CDataJavaToCppExample",
value = @Platform(
include = {
"CDataCppBridge.h"
},
compiler = {"cpp17"},
linkpath = {"/arrow/cpp/build/debug/"},
link = {"arrow"}
)
)
public class CDataJavaConfig implements InfoMapper {
@Override
public void map(InfoMap infoMap) {
}
}
# Compile our Java code
$ javac -cp javacpp-1.5.7.jar CDataJavaConfig.java
# Generate CDataInterfaceLibrary
$ java -jar javacpp-1.5.7.jar CDataJavaConfig.java
# Generate libjniCDataInterfaceLibrary.dylib
$ java -jar javacpp-1.5.7.jar CDataJavaToCppExample.java
# Validate libjniCDataInterfaceLibrary.dylib created
$ otool -L macosx-x86_64/libjniCDataJavaToCppExample.dylib
macosx-x86_64/libjniCDataJavaToCppExample.dylib:
libjniCDataJavaToCppExample.dylib (compatibility version 0.0.0, current version 0.0.0)
@rpath/libarrow.800.dylib (compatibility version 800.0.0, current version 800.0.0)
/usr/lib/libc++.1.dylib (compatibility version 1.0.0, current version 1200.3.0)
/usr/lib/libSystem.B.dylib (compatibility version 1.0.0, current version 1311.0.0)
Java 测试
让我们创建一个 Java 类来测试我们的桥接
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
public class TestCDataInterface {
public static void main(String[] args) {
try(
BufferAllocator allocator = new RootAllocator();
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
ArrowArray arrowArray = ArrowArray.allocateNew(allocator)
){
CDataJavaToCppExample.FillInt64Array(
arrowSchema.memoryAddress(), arrowArray.memoryAddress());
try(
BigIntVector bigIntVector = (BigIntVector) Data.importVector(
allocator, arrowArray, arrowSchema, null)
){
System.out.println("C++-allocated array: " + bigIntVector);
}
}
}
}
C++-allocated array: [1, 2, 3, null, 5, 6, 7, 8, 9, 10]
从 Java 共享 Int32 数组到 C++#
Java 端
对于此示例,我们将构建一个 JAR,其中包含所有依赖项。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>cpptojava</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<arrow.version>9.0.0</arrow.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-c-data</artifactId>
<version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>${arrow.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import java.util.Arrays;
public class ToBeCalledByCpp {
final static BufferAllocator allocator = new RootAllocator();
/**
* Create a {@link FieldVector} and export it via the C Data Interface
* @param schemaAddress Schema memory address to wrap
* @param arrayAddress Array memory address to wrap
*/
public static void fillVector(long schemaAddress, long arrayAddress){
try (ArrowArray arrow_array = ArrowArray.wrap(arrayAddress);
ArrowSchema arrow_schema = ArrowSchema.wrap(schemaAddress) ) {
Data.exportVector(allocator, populateFieldVectorToExport(), null, arrow_array, arrow_schema);
}
}
/**
* Create a {@link VectorSchemaRoot} and export it via the C Data Interface
* @param schemaAddress Schema memory address to wrap
* @param arrayAddress Array memory address to wrap
*/
public static void fillVectorSchemaRoot(long schemaAddress, long arrayAddress){
try (ArrowArray arrow_array = ArrowArray.wrap(arrayAddress);
ArrowSchema arrow_schema = ArrowSchema.wrap(schemaAddress) ) {
Data.exportVectorSchemaRoot(allocator, populateVectorSchemaRootToExport(), null, arrow_array, arrow_schema);
}
}
private static FieldVector populateFieldVectorToExport(){
IntVector intVector = new IntVector("int-to-export", allocator);
intVector.allocateNew(3);
intVector.setSafe(0, 1);
intVector.setSafe(1, 2);
intVector.setSafe(2, 3);
intVector.setValueCount(3);
System.out.println("[Java] FieldVector: \n" + intVector);
return intVector;
}
private static VectorSchemaRoot populateVectorSchemaRootToExport(){
IntVector intVector = new IntVector("age-to-export", allocator);
intVector.setSafe(0, 10);
intVector.setSafe(1, 20);
intVector.setSafe(2, 30);
VectorSchemaRoot root = new VectorSchemaRoot(Arrays.asList(intVector));
root.setRowCount(3);
System.out.println("[Java] VectorSchemaRoot: \n" + root.contentToTSVString());
return root;
}
}
构建 JAR 并将其复制到 C++ 项目中。
$ mvn clean install
$ cp target/cpptojava-1.0-SNAPSHOT-jar-with-dependencies.jar <c++ project path>/cpptojava.jar
C++ 端
此应用程序使用 JNI 调用 Java 代码,但通过 C 数据接口传输数据(零拷贝)。
#include <iostream>
#include <jni.h>
#include <arrow/api.h>
#include <arrow/c/bridge.h>
JNIEnv *CreateVM(JavaVM **jvm) {
JNIEnv *env;
JavaVMInitArgs vm_args;
JavaVMOption options[2];
options[0].optionString = "-Djava.class.path=cpptojava.jar";
options[1].optionString = "-DXcheck:jni:pedantic";
vm_args.version = JNI_VERSION_1_8;
vm_args.nOptions = 2;
vm_args.options = options;
int status = JNI_CreateJavaVM(jvm, (void **) &env, &vm_args);
if (status < 0) {
std::cerr << "\n<<<<< Unable to Launch JVM >>>>>\n" << std::endl;
return nullptr;
}
return env;
}
int main() {
JNIEnv *env;
JavaVM *jvm;
env = CreateVM(&jvm);
if (env == nullptr) return EXIT_FAILURE;
jclass javaClassToBeCalledByCpp = env->FindClass("ToBeCalledByCpp");
if (javaClassToBeCalledByCpp != nullptr) {
jmethodID fillVector = env->GetStaticMethodID(javaClassToBeCalledByCpp,
"fillVector",
"(JJ)V");
if (fillVector != nullptr) {
struct ArrowSchema arrowSchema;
struct ArrowArray arrowArray;
std::cout << "\n<<<<< C++ to Java for Arrays >>>>>\n" << std::endl;
env->CallStaticVoidMethod(javaClassToBeCalledByCpp, fillVector,
static_cast<jlong>(reinterpret_cast<uintptr_t>(&arrowSchema)),
static_cast<jlong>(reinterpret_cast<uintptr_t>(&arrowArray)));
auto resultImportArray = arrow::ImportArray(&arrowArray, &arrowSchema);
std::shared_ptr<arrow::Array> array = resultImportArray.ValueOrDie();
std::cout << "[C++] Array: " << array->ToString() << std::endl;
} else {
std::cerr << "Could not find fillVector method\n" << std::endl;
return EXIT_FAILURE;
}
jmethodID fillVectorSchemaRoot = env->GetStaticMethodID(javaClassToBeCalledByCpp,
"fillVectorSchemaRoot",
"(JJ)V");
if (fillVectorSchemaRoot != nullptr) {
struct ArrowSchema arrowSchema;
struct ArrowArray arrowArray;
std::cout << "\n<<<<< C++ to Java for RecordBatch >>>>>\n" << std::endl;
env->CallStaticVoidMethod(javaClassToBeCalledByCpp, fillVectorSchemaRoot,
static_cast<jlong>(reinterpret_cast<uintptr_t>(&arrowSchema)),
static_cast<jlong>(reinterpret_cast<uintptr_t>(&arrowArray)));
auto resultImportVectorSchemaRoot = arrow::ImportRecordBatch(&arrowArray, &arrowSchema);
std::shared_ptr<arrow::RecordBatch> recordBatch = resultImportVectorSchemaRoot.ValueOrDie();
std::cout << "[C++] RecordBatch: " << recordBatch->ToString() << std::endl;
} else {
std::cerr << "Could not find fillVectorSchemaRoot method\n" << std::endl;
return EXIT_FAILURE;
}
} else {
std::cout << "Could not find ToBeCalledByCpp class\n" << std::endl;
return EXIT_FAILURE;
}
jvm->DestroyJavaVM();
return EXIT_SUCCESS;
}
CMakeLists.txt 定义文件
cmake_minimum_required(VERSION 3.19)
project(cdatacpptojava)
find_package(JNI REQUIRED)
find_package(Arrow REQUIRED)
message(STATUS "Arrow version: ${ARROW_VERSION}")
include_directories(${JNI_INCLUDE_DIRS})
set(CMAKE_CXX_STANDARD 17)
add_executable(${PROJECT_NAME} main.cpp)
target_link_libraries(cdatacpptojava PRIVATE Arrow::arrow_shared)
target_link_libraries(cdatacpptojava PRIVATE ${JNI_LIBRARIES})
结果
<<<<< C++ to Java for Arrays >>>>>
[Java] FieldVector:
[1, 2, 3]
[C++] Array: [
1,
2,
3
]
<<<<< C++ to Java for RecordBatch >>>>>
[Java] VectorSchemaRoot:
age-to-export
10
20
30
[C++] RecordBatch: age-to-export: [
10,
20,
30
]