Arrow Flight RPC#

Arrow Flight 是一个 RPC 框架,用于通过网络高效传输 Flight 数据。

另请参阅

Flight 协议文档

Flight 协议的文档,包括如何概念性地使用 Flight。

Flight API 文档

C++ API 文档,列出了所有各种客户端和服务器类型。

C++ 指南

在 C++ 中使用 Arrow Flight 的方法。

编写 Flight 服务#

服务器是 arrow::flight::FlightServerBase 的子类。要实现各个 RPC,请重写此类的 RPC 方法。

class MyFlightServer : public FlightServerBase {
  Status ListFlights(const ServerCallContext& context, const Criteria* criteria,
                     std::unique_ptr<FlightListing>* listings) override {
    std::vector<FlightInfo> flights = ...;
    *listings = std::unique_ptr<FlightListing>(new SimpleFlightListing(flights));
    return Status::OK();
  }
};

每个 RPC 方法都始终采用 arrow::flight::ServerCallContext 用于通用参数,并返回 arrow::Status 以指示成功或失败。可以通过 arrow::flight::MakeFlightError() 返回 Flight 特定的错误代码。

除了状态之外还返回值的 RPC 方法将使用输出参数,如上所示。通常,有一些辅助类提供这些输出参数的基本实现。例如,上面,arrow::flight::SimpleFlightListing 使用 arrow::flight::FlightInfo 对象的向量作为 ListFlights RPC 的结果。

要启动服务器,请创建 arrow::flight::Location 以指定要侦听的位置,并调用 arrow::flight::FlightServerBase::Init()。这将启动服务器,但不会阻塞程序的其余部分。使用 arrow::flight::FlightServerBase::SetShutdownOnSignals() 启用在收到中断信号时停止服务器,然后调用 arrow::flight::FlightServerBase::Serve() 以阻塞直到服务器停止。

std::unique_ptr<arrow::flight::FlightServerBase> server;
// Initialize server
arrow::flight::Location location;
// Listen to all interfaces on a free port
ARROW_CHECK_OK(arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0, &location));
arrow::flight::FlightServerOptions options(location);

// Start the server
ARROW_CHECK_OK(server->Init(options));
// Exit with a clean error code (0) on SIGTERM
ARROW_CHECK_OK(server->SetShutdownOnSignals({SIGTERM}));

std::cout << "Server listening on localhost:" << server->port() << std::endl;
ARROW_CHECK_OK(server->Serve());

使用 Flight 客户端#

要连接到 Flight 服务,请通过调用 Connect 创建 arrow::flight::FlightClient 实例。

每个 RPC 方法都返回 arrow::Result 以指示请求的成功/失败,以及如果请求成功则返回结果对象。有些调用是流式调用,因此它们将返回读取器和/或写入器对象;直到流完成才可知最终调用状态。

取消和超时#

在进行调用时,客户端可以选择提供 FlightCallOptions。这允许客户端设置调用的超时或提供自定义 HTTP 标头,以及其他功能。此外,客户端 RPC 调用返回的一些对象公开了一个 Cancel 方法,该方法允许提前终止调用。

在服务器端,无需额外的代码即可实现超时。对于取消,服务器需要手动轮询 ServerCallContext::is_cancelled 以检查客户端是否已取消调用,如果是,则中断服务器当前正在进行的任何处理。

启用 TLS#

通过向 FlightServerBase::Init 提供证书和密钥对,可以在设置服务器时启用 TLS。

在客户端,使用 Location::ForGrpcTls 构造 arrow::flight::Location 以侦听。

启用身份验证#

警告

如果没有启用 TLS,身份验证是不安全的。

可以通过实现 ServerAuthHandler 并在构建服务器时提供此项来启用基于握手的身份验证。

身份验证包括两个部分:在初始客户端连接时,服务器和客户端身份验证实现可以执行所需的任何协商。然后客户端身份验证处理程序提供一个令牌,该令牌将附加到未来的调用。这是通过使用所需的客户端身份验证实现调用 Authenticate 来完成的。

此后的每个 RPC,客户端处理程序的令牌都会自动添加到请求头中的调用中。服务器身份验证处理程序验证令牌并提供客户端的身份。在服务器上,此身份可以从 arrow::flight::ServerCallContext 获取。

自定义中间件#

服务器和客户端支持自定义中间件(或拦截器),这些中间件在每个请求上调用,并可以以有限的方式修改请求。可以通过子类化 ServerMiddlewareClientMiddleware,然后在创建客户端或服务器时提供它们来实施。

中间件功能相当有限,但它们可以向请求/响应添加头部。在服务器上,它们可以检查传入的头部并使请求失败;因此,它们可以用于实现自定义身份验证方法。

最佳实践#

gRPC#

使用默认 gRPC 传输时,可以通过 arrow::flight::FlightClientOptions::generic_options 将选项传递给它。例如:

auto options = FlightClientOptions::Defaults();
// Set the period after which a keepalive ping is sent on transport.
options.generic_options.emplace_back(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);
# Set the period after which a keepalive ping is sent on transport.
generic_options = [("GRPC_ARG_KEEPALIVE_TIME_MS", 60000)]
client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)

另请参阅 gRPC 最佳实践 和可用的 gRPC 键

尽可能重用客户端#

创建和关闭客户端需要在客户端和服务器端进行设置和拆卸,这会占用实际处理 RPC 的时间。尽可能重用客户端以避免这种情况。请注意,客户端是线程安全的,因此单个客户端可以在多个线程之间共享。

不要轮询负载均衡#

轮询负载均衡意味着每个客户端都可以与每个服务器建立开放连接,导致意外数量的开放连接并耗尽服务器资源。

调试连接问题#

在长时间运行的连接上遇到意外断开时,使用 netstat 监视开放连接的数量。如果连接数量远大于客户端数量,则可能会导致问题。

为了调试,某些环境变量可以启用 gRPC 中的日志记录。例如,env GRPC_VERBOSITY=info GRPC_TRACE=http 将打印初始头(在两端),以便您可以查看 gRPC 是否建立了连接。它还会打印消息发送时间,以便您可以判断连接是否打开。

gRPC 可能不会报告连接错误,直到实际进行调用。因此,为了在创建客户端时检测连接错误,应该进行某种虚拟 RPC。

内存管理#

Flight 尝试重用 gRPC 所做的分配,以避免冗余数据复制。然而,经验表明,这些数据经常未对齐。某些用例可能要求数据具有特定于数据类型的对齐方式(例如,Int32 数组的数据缓冲区对齐到 4 字节边界),这可以通过将 arrow::ipc::IpcReadOptions::ensure_alignment 设置为 arrow::ipc::Alignment::kDataTypeSpecificAlignment 来强制执行。这使用 arrow::ipc::IpcReadOptions::memory_pool 分配具有对齐地址的内存,但仅适用于未对齐的数据。但是,这会创建通过 Flight 接收的数据的副本。

除非如上所述复制 gRPC 数据,否则 gRPC 所做的分配可能不会被 Arrow 内存池跟踪,并且内存使用行为(例如是否将可用内存返回给系统)取决于 gRPC 使用的分配器(通常是系统分配器)。

一种快速测试方法:使用调试器附加到进程并调用 malloc_trim,或者在系统池上调用 ReleaseUnused。如果内存使用量下降,那么很可能 gRPC 或应用程序分配了内存,而系统分配器一直保留着这些内存。这可以通过平台特定的方式进行调整;请参阅 ARROW-16697 中的调查,了解这在 Linux/glibc 上如何工作的示例。可以明确告诉 glibc malloc 转储缓存。

过度流量#

gRPC 将为并发客户端生成最多线程配额的线程。这些线程不一定被清理(用 Java 术语来说是“缓存线程池”)。glibc malloc 清除了一些每个线程的状态,并且默认调整在某些工作负载中从不清除缓存。

gRPC 的默认行为允许一个服务器接受来自许多不同客户端的许多连接,但如果请求做了大量工作(如在 Flight 下可能发生的情况),服务器可能无法跟上。将客户端配置为回退重试(并可能连接到不同的节点),将提供更一致的服务质量。

auto options = FlightClientOptions::Defaults();
// Set the minimum time between subsequent connection attempts.
options.generic_options.emplace_back(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000);
# Set the minimum time between subsequent connection attempts.
generic_options = [("GRPC_ARG_MIN_RECONNECT_BACKOFF_MS", 2000)]
client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)

限制 DoPut 批处理大小#

您可能希望限制客户端通过 DoPut 提交到服务器的最大批处理大小,以防止请求在服务器上占用过多内存。在客户端,设置 arrow::flight::FlightClientOptions::write_size_limit_bytes。在服务器端,设置 gRPC 选项 GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH。客户端选项将返回一个错误,可以使用较小的批处理重试,而服务器端限制将关闭连接。同时设置两者是明智的,因为前者提供更好的用户体验,但后者可能需要用来防御不礼貌的客户端。

关闭无响应的连接#

  1. 可以使用 arrow::flight::FlightCallOptions::stop_token 关闭陈旧调用。这需要在调用建立时记录停止令牌。

    StopSource stop_source;
    FlightCallOptions options;
    options.stop_token = stop_source.token();
    stop_source.RequestStop(Status::Cancelled("StopSource"));
    flight_client->DoAction(options, {});
    
  2. 使用调用超时。(这是 gRPC 的一般最佳实践。)

    FlightCallOptions options;
    options.timeout = TimeoutDuration{0.2};
    Status status = client->GetFlightInfo(options, FlightDescriptor{}).status();
    
    Iterator<Result> results = client.doAction(new Action("hang"), CallOptions.timeout(0.2, TimeUnit.SECONDS));
    
    options = pyarrow.flight.FlightCallOptions(timeout=0.2)
    result = client.do_action(action, options=options)
    
  3. 客户端超时对于长时间运行的流式调用不太好,在这种情况下可能很难为整个操作选择超时。相反,通常需要的是每次读取或每次写入超时,以便如果操作没有进展,则操作失败。这可以通过一个后台线程来实现,该线程在计时器上调用 Cancel(),主线程在每次操作成功完成时重置计时器。有关完整示例,请参阅 Cookbook。

    注意

    对于每次写入/每次读取超时而不是每次调用超时,有一个长期存在的问题单(ARROW-6062),但这使用阻塞 gRPC API 无法(轻易)实现。