Flight SQL 驱动程序¶
语言:Go 状态:稳定
Flight SQL 驱动程序提供对实现 Arrow Flight SQL 兼容端点的任何数据库的访问。
安装¶
从 conda-forge 安装 libadbc-driver-flightsql
mamba install libadbc-driver-flightsql
安装 C/C++ 驱动程序,然后使用 Go 驱动程序管理器。需要 CGO。
go get github.com/apache/arrow-adbc/go/adbc/drivermgr
从 conda-forge 安装 adbc-driver-flightsql
mamba install adbc-driver-flightsql
从 PyPI 安装 adbc-driver-flightsql
pip install adbc-driver-flightsql
从 R-multiverse 安装 adbcflightsql
install.packages("adbcflightsql", repos = "https://community.r-multiverse.org")
此外,该驱动程序可以通过驱动程序管理器从 C/C++、C#、GLib、Go、R、Ruby 和 Rust 使用。
用法¶
要连接到数据库,在构造 AdbcDatabase 时提供“uri”参数。
#include "arrow-adbc/adbc.h"
// Ignoring error handling
struct AdbcDatabase database;
AdbcDatabaseNew(&database, nullptr);
AdbcDatabaseSetOption(&database, "driver", "adbc_driver_flightsql", nullptr);
AdbcDatabaseSetOption(&database, "uri", "grpc://:8080", nullptr);
AdbcDatabaseInit(&database, nullptr);
注意
有关详细示例,请参阅 Flight SQL 食谱。
from adbc_driver_flightsql import DatabaseOptions
from adbc_driver_flightsql.dbapi import connect
headers = {"foo": "bar"}
with connect(
"grpc+tls://:8080",
db_kwargs={
DatabaseOptions.AUTHORIZATION_HEADER.value: "Bearer <token>",
DatabaseOptions.TLS_SKIP_VERIFY.value: "true",
**{
f"{DatabaseOptions.RPC_CALL_HEADER_PREFIX.value}{k}": v
for k, v in headers.items()
},
}
) as conn:
pass
食谱来源: example_usage_test.go
20// Tests that use the SQLite server example.
21
22package flightsql_test
23
24import (
25 "context"
26 "database/sql"
27 "errors"
28 "fmt"
29 "log"
30
31 "github.com/apache/arrow-adbc/go/adbc"
32 drv "github.com/apache/arrow-adbc/go/adbc/driver/flightsql"
33 "github.com/apache/arrow-go/v18/arrow/array"
34 "github.com/apache/arrow-go/v18/arrow/flight"
35 "github.com/apache/arrow-go/v18/arrow/flight/flightsql"
36 sqlite "github.com/apache/arrow-go/v18/arrow/flight/flightsql/example"
37 "github.com/apache/arrow-go/v18/arrow/memory"
38 _ "modernc.org/sqlite"
39)
40
41var headers = map[string]string{"foo": "bar"}
42
43func FlightSQLExample(uri string) (err error) {
44 ctx := context.Background()
45 options := map[string]string{
46 adbc.OptionKeyURI: uri,
47 }
48
49 for k, v := range headers {
50 options[drv.OptionRPCCallHeaderPrefix+k] = v
51 }
52
53 var alloc memory.Allocator
54 drv := drv.NewDriver(alloc)
55 db, err := drv.NewDatabase(options)
56 if err != nil {
57 return fmt.Errorf("failed to open database: %s\n", err.Error())
58 }
59 defer func() {
60 err = errors.Join(err, db.Close())
61 }()
62
63 cnxn, err := db.Open(ctx)
64 if err != nil {
65 return fmt.Errorf("failed to open connection: %s", err.Error())
66 }
67 defer func() {
68 err = errors.Join(err, cnxn.Close())
69 }()
70
71 stmt, err := cnxn.NewStatement()
72 if err != nil {
73 return fmt.Errorf("failed to create statement: %s", err.Error())
74 }
75 defer func() {
76 err = errors.Join(err, stmt.Close())
77 }()
78
79 if err = stmt.SetSqlQuery("SELECT 1 AS theresult"); err != nil {
80 return fmt.Errorf("failed to set query: %s", err.Error())
81 }
82
83 reader, _, err := stmt.ExecuteQuery(ctx)
84 if err != nil {
85 return fmt.Errorf("failed to execute query: %s", err.Error())
86 }
87 defer reader.Release()
88
89 for reader.Next() {
90 arr, ok := reader.RecordBatch().Column(0).(*array.Int64)
91 if !ok {
92 return fmt.Errorf("result data was not int64")
93 }
94 for i := 0; i < arr.Len(); i++ {
95 if arr.IsNull(i) {
96 fmt.Println("theresult: NULL")
97 } else {
98 fmt.Printf("theresult: %d\n", arr.Value(i))
99 }
100 }
101 }
102
103 return nil
104}
105
106func Example() {
107 // For this example we will spawn the Flight SQL server ourselves.
108
109 // Create a new database that isn't tied to any other databases that
110 // may be in process.
111 db, err := sql.Open("sqlite", "file:example_in_memory?mode=memory")
112 if err != nil {
113 log.Fatal(err)
114 }
115 defer func() {
116 err := db.Close()
117 if err != nil {
118 log.Fatal(err)
119 }
120 }()
121
122 srv, err := sqlite.NewSQLiteFlightSQLServer(db)
123 if err != nil {
124 log.Fatal(err)
125 }
126
127 server := flight.NewServerWithMiddleware(nil)
128 server.RegisterFlightService(flightsql.NewFlightServer(srv))
129 err = server.Init("localhost:8080")
130 if err != nil {
131 log.Fatal(err)
132 }
133
134 go func() {
135 if err := server.Serve(); err != nil {
136 log.Fatal(err)
137 }
138 }()
139
140 uri := fmt.Sprintf("grpc://%s", server.Addr().String())
141 if err := FlightSQLExample(uri); err != nil {
142 log.Printf("Error: %s\n", err.Error())
143 }
144
145 server.Shutdown()
146
147 // Output:
148 // theresult: 1
149}
theresult: 1
支持的功能¶
Flight SQL 驱动程序通常支持 ADBC API 规范 1.0.0 中定义的功能,以及一些额外的自定义选项。
警告
Java 驱动程序不支持此处的所有选项。请参阅 问题 #745。
认证¶
驱动程序默认不进行认证。驱动程序实现了几种可选的认证方案
相互 TLS (mTLS):请参阅下面的“客户端选项”。
一种模仿 Arrow Flight SQL JDBC 驱动程序的 HTTP 风格方案。
在
AdbcDatabase上设置选项username和password。或者,设置选项adbc.flight.sql.authorization_header以完全控制。客户端通过发送一个
authorization从客户端到服务器来提供凭据。服务器随后在第一个请求上以authorization标头响应。此标头的值将在所有后续请求中作为authorization标头发送回去。OAuth 2.0 认证流程。
客户端提供 配置 以允许客户端应用程序从授权服务器获取访问令牌。获取到的令牌随后将在所有后续请求中用于
authorization标头。
批量摄取¶
Flight SQL 没有专门用于将 Arrow 数据批量摄取到给定表的 API。因此,驱动程序目前没有实现批量摄取。
客户端选项¶
用于创建 Flight RPC 客户端的选项可以自定义。
注意
其中许多选项只是包装了 gRPC 选项。有关这些选项功能的更多详细信息,请查阅 gRPC 文档。
adbc.flight.sql.client_option.authority覆盖 gRPC 的
:authority伪标头。adbc.flight.sql.client_option.mtls_cert_chain用于 mTLS 的证书链。
Python:
adbc_driver_flightsql.DatabaseOptions.MTLS_CERT_CHAINadbc.flight.sql.client_option.mtls_private_key用于 mTLS 的私钥。
Python:
adbc_driver_flightsql.DatabaseOptions.MTLS_PRIVATE_KEYadbc.flight.sql.client_option.tls_override_hostname覆盖用于验证服务器 TLS 证书的主机名。
Python:
adbc_driver_flightsql.DatabaseOptions.TLS_OVERRIDE_HOSTNAMEadbc.flight.sql.client_option.tls_root_certs覆盖用于验证服务器 TLS 证书的根证书。
Python:
adbc_driver_flightsql.DatabaseOptions.TLS_ROOT_CERTSadbc.flight.sql.client_option.tls_skip_verify禁用服务器 TLS 证书的验证。值应为
true或false。Python:
adbc_driver_flightsql.DatabaseOptions.TLS_SKIP_VERIFYadbc.flight.sql.client_option.with_block警告
此选项已弃用,因为 gRPC 本身已弃用底层选项。
此选项无效,并将在未来版本中移除。值应为
true或false。adbc.flight.sql.client_option.with_max_msg_size接受来自服务器的最大消息大小。驱动程序默认设置为 16 MiB,因为 Flight 服务倾向于返回较大的响应负载。应为正整数字节数。
Python:
adbc_driver_flightsql.DatabaseOptions.WITH_MAX_MSG_SIZEadbc.flight.sql.authorization_header直接指定要在所有请求中发送的
authorization标头的值。Python:
adbc_driver_flightsql.DatabaseOptions.AUTHORIZATION_HEADERadbc.flight.sql.rpc.with_cookie_middleware启用或禁用处理和处理从服务器返回的“set-cookie”元数据标头并从客户端发送回“Cookie”标头的中间件。值应为
true或false。默认值为false。Python:
adbc_driver_flightsql.DatabaseOptions.WITH_COOKIE_MIDDLEWARE
自定义调用标头¶
自定义 HTTP 标头可以通过适用于 AdbcDatabase、AdbcConnection 和 AdbcStatement 的选项附加到请求中。
adbc.flight.sql.rpc.call_header.<HEADER NAME>将标头
<HEADER NAME>添加到传出请求中,并附带给定值。Python:
adbc_driver_flightsql.ConnectionOptions.RPC_CALL_HEADER_PREFIX警告
标头名称必须全部小写。
OAuth 2.0 选项¶
支持使用 OAuth 2.0 认证流程获取令牌的配置。
adbc.flight.sql.oauth.flow指定要使用的 OAuth 2.0 流程类型。可能的值:
client_credentials、token_exchangePython:
adbc_driver_flightsql.DatabaseOptions.OAUTH_FLOW、adbc_driver_flightsql.OAuthFlowTypeadbc.flight.sql.oauth.client_id授权服务器向客户端应用程序颁发的唯一标识符
Python:
adbc_driver_flightsql.DatabaseOptions.OAUTH_CLIENT_IDadbc.flight.sql.oauth.client_secret与 client_id 相关联的机密。用于向授权服务器认证客户端应用程序
Python:
adbc_driver_flightsql.DatabaseOptions.OAUTH_CLIENT_SECRETadbc.flight.sql.oauth.token_uri客户端应用程序从授权服务器请求令牌的端点 URL
Python:
adbc_driver_flightsql.DatabaseOptions.OAUTH_TOKEN_URIadbc.flight.sql.oauth.scope以空格分隔的权限列表,客户端正在请求访问这些权限(例如
"read.all offline_access")adbc.flight.sql.oauth.exchange.subject_token客户端应用程序希望交换的安全令牌
Python:
adbc_driver_flightsql.DatabaseOptions.OAUTH_EXCHANGE_SUBJECT_TOKENadbc.flight.sql.oauth.exchange.subject_token_type主体令牌类型的标识符。查看下面的列表以获取支持的令牌类型。
Python:
adbc_driver_flightsql.DatabaseOptions.OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE、adbc_driver_flightsql.OAuthTokenTypeadbc.flight.sql.oauth.exchange.actor_token表示行动方身份的安全令牌
Python:
adbc_driver_flightsql.DatabaseOptions.OAUTH_EXCHANGE_ACTOR_TOKENadbc.flight.sql.oauth.exchange.actor_token_type行动方令牌类型的标识符。查看下面的列表以获取支持的令牌类型。
Python:
adbc_driver_flightsql.DatabaseOptions.OAUTH_EXCHANGE_ACTOR_TOKEN_TYPE、adbc_driver_flightsql.OAuthTokenTypeadbc.flight.sql.oauth.exchange.aud请求的安全令牌的预期受众
Python:
adbc_driver_flightsql.DatabaseOptions.OAUTH_EXCHANGE_AUDadbc.flight.sql.oauth.exchange.resource客户端打算使用请求的安全令牌的资源服务器
Python:
adbc_driver_flightsql.DatabaseOptions.OAUTH_EXCHANGE_RESOURCEadbc.flight.sql.oauth.exchange.scope为新令牌请求的特定权限
Python:
adbc_driver_flightsql.DatabaseOptions.OAUTH_EXCHANGE_SCOPEadbc.flight.sql.oauth.exchange.requested_token_type客户端希望交换接收的令牌类型。查看下面的列表以获取支持的令牌类型。
Python:
adbc_driver_flightsql.DatabaseOptions.OAUTH_EXCHANGE_REQUESTED_TOKEN_TYPE、adbc_driver_flightsql.OAuthTokenType- 支持的令牌类型
urn:ietf:params:oauth:token-type:access_tokenurn:ietf:params:oauth:token-type:refresh_tokenurn:ietf:params:oauth:token-type:id_tokenurn:ietf:params:oauth:token-type:saml1urn:ietf:params:oauth:token-type:saml2urn:ietf:params:oauth:token-type:jwt
分布式结果集¶
驱动程序将以未指定的顺序获取服务器返回的所有分区(FlightEndpoints)(请注意,Flight SQL 本身并未定义这些分区的顺序)。如果一个端点没有位置,则将使用原始服务器连接获取数据。否则,驱动程序将按顺序尝试给定的每个位置,直到请求成功。如果连接或请求失败,它将尝试下一个位置。
驱动程序目前不缓存或池化这些辅助连接。它也不会重试连接或请求。
所有分区并行获取。每个分区都会排队有限数量的批次。数据按分区顺序返回给客户端。
某些行为可以在 AdbcStatement 上配置
adbc.rpc.result_queue_size每个分区排队的批次数量。默认为 5。
增量执行¶
通过设置 ADBC_STATEMENT_OPTION_INCREMENTAL,您可以对此驱动程序使用非阻塞执行。这仅更改 AdbcStatementExecutePartitions() 的行为。启用后,ExecutePartitions 将在每次服务器返回新分区(在 Flight SQL 术语中,当有新的 FlightEndpoints 时)时返回,而不是阻塞直到查询完成。
某些行为可以在 AdbcStatement 上配置
adbc.flight.sql.statement.exec.last_flight_info获取服务返回的最新
FlightInfo的序列化字节。这是一个用于高级用法的低级选项。当启用增量执行时,检查最新的服务器响应而无需等待AdbcStatementExecutePartitions()返回时,它最为有用。Python:
adbc_driver_flightsql.StatementOptions.LAST_FLIGHT_INFO
元数据¶
驱动程序目前不会在 AdbcConnectionGetObjects() 中填充列约束信息(外键、主键等)。此外,目录过滤器被评估为简单的字符串匹配,而不是 LIKE 样式模式。
分区结果集¶
Flight SQL 驱动程序支持 ADBC 的分区结果集。当请求时,结果集的每个分区都包含一个序列化的 FlightInfo,其中包含原始响应中的一个 FlightEndpoints。希望自省分区的客户端可以通过反序列化 ADBC 分区中包含的 FlightInfo 来实现。(例如,希望在多个工作程序或机器上分发工作的客户端可能希望利用 ADBC 没有的本地性信息。)
会话¶
驱动程序通过连接上的选项公开 Flight SQL 会话支持。没有明确的命令来启动新会话;预计服务器本身会管理它。(您很可能需要如上所述启用 cookie 支持。)没有明确的命令来关闭会话;这总是在关闭连接时发出。
adbc.flight.sql.session.options将所有选项获取为 JSON blob。
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTIONSadbc.flight.sql.session.option.获取或设置字符串/数字会话选项。
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTION_PREFIXadbc.flight.sql.session.optionerase.擦除会话选项。
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_ERASE_SESSION_OPTION_PREFIXadbc.flight.sql.session.optionbool.获取或设置布尔会话选项。
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_BOOL_SESSION_OPTION_PREFIXadbc.flight.sql.session.optionstringlist.获取或设置字符串列表会话选项。内容应为序列化的 JSON 列表。
Python:
adbc_driver_flightsql.ConnectionOptions.OPTION_STRING_LIST_SESSION_OPTION_PREFIX
超时¶
默认情况下,RPC 调用不使用超时。它们可以通过 AdbcConnection 上的特殊选项设置。通常,设置超时以避免意外卡住是最佳实践。选项如下
adbc.flight.sql.rpc.timeout_seconds.fetch任何获取数据的 API 调用(以浮点秒为单位)的超时时间。这对应于 Flight
DoGet调用。例如,这控制了随着结果集的消耗而获取更多数据的底层 Flight 调用的超时时间。
Python:
adbc_driver_flightsql.ConnectionOptions.TIMEOUT_FETCHadbc.flight.sql.rpc.timeout_seconds.query任何执行查询的 API 调用(以浮点秒为单位)的超时时间。这对应于 Flight
GetFlightInfo调用。例如,这控制了实现
AdbcStatementExecuteQuery()的底层 Flight 调用的超时时间。Python:
adbc_driver_flightsql.ConnectionOptions.TIMEOUT_QUERYadbc.flight.sql.rpc.timeout_seconds.update任何上传数据或执行其他更新的 API 调用(以浮点秒为单位)的超时时间。
例如,这控制了实现批量摄取或事务支持的底层 Flight 调用的超时时间。
Python:
adbc_driver_flightsql.ConnectionOptions.TIMEOUT_UPDATE
还有一个超时设置在 AdbcDatabase 上
adbc.flight.sql.rpc.timeout_seconds.connect建立连接的超时时间(以浮点秒为单位)。默认值为 20 秒。
事务¶
驱动程序支持事务。它将首先检查服务器的 SqlInfo 以确定是否支持事务。否则,与事务相关的 ADBC API 将返回 ADBC_STATUS_NOT_IMPLEMENTED。