介绍 Apache Arrow Flight SQL:加速数据库访问


发布日期 2022年2月16日
作者 José Almeida, James Duong, Vinicius Fraga, Juscelino Junior, David Li, Kyle Porter, Rafael Telles

我们希望介绍 Flight SQL,这是一个由 Apache Arrow 社区开发的新型客户端-服务器协议,用于与 SQL 数据库交互,它利用了 Arrow 内存列式格式和 Flight RPC 框架。

Flight SQL 的目标是提供与现有 API(如 JDBC 和 ODBC)大致相似的功能,包括执行查询;创建预处理语句;以及获取有关支持的 SQL 方言、可用类型、已定义表等的元数据。然而,通过构建在 Apache Arrow 之上,Flight SQL 使客户端能够轻松地与 Arrow 原生数据库进行通信,而无需转换数据。通过使用 Flight,它提供了一种高效的线格式实现,支持加密和身份验证等开箱即用功能,同时允许进一步的优化,如并行数据访问。

虽然它可以直接用于数据库访问,但它不是 JDBC/ODBC 的直接替代品。相反,Flight SQL 充当了一种具体的线协议/驱动实现,可以支持 JDBC/ODBC 驱动,并减轻数据库的实现负担。

Illustration of where Flight SQL sits in the stack. JDBC and ODBC drivers can wrap Flight SQL, or an Arrow-native application can directly use the Flight SQL libraries. Flight SQL in turn talks over Arrow Flight to a database exposing a Flight SQL endpoint.

动机

虽然 JDBCODBC 等标准已经很好地服务了用户数十年,但对于希望使用 Apache Arrow 或通用列式数据的数据库和客户端来说,它们存在不足。像 JDBC 或 PEP 249 这样的基于行的 API 在这种情况下需要转置数据,而对于本身就是列式的数据库来说,这意味着数据必须转置两次——一次是为了以行的形式呈现给 API,一次是为了将其重新转换为列式供消费者使用。同时,虽然像 ODBC 这样的 API 确实提供了对结果缓冲区的批量访问,但这些数据仍必须复制到 Arrow 数组中,以便与更广泛的 Arrow 生态系统一起使用,例如由 Turbodbc 等项目实现的功能。Flight SQL 的目标是消除这些中间步骤。

Flight SQL 意味着数据库服务器可以实现一个从一开始就围绕 Apache Arrow 和列式数据设计的标准接口。就像 Arrow 提供了一种标准的内存格式一样,Flight SQL 使开发人员无需设计和实现全新的线协议。如前所述,Flight 已经实现了诸如线上传输加密和请求认证等功能,数据库无需重新实现这些功能。

对于客户端而言,Flight SQL 提供了对查询结果的批量访问,而无需从其他 API 或格式转换数据。此外,通过将实现线协议的工作推送到 Flight 和 Flight SQL 库中,可以减少每种客户端语言或驱动需要编写的代码。而且,通过底层使用 Flight,客户端和服务器可以协作实现诸如并行数据访问等优化,这是 Flight 本身最初的目标之一。数据库可以向 Flight SQL 客户端返回多个“端点”,然后客户端可以从所有这些端点并行拉取数据,从而使数据库后端能够横向扩展。

Flight SQL 基础

Flight SQL 充分利用了 Flight RPC 框架及其可扩展性,通过 Protobuf 定义了额外的请求/响应消息。我们将简要介绍一下 Flight SQL 协议,但 C++ 和 Java 已经实现了管理大部分工作的客户端。完整的协议可以在 GitHub 上找到。

大多数请求遵循以下模式

  1. 客户端使用定义的 Protobuf 消息之一构造请求。
  2. 客户端通过 GetSchema RPC 方法(用于获取响应的 schema)或 GetFlightInfo RPC 方法(用于执行请求)发送请求。
  3. 客户端向 GetFlightInfo 返回的端点发出请求以获取响应。

Flight SQL 定义了用于查询数据库元数据、执行查询或操作预处理语句的方法。

元数据请求

  • CommandGetCatalogs: 列出数据库中的 catalog。
  • CommandGetCrossReference: 列出引用特定其他表的外部键列。
  • CommandGetDbSchemas: 列出 catalog 中的 schema。
  • CommandGetExportedKeys: 列出引用某表的外部键。
  • CommandGetImportedKeys: 列出某表的外部键。
  • CommandGetPrimaryKeys: 列出某表的主键。
  • CommandGetSqlInfo: 获取有关数据库本身及其支持的 SQL 方言的信息。
  • CommandGetTables: 列出 catalog/schema 中的表。
  • CommandGetTableTypes: 列出支持的表类型(例如 table、view、system table)。

查询

  • CommandStatementQuery: 执行一次性 SQL 查询。
  • CommandStatementUpdate: 执行一次性 SQL 更新查询。

预处理语句

  • ActionClosePreparedStatementRequest: 关闭预处理语句。
  • ActionCreatePreparedStatementRequest: 创建新的预处理语句。
  • CommandPreparedStatementQuery: 执行预处理语句。
  • CommandPreparedStatementUpdate: 执行更新数据的预处理语句。

例如,列出所有表

Sequence diagram showing how to use CommandGetTables. First, the client calls the GetFlightInfo RPC method with a serialized CommandGetTables message as the argument. The server returns a FlightInfo message containing a Ticket message. The client then calls the DoGet RPC method with the Ticket as the argument, and gets back a stream of Arrow record batches containing the tables in the database.

执行查询

Sequence diagram showing how to use CommandStatementQuery. First, the client calls the GetFlightInfo RPC method with a serialized CommandStatementQuery message as the argument. This message contains the SQL query. The server returns a FlightInfo message containing a Ticket message. The client then calls the DoGet RPC method with the Ticket as the argument, and gets back a stream of Arrow record batches containing the query results.

创建并执行预处理语句以插入行

Sequence diagram showing how to use ActionCreatePreparedStatementResult. First, the client calls the DoAction RPC method with a serialized ActionCreatePreparedStatementResult message as the argument. This message contains the SQL query. The server returns a serialized ActionCreatePreparedStatementResult message containing an opaque handle for the prepared statement. The client then calls the DoPut RPC method with a CommandPreparedStatementUpdate message, containing the opaque handle, as the argument, and uploads a stream of Arrow record batches containing query parameters. The server responds with a serialized DoPutUpdateResult message containing the number of affected rows. Finally, the client calls DoAction again with ActionClosePreparedStatementRequest to clean up the prepared statement.

入门

请注意,虽然 Flight SQL 作为 Apache Arrow 7.0.0 的一部分发布,但它仍在开发中,详细文档即将推出。然而,C++ 和 Java 中已提供了实现,其中包含可用的低级客户端以及可实现的服务器骨架。

对于感兴趣的人,源代码中提供了一个封装 Apache Derby 的服务器实现一个封装 SQLite 的服务器实现。此外,还提供了一个演示客户端的简单 CLI。最后,我们可以看一个执行查询和获取结果的简短示例。

flight::FlightCallOptions call_options;

// Execute the query, getting a FlightInfo describing how to fetch the results
std::cout << "Executing query: '" << FLAGS_query << "'" << std::endl;
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<flight::FlightInfo> flight_info,
                      client->Execute(call_options, FLAGS_query));

// Fetch each partition sequentially (though this can be done in parallel)
for (const flight::FlightEndpoint& endpoint : flight_info->endpoints()) {
  // Here we assume each partition is on the same server we originally queried, but this
  // isn't true in general: the server may split the query results between multiple
  // other servers, which we would have to connect to.

  // The "ticket" in the endpoint is opaque to the client. The server uses it to
  // identify which part of the query results to return.
  ARROW_ASSIGN_OR_RAISE(auto stream, client->DoGet(call_options, endpoint.ticket));
  // Read all results into an Arrow Table, though we can iteratively process record
  // batches as they arrive as well
  std::shared_ptr<arrow::Table> table;
  ARROW_RETURN_NOT_OK(stream->ReadAll(&table));
  std::cout << "Read one partition:" << std::endl;
  std::cout << table->ToString() << std::endl;
}

完整源代码可在 GitHub 上获取

后续计划与参与方式

与 PyODBC 等现有库相比,Arrow Flight 的速度已快达 20 倍(约 00:21:00)。Flight SQL 将把这些性能优势打包成一个标准接口,供客户端和数据库实现。

预计将进行进一步的协议改进和扩展。其中一些工作是为了使在 Flight SQL 之上实现 JDBC 等 API 成为可能;目前正在积极开发 JDBC 驱动。虽然这会再次引入数据转换的开销,但它意味着数据库通过实现 Flight SQL,可以同时服务于 Arrow 原生客户端和传统客户端。未来的其他改进可能包括 Python 绑定、ODBC 驱动等。

对于有兴趣参与的人,无论是作为贡献者还是采用者,请通过邮件列表联系,或加入 GitHub 上的讨论。