Flight SQL 方案

其中一些方案是针对由 SQLite 支持的演示 Flight SQL 服务编写的。您可以按照以下方式自行运行

$ go install github.com/apache/arrow-go/v${ARROW_MAJOR_VERSION}/arrow/flight/flightsql/example/cmd/sqlite_flightsql_server@latest
$ sqlite_flightsql_server -host 0.0.0.0 -port 8080

其他方案使用开源版本的 Dremio

$ docker run -p 9047:9047 -p 31010:31010 -p 45678:45678 dremio/dremio-oss

如果您已经检出了 ADBC 存储库并安装了 Docker Compose,则可以使用我们的配置来运行这两项服务

$ docker compose up --detach --wait dremio dremio-init flightsql-sqlite-test

连接到未加密的 Flight SQL 服务

方案来源: flightsql_sqlite_connect.py

要连接到未加密的 Flight SQL 服务,只需提供 URI 即可。

22import os
23
24import adbc_driver_flightsql.dbapi
25
26uri = os.environ["ADBC_SQLITE_FLIGHTSQL_URI"]
27conn = adbc_driver_flightsql.dbapi.connect(uri)

然后我们就可以执行一个简单的查询。

31with conn.cursor() as cur:
32    cur.execute("SELECT 1")
33
34    assert cur.fetchone() == (1,)
35
36conn.close()

使用用户名和密码连接到 Flight SQL 服务

方案来源: flightsql_dremio_connect.py

Dremio 需要用户名和密码。要使用身份验证连接到 Flight SQL 服务,请在连接时提供这些选项。

25import os
26
27import adbc_driver_flightsql.dbapi
28import adbc_driver_manager
29
30uri = os.environ["ADBC_DREMIO_FLIGHTSQL_URI"]
31username = os.environ["ADBC_DREMIO_FLIGHTSQL_USER"]
32password = os.environ["ADBC_DREMIO_FLIGHTSQL_PASS"]
33conn = adbc_driver_flightsql.dbapi.connect(
34    uri,
35    db_kwargs={
36        adbc_driver_manager.DatabaseOptions.USERNAME.value: username,
37        adbc_driver_manager.DatabaseOptions.PASSWORD.value: password,
38    },
39)

然后我们就可以执行一个简单的查询。

43with conn.cursor() as cur:
44    cur.execute("SELECT 1")
45
46    assert cur.fetchone() == (1,)
47
48conn.close()

设置超时和其他选项

方案来源: flightsql_sqlite_options.py

Flight SQL 驱动程序支持多种选项。

22import os
23
24import adbc_driver_flightsql.dbapi
25from adbc_driver_flightsql import ConnectionOptions, DatabaseOptions
26
27uri = os.environ["ADBC_SQLITE_FLIGHTSQL_URI"]

我们可以启用 Cookie 支持,某些服务器实现需要此功能。

29conn = adbc_driver_flightsql.dbapi.connect(
30    uri,
31    db_kwargs={DatabaseOptions.WITH_COOKIE_MIDDLEWARE.value: "true"},
32)

其他选项在连接或语句上设置。

例如,我们可以向所有传出请求添加自定义标头。

37custom_header = f"{ConnectionOptions.RPC_CALL_HEADER_PREFIX.value}x-custom-header"
38conn.adbc_connection.set_options(**{custom_header: "value"})

我们还可以设置超时。这些是以浮点秒为单位的。

41conn.adbc_connection.set_options(
42    **{
43        ConnectionOptions.TIMEOUT_FETCH.value: 30.0,
44        ConnectionOptions.TIMEOUT_QUERY.value: 30.0,
45        ConnectionOptions.TIMEOUT_UPDATE.value: 30.0,
46    }
47)

这些选项将适用于我们创建的所有游标。

51with conn.cursor() as cur:
52    cur.execute("SELECT 1")
53
54    assert cur.fetchone() == (1,)
55
56conn.close()

设置最大 gRPC 消息大小

方案来源: flightsql_sqlite_max_msg_size.py

默认情况下,Flight SQL 驱动程序会限制传入/传出消息的大小。如果超过这些限制,您可能会看到如下错误

INTERNAL: [FlightSQL] grpc: received message larger than max

可以调整这些限制以避免此问题。

27import os
28
29import adbc_driver_flightsql.dbapi
30from adbc_driver_flightsql import DatabaseOptions
31
32uri = os.environ["ADBC_SQLITE_FLIGHTSQL_URI"]

此查询每批生成约 16 MiB,这将触发默认限制。

37query = """
38WITH RECURSIVE generate_series(value) AS (
39  SELECT 1
40  UNION ALL
41  SELECT value + 1 FROM generate_series
42   WHERE value + 1 <= 2048
43)
44SELECT printf('%.*c', 16384, 'x') FROM generate_series
45"""

当我们执行该查询时,我们将收到一个错误。

49conn = adbc_driver_flightsql.dbapi.connect(uri)
50with conn.cursor() as cur:
51    cur.execute(query)
52
53    try:
54        cur.fetchallarrow()
55    except adbc_driver_flightsql.dbapi.InternalError:
56        # This exception is expected.
57        pass
58    else:
59        assert False, "Did not raise expected exception"
60
61conn.close()

相反,我们可以在连接时更改限制。

65conn = adbc_driver_flightsql.dbapi.connect(
66    uri,
67    db_kwargs={DatabaseOptions.WITH_MAX_MSG_SIZE.value: "2147483647"},
68)
69with conn.cursor() as cur:
70    cur.execute(query)
71
72    assert len(cur.fetchallarrow()) == 2048
73
74conn.close()

使用 OAuth 2.0 客户端凭据连接

方案来源: flightsql_oauth_client_credentials.py

Flight SQL 驱动程序支持 OAuth 2.0 身份验证。此示例展示了如何使用客户端凭据流(RFC 6749)进行连接,这适用于无需用户交互的机器对机器身份验证。

24import os
25
26import adbc_driver_flightsql.dbapi
27from adbc_driver_flightsql import DatabaseOptions, OAuthFlowType
28
29uri = os.environ["ADBC_TEST_FLIGHTSQL_URI"]
30token_uri = os.environ["ADBC_OAUTH_TOKEN_URI"]
31client_id = os.environ["ADBC_OAUTH_CLIENT_ID"]
32client_secret = os.environ["ADBC_OAUTH_CLIENT_SECRET"]

使用 OAuth 2.0 客户端凭据流连接。驱动程序将自动获取并刷新访问令牌。

37db_kwargs = {
38    DatabaseOptions.OAUTH_FLOW.value: OAuthFlowType.CLIENT_CREDENTIALS.value,
39    DatabaseOptions.OAUTH_TOKEN_URI.value: token_uri,
40    DatabaseOptions.OAUTH_CLIENT_ID.value: client_id,
41    DatabaseOptions.OAUTH_CLIENT_SECRET.value: client_secret,

可选地,请求特定的作用域 (scopes)

43    # DatabaseOptions.OAUTH_SCOPE.value: "dremio.all",
44}

为了使用自签名证书进行测试,请跳过 TLS 验证。在生产环境中,您应该提供适当的 TLS 证书。

48if os.environ.get("ADBC_OAUTH_SKIP_VERIFY", "true").lower() in ("1", "true"):
49    db_kwargs[DatabaseOptions.TLS_SKIP_VERIFY.value] = "true"
50
51conn = adbc_driver_flightsql.dbapi.connect(uri, db_kwargs=db_kwargs)

然后我们可以照常执行查询。

55with conn.cursor() as cur:
56    cur.execute("SELECT 1")
57
58    result = cur.fetchone()
59    print(result)
60
61conn.close()

使用 OAuth 2.0 令牌交换连接

方案来源: flightsql_oauth_token_exchange.py

Flight SQL 驱动程序支持 OAuth 2.0 令牌交换 (RFC 8693)。这允许将现有令牌(例如,来自身份提供商的 JWT)交换为可用于访问 Flight SQL 服务的新令牌。

24import os
25
26import adbc_driver_flightsql.dbapi
27from adbc_driver_flightsql import DatabaseOptions, OAuthFlowType, OAuthTokenType
28
29uri = os.environ["ADBC_TEST_FLIGHTSQL_URI"]
30token_uri = os.environ["ADBC_OAUTH_TOKEN_URI"]

这通常是来自您身份提供商的 JWT 或其他令牌

32subject_token = os.environ["ADBC_OAUTH_SUBJECT_TOKEN"]

为了使用自签名证书进行测试,请跳过 TLS 验证。在生产环境中,您应该提供适当的 TLS 证书。

36db_kwargs = {}
37if os.environ.get("ADBC_OAUTH_SKIP_VERIFY", "true").lower() in ("1", "true"):
38    db_kwargs[DatabaseOptions.TLS_SKIP_VERIFY.value] = "true"

使用 OAuth 2.0 令牌交换流连接。驱动程序会将主体令牌 (subject token) 交换为访问令牌。

43db_kwargs.update(
44    {
45        DatabaseOptions.OAUTH_FLOW.value: OAuthFlowType.TOKEN_EXCHANGE.value,
46        DatabaseOptions.OAUTH_TOKEN_URI.value: token_uri,
47        DatabaseOptions.OAUTH_EXCHANGE_SUBJECT_TOKEN.value: subject_token,

指定正在交换的主体令牌的类型

49        DatabaseOptions.OAUTH_EXCHANGE_SUBJECT_TOKEN_TYPE.value: (
50            OAuthTokenType.JWT.value
51        ),

可选地,指定您想要接收的令牌类型

53        # DatabaseOptions.OAUTH_EXCHANGE_REQUESTED_TOKEN_TYPE.value:
54        #   OAuthTokenType.ACCESS_TOKEN.value,

可选地,指定预期的受众 (audience)

56        # DatabaseOptions.OAUTH_EXCHANGE_AUD.value: "my-service",
57    }
58)
59
60conn = adbc_driver_flightsql.dbapi.connect(uri, db_kwargs=db_kwargs)

然后我们可以照常执行查询。

64with conn.cursor() as cur:
65    cur.execute("SELECT 1")
66
67    result = cur.fetchone()
68    print(result)
69
70conn.close()