Arrow Flight RPC#

Arrow Flight 是一个用于通过网络高效传输 Arrow 数据的 RPC 框架。

另请参阅

Flight 协议文档

Flight 协议的文档,包括如何在概念上使用 Flight。

Java 食谱

在 Java 中使用 Arrow Flight 的方法。

编写 Flight 服务#

Flight 服务器实现了 FlightProducer 接口。为了方便起见,它们可以继承 NoOpFlightProducer 类,该类提供了所有 RPC 方法的默认实现。

public class TutorialFlightProducer implements FlightProducer {
    @Override
    // Override methods or use NoOpFlightProducer for only methods needed
}

每个 RPC 方法总是接受一个 CallContext 用于通用参数。要指示失败,如果存在“侦听器”,则将异常传递给它,否则引发异常。

// Server
@Override
public void listFlights(CallContext context, Criteria criteria, StreamListener<FlightInfo> listener) {
    // ...
    listener.onError(
        CallStatus.UNAUTHENTICATED.withDescription(
            "Custom UNAUTHENTICATED description message.").toRuntimeException());
    // ...
}

// Client
try{
    Iterable<FlightInfo> flightInfosBefore = flightClient.listFlights(Criteria.ALL);
    // ...
} catch (FlightRuntimeException e){
    // Catch UNAUTHENTICATED exception
}

要启动服务器,请创建一个 Location 来指定侦听位置,然后使用生产者实例创建一个 FlightServer。这将启动服务器,但不会阻塞程序的其余部分。调用 FlightServer.awaitTermination 将阻塞,直到服务器停止。

class TutorialFlightProducer implements FlightProducer {
    @Override
    // Override methods or use NoOpFlightProducer for only methods needed
}

Location location = Location.forGrpcInsecure("0.0.0.0", 0);
try(
    BufferAllocator allocator = new RootAllocator();
    FlightServer server = FlightServer.builder(
            allocator,
            location,
            new TutorialFlightProducer()
    ).build();
){
    server.start();
    System.out.println("Server listening on port " + server.getPort());
    server.awaitTermination();
} catch (Exception e) {
    e.printStackTrace();
}
Server listening on port 58104

使用 Flight 客户端#

要连接到 Flight 服务,请使用位置创建一个 FlightClient

Location location = Location.forGrpcInsecure("0.0.0.0", 58104);

try(BufferAllocator allocator = new RootAllocator();
    FlightClient client = FlightClient.builder(allocator, location).build()){
    // ... Consume operations exposed by Flight server
} catch (Exception e) {
    e.printStackTrace();
}

取消和超时#

进行调用时,客户端可以选择提供 CallOptions。这允许客户端设置调用的超时。此外,客户端 RPC 调用返回的一些对象公开了一种取消方法,允许提前终止调用。

Location location = Location.forGrpcInsecure("0.0.0.0", 58609);

try(BufferAllocator allocator = new RootAllocator();
    FlightClient tutorialFlightClient = FlightClient.builder(allocator, location).build()){

    Iterator<Result> resultIterator = tutorialFlightClient.doAction(
            new Action("test-timeout"),
            CallOptions.timeout(2, TimeUnit.SECONDS)
    );
} catch (Exception e) {
    e.printStackTrace();
}

在服务器端,超时是透明的。 对于取消,服务器需要手动轮询 setOnCancelHandlerisCancelled 以检查客户端是否已取消调用,如果是,则中断服务器当前正在执行的任何处理。

// Client
Location location = Location.forGrpcInsecure("0.0.0.0", 58609);
try(BufferAllocator allocator = new RootAllocator();
    FlightClient tutorialFlightClient = FlightClient.builder(allocator, location).build()){
    try(FlightStream flightStream = flightClient.getStream(new Ticket(new byte[]{}))) {
        // ...
        flightStream.cancel("tutorial-cancel", new Exception("Testing cancellation option!"));
    }
} catch (Exception e) {
    e.printStackTrace();
}
// Server
@Override
public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) {
    // ...
    listener.setOnCancelHandler(()->{
                // Implement logic to handle cancellation option
            });
}

启用 TLS#

通过向 FlightServer.Builder.useTls 提供证书和密钥对,可以在设置服务器时启用 TLS。

在客户端,使用 Location.forGrpcTls 为客户端创建 Location。

启用身份验证#

警告

不启用 TLS 的身份验证是不安全的。

可以通过实现 ServerAuthHandler 来启用基于握手的身份验证。身份验证包括两部分:在初始客户端连接时,服务器和客户端身份验证实现可以执行任何所需的协商。然后,客户端身份验证处理程序提供一个令牌,该令牌将附加到将来的调用中。

客户端通过 ClientAuthHandler.authenticate 发送要验证的数据。服务器通过 ServerAuthHandler.authenticate 验证接收到的数据。

自定义中间件#

服务器和客户端支持在每个请求上调用的自定义中间件(或拦截器),并且可以以有限的方式修改请求。这些可以通过实现 FlightServerMiddlewareFlightClientMiddleware 接口来实现。

中间件的功能相当有限,但它们可以向请求/响应添加标头。在服务器上,它们可以检查传入的标头并使请求失败;因此,它们可用于实现自定义身份验证方法。

添加服务#

服务器可以添加其他 gRPC 服务。例如,要添加 运行状况检查服务

final HealthStatusManager statusManager = new HealthStatusManager();
final Consumer<NettyServerBuilder> consumer = (builder) -> {
  builder.addService(statusManager.getHealthService());
};
final Location location = forGrpcInsecure(LOCALHOST, 5555);
try (
    BufferAllocator a = new RootAllocator(Long.MAX_VALUE);
    Producer producer = new Producer(a);
    FlightServer s = FlightServer.builder(a, location, producer)
        .transportHint("grpc.builderConsumer", consumer).build().start();
) {
  Channel channel = NettyChannelBuilder.forAddress(location.toSocketAddress()).usePlaintext().build();
  HealthCheckResponse response = HealthGrpc
          .newBlockingStub(channel)
          .check(HealthCheckRequest.getDefaultInstance());

  System.out.println(response.getStatus());
}

Flight 最佳实践#