客户端#
如何连接#
您必须指定 PostgreSQL 用户和 PostgreSQL 数据库才能连接 Apache Arrow Flight SQL 端点。
用户名和密码必须通过 Handshake
调用传递。请注意,目前仅支持基本身份验证。mTLS(双向 TLS)尚未实现。如果您对 mTLS 感兴趣,请查看相关问题:apache/arrow-flight-sql-postgresql#79
您需要使用标头来指定 PostgreSQL 数据库。PostgreSQL 数据库的标头名称为 x-flight-sql-database
。
您需要使用 arrow::flight::FlightClient::AuthenticateBasicToken()
进行身份验证。
您需要将 x-flight-sql-database
标头添加到 arrow::flight::FlightCallOptions::headers
以指定数据库。
以下示例使用 PGUSER
(回退到 USER
)和 PGPASSWORD
环境变量,类似于 libpq
。 AuthenticateBasicToken()
成功时返回 Bearer 令牌。因此,示例使用返回的 Bearer 令牌作为请求标头,用于后续请求。
以下示例使用 PGDATABASE
(回退到 PGUSER
/USER
)环境变量,类似于 libpq
。
1#include <cstdlib>
2#include <fstream>
3#include <iostream>
4
5#include <arrow/flight/sql/client.h>
6
7namespace {
8std::string
9getenv(const char* name)
10{
11 auto value = std::getenv(name);
12 if (value)
13 {
14 return std::string(value);
15 }
16 else
17 {
18 return std::string("");
19 }
20}
21
22arrow::Status
23run()
24{
25 auto uri = getenv("PGFLIGHTSQLURI");
26 if (uri.empty())
27 {
28 auto host = getenv("PGHOST");
29 if (host.empty())
30 {
31 host = "localhost";
32 }
33 auto sslmode = getenv("PGSSLMODE");
34 if (sslmode == "require" || sslmode == "verify-ca" || sslmode == "verify-full")
35 {
36 uri = std::string("grpc+tls://") + host + ":15432";
37 }
38 else
39 {
40 uri = std::string("grpc://") + host + ":15432";
41 }
42 }
43 ARROW_ASSIGN_OR_RAISE(auto location, arrow::flight::Location::Parse(uri));
44 arrow::flight::FlightClientOptions client_options;
45 auto sslrootcert = getenv("PGSSLROOTCERT");
46 if (sslrootcert.empty())
47 {
48 auto home = getenv("HOME");
49 if (!home.empty())
50 {
51 sslrootcert = home + "/.postgresql/root.crt";
52 }
53 }
54 if (!sslrootcert.empty())
55 {
56 std::ifstream input(sslrootcert);
57 if (input)
58 {
59 client_options.tls_root_certs =
60 std::string(std::istreambuf_iterator<char>{input}, {});
61 }
62 }
63 ARROW_ASSIGN_OR_RAISE(auto client,
64 arrow::flight::FlightClient::Connect(location, client_options));
65 auto user = getenv("PGUSER");
66 if (user.empty())
67 {
68 user = getenv("USER");
69 }
70 auto password = getenv("PGPASSWORD");
71 if (password.empty())
72 {
73 password = "";
74 }
75 arrow::flight::FlightCallOptions call_options;
76 auto database = getenv("PGDATABASE");
77 if (database.empty())
78 {
79 database = user;
80 }
81 call_options.headers.emplace_back("x-flight-sql-database", database);
82 ARROW_ASSIGN_OR_RAISE(auto bearer_token,
83 client->AuthenticateBasicToken(call_options, user, password));
84 const auto& bearer_name = bearer_token.first;
85 const auto& bearer_value = bearer_token.second;
86 if (!bearer_name.empty() && !bearer_value.empty())
87 {
88 call_options.headers.emplace_back(bearer_name, bearer_value);
89 }
90 return client->Close();
91}
92}; // namespace
93
94int
95main(int argc, char** argv)
96{
97 auto status = run();
98 if (status.ok())
99 {
100 std::cout << "Authenticated!" << std::endl;
101 return EXIT_SUCCESS;
102 }
103 else
104 {
105 std::cerr << status.ToString() << std::endl;
106 return EXIT_FAILURE;
107 }
108}
您需要设置 username
和 password
选项以进行身份验证。另请参阅 身份验证 文档。
您需要设置 adbc.flight.sql.rpc.call_header.x-flight-sql-database
选项以指定数据库。另请参阅 自定义调用标头 文档。
1#include <stdio.h>
2#include <stdlib.h>
3#include <string.h>
4
5#include <adbc.h>
6
7#ifndef ADBC_ERROR_INIT
8# define ADBC_ERROR_INIT \
9 { \
10 0 \
11 }
12#endif
13
14static AdbcStatusCode
15database_init(struct AdbcDatabase* database, struct AdbcError* error)
16{
17 AdbcStatusCode code;
18 code = AdbcDatabaseSetOption(database, "driver", "adbc_driver_flightsql", error);
19 if (code != ADBC_STATUS_OK)
20 {
21 return code;
22 }
23 const char* uri = getenv("PGFLIGHTSQLURI");
24 char uri_buffer[4096];
25 if (!uri)
26 {
27 const char* host = getenv("PGHOST");
28 if (!host)
29 {
30 host = "localhost";
31 }
32 const char* sslmode = getenv("PGSSLMODE");
33 if (sslmode &&
34 ((strcmp(sslmode, "require") == 0) || (strcmp(sslmode, "verify-ca") == 0) ||
35 (strcmp(sslmode, "verify-full") == 0)))
36 {
37 snprintf(uri_buffer, sizeof(uri_buffer), "grpc+tls://%s:15432", host);
38 }
39 else
40 {
41 snprintf(uri_buffer, sizeof(uri_buffer), "grpc://%s:15432", host);
42 }
43 uri = uri_buffer;
44 }
45 code = AdbcDatabaseSetOption(database, "uri", uri, error);
46 if (code != ADBC_STATUS_OK)
47 {
48 return code;
49 }
50 const char* sslrootcert = getenv("PGSSLROOTCERT");
51 char sslrootcert_buffer[4096];
52 if (!sslrootcert)
53 {
54 const char* home = getenv("HOME");
55 if (home)
56 {
57 snprintf(sslrootcert_buffer,
58 sizeof(sslrootcert_buffer),
59 "%s/.postgresql/root.crt",
60 home);
61 sslrootcert = sslrootcert_buffer;
62 }
63 }
64 if (sslrootcert)
65 {
66 FILE* input = fopen(sslrootcert, "r");
67 if (input)
68 {
69 char sslrootcert_data[40960];
70 size_t read_size =
71 fread(sslrootcert_data, 1, sizeof(sslrootcert_data), input);
72 fclose(input);
73 if (read_size < sizeof(sslrootcert_data))
74 {
75 code =
76 AdbcDatabaseSetOption(database,
77 "adbc.flight.sql.client_option.tls_root_certs",
78 sslrootcert_data,
79 error);
80 if (code != ADBC_STATUS_OK)
81 {
82 return code;
83 }
84 }
85 }
86 }
87 const char* user = getenv("PGUSER");
88 if (!user)
89 {
90 user = getenv("USER");
91 }
92 if (user)
93 {
94 code = AdbcDatabaseSetOption(database, "username", user, error);
95 if (code != ADBC_STATUS_OK)
96 {
97 return code;
98 }
99 }
100 const char* password = getenv("PGPASSWORD");
101 if (password)
102 {
103 code = AdbcDatabaseSetOption(database, "password", password, error);
104 if (code != ADBC_STATUS_OK)
105 {
106 return code;
107 }
108 }
109 const char* database_name = getenv("PGDATABASE");
110 if (!database_name)
111 {
112 database_name = user;
113 }
114 if (database_name)
115 {
116 code =
117 AdbcDatabaseSetOption(database,
118 "adbc.flight.sql.rpc.call_header.x-flight-sql-database",
119 database_name,
120 error);
121 if (code != ADBC_STATUS_OK)
122 {
123 return code;
124 }
125 }
126 return AdbcDatabaseInit(database, error);
127}
128
129static AdbcStatusCode
130run(struct AdbcError* error)
131{
132 AdbcStatusCode code;
133 struct AdbcDatabase database = {0};
134 code = AdbcDatabaseNew(&database, error);
135 if (code != ADBC_STATUS_OK)
136 {
137 return code;
138 }
139 code = database_init(&database, error);
140 if (code == ADBC_STATUS_OK)
141 {
142 struct AdbcConnection connection = {0};
143 code = AdbcConnectionNew(&connection, error);
144 if (code == ADBC_STATUS_OK)
145 {
146 code = AdbcConnectionInit(&connection, &database, error);
147 AdbcConnectionRelease(&connection, error);
148 }
149 }
150 AdbcDatabaseRelease(&database, error);
151 return code;
152}
153
154int
155main(int argc, char** argv)
156{
157 struct AdbcError error = ADBC_ERROR_INIT;
158 AdbcStatusCode code = run(&error);
159 if (code == ADBC_STATUS_OK)
160 {
161 printf("Authenticated!\n");
162 return EXIT_SUCCESS;
163 }
164 else
165 {
166 fprintf(stderr, "%s\n", error.message);
167 error.release(&error);
168 return EXIT_FAILURE;
169 }
170}
如何查询#
您可以使用临时 SQL 语句或准备好的 SQL 语句进行查询。
临时 SQL 语句#
您需要使用 arrow::flight::sql::FlightSqlClient::Execute()
执行查询。
您需要使用 arrow::flight::sql::FlightSqlClient::DoGet()
获取结果。
1arrow::Status
2run()
3{
4 arrow::flight::FlightCallOptions call_options;
5 ARROW_ASSIGN_OR_RAISE(auto sql_client, connect(call_options));
6 ARROW_ASSIGN_OR_RAISE(
7 auto info,
8 sql_client->Execute(call_options, "SELECT 1 AS number, 'hello' AS string"));
9 for (const auto& endpoint : info->endpoints())
10 {
11 ARROW_ASSIGN_OR_RAISE(auto reader,
12 sql_client->DoGet(call_options, endpoint.ticket));
13 while (true)
14 {
15 ARROW_ASSIGN_OR_RAISE(auto chunk, reader->Next());
16 if (!chunk.data)
17 {
18 break;
19 }
20 std::cout << chunk.data->ToString() << std::endl;
21 }
22 }
23 return sql_client->Close();
24}
待办事项
准备好的 SQL 语句#
您需要使用 arrow::flight::sql::FlightSqlClient::Prepare()
准备查询。
您需要使用 arrow::flight::sql::PreparedStatement::SetParameters()
设置参数。
您需要使用 arrow::flight::sql::PreparedStatement::Execute()
执行带有参数的准备好的语句。
您需要使用 arrow::flight::sql::FlightSqlClient::DoGet()
获取结果。
1arrow::Status
2run()
3{
4 arrow::flight::FlightCallOptions call_options;
5 ARROW_ASSIGN_OR_RAISE(auto sql_client, connect(call_options));
6 ARROW_ASSIGN_OR_RAISE(auto statement,
7 sql_client->Prepare(call_options,
8 "SELECT i "
9 " FROM generate_series(1, 100) "
10 " AS series (i) "
11 " WHERE i < $1"));
12 auto schema = arrow::schema({arrow::field("i", arrow::int32())});
13 ARROW_ASSIGN_OR_RAISE(
14 auto record_batch_builder,
15 arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
16 auto i_builder = record_batch_builder->GetFieldAs<arrow::Int32Builder>(0);
17 ARROW_RETURN_NOT_OK(i_builder->Append(10));
18 ARROW_ASSIGN_OR_RAISE(auto record_batch, record_batch_builder->Flush());
19 ARROW_RETURN_NOT_OK(statement->SetParameters(record_batch));
20 ARROW_ASSIGN_OR_RAISE(auto info, statement->Execute());
21 for (const auto& endpoint : info->endpoints())
22 {
23 ARROW_ASSIGN_OR_RAISE(auto reader,
24 sql_client->DoGet(call_options, endpoint.ticket));
25 while (true)
26 {
27 ARROW_ASSIGN_OR_RAISE(auto chunk, reader->Next());
28 if (!chunk.data)
29 {
30 break;
31 }
32 std::cout << chunk.data->ToString() << std::endl;
33 }
34 }
35 ARROW_RETURN_NOT_OK(statement->Close());
36 return sql_client->Close();
37}
待办事项