Arrow Flight RPC#

Arrow Flight 是一种 RPC 框架,用于通过网络高效传输 Flight 数据。

另请参阅

Flight 协议文档

Flight 协议的文档,包括如何在概念上使用 Flight。

Flight API 文档

C++ API 文档,列出所有各种客户端和服务器类型。

C++ 食谱

在 C++ 中使用 Arrow Flight 的方法。

编写 Flight 服务#

服务器是 arrow::flight::FlightServerBase 的子类。 要实现单个 RPC,请覆盖此类上的 RPC 方法。

class MyFlightServer : public FlightServerBase {
  Status ListFlights(const ServerCallContext& context, const Criteria* criteria,
                     std::unique_ptr<FlightListing>* listings) override {
    std::vector<FlightInfo> flights = ...;
    *listings = std::unique_ptr<FlightListing>(new SimpleFlightListing(flights));
    return Status::OK();
  }
};

每个 RPC 方法始终采用 arrow::flight::ServerCallContext 作为常用参数,并返回一个 arrow::Status 以指示成功或失败。 可以通过 arrow::flight::MakeFlightError() 返回 Flight 特定的错误代码。

除了状态之外还返回值的 RPC 方法将使用 out 参数,如上所示。 通常,有一些辅助类提供这些 out 参数的基本实现。 例如,在上面,arrow::flight::SimpleFlightListing 使用 arrow::flight::FlightInfo 对象的向量作为 ListFlights RPC 的结果。

要启动服务器,请创建一个 arrow::flight::Location 以指定要侦听的位置,并调用 arrow::flight::FlightServerBase::Init()。 这将启动服务器,但不会阻止程序的其余部分。 使用 arrow::flight::FlightServerBase::SetShutdownOnSignals() 以在收到中断信号时停止服务器,然后调用 arrow::flight::FlightServerBase::Serve() 以阻塞直到服务器停止。

std::unique_ptr<arrow::flight::FlightServerBase> server;
// Initialize server
arrow::flight::Location location;
// Listen to all interfaces on a free port
ARROW_CHECK_OK(arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0, &location));
arrow::flight::FlightServerOptions options(location);

// Start the server
ARROW_CHECK_OK(server->Init(options));
// Exit with a clean error code (0) on SIGTERM
ARROW_CHECK_OK(server->SetShutdownOnSignals({SIGTERM}));

std::cout << "Server listening on localhost:" << server->port() << std::endl;
ARROW_CHECK_OK(server->Serve());

使用 Flight 客户端#

要连接到 Flight 服务,请通过调用 Connect 创建 arrow::flight::FlightClient 的实例。

每个 RPC 方法都返回 arrow::Result 以指示请求的成功/失败,如果请求成功,则返回结果对象。 某些调用是流式调用,因此它们将返回读取器和/或写入器对象; 只有在流完成后才知道最终调用状态。

取消和超时#

发出调用时,客户端可以选择提供 FlightCallOptions。 这允许客户端在调用中设置超时或提供自定义 HTTP 标头,以及其他功能。 此外,客户端 RPC 调用返回的某些对象公开了一个 Cancel 方法,该方法允许提前终止调用。

在服务器端,不需要额外的代码来实现超时。 对于取消,服务器需要手动轮询 ServerCallContext::is_cancelled 以检查客户端是否取消了调用,如果是,则中断服务器当前正在执行的任何处理。

启用 TLS#

可以通过向 FlightServerBase::Init 提供证书和密钥对,在设置服务器时启用 TLS。

在客户端,使用 Location::ForGrpcTls 构造要侦听的 arrow::flight::Location

启用身份验证#

警告

不启用 TLS 的情况下,身份验证是不安全的。

可以通过实现 ServerAuthHandler 并在构造期间将其提供给服务器来启用基于握手的身份验证。

身份验证包含两个部分:在初始客户端连接时,服务器和客户端身份验证实现可以执行任何所需的协商。 然后,客户端身份验证处理程序提供一个令牌,该令牌将附加到将来的调用。 这是通过使用所需的客户端身份验证实现调用 Authenticate 来完成的。

此后,在每个 RPC 上,客户端处理程序的令牌会自动添加到请求标头中的调用中。 服务器身份验证处理程序验证令牌并提供客户端的身份。 在服务器上,可以从 arrow::flight::ServerCallContext 获取此身份。

自定义中间件#

服务器和客户端支持自定义中间件(或拦截器),这些中间件在每个请求上调用,并且可以以有限的方式修改请求。 这些可以通过对 ServerMiddlewareClientMiddleware 进行子类化来实现,然后在创建客户端或服务器时提供它们。

中间件相当有限,但它们可以向请求/响应添加标头。 在服务器上,他们可以检查传入的标头并使请求失败; 因此,它们可以用于实现自定义身份验证方法。

最佳实践#

gRPC#

当使用默认的 gRPC 传输时,可以通过 arrow::flight::FlightClientOptions::generic_options 将选项传递给它。例如

auto options = FlightClientOptions::Defaults();
// Set the period after which a keepalive ping is sent on transport.
options.generic_options.emplace_back(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);
# Set the period after which a keepalive ping is sent on transport.
generic_options = [("GRPC_ARG_KEEPALIVE_TIME_MS", 60000)]
client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)

另请参阅 gRPC 最佳实践 和可用的 gRPC 键

尽可能重用客户端#

创建和关闭客户端需要在客户端和服务器端进行设置和拆卸,这会占用实际处理 RPC 的时间。尽可能重用客户端以避免这种情况。请注意,客户端是线程安全的,因此可以在多个线程之间共享单个客户端。

不要使用轮询负载均衡#

轮询负载均衡意味着每个客户端都可以与每个服务器建立开放连接,导致开放连接的数量超出预期,并耗尽服务器资源。

调试连接问题#

当在长时间运行的连接上遇到意外断开时,使用 netstat 监控打开的连接数。如果连接数远大于客户端数,则可能导致问题。

对于调试,某些环境变量可以在 gRPC 中启用日志记录。 例如,env GRPC_VERBOSITY=info GRPC_TRACE=http 将打印初始标头(在双方),以便你可以查看 gRPC 是否建立了连接。它还会打印何时发送消息,以便你可以判断连接是否打开。

gRPC 可能不会报告连接错误,直到实际进行调用为止。 因此,要在创建客户端时检测连接错误,应进行某种虚拟 RPC。

内存管理#

Flight 尝试重用 gRPC 所做的分配,以避免冗余的数据复制。 但是,这意味着这些分配可能不会被 Arrow 内存池跟踪,并且内存使用行为(例如,是否将空闲内存返回给系统)取决于 gRPC 使用的分配器(通常是系统分配器)。

一种快速测试方法是:使用调试器附加到进程并调用 malloc_trim,或者在系统池上调用 ReleaseUnused。 如果内存使用量下降,则很可能是 gRPC 或应用程序分配的内存被系统分配器保留。 这可以在特定于平台的方式进行调整; 有关此如何在 Linux/glibc 上工作的示例,请参见 ARROW-16697 中的调查。 可以明确告诉 glibc malloc 转储缓存。

过度流量#

gRPC 将为并发客户端生成最多最大线程配额的线程。 这些线程不一定会清理(Java 术语中的“缓存线程池”)。 glibc malloc 清除一些每个线程的状态,并且默认调整永远不会清除某些工作负载中的缓存。

gRPC 的默认行为允许一个服务器接受来自许多不同客户端的许多连接,但是如果请求执行大量工作(就像在 Flight 下一样),则服务器可能无法跟上。 配置客户端以重试并进行退避(并可能连接到其他节点)将提供更一致的服务质量。

auto options = FlightClientOptions::Defaults();
// Set the minimum time between subsequent connection attempts.
options.generic_options.emplace_back(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000);
# Set the minimum time between subsequent connection attempts.
generic_options = [("GRPC_ARG_MIN_RECONNECT_BACKOFF_MS", 2000)]
client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)

限制 DoPut 批处理大小#

您可能希望限制客户端可以通过 DoPut 提交给服务器的最大批处理大小,以防止请求占用服务器上过多的内存。 在客户端,设置 arrow::flight::FlightClientOptions::write_size_limit_bytes。 在服务器端,设置 gRPC 选项 GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH。 客户端选项将返回一个错误,可以使用较小的批次重试,而服务器端限制将关闭连接。 同时设置两者可能是明智的,因为前者提供了更好的用户体验,但后者可能是防御不礼貌的客户端所必需的。

关闭无响应连接#

  1. 可以使用 arrow::flight::FlightCallOptions::stop_token 关闭过时的调用。 这需要在建立调用时记录停止令牌。

    StopSource stop_source;
    FlightCallOptions options;
    options.stop_token = stop_source.token();
    stop_source.RequestStop(Status::Cancelled("StopSource"));
    flight_client->DoAction(options, {});
    
  2. 使用调用超时。(这是一般的 gRPC 最佳实践。)

    FlightCallOptions options;
    options.timeout = TimeoutDuration{0.2};
    Status status = client->GetFlightInfo(options, FlightDescriptor{}).status();
    
    Iterator<Result> results = client.doAction(new Action("hang"), CallOptions.timeout(0.2, TimeUnit.SECONDS));
    
    options = pyarrow.flight.FlightCallOptions(timeout=0.2)
    result = client.do_action(action, options=options)
    
  3. 客户端超时对于长时间运行的流式调用不是很有效,因为可能难以选择整个操作的超时。 相反,通常需要的是每个读取或每个写入超时,以便在操作没有取得进展时操作失败。 这可以通过一个在计时器上调用 Cancel() 的后台线程来实现,主线程在每次操作成功完成后重置计时器。 有关完整的工作示例,请参见 Cookbook。

    注意

    有一个针对每次写入/每次读取超时而不是每次调用超时的长期票证(ARROW-6062),但是这(不容易)可以使用阻塞 gRPC API 实现。