Arrow Flight RPC¶
Arrow Flight 是一个用于在网络上高效传输 Arrow 数据的 RPC 框架。
另请参阅
- Flight 协议文档
Flight 协议的文档,包括如何概念性地使用 Flight。
- Java 指南
在 Java 中使用 Arrow Flight 的方案。
编写 Flight 服务¶
Flight 服务器实现 FlightProducer 接口。为方便起见,它们可以改用 NoOpFlightProducer 的子类,该类提供了所有 RPC 方法的默认实现。
public class TutorialFlightProducer implements FlightProducer {
@Override
// Override methods or use NoOpFlightProducer for only methods needed
}
每个 RPC 方法都始终接受一个用于通用参数的 CallContext。若要指示失败,请将异常传递给“监听器”(如果存在),否则请抛出异常。
// Server
@Override
public void listFlights(CallContext context, Criteria criteria, StreamListener<FlightInfo> listener) {
// ...
listener.onError(
CallStatus.UNAUTHENTICATED.withDescription(
"Custom UNAUTHENTICATED description message.").toRuntimeException());
// ...
}
// Client
try{
Iterable<FlightInfo> flightInfosBefore = flightClient.listFlights(Criteria.ALL);
// ...
} catch (FlightRuntimeException e){
// Catch UNAUTHENTICATED exception
}
要启动服务器,请创建一个 Location 来指定监听位置,然后使用一个 producer 实例创建一个 FlightServer。这将启动服务器,但不会阻塞程序的其余部分。调用 FlightServer.awaitTermination 可阻塞直到服务器停止。
class TutorialFlightProducer implements FlightProducer {
@Override
// Override methods or use NoOpFlightProducer for only methods needed
}
Location location = Location.forGrpcInsecure("0.0.0.0", 0);
try(
BufferAllocator allocator = new RootAllocator();
FlightServer server = FlightServer.builder(
allocator,
location,
new TutorialFlightProducer()
).build();
){
server.start();
System.out.println("Server listening on port " + server.getPort());
server.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
Server listening on port 58104
使用 Flight 客户端¶
要连接到 Flight 服务,请使用一个 location 创建一个 FlightClient。
Location location = Location.forGrpcInsecure("0.0.0.0", 58104);
try(BufferAllocator allocator = new RootAllocator();
FlightClient client = FlightClient.builder(allocator, location).build()){
// ... Consume operations exposed by Flight server
} catch (Exception e) {
e.printStackTrace();
}
取消与超时¶
发起调用时,客户端可以选择提供 CallOptions。这允许客户端设置调用的超时时间。此外,某些由客户端 RPC 调用返回的对象会公开一个 cancel 方法,允许提前终止调用。
Location location = Location.forGrpcInsecure("0.0.0.0", 58609);
try(BufferAllocator allocator = new RootAllocator();
FlightClient tutorialFlightClient = FlightClient.builder(allocator, location).build()){
Iterator<Result> resultIterator = tutorialFlightClient.doAction(
new Action("test-timeout"),
CallOptions.timeout(2, TimeUnit.SECONDS)
);
} catch (Exception e) {
e.printStackTrace();
}
在服务器端,超时是透明的。对于取消操作,服务器需要手动轮询 setOnCancelHandler 或 isCancelled 来检查客户端是否已取消调用,如果是,则中断服务器当前正在进行的任何处理。
// Client
Location location = Location.forGrpcInsecure("0.0.0.0", 58609);
try(BufferAllocator allocator = new RootAllocator();
FlightClient tutorialFlightClient = FlightClient.builder(allocator, location).build()){
try(FlightStream flightStream = flightClient.getStream(new Ticket(new byte[]{}))) {
// ...
flightStream.cancel("tutorial-cancel", new Exception("Testing cancellation option!"));
}
} catch (Exception e) {
e.printStackTrace();
}
// Server
@Override
public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
// ...
listener.setOnCancelHandler(()->{
// Implement logic to handle cancellation option
});
}
启用 TLS¶
在设置服务器时,通过向 FlightServer.Builder.useTls 提供证书和密钥对,可以启用 TLS。
在客户端,使用 Location.forGrpcTls 为客户端创建 Location。
启用身份验证¶
警告
如果没有启用 TLS,身份验证是不安全的。
通过实现 ServerAuthHandler 可以启用基于握手的身份验证。身份验证由两部分组成:在初始客户端连接时,服务器和客户端的身份验证实现可以执行所需的任何协商。然后,客户端身份验证处理器会提供一个令牌,该令牌将被附加到后续的调用中。
客户端通过 ClientAuthHandler.authenticate 发送待验证的数据。服务器通过 ServerAuthHandler.authenticate 验证接收到的数据。
自定义中间件¶
服务器和客户端支持自定义中间件(或拦截器),它们在每次请求时被调用,并能以有限的方式修改请求。这些可以通过实现 FlightServerMiddleware 和 FlightClientMiddleware 接口来实现。
中间件功能相当有限,但它们可以向请求/响应添加头部。在服务器上,它们可以检查传入的头部并使请求失败;因此,它们可以用于实现自定义身份验证方法。
添加服务¶
服务器可以添加其他 gRPC 服务。例如,要添加 健康检查服务
final HealthStatusManager statusManager = new HealthStatusManager();
final Consumer<NettyServerBuilder> consumer = (builder) -> {
builder.addService(statusManager.getHealthService());
};
final Location location = forGrpcInsecure(LOCALHOST, 5555);
try (
BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
Producer producer = new Producer(a);
FlightServer s = FlightServer.builder(a, location, producer)
.transportHint("grpc.builderConsumer", consumer).build().start();
) {
Channel channel = NettyChannelBuilder.forAddress(location.toSocketAddress()).usePlaintext().build();
HealthCheckResponse response = HealthGrpc
.newBlockingStub(channel)
.check(HealthCheckRequest.getDefaultInstance());
System.out.println(response.getStatus());
}
Flight 最佳实践¶
请参阅 C++ 的最佳实践。