Arrow Flight RPC#

Arrow Flight 是一个用于通过网络高效传输 Flight 数据的 RPC 框架。

另请参阅

Flight 协议文档

Flight 协议的文档,包括如何从概念上使用 Flight。

Flight API 文档

列出所有各种客户端和服务器类型的 C++ API 文档。

C++ 食谱

在 C++ 中使用 Arrow Flight 的食谱。

编写 Flight 服务#

服务器是 arrow::flight::FlightServerBase 的子类。要实现各个 RPC,请覆盖此类上的 RPC 方法。

class MyFlightServer : public FlightServerBase {
  Status ListFlights(const ServerCallContext& context, const Criteria* criteria,
                     std::unique_ptr<FlightListing>* listings) override {
    std::vector<FlightInfo> flights = ...;
    *listings = std::unique_ptr<FlightListing>(new SimpleFlightListing(flights));
    return Status::OK();
  }
};

每个 RPC 方法始终接受一个 arrow::flight::ServerCallContext 用于通用参数,并返回一个 arrow::Status 以指示成功或失败。可以通过 arrow::flight::MakeFlightError() 返回特定于 Flight 的错误代码。

除了状态外还返回值的 RPC 方法将使用输出参数,如上所示。通常,有一些辅助类提供这些输出参数的基本实现。例如,上面,arrow::flight::SimpleFlightListing 使用 arrow::flight::FlightInfo 对象的向量作为 ListFlights RPC 的结果。

要启动服务器,请创建一个 arrow::flight::Location 以指定要侦听的位置,并调用 arrow::flight::FlightServerBase::Init()。这将启动服务器,但不会阻塞程序的其余部分。使用 arrow::flight::FlightServerBase::SetShutdownOnSignals() 启用在收到中断信号时停止服务器,然后调用 arrow::flight::FlightServerBase::Serve() 以阻塞直到服务器停止。

std::unique_ptr<arrow::flight::FlightServerBase> server;
// Initialize server
arrow::flight::Location location;
// Listen to all interfaces on a free port
ARROW_CHECK_OK(arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0, &location));
arrow::flight::FlightServerOptions options(location);

// Start the server
ARROW_CHECK_OK(server->Init(options));
// Exit with a clean error code (0) on SIGTERM
ARROW_CHECK_OK(server->SetShutdownOnSignals({SIGTERM}));

std::cout << "Server listening on localhost:" << server->port() << std::endl;
ARROW_CHECK_OK(server->Serve());

使用 Flight 客户端#

要连接到 Flight 服务,请通过调用 Connect 创建 arrow::flight::FlightClient 的实例。

每个 RPC 方法返回 arrow::Result 以指示请求的成功/失败,以及如果请求成功则指示结果对象。某些调用是流式调用,因此它们将返回读取器和/或写入器对象;最终调用状态直到流完成才已知。

取消和超时#

进行调用时,客户端可以选择提供 FlightCallOptions。这允许客户端在调用上设置超时或提供自定义 HTTP 标头,以及其他功能。此外,客户端 RPC 调用返回的一些对象公开了一个 Cancel 方法,该方法允许提前终止调用。

在服务器端,无需额外的代码来实现超时。对于取消,服务器需要手动轮询 ServerCallContext::is_cancelled 以检查客户端是否已取消调用,如果是,则中断服务器当前正在执行的任何处理。

启用 TLS#

在设置服务器时,可以通过向 FlightServerBase::Init 提供证书和密钥对来启用 TLS。

在客户端,使用 Location::ForGrpcTls 构造要侦听的 arrow::flight::Location

启用身份验证#

警告

在没有启用 TLS 的情况下,身份验证是不安全的。

可以通过实现 ServerAuthHandler 并将其在构造期间提供给服务器来启用基于握手的身份验证。

身份验证包括两个部分:在初始客户端连接时,服务器和客户端身份验证实现可以执行任何必要的协商。然后,客户端身份验证处理程序提供一个将在未来调用中附加的令牌。这是通过使用所需的客户端身份验证实现调用 Authenticate 来完成的。

此后在每个 RPC 上,客户端处理程序的令牌都会自动添加到请求标头中的调用中。服务器身份验证处理程序验证令牌并提供客户端的身份。在服务器上,可以从 arrow::flight::ServerCallContext 获取此身份。

自定义中间件#

服务器和客户端支持自定义中间件(或拦截器),这些中间件在每次请求时都会被调用,并且可以以有限的方式修改请求。这些可以通过子类化 ServerMiddlewareClientMiddleware 来实现,然后在创建客户端或服务器时提供它们。

中间件相当有限,但它们可以向请求/响应添加标头。在服务器上,它们可以检查传入的标头并使请求失败;因此,它们可以用于实现自定义身份验证方法。

最佳实践#

gRPC#

使用默认 gRPC 传输时,可以通过 arrow::flight::FlightClientOptions::generic_options 将选项传递给它。例如

auto options = FlightClientOptions::Defaults();
// Set the period after which a keepalive ping is sent on transport.
options.generic_options.emplace_back(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);
# Set the period after which a keepalive ping is sent on transport.
generic_options = [("GRPC_ARG_KEEPALIVE_TIME_MS", 60000)]
client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)

另请参阅 最佳 gRPC 实践 和可用的 gRPC 密钥

尽可能重复使用客户端#

创建和关闭客户端需要在客户端和服务器端进行设置和拆卸,这会占用实际处理 RPC 的时间。尽可能重复使用客户端以避免这种情况。请注意,客户端是线程安全的,因此单个客户端可以在多个线程之间共享。

不要使用轮询负载均衡#

轮询负载均衡 意味着每个客户端都可以与每个服务器建立打开的连接,从而导致意外数量的打开连接并耗尽服务器资源。

调试连接问题#

在遇到长期运行连接上的意外断开连接时,使用 netstat 监控打开连接的数量。如果连接数远大于客户端数,则可能会导致问题。

为了进行调试,某些环境变量启用了 gRPC 中的日志记录。例如,env GRPC_VERBOSITY=info GRPC_TRACE=http 将打印初始标头(在双方),以便您可以查看 gRPC 是否建立了连接。它还将在发送消息时打印,因此您可以判断连接是否打开。

gRPC 可能不会报告连接错误,直到实际进行调用。因此,要在创建客户端时检测连接错误,应进行某种虚拟 RPC。

内存管理#

Flight 尝试重复使用 gRPC 进行的分配,以避免冗余数据复制。但是,这意味着这些分配可能不会由 Arrow 内存池跟踪,并且内存使用行为(例如是否将空闲内存返回到系统)取决于 gRPC 使用的分配器(通常是系统分配器)。

一种快速测试方法:使用调试器附加到进程并调用 malloc_trim,或在系统池上调用 ReleaseUnused。如果内存使用量下降,则很可能系统分配器保留了 gRPC 或应用程序分配的内存。这可以通过特定于平台的方式进行调整;请参阅 ARROW-16697 中的调查,了解此功能在 Linux/glibc 上的工作原理示例。可以显式地指示 glibc malloc 转储缓存。

流量过多#

gRPC 将为并发客户端生成多达最大线程配额的线程。这些线程不一定被清理(用 Java 的说法是“缓存线程池”)。glibc malloc 清理一些每个线程的状态,并且默认调整在某些工作负载中永远不会清除缓存。

gRPC 的默认行为允许一台服务器接受来自许多不同客户端的许多连接,但如果请求执行大量工作(因为它们可能在 Flight 下执行),则服务器可能无法跟上。配置客户端以使用退避重试(并可能连接到不同的节点),将提供更一致的服务质量。

auto options = FlightClientOptions::Defaults();
// Set the minimum time between subsequent connection attempts.
options.generic_options.emplace_back(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000);
# Set the minimum time between subsequent connection attempts.
generic_options = [("GRPC_ARG_MIN_RECONNECT_BACKOFF_MS", 2000)]
client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)

限制 DoPut 批处理大小#

您可能希望限制客户端通过 DoPut 提交到服务器的最大批处理大小,以防止请求占用服务器过多的内存。在客户端,设置arrow::flight::FlightClientOptions::write_size_limit_bytes。在服务器端,设置 gRPC 选项GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH。客户端选项将返回一个错误,可以使用较小的批次重试,而服务器端限制将关闭连接。设置两者可能明智,因为前者提供了更好的用户体验,但后者可能是为了防御不礼貌的客户端而必需的。

关闭无响应的连接#

  1. 可以使用arrow::flight::FlightCallOptions::stop_token关闭过时的调用。这需要在建立调用时记录停止令牌。

    StopSource stop_source;
    FlightCallOptions options;
    options.stop_token = stop_source.token();
    stop_source.RequestStop(Status::Cancelled("StopSource"));
    flight_client->DoAction(options, {});
    
  2. 使用调用超时。(这是 gRPC 的一般最佳实践。)

    FlightCallOptions options;
    options.timeout = TimeoutDuration{0.2};
    Status status = client->GetFlightInfo(options, FlightDescriptor{}).status();
    
    Iterator<Result> results = client.doAction(new Action("hang"), CallOptions.timeout(0.2, TimeUnit.SECONDS));
    
    options = pyarrow.flight.FlightCallOptions(timeout=0.2)
    result = client.do_action(action, options=options)
    
  3. 客户端超时对于长时间运行的流式调用来说不是很好,因为可能难以为整个操作选择超时。相反,通常需要的是每个读取或每个写入超时,以便如果操作没有取得进展,则该操作失败。这可以通过一个后台线程来实现,该线程在计时器上调用 Cancel(),主线程在每次操作成功完成时重置计时器。有关完整的工作示例,请参阅食谱。

    注意

    有一个长期存在的关于每个写入/每个读取超时而不是每个调用超时的工单(ARROW-6062),但使用阻塞式 gRPC API 无法(轻松)实现这一点。

替代传输#

Arrow Flight 的标准传输是gRPC。C++ 实现还实验性地支持基于UCX的传输。要使用它,在启动服务器或创建客户端时使用协议方案ucx:

UCX 传输#

并非所有 gRPC 传输的功能都受支持。有关详细信息,请参阅Flight RPC。另请注意以下具体注意事项

  • 服务器为每个客户端创建一个独立的 UCP 工作进程。这会消耗更多资源,但提供更高的吞吐量。

  • 客户端为每个 RPC 调用创建一个独立的 UCP 工作进程。同样,这以资源消耗换取性能。这也意味着,与 gRPC 不同,使用单个客户端或多个客户端进行所有调用在本质上是等效的。

  • UCX 传输尝试尽可能避免复制。在某些情况下,它可以直接重用 UCX 分配的缓冲区来支持arrow::Buffer对象,但是,这也会将关联的 UCX 资源的生命周期扩展到 Flight 客户端或服务器对象的生命周期之外。

  • 根据 UCX 本身选择的传输,您可能会发现将UCX_MM_SEG_SIZE从默认值(约 8KB)增加到约 60KB 会提高性能(UCX 将在单个调用中复制更多数据)。