Arrow Flight RPC#

Arrow Flight 是一个 RPC 框架,用于通过网络高效传输 Flight 数据。

另请参阅

Flight 协议文档

Flight 协议的文档,包括如何在概念上使用 Flight。

Flight API 文档

列出所有各种客户端和服务器类型的 C++ API 文档。

C++ 教程

在 C++ 中使用 Arrow Flight 的方法。

编写 Flight 服务#

服务器是 arrow::flight::FlightServerBase 的子类。要实现单个 RPC,请覆盖此类上的 RPC 方法。

class MyFlightServer : public FlightServerBase {
  Status ListFlights(const ServerCallContext& context, const Criteria* criteria,
                     std::unique_ptr<FlightListing>* listings) override {
    std::vector<FlightInfo> flights = ...;
    *listings = std::unique_ptr<FlightListing>(new SimpleFlightListing(flights));
    return Status::OK();
  }
};

每个 RPC 方法总是接受一个 arrow::flight::ServerCallContext 用于公共参数,并返回一个 arrow::Status 来指示成功或失败。可以通过 arrow::flight::MakeFlightError() 返回 Flight 特定的错误代码。

除了状态之外还返回值的 RPC 方法将使用输出参数,如上所示。通常,有一些辅助类提供这些输出参数的基本实现。例如,上面,arrow::flight::SimpleFlightListing 使用 arrow::flight::FlightInfo 对象的向量作为 ListFlights RPC 的结果。

要启动服务器,请创建一个 arrow::flight::Location 来指定侦听位置,并调用 arrow::flight::FlightServerBase::Init()。这将启动服务器,但不会阻塞程序的其余部分。 使用 arrow::flight::FlightServerBase::SetShutdownOnSignals() 来启用在收到中断信号时停止服务器,然后调用 arrow::flight::FlightServerBase::Serve() 阻塞直到服务器停止。

std::unique_ptr<arrow::flight::FlightServerBase> server;
// Initialize server
arrow::flight::Location location;
// Listen to all interfaces on a free port
ARROW_CHECK_OK(arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0, &location));
arrow::flight::FlightServerOptions options(location);

// Start the server
ARROW_CHECK_OK(server->Init(options));
// Exit with a clean error code (0) on SIGTERM
ARROW_CHECK_OK(server->SetShutdownOnSignals({SIGTERM}));

std::cout << "Server listening on localhost:" << server->port() << std::endl;
ARROW_CHECK_OK(server->Serve());

使用 Flight 客户端#

要连接到 Flight 服务,请通过调用 Connect 创建 arrow::flight::FlightClient 的实例。

每个 RPC 方法都返回 arrow::Result 以指示请求的成功/失败,以及请求成功的结果对象。某些调用是流式调用,因此它们将返回读取器和/或写入器对象;最终调用状态在流完成之前是未知的。

取消和超时#

进行调用时,客户端可以选择提供 FlightCallOptions。这允许客户端设置调用超时或提供自定义 HTTP 标头等功能。此外,客户端 RPC 调用返回的一些对象公开了一个 Cancel 方法,该方法允许提前终止调用。

在服务器端,不需要额外的代码来实现超时。 对于取消,服务器需要手动轮询 ServerCallContext::is_cancelled 以检查客户端是否已取消调用,如果是,则中断服务器当前正在执行的任何处理。

启用 TLS#

通过向 FlightServerBase::Init 提供证书和密钥对,可以在设置服务器时启用 TLS。

在客户端,使用 Location::ForGrpcTls 构造要侦听的 arrow::flight::Location

启用身份验证#

警告

未启用 TLS 的情况下,身份验证是不安全的。

可以通过实现 ServerAuthHandler 并在构造期间将其提供给服务器来启用基于握手的身份验证。

身份验证包括两个部分:在初始客户端连接时,服务器和客户端身份验证实现可以执行任何需要的协商。然后,客户端身份验证处理程序提供一个将附加到未来调用的令牌。这是通过使用所需的客户端身份验证实现调用 Authenticate 来完成的。

在之后的每个 RPC 上,客户端处理程序的令牌都会自动添加到请求标头中的调用中。服务器身份验证处理程序验证令牌并提供客户端的身份。在服务器上,可以从 arrow::flight::ServerCallContext 获取此身份。

自定义中间件#

服务器和客户端支持在每个请求上调用的自定义中间件(或拦截器),并且可以以有限的方式修改请求。这些可以通过继承 ServerMiddlewareClientMiddleware 来实现,然后在创建客户端或服务器时提供它们。

中间件的功能相当有限,但它们可以向请求/响应添加标头。在服务器上,它们可以检查传入的标头并使请求失败;因此,它们可以用于实现自定义身份验证方法。

最佳实践#

gRPC#

使用默认的 gRPC 传输时,可以通过 arrow::flight::FlightClientOptions::generic_options 将选项传递给它。例如

auto options = FlightClientOptions::Defaults();
// Set the period after which a keepalive ping is sent on transport.
options.generic_options.emplace_back(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);
# Set the period after which a keepalive ping is sent on transport.
generic_options = [("GRPC_ARG_KEEPALIVE_TIME_MS", 60000)]
client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)

另请参阅 gRPC 最佳实践 和可用的 gRPC 密钥

尽可能重用客户端#

创建和关闭客户端需要在客户端和服务器端进行设置和拆除,这会占用实际处理 RPC 的时间。 尽可能重用客户端以避免这种情况。 请注意,客户端是线程安全的,因此可以在多个线程之间共享单个客户端。

不要使用轮询负载均衡#

轮询负载均衡 意味着每个客户端都可以与每个服务器建立开放连接,从而导致意外数量的开放连接并耗尽服务器资源。

调试连接问题#

当长时间运行的连接出现意外断开时,请使用 netstat 监控打开的连接数。 如果连接数远大于客户端数,则可能会导致问题。

为了进行调试,某些环境变量可以启用 gRPC 中的日志记录。 例如,env GRPC_VERBOSITY=info GRPC_TRACE=http 将打印初始标头(在双方),以便您可以查看 gRPC 是否建立了连接。 它还会在发送消息时打印,因此您可以判断连接是否打开。

gRPC 可能在实际进行调用之前不会报告连接错误。 因此,为了在创建客户端时检测连接错误,应该进行某种虚拟 RPC。

内存管理#

Flight 尝试重用 gRPC 进行的分配,以避免冗余数据副本。 但是,这意味着 Arrow 内存池可能无法跟踪这些分配,并且内存使用行为(例如是否将空闲内存返回给系统)取决于 gRPC 使用的分配器(通常是系统分配器)。

一种快速测试方法:使用调试器附加到进程并调用 malloc_trim,或在系统池上调用 ReleaseUnused。 如果内存使用量下降,则可能是 gRPC 或应用程序分配了系统分配器持有的内存。 这可以通过特定于平台的方式进行调整;有关它如何在 Linux/glibc 上工作的示例,请参阅 ARROW-16697 中的调查。 可以明确告知 glibc malloc 转储缓存。

流量过大#

gRPC 将为并发客户端生成最多最大线程配额的线程。 这些线程不一定会被清理(Java 术语中的“缓存线程池”)。 glibc malloc 清除一些每线程状态,并且默认调整在某些工作负载中永远不会清除缓存。

gRPC 的默认行为允许一台服务器接受来自许多不同客户端的许多连接,但如果请求执行大量工作(就像它们在 Flight 下可能执行的那样),服务器可能无法跟上。 将客户端配置为使用退避重试(并可能连接到不同的节点),将提供更一致的服务质量。

auto options = FlightClientOptions::Defaults();
// Set the minimum time between subsequent connection attempts.
options.generic_options.emplace_back(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000);
# Set the minimum time between subsequent connection attempts.
generic_options = [("GRPC_ARG_MIN_RECONNECT_BACKOFF_MS", 2000)]
client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)

限制 DoPut 批处理大小#

您可能希望限制客户端可以通过 DoPut 提交到服务器的最大批处理大小,以防止请求占用服务器上的太多内存。 在客户端,设置 arrow::flight::FlightClientOptions::write_size_limit_bytes。 在服务器端,设置 gRPC 选项 GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH。 客户端选项将返回一个错误,可以使用较小的批次重试该错误,而服务器端限制将关闭连接。 同时设置两者是明智的,因为前者提供了更好的用户体验,而后者可能是抵御不礼貌客户端所必需的。

关闭无响应的连接#

  1. 可以使用 arrow::flight::FlightCallOptions::stop_token 关闭过时的调用。 这需要在调用建立时记录停止令牌。

    StopSource stop_source;
    FlightCallOptions options;
    options.stop_token = stop_source.token();
    stop_source.RequestStop(Status::Cancelled("StopSource"));
    flight_client->DoAction(options, {});
    
  2. 使用调用超时。(这是一般的 gRPC 最佳实践。)

    FlightCallOptions options;
    options.timeout = TimeoutDuration{0.2};
    Status status = client->GetFlightInfo(options, FlightDescriptor{}).status();
    
    Iterator<Result> results = client.doAction(new Action("hang"), CallOptions.timeout(0.2, TimeUnit.SECONDS));
    
    options = pyarrow.flight.FlightCallOptions(timeout=0.2)
    result = client.do_action(action, options=options)
    
  3. 客户端超时不适用于长时间运行的流式调用,在这种情况下,可能很难为整个操作选择超时。 相反,通常需要的是每次读取或每次写入超时,以便操作在没有进展时失败。 这可以通过使用后台线程在计时器上调用 Cancel() 来实现,主线程在每次操作成功完成时重置计时器。 有关完整的示例,请参阅 Cookbook。

    注意

    长期以来,一直有一个关于每次写入/每次读取超时而不是每次调用超时的票证 (ARROW-6062),但这(很容易)无法使用阻塞 gRPC API 实现。