Arrow Flight RPC#
Arrow Flight 是一个用于在网络上高效传输 Flight 数据的 RPC 框架。
另请参阅
- Flight 协议文档
Flight 协议的文档,包括如何概念性地使用 Flight。
- Flight API 文档
列出所有各种客户端和服务器类型的 C++ API 文档。
- C++ 指南
在 C++ 中使用 Arrow Flight 的方案。
编写 Flight 服务#
服务器是 arrow::flight::FlightServerBase 的子类。要实现各个 RPC,请覆盖此类中的 RPC 方法。
class MyFlightServer : public FlightServerBase {
Status ListFlights(const ServerCallContext& context, const Criteria* criteria,
std::unique_ptr<FlightListing>* listings) override {
std::vector<FlightInfo> flights = ...;
*listings = std::unique_ptr<FlightListing>(new SimpleFlightListing(flights));
return Status::OK();
}
};
每个 RPC 方法总是接受一个用于通用参数的 arrow::flight::ServerCallContext,并返回一个指示成功或失败的 arrow::Status。Flight 特定的错误代码可以通过 arrow::flight::MakeFlightError() 返回。
除了状态外还需要返回值的 RPC 方法将使用输出参数,如上所示。通常,有一些辅助类提供这些输出参数的基本实现。例如,上面提到的 arrow::flight::SimpleFlightListing 使用 arrow::flight::FlightInfo 对象的向量作为 ListFlights RPC 的结果。
要启动服务器,请创建一个 arrow::flight::Location 来指定监听位置,并调用 arrow::flight::FlightServerBase::Init()。这将启动服务器,但不会阻塞程序的其余部分。使用 arrow::flight::FlightServerBase::SetShutdownOnSignals() 可以在接收到中断信号时启用服务器停止功能,然后调用 arrow::flight::FlightServerBase::Serve() 进行阻塞,直到服务器停止。
std::unique_ptr<arrow::flight::FlightServerBase> server;
// Initialize server
arrow::flight::Location location;
// Listen to all interfaces on a free port
ARROW_CHECK_OK(arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0, &location));
arrow::flight::FlightServerOptions options(location);
// Start the server
ARROW_CHECK_OK(server->Init(options));
// Exit with a clean error code (0) on SIGTERM
ARROW_CHECK_OK(server->SetShutdownOnSignals({SIGTERM}));
std::cout << "Server listening on localhost:" << server->port() << std::endl;
ARROW_CHECK_OK(server->Serve());
使用 Flight 客户端#
要连接到 Flight 服务,请通过调用 Connect 创建一个 arrow::flight::FlightClient 实例。
每个 RPC 方法都返回一个 arrow::Result 以指示请求的成功/失败,如果请求成功,则包含结果对象。有些调用是流式调用,因此它们会返回读取器和/或写入器对象;只有在流完成时才知道最终的调用状态。
取消和超时#
进行调用时,客户端可以选择提供 FlightCallOptions。这允许客户端设置调用超时或提供自定义 HTTP 标头等功能。此外,客户端 RPC 调用返回的一些对象公开了一个 Cancel 方法,允许提前终止调用。
在服务器端,不需要额外的代码来实现超时。对于取消,服务器需要手动轮询 ServerCallContext::is_cancelled 来检查客户端是否已取消调用,如果是,则中断服务器当前正在进行的任何处理。
启用 TLS#
在设置服务器时,可以通过向 FlightServerBase::Init 提供证书和密钥对来启用 TLS。
在客户端,使用 Location::ForGrpcTls 来构建用于监听的 arrow::flight::Location。
启用身份验证#
警告
如果没有启用 TLS,身份验证是不安全的。
可以通过实现 ServerAuthHandler 并在构建期间将其提供给服务器来启用基于握手的身份验证。
身份验证包括两个部分:在初始客户端连接时,服务器和客户端身份验证实现可以执行所需的任何协商。然后,客户端身份验证处理程序提供一个将被附加到后续调用的令牌。这是通过使用所需的客户端身份验证实现调用 Authenticate 来完成的。
在此之后的每次 RPC 中,客户端处理程序的令牌会自动添加到请求标头中的调用中。服务器身份验证处理程序验证该令牌并提供客户端的身份。在服务器上,此身份可以从 arrow::flight::ServerCallContext 中获取。
自定义中间件#
服务器和客户端支持在每个请求上调用的自定义中间件(或拦截器),并且可以以有限的方式修改请求。这些可以通过子类化 ServerMiddleware 和 ClientMiddleware,然后在创建客户端或服务器时提供它们来实现。
中间件功能相当有限,但它们可以向请求/响应添加头部。在服务器上,它们可以检查传入的头部并使请求失败;因此,它们可以用于实现自定义身份验证方法。
最佳实践#
gRPC#
使用默认的 gRPC 传输时,可以通过 arrow::flight::FlightClientOptions::generic_options 将选项传递给它。例如
auto options = FlightClientOptions::Defaults();
// Set the period after which a keepalive ping is sent on transport.
options.generic_options.emplace_back(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);
# Set the period after which a keepalive ping is sent on transport.
generic_options = [("GRPC_ARG_KEEPALIVE_TIME_MS", 60000)]
client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)
尽可能重用客户端#
创建和关闭客户端需要在客户端和服务器端进行设置和拆卸,这会占用处理 RPC 的资源。尽可能重用客户端以避免这种情况。请注意,客户端是线程安全的,因此单个客户端可以在多个线程之间共享。
不要使用轮询负载均衡#
轮询负载均衡意味着每个客户端都可以与每个服务器建立开放连接,从而导致意外数量的开放连接并耗尽服务器资源。
调试连接问题#
当在长时间运行的连接上遇到意外断开连接时,请使用 netstat 监控开放连接的数量。如果连接数量远大于客户端数量,可能会导致问题。
为了调试,某些环境变量可以启用 gRPC 中的日志记录。例如,env GRPC_VERBOSITY=info GRPC_TRACE=http 将打印初始标头(在双方),以便您可以查看 gRPC 是否建立了连接。它还会在发送消息时打印,以便您可以判断连接是否打开。
gRPC 可能在实际发出调用之前不会报告连接错误。因此,要在创建客户端时检测连接错误,应该进行某种虚拟 RPC。
内存管理#
Flight 尝试重用 gRPC 所做的分配,以避免冗余数据拷贝。然而,经验表明,此类数据经常未对齐。某些用例可能要求数据具有特定于数据类型的对齐方式(例如,Int32 数组的数据缓冲区要在 4 字节边界上对齐),这可以通过将 arrow::ipc::IpcReadOptions::ensure_alignment 设置为 arrow::ipc::Alignment::kDataTypeSpecificAlignment 来强制执行。这使用 arrow::ipc::IpcReadOptions::memory_pool 来分配具有对齐地址的内存,但仅针对未对齐的数据。然而,这会为您通过 Flight 接收到的数据创建数据副本。
除非如上所述拷贝 gRPC 数据,否则 gRPC 所做的分配可能不会被 Arrow 内存池跟踪,并且内存使用行为(例如是否将空闲内存返回给系统)取决于 gRPC 使用的分配器(通常是系统分配器)。
一种快速测试方法:使用调试器附加到进程并调用 malloc_trim,或在系统池上调用 ReleaseUnused。如果内存使用量下降,则很可能是系统分配器保留了由 gRPC 或应用程序分配的内存。这可以通过特定于平台的方式进行调整;请参阅 ARROW-16697 中的调查,了解这在 Linux/glibc 上是如何工作的示例。可以显式地告知 glibc malloc 转储缓存。
流量过大#
对于并发客户端,gRPC 将生成最多达到线程配额的线程。这些线程不一定会清理(Java 术语中的“缓存线程池”)。glibc malloc 会清除一些每个线程的状态,并且默认调整在某些工作负载下从不清除缓存。
gRPC 的默认行为允许一台服务器接受来自许多不同客户端的许多连接,但如果请求做了大量工作(在 Flight 下可能如此),服务器可能无法跟上。将客户端配置为进行退避重试(并可能连接到不同的节点),将提供更一致的服务质量。
auto options = FlightClientOptions::Defaults();
// Set the minimum time between subsequent connection attempts.
options.generic_options.emplace_back(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000);
# Set the minimum time between subsequent connection attempts.
generic_options = [("GRPC_ARG_MIN_RECONNECT_BACKOFF_MS", 2000)]
client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)
限制 DoPut 批处理大小#
您可能希望限制客户端通过 DoPut 提交给服务器的最大批处理大小,以防止请求占用服务器上过多的内存。在客户端,设置 arrow::flight::FlightClientOptions::write_size_limit_bytes。在服务器端,设置 gRPC 选项 GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH。客户端选项将返回一个可以尝试用更小批次重试的错误,而服务器端限制将关闭连接。设置两者可能是明智的,因为前者提供更好的用户体验,但后者对于抵御不礼貌的客户端可能是必要的。
关闭无响应的连接#
可以使用
arrow::flight::FlightCallOptions::stop_token关闭陈旧的调用。这需要在建立调用时记录停止令牌。StopSource stop_source; FlightCallOptions options; options.stop_token = stop_source.token(); stop_source.RequestStop(Status::Cancelled("StopSource")); flight_client->DoAction(options, {});
使用调用超时。(这是 gRPC 的一般最佳实践。)
FlightCallOptions options; options.timeout = TimeoutDuration{0.2}; Status status = client->GetFlightInfo(options, FlightDescriptor{}).status();
Iterator<Result> results = client.doAction(new Action("hang"), CallOptions.timeout(0.2, TimeUnit.SECONDS));
options = pyarrow.flight.FlightCallOptions(timeout=0.2) result = client.do_action(action, options=options)
客户端超时对于长时间运行的流式调用来说不太好,因为很难为整个操作选择一个超时。相反,通常需要的是单次读取或单次写入超时,以便如果操作没有进展,则操作失败。这可以通过一个后台线程来实现,该线程在计时器上调用 Cancel(),主线程在每次操作成功完成时重置计时器。有关完整的示例,请参阅 Cookbook。
注意
有一个关于单次写入/单次读取超时而不是单次调用超时的长期存在的工单(ARROW-6062),但使用阻塞 gRPC API 无法(轻松)实现这一点。