Flight SQL 驱动程序

语言:Go 状态:稳定

Flight SQL 驱动程序提供对实现 Arrow Flight SQL 兼容端点的任何数据库的访问。

安装

从 conda-forge 安装 libadbc-driver-flightsql

mamba install libadbc-driver-flightsql

安装 C/C++ 驱动程序,然后使用 Go 驱动程序管理器。需要 CGO。

go get github.com/apache/arrow-adbc/go/adbc/drivermgr

从 conda-forge 安装 adbc-driver-flightsql

mamba install adbc-driver-flightsql

从 PyPI 安装 adbc-driver-flightsql

pip install adbc-driver-flightsql

从 R-multiverse 安装 adbcflightsql

install.packages("adbcflightsql", repos = "https://community.r-multiverse.org")

此外,该驱动程序可以通过驱动程序管理器从 C/C++、C#、GLib、Go、R、Ruby 和 Rust 使用。

用法

要连接到数据库,在构造 AdbcDatabase 时提供“uri”参数。

#include "arrow-adbc/adbc.h"

// Ignoring error handling
struct AdbcDatabase database;
AdbcDatabaseNew(&database, nullptr);
AdbcDatabaseSetOption(&database, "driver", "adbc_driver_flightsql", nullptr);
AdbcDatabaseSetOption(&database, "uri", "grpc://:8080", nullptr);
AdbcDatabaseInit(&database, nullptr);

注意

有关详细示例,请参见 Flight SQL 食谱

from adbc_driver_flightsql import DatabaseOptions
from adbc_driver_flightsql.dbapi import connect

headers = {"foo": "bar"}

with connect(
    "grpc+tls://:8080",
    db_kwargs={
        DatabaseOptions.AUTHORIZATION_HEADER.value: "Bearer <token>",
        DatabaseOptions.TLS_SKIP_VERIFY.value: "true",
        **{
            f"{DatabaseOptions.RPC_CALL_HEADER_PREFIX.value}{k}": v
            for k, v in headers.items()
        },
    }
) as conn:
    pass

食谱来源:example_usage_test.go

 20// Tests that use the SQLite server example.
 21
 22package flightsql_test
 23
 24import (
 25	"context"
 26	"database/sql"
 27	"errors"
 28	"fmt"
 29	"log"
 30
 31	"github.com/apache/arrow-adbc/go/adbc"
 32	drv "github.com/apache/arrow-adbc/go/adbc/driver/flightsql"
 33	"github.com/apache/arrow-go/v18/arrow/array"
 34	"github.com/apache/arrow-go/v18/arrow/flight"
 35	"github.com/apache/arrow-go/v18/arrow/flight/flightsql"
 36	sqlite "github.com/apache/arrow-go/v18/arrow/flight/flightsql/example"
 37	"github.com/apache/arrow-go/v18/arrow/memory"
 38	_ "modernc.org/sqlite"
 39)
 40
 41var headers = map[string]string{"foo": "bar"}
 42
 43func FlightSQLExample(uri string) (err error) {
 44	ctx := context.Background()
 45	options := map[string]string{
 46		adbc.OptionKeyURI: uri,
 47	}
 48
 49	for k, v := range headers {
 50		options[drv.OptionRPCCallHeaderPrefix+k] = v
 51	}
 52
 53	var alloc memory.Allocator
 54	drv := drv.NewDriver(alloc)
 55	db, err := drv.NewDatabase(options)
 56	if err != nil {
 57		return fmt.Errorf("failed to open database: %s\n", err.Error())
 58	}
 59	defer func() {
 60		err = errors.Join(err, db.Close())
 61	}()
 62
 63	cnxn, err := db.Open(ctx)
 64	if err != nil {
 65		return fmt.Errorf("failed to open connection: %s", err.Error())
 66	}
 67	defer func() {
 68		err = errors.Join(err, cnxn.Close())
 69	}()
 70
 71	stmt, err := cnxn.NewStatement()
 72	if err != nil {
 73		return fmt.Errorf("failed to create statement: %s", err.Error())
 74	}
 75	defer func() {
 76		err = errors.Join(err, stmt.Close())
 77	}()
 78
 79	if err = stmt.SetSqlQuery("SELECT 1 AS theresult"); err != nil {
 80		return fmt.Errorf("failed to set query: %s", err.Error())
 81	}
 82
 83	reader, _, err := stmt.ExecuteQuery(ctx)
 84	if err != nil {
 85		return fmt.Errorf("failed to execute query: %s", err.Error())
 86	}
 87	defer reader.Release()
 88
 89	for reader.Next() {
 90		arr, ok := reader.RecordBatch().Column(0).(*array.Int64)
 91		if !ok {
 92			return fmt.Errorf("result data was not int64")
 93		}
 94		for i := 0; i < arr.Len(); i++ {
 95			if arr.IsNull(i) {
 96				fmt.Println("theresult: NULL")
 97			} else {
 98				fmt.Printf("theresult: %d\n", arr.Value(i))
 99			}
100		}
101	}
102
103	return nil
104}
105
106func Example() {
107	// For this example we will spawn the Flight SQL server ourselves.
108
109	// Create a new database that isn't tied to any other databases that
110	// may be in process.
111	db, err := sql.Open("sqlite", "file:example_in_memory?mode=memory")
112	if err != nil {
113		log.Fatal(err)
114	}
115	defer func() {
116		err := db.Close()
117		if err != nil {
118			log.Fatal(err)
119		}
120	}()
121
122	srv, err := sqlite.NewSQLiteFlightSQLServer(db)
123	if err != nil {
124		log.Fatal(err)
125	}
126
127	server := flight.NewServerWithMiddleware(nil)
128	server.RegisterFlightService(flightsql.NewFlightServer(srv))
129	err = server.Init("localhost:8080")
130	if err != nil {
131		log.Fatal(err)
132	}
133
134	go func() {
135		if err := server.Serve(); err != nil {
136			log.Fatal(err)
137		}
138	}()
139
140	uri := fmt.Sprintf("grpc://%s", server.Addr().String())
141	if err := FlightSQLExample(uri); err != nil {
142		log.Printf("Error: %s\n", err.Error())
143	}
144
145	server.Shutdown()
146
147	// Output:
148	// theresult: 1
149}
标准输出
theresult: 1

支持的功能

Flight SQL 驱动程序通常支持 ADBC API 规范 1.0.0 中定义的功能,以及一些额外的自定义选项。

警告

Java 驱动程序不支持此处的所有选项。请参阅 问题 #745

认证

驱动程序默认不进行认证。驱动程序实现了几种可选的认证方案

  • 相互 TLS (mTLS):请参阅下面的“客户端选项”。

  • 一种模仿 Arrow Flight SQL JDBC 驱动程序的 HTTP 风格方案。

    AdbcDatabase 上设置选项 usernamepassword。或者,设置选项 adbc.flight.sql.authorization_header 以获得完全控制。

    客户端通过从客户端向服务器发送 authorization 来提供凭据。服务器随后会在第一个请求上响应一个 authorization 头部。此头部的值随后将作为所有未来请求上的 authorization 头部发送回去。

  • OAuth 2.0 认证流程。

    客户端提供 配置 以允许客户端应用程序从授权服务器获取访问令牌。获取到的令牌随后将用于所有未来请求的 authorization 头部。

批量摄取

Flight SQL 没有专门用于将 Arrow 数据批量摄取到给定表的 API。因此,驱动程序目前不实现批量摄取。

客户端选项

用于创建 Flight RPC 客户端的选项可以自定义。

注意

其中许多选项只是封装了 gRPC 选项。有关这些选项功能的更多详细信息,请查阅 gRPC 文档

adbc.flight.sql.client_option.authority

覆盖 gRPC 的 :authority 伪头部。

Python:adbc_driver_flightsql.DatabaseOptions.AUTHORITY

adbc.flight.sql.client_option.mtls_cert_chain

用于 mTLS 的证书链。

Python:adbc_driver_flightsql.DatabaseOptions.MTLS_CERT_CHAIN

adbc.flight.sql.client_option.mtls_private_key

用于 mTLS 的私钥。

Python:adbc_driver_flightsql.DatabaseOptions.MTLS_PRIVATE_KEY

adbc.flight.sql.client_option.tls_override_hostname

覆盖用于验证服务器 TLS 证书的主机名。

Python:adbc_driver_flightsql.DatabaseOptions.TLS_OVERRIDE_HOSTNAME

adbc.flight.sql.client_option.tls_root_certs

覆盖用于验证服务器 TLS 证书的根证书。

Python:adbc_driver_flightsql.DatabaseOptions.TLS_ROOT_CERTS

adbc.flight.sql.client_option.tls_skip_verify

禁用对服务器 TLS 证书的验证。值应为 truefalse

Python:adbc_driver_flightsql.DatabaseOptions.TLS_SKIP_VERIFY

adbc.flight.sql.client_option.with_block

警告

此选项已弃用,因为 gRPC 本身已弃用底层选项。

此选项无效,并将在未来版本中删除。值应为 truefalse

adbc.flight.sql.client_option.with_max_msg_size

接受来自服务器的最大消息大小。由于 Flight 服务倾向于返回较大的响应负载,驱动程序默认为 16 MiB。应为正整数字节数。

Python:adbc_driver_flightsql.DatabaseOptions.WITH_MAX_MSG_SIZE

adbc.flight.sql.authorization_header

直接指定要在所有请求上发送的 authorization 头部的值。

Python:adbc_driver_flightsql.DatabaseOptions.AUTHORIZATION_HEADER

adbc.flight.sql.rpc.with_cookie_middleware

启用或禁用处理和处理服务器返回的“set-cookie”元数据头部以及从客户端发送“Cookie”头部中间件。值应为 truefalse。默认值为 false

Python:adbc_driver_flightsql.DatabaseOptions.WITH_COOKIE_MIDDLEWARE

自定义调用头部

自定义 HTTP 头部可以通过适用于 AdbcDatabaseAdbcConnectionAdbcStatement 的选项附加到请求中。

adbc.flight.sql.rpc.call_header.<HEADER NAME>

将头部 <HEADER NAME> 以给定值添加到传出请求中。

Python:adbc_driver_flightsql.ConnectionOptions.RPC_CALL_HEADER_PREFIX

警告

头部名称必须全部小写。

OAuth 2.0 选项

支持使用 OAuth 2.0 认证流程获取令牌的配置。

adbc.flight.sql.oauth.flow

指定要使用的 OAuth 2.0 流程类型。可能的值:client_credentialstoken_exchange

adbc.flight.sql.oauth.client_id

授权服务器颁发给客户端应用程序的唯一标识符

adbc.flight.sql.oauth.client_secret

与 client_id 关联的密钥。用于向授权服务器验证客户端应用程序

adbc.flight.sql.oauth.token_uri

客户端应用程序从授权服务器请求令牌的端点 URL

adbc.flight.sql.oauth.scope

客户端请求访问的权限列表,以空格分隔(例如 "read.all offline_access"

adbc.flight.sql.oauth.exchange.subject_token

客户端应用程序希望交换的安全令牌

adbc.flight.sql.oauth.exchange.subject_token_type

主题令牌类型的标识符。请查看下面的列表以了解支持的令牌类型。

adbc.flight.sql.oauth.exchange.actor_token

表示执行方身份的安全令牌

adbc.flight.sql.oauth.exchange.actor_token_type

参与者令牌类型的标识符。请查看下面的列表以了解支持的令牌类型。

adbc.flight.sql.oauth.exchange.aud

请求的安全令牌的预期受众

adbc.flight.sql.oauth.exchange.resource

客户端打算使用请求的安全令牌的资源服务器

adbc.flight.sql.oauth.exchange.scope

为新令牌请求的特定权限

adbc.flight.sql.oauth.exchange.requested_token_type

客户端希望接收的令牌类型。请查看下面的列表以了解支持的令牌类型。

支持的令牌类型
  • urn:ietf:params:oauth:token-type:access_token

  • urn:ietf:params:oauth:token-type:refresh_token

  • urn:ietf:params:oauth:token-type:id_token

  • urn:ietf:params:oauth:token-type:saml1

  • urn:ietf:params:oauth:token-type:saml2

  • urn:ietf:params:oauth:token-type:jwt

分布式结果集

驱动程序将以未指定顺序获取服务器返回的所有分区(FlightEndpoints)(请注意,Flight SQL 本身并未定义这些分区的顺序)。如果一个端点没有位置,数据将使用原始服务器连接获取。否则,驱动程序将按顺序尝试给定的每个位置,直到请求成功。如果连接或请求失败,它将尝试下一个位置。

驱动程序目前不缓存或池化这些辅助连接。它也不会重试连接或请求。

所有分区都并行获取。每个分区都有有限数量的批次排队。数据按分区顺序返回给客户端。

一些行为可以在 AdbcStatement 上配置

adbc.rpc.result_queue_size

每个分区排队的批次数量。默认为 5。

Python:adbc_driver_flightsql.StatementOptions.QUEUE_SIZE

增量执行

通过设置 ADBC_STATEMENT_OPTION_INCREMENTAL,您可以使用此驱动程序进行非阻塞执行。这仅更改 AdbcStatementExecutePartitions() 的行为。启用后,ExecutePartitions 将在每次服务器有新分区(在 Flight SQL 术语中,当有新的 FlightEndpoints)时返回,而不是阻塞直到查询完成。

一些行为可以在 AdbcStatement 上配置

adbc.flight.sql.statement.exec.last_flight_info

获取服务返回的最新 FlightInfo 的序列化字节。这是一个用于高级用途的低级选项。它在启用增量执行时最有用,用于检查最新的服务器响应,而无需等待 AdbcStatementExecutePartitions() 返回。

Python:adbc_driver_flightsql.StatementOptions.LAST_FLIGHT_INFO

元数据

驱动程序目前不会在 AdbcConnectionGetObjects() 中填充列约束信息(外键、主键等)。此外,目录过滤器被评估为简单的字符串匹配,而不是 LIKE 样式模式。

分区结果集

Flight SQL 驱动程序支持 ADBC 的分区结果集。当请求时,结果集的每个分区都包含一个序列化的 FlightInfo,其中包含原始响应的一个 FlightEndpoint。希望检查分区的客户端可以通过反序列化 ADBC 分区中包含的 FlightInfo 来实现。(例如,希望在多个工作程序或机器之间分配工作的客户端可能希望利用 ADBC 没有的局部性信息。)

会话

驱动程序通过连接上的选项公开 Flight SQL 会话支持。没有明确的命令来启动新会话;预计服务器本身将管理此功能。(您很可能需要启用上面描述的 Cookie 支持。)没有明确的命令来关闭会话;这总是在连接关闭时发出。

adbc.flight.sql.session.options

将所有选项获取为 JSON blob。

Python:adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTIONS

adbc.flight.sql.session.option.

获取或设置字符串/数字会话选项。

Python:adbc_driver_flightsql.ConnectionOptions.OPTION_SESSION_OPTION_PREFIX

adbc.flight.sql.session.optionerase.

擦除会话选项。

Python:adbc_driver_flightsql.ConnectionOptions.OPTION_ERASE_SESSION_OPTION_PREFIX

adbc.flight.sql.session.optionbool.

获取或设置布尔会话选项。

Python:adbc_driver_flightsql.ConnectionOptions.OPTION_BOOL_SESSION_OPTION_PREFIX

adbc.flight.sql.session.optionstringlist.

获取或设置字符串列表会话选项。内容应为序列化的 JSON 列表。

Python:adbc_driver_flightsql.ConnectionOptions.OPTION_STRING_LIST_SESSION_OPTION_PREFIX

超时

默认情况下,RPC 调用不使用超时。它们可以通过 AdbcConnection 上的特殊选项进行设置。通常,最好设置超时以避免意外卡住。选项如下

adbc.flight.sql.rpc.timeout_seconds.fetch

用于获取数据的任何 API 调用的超时(以浮点秒为单位)。这对应于 Flight DoGet 调用。

例如,这控制了获取更多数据作为结果集被消耗的底层 Flight 调用的超时。

Python:adbc_driver_flightsql.ConnectionOptions.TIMEOUT_FETCH

adbc.flight.sql.rpc.timeout_seconds.query

用于执行查询的任何 API 调用的超时(以浮点秒为单位)。这对应于 Flight GetFlightInfo 调用。

例如,这控制了实现 AdbcStatementExecuteQuery() 的底层 Flight 调用的超时。

Python:adbc_driver_flightsql.ConnectionOptions.TIMEOUT_QUERY

adbc.flight.sql.rpc.timeout_seconds.update

用于上传数据或执行其他更新的任何 API 调用的超时(以浮点秒为单位)。

例如,这控制了实现批量摄取或事务支持的底层 Flight 调用的超时。

Python:adbc_driver_flightsql.ConnectionOptions.TIMEOUT_UPDATE

AdbcDatabase 上还设置了一个超时

adbc.flight.sql.rpc.timeout_seconds.connect

建立连接的超时(以浮点秒为单位)。默认值为 20 秒。

事务

驱动程序支持事务。它将首先检查服务器的 SqlInfo 以确定是否支持此功能。否则,与事务相关的 ADBC API 将返回 ADBC_STATUS_NOT_IMPLEMENTED