Flight SQL 驱动程序¶
适用于: C/C++、GLib/Ruby、Go、Java、Python、R
Flight SQL 驱动程序提供对任何实现 Arrow Flight SQL 兼容端点的数据库的访问。
安装¶
对于 conda-forge 用户
mamba install libadbc-driver-flightsql
go get github.com/apache/arrow-adbc/go/adbc
添加对 org.apache.arrow.adbc:adbc-driver-flight-sql
的依赖。
对于 Maven 用户
<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-flight-sql</artifactId>
</dependency>
# For conda-forge
mamba install adbc-driver-flightsql
# For pip
pip install adbc_driver_flightsql
# install.packages("pak")
pak::pak("apache/arrow-adbc/r/adbcflightsql")
用法¶
要连接到数据库,请在构造 AdbcDatabase
时提供“uri”参数。
#include "arrow-adbc/adbc.h"
// Ignoring error handling
struct AdbcDatabase database;
AdbcDatabaseNew(&database, nullptr);
AdbcDatabaseSetOption(&database, "driver", "adbc_driver_flightsql", nullptr);
AdbcDatabaseSetOption(&database, "uri", "grpc://localhost:8080", nullptr);
AdbcDatabaseInit(&database, nullptr);
注意
有关详细示例,请参阅Flight SQL 秘籍。
from adbc_driver_flightsql import DatabaseOptions
from adbc_driver_flightsql.dbapi import connect
headers = {"foo": "bar"}
with connect(
"grpc+tls://localhost:8080",
db_kwargs={
DatabaseOptions.AUTHORIZATION_HEADER.value: "Bearer <token>",
DatabaseOptions.TLS_SKIP_VERIFY.value: "true",
**{
f"{DatabaseOptions.RPC_CALL_HEADER_PREFIX.value}{k}": v
for k, v in headers.items()
},
}
) as conn:
pass
秘籍来源: example_usage_test.go
20// Tests that use the SQLite server example.
21
22package flightsql_test
23
24import (
25 "context"
26 "database/sql"
27 "fmt"
28 "log"
29
30 "github.com/apache/arrow-adbc/go/adbc"
31 drv "github.com/apache/arrow-adbc/go/adbc/driver/flightsql"
32 "github.com/apache/arrow-go/v18/arrow/array"
33 "github.com/apache/arrow-go/v18/arrow/flight"
34 "github.com/apache/arrow-go/v18/arrow/flight/flightsql"
35 sqlite "github.com/apache/arrow-go/v18/arrow/flight/flightsql/example"
36 "github.com/apache/arrow-go/v18/arrow/memory"
37 _ "modernc.org/sqlite"
38)
39
40var headers = map[string]string{"foo": "bar"}
41
42func FlightSQLExample(uri string) error {
43 ctx := context.Background()
44 options := map[string]string{
45 adbc.OptionKeyURI: uri,
46 }
47
48 for k, v := range headers {
49 options[drv.OptionRPCCallHeaderPrefix+k] = v
50 }
51
52 var alloc memory.Allocator
53 drv := drv.NewDriver(alloc)
54 db, err := drv.NewDatabase(options)
55 if err != nil {
56 return fmt.Errorf("failed to open database: %s\n", err.Error())
57 }
58 defer db.Close()
59
60 cnxn, err := db.Open(ctx)
61 if err != nil {
62 return fmt.Errorf("failed to open connection: %s", err.Error())
63 }
64 defer cnxn.Close()
65
66 stmt, err := cnxn.NewStatement()
67 if err != nil {
68 return fmt.Errorf("failed to create statement: %s", err.Error())
69 }
70 defer stmt.Close()
71
72 if err = stmt.SetSqlQuery("SELECT 1 AS theresult"); err != nil {
73 return fmt.Errorf("failed to set query: %s", err.Error())
74 }
75
76 reader, _, err := stmt.ExecuteQuery(ctx)
77 if err != nil {
78 return fmt.Errorf("failed to execute query: %s", err.Error())
79 }
80 defer reader.Release()
81
82 for reader.Next() {
83 arr, ok := reader.Record().Column(0).(*array.Int64)
84 if !ok {
85 return fmt.Errorf("result data was not int64")
86 }
87 for i := 0; i < arr.Len(); i++ {
88 if arr.IsNull(i) {
89 fmt.Println("theresult: NULL")
90 } else {
91 fmt.Printf("theresult: %d\n", arr.Value(i))
92 }
93 }
94 }
95
96 return nil
97}
98
99func Example() {
100 // For this example we will spawn the Flight SQL server ourselves.
101
102 // Create a new database that isn't tied to any other databases that
103 // may be in process.
104 db, err := sql.Open("sqlite", "file:example_in_memory?mode=memory")
105 if err != nil {
106 log.Fatal(err)
107 }
108 defer db.Close()
109
110 srv, err := sqlite.NewSQLiteFlightSQLServer(db)
111 if err != nil {
112 log.Fatal(err)
113 }
114
115 server := flight.NewServerWithMiddleware(nil)
116 server.RegisterFlightService(flightsql.NewFlightServer(srv))
117 err = server.Init("localhost:8080")
118 if err != nil {
119 log.Fatal(err)
120 }
121
122 go func() {
123 if err := server.Serve(); err != nil {
124 log.Fatal(err)
125 }
126 }()
127
128 uri := fmt.Sprintf("grpc://%s", server.Addr().String())
129 if err := FlightSQLExample(uri); err != nil {
130 log.Printf("Error: %s\n", err.Error())
131 }
132
133 server.Shutdown()
134
135 // Output:
136 // theresult: 1
137}
theresult: 1
支持的特性¶
Flight SQL 驱动程序通常支持 ADBC API 规范 1.0.0 中定义的特性,以及一些额外的自定义选项。
警告
Java 驱动程序不支持此处的所有选项。请参阅 issue #745。
身份验证¶
默认情况下,驱动程序不进行身份验证。该驱动程序实现了一些可选的身份验证方案
相互 TLS (mTLS):请参阅下面的“客户端选项”。
模仿 Arrow Flight SQL JDBC 驱动程序的 HTTP 风格方案。
在
AdbcDatabase
上设置选项username
和password
。或者,设置选项adbc.flight.sql.authorization_header
以进行完全控制。客户端提供凭据,从客户端向服务器发送一个
authorization
。然后,服务器在第一个请求上以authorization
标头响应。然后,此标头的值将作为authorization
标头发送回所有未来的请求。
批量摄取¶
Flight SQL 没有用于将 Arrow 数据批量摄取到给定表中的专用 API。因此,该驱动程序目前不实现批量摄取。
客户端选项¶
可以自定义用于创建 Flight RPC 客户端的选项。
注意
其中许多选项只是包装了一个 gRPC 选项。有关这些选项作用的更多详细信息,请查阅 gRPC 文档。
adbc.flight.sql.client_option.authority
覆盖 gRPC 的
:authority
伪标头。adbc.flight.sql.client_option.mtls_cert_chain
用于 mTLS 的证书链。
Python:
adbc_driver_flightsql.DatabaseOptions.MTLS_CERT_CHAIN
adbc.flight.sql.client_option.mtls_private_key
用于 mTLS 的私钥。
Python:
adbc_driver_flightsql.DatabaseOptions.MTLS_PRIVATE_KEY
adbc.flight.sql.client_option.tls_override_hostname
覆盖用于验证服务器 TLS 证书的主机名。
Python:
adbc_driver_flightsql.DatabaseOptions.TLS_OVERRIDE_HOSTNAME
adbc.flight.sql.client_option.tls_root_certs
覆盖用于验证服务器 TLS 证书的根证书。
Python:
adbc_driver_flightsql.DatabaseOptions.TLS_ROOT_CERTS
adbc.flight.sql.client_option.tls_skip_verify
禁用服务器 TLS 证书的验证。值应为
true
或false
。Python:
adbc_driver_flightsql.DatabaseOptions.TLS_SKIP_VERIFY
adbc.flight.sql.client_option.with_block
警告
此选项已弃用,因为 gRPC 本身已弃用底层选项。
此选项无效,将在未来的版本中删除。值应为
true
或false
。adbc.flight.sql.client_option.with_max_msg_size
从服务器接受的最大消息大小。由于 Flight 服务倾向于返回较大的响应负载,因此驱动程序默认为 16 MiB。应为正整数的字节数。
Python:
adbc_driver_flightsql.DatabaseOptions.WITH_MAX_MSG_SIZE
adbc.flight.sql.authorization_header
直接指定在所有请求上发送的
authorization
标头的值。Python:
adbc_driver_flightsql.DatabaseOptions.AUTHORIZATION_HEADER
adbc.flight.sql.rpc.with_cookie_middleware
启用或禁用处理和处理从服务器返回的“set-cookie”元数据标头,并将“Cookie”标头从客户端发送回来的中间件。值应为
true
或false
。默认为false
。Python:
adbc_driver_flightsql.DatabaseOptions.WITH_COOKIE_MIDDLEWARE
自定义调用标头¶
可以通过应用于 AdbcDatabase
、AdbcConnection
和 AdbcStatement
的选项将自定义 HTTP 标头附加到请求。
adbc.flight.sql.rpc.call_header.<HEADER NAME>
将标头
<HEADER NAME>
添加到具有给定值的传出请求。警告
标头名称必须全部为小写。
分布式结果集¶
驱动程序将获取服务器返回的所有分区 (FlightEndpoints),顺序不指定(请注意,Flight SQL 本身未定义这些分区的排序)。如果一个 endpoint 没有 locations,数据将使用原始服务器连接获取。否则,驱动程序将按顺序尝试给定的每个 location,直到请求成功。如果连接或请求失败,它将尝试下一个 location。
驱动程序目前不缓存或池化这些辅助连接。它也不会重试连接或请求。
所有分区都是并行获取的。每个分区都会排队有限数量的批次。数据按分区顺序返回给客户端。
某些行为可以在 AdbcStatement
上配置
adbc.rpc.result_queue_size
每个分区要排队的批次数。默认为 5。
增量执行¶
通过设置 ADBC_STATEMENT_OPTION_INCREMENTAL
,您可以使用此驱动程序的非阻塞执行。这仅更改 AdbcStatementExecutePartitions()
的行为。启用后,ExecutePartitions 每次从服务器获得新分区时(用 Flight SQL 术语来说,当有新的 FlightEndpoints 时)都会返回,而不是阻塞直到查询完成。
某些行为可以在 AdbcStatement
上配置
adbc.flight.sql.statement.exec.last_flight_info
获取服务返回的最新
FlightInfo
的序列化字节。这是一个用于高级用法的基础选项。当启用增量执行时,它对于检查最新服务器响应而不等待AdbcStatementExecutePartitions()
返回时最有用。Python:
adbc_driver_flightsql.StatementOptions.LAST_FLIGHT_INFO
元数据¶
驱动程序当前不会在 AdbcConnectionGetObjects()
中填充列约束信息(外键、主键等)。此外,目录过滤器被评估为简单的字符串匹配,而不是 LIKE
样式的模式。
分区结果集¶
Flight SQL 驱动程序支持 ADBC 的分区结果集。当被请求时,结果集的每个分区都包含一个序列化的 FlightInfo,其中包含原始响应的 FlightEndpoints 之一。可能希望内省分区的客户端可以通过从 ADBC 分区反序列化包含的 FlightInfo 来做到这一点。(例如,希望在多个工作节点或机器上分配工作的客户端可能希望尝试利用 ADBC 没有的本地信息。)
会话¶
驱动程序通过连接上的选项公开 Flight SQL 会话支持。没有启动新会话的显式命令;预计服务器本身会管理此操作。(您很可能需要启用如上所述的 cookie 支持。)没有关闭会话的显式命令;这总是在关闭连接时发出。
adbc.flight.sql.session.options
将所有选项作为 JSON blob 获取。
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTIONS
adbc.flight.sql.session.option.
获取或设置字符串/数值会话选项。
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTION_PREFIX
adbc.flight.sql.session.optionerase.
擦除会话选项。
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_ERASE_SESSION_OPTION_PREFIX
adbc.flight.sql.session.optionbool.
获取或设置布尔会话选项。
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_BOOL_SESSION_OPTION_PREFIX
adbc.flight.sql.session.optionstringlist.
获取或设置字符串列表会话选项。内容应该是序列化的 JSON 列表。
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_STRING_LIST_SESSION_OPTION_PREFIX
超时¶
默认情况下,超时不用于 RPC 调用。它们可以通过 AdbcConnection
上的特殊选项进行设置。一般来说,最佳实践是设置超时以避免意外卡住。选项如下
adbc.flight.sql.rpc.timeout_seconds.fetch
任何获取数据的 API 调用的超时(以浮点秒为单位)。这对应于 Flight
DoGet
调用。例如,这控制着底层 Flight 调用的超时,这些调用会在使用结果集时获取更多数据。
Python:
adbc_driver_flightsql.ConnectionOptions.TIMEOUT_FETCH
adbc.flight.sql.rpc.timeout_seconds.query
任何执行查询的 API 调用的超时(以浮点秒为单位)。这对应于 Flight
GetFlightInfo
调用。例如,这控制着实现
AdbcStatementExecuteQuery()
的底层 Flight 调用的超时。Python:
adbc_driver_flightsql.ConnectionOptions.TIMEOUT_QUERY
adbc.flight.sql.rpc.timeout_seconds.update
任何上传数据或执行其他更新的 API 调用的超时(以浮点秒为单位)。
例如,这控制着实现批量摄取或事务支持的底层 Flight 调用的超时。
Python:
adbc_driver_flightsql.ConnectionOptions.TIMEOUT_UPDATE
还有一个在 AdbcDatabase
上设置的超时
adbc.flight.sql.rpc.timeout_seconds.connect
建立连接的超时(以浮点秒为单位)。 默认值为 20 秒。
事务¶
驱动程序支持事务。它将首先检查服务器的 SqlInfo 以确定是否支持此功能。否则,与事务相关的 ADBC API 将返回 ADBC_STATUS_NOT_IMPLEMENTED
。