客户端#

如何连接#

您必须指定 PostgreSQL 用户和 PostgreSQL 数据库才能连接 Apache Arrow Flight SQL 端点。

用户名和密码必须通过 Handshake 调用传递。请注意,目前仅支持基本身份验证。mTLS(双向 TLS)尚未实现。如果您对 mTLS 感兴趣,请查看相关问题:apache/arrow-flight-sql-postgresql#79

您需要使用标头来指定 PostgreSQL 数据库。PostgreSQL 数据库的标头名称为 x-flight-sql-database

您需要使用 arrow::flight::FlightClient::AuthenticateBasicToken() 进行身份验证。

您需要将 x-flight-sql-database 标头添加到 arrow::flight::FlightCallOptions::headers 以指定数据库。

以下示例使用 PGUSER(回退到 USER)和 PGPASSWORD 环境变量,类似于 libpqAuthenticateBasicToken() 成功时返回 Bearer 令牌。因此,示例使用返回的 Bearer 令牌作为请求标头,用于后续请求。

以下示例使用 PGDATABASE(回退到 PGUSER/USER)环境变量,类似于 libpq

  1#include <cstdlib>
  2#include <fstream>
  3#include <iostream>
  4
  5#include <arrow/flight/sql/client.h>
  6
  7namespace {
  8std::string
  9getenv(const char* name)
 10{
 11    auto value = std::getenv(name);
 12    if (value)
 13    {
 14        return std::string(value);
 15    }
 16    else
 17    {
 18        return std::string("");
 19    }
 20}
 21
 22arrow::Status
 23run()
 24{
 25    auto uri = getenv("PGFLIGHTSQLURI");
 26    if (uri.empty())
 27    {
 28        auto host = getenv("PGHOST");
 29        if (host.empty())
 30        {
 31            host = "localhost";
 32        }
 33        auto sslmode = getenv("PGSSLMODE");
 34        if (sslmode == "require" || sslmode == "verify-ca" || sslmode == "verify-full")
 35        {
 36            uri = std::string("grpc+tls://") + host + ":15432";
 37        }
 38        else
 39        {
 40            uri = std::string("grpc://") + host + ":15432";
 41        }
 42    }
 43    ARROW_ASSIGN_OR_RAISE(auto location, arrow::flight::Location::Parse(uri));
 44    arrow::flight::FlightClientOptions client_options;
 45    auto sslrootcert = getenv("PGSSLROOTCERT");
 46    if (sslrootcert.empty())
 47    {
 48        auto home = getenv("HOME");
 49        if (!home.empty())
 50        {
 51            sslrootcert = home + "/.postgresql/root.crt";
 52        }
 53    }
 54    if (!sslrootcert.empty())
 55    {
 56        std::ifstream input(sslrootcert);
 57        if (input)
 58        {
 59            client_options.tls_root_certs =
 60                std::string(std::istreambuf_iterator<char>{input}, {});
 61        }
 62    }
 63    ARROW_ASSIGN_OR_RAISE(auto client,
 64                          arrow::flight::FlightClient::Connect(location, client_options));
 65    auto user = getenv("PGUSER");
 66    if (user.empty())
 67    {
 68        user = getenv("USER");
 69    }
 70    auto password = getenv("PGPASSWORD");
 71    if (password.empty())
 72    {
 73        password = "";
 74    }
 75    arrow::flight::FlightCallOptions call_options;
 76    auto database = getenv("PGDATABASE");
 77    if (database.empty())
 78    {
 79        database = user;
 80    }
 81    call_options.headers.emplace_back("x-flight-sql-database", database);
 82    ARROW_ASSIGN_OR_RAISE(auto bearer_token,
 83                          client->AuthenticateBasicToken(call_options, user, password));
 84    const auto& bearer_name = bearer_token.first;
 85    const auto& bearer_value = bearer_token.second;
 86    if (!bearer_name.empty() && !bearer_value.empty())
 87    {
 88        call_options.headers.emplace_back(bearer_name, bearer_value);
 89    }
 90    return client->Close();
 91}
 92};  // namespace
 93
 94int
 95main(int argc, char** argv)
 96{
 97    auto status = run();
 98    if (status.ok())
 99    {
100        std::cout << "Authenticated!" << std::endl;
101        return EXIT_SUCCESS;
102    }
103    else
104    {
105        std::cerr << status.ToString() << std::endl;
106        return EXIT_FAILURE;
107    }
108}

您需要设置 usernamepassword 选项以进行身份验证。另请参阅 身份验证 文档。

您需要设置 adbc.flight.sql.rpc.call_header.x-flight-sql-database 选项以指定数据库。另请参阅 自定义调用标头 文档。

  1#include <stdio.h>
  2#include <stdlib.h>
  3#include <string.h>
  4
  5#include <adbc.h>
  6
  7#ifndef ADBC_ERROR_INIT
  8#   define ADBC_ERROR_INIT \
  9        {                   \
 10            0               \
 11        }
 12#endif
 13
 14static AdbcStatusCode
 15database_init(struct AdbcDatabase* database, struct AdbcError* error)
 16{
 17    AdbcStatusCode code;
 18    code = AdbcDatabaseSetOption(database, "driver", "adbc_driver_flightsql", error);
 19    if (code != ADBC_STATUS_OK)
 20    {
 21        return code;
 22    }
 23    const char* uri = getenv("PGFLIGHTSQLURI");
 24    char uri_buffer[4096];
 25    if (!uri)
 26    {
 27        const char* host = getenv("PGHOST");
 28        if (!host)
 29        {
 30            host = "localhost";
 31        }
 32        const char* sslmode = getenv("PGSSLMODE");
 33        if (sslmode &&
 34            ((strcmp(sslmode, "require") == 0) || (strcmp(sslmode, "verify-ca") == 0) ||
 35             (strcmp(sslmode, "verify-full") == 0)))
 36        {
 37            snprintf(uri_buffer, sizeof(uri_buffer), "grpc+tls://%s:15432", host);
 38        }
 39        else
 40        {
 41            snprintf(uri_buffer, sizeof(uri_buffer), "grpc://%s:15432", host);
 42        }
 43        uri = uri_buffer;
 44    }
 45    code = AdbcDatabaseSetOption(database, "uri", uri, error);
 46    if (code != ADBC_STATUS_OK)
 47    {
 48        return code;
 49    }
 50    const char* sslrootcert = getenv("PGSSLROOTCERT");
 51    char sslrootcert_buffer[4096];
 52    if (!sslrootcert)
 53    {
 54        const char* home = getenv("HOME");
 55        if (home)
 56        {
 57            snprintf(sslrootcert_buffer,
 58                     sizeof(sslrootcert_buffer),
 59                     "%s/.postgresql/root.crt",
 60                     home);
 61            sslrootcert = sslrootcert_buffer;
 62        }
 63    }
 64    if (sslrootcert)
 65    {
 66        FILE* input = fopen(sslrootcert, "r");
 67        if (input)
 68        {
 69            char sslrootcert_data[40960];
 70            size_t read_size =
 71                fread(sslrootcert_data, 1, sizeof(sslrootcert_data), input);
 72            fclose(input);
 73            if (read_size < sizeof(sslrootcert_data))
 74            {
 75                code =
 76                    AdbcDatabaseSetOption(database,
 77                                          "adbc.flight.sql.client_option.tls_root_certs",
 78                                          sslrootcert_data,
 79                                          error);
 80                if (code != ADBC_STATUS_OK)
 81                {
 82                    return code;
 83                }
 84            }
 85        }
 86    }
 87    const char* user = getenv("PGUSER");
 88    if (!user)
 89    {
 90        user = getenv("USER");
 91    }
 92    if (user)
 93    {
 94        code = AdbcDatabaseSetOption(database, "username", user, error);
 95        if (code != ADBC_STATUS_OK)
 96        {
 97            return code;
 98        }
 99    }
100    const char* password = getenv("PGPASSWORD");
101    if (password)
102    {
103        code = AdbcDatabaseSetOption(database, "password", password, error);
104        if (code != ADBC_STATUS_OK)
105        {
106            return code;
107        }
108    }
109    const char* database_name = getenv("PGDATABASE");
110    if (!database_name)
111    {
112        database_name = user;
113    }
114    if (database_name)
115    {
116        code =
117            AdbcDatabaseSetOption(database,
118                                  "adbc.flight.sql.rpc.call_header.x-flight-sql-database",
119                                  database_name,
120                                  error);
121        if (code != ADBC_STATUS_OK)
122        {
123            return code;
124        }
125    }
126    return AdbcDatabaseInit(database, error);
127}
128
129static AdbcStatusCode
130run(struct AdbcError* error)
131{
132    AdbcStatusCode code;
133    struct AdbcDatabase database = {0};
134    code = AdbcDatabaseNew(&database, error);
135    if (code != ADBC_STATUS_OK)
136    {
137        return code;
138    }
139    code = database_init(&database, error);
140    if (code == ADBC_STATUS_OK)
141    {
142        struct AdbcConnection connection = {0};
143        code = AdbcConnectionNew(&connection, error);
144        if (code == ADBC_STATUS_OK)
145        {
146            code = AdbcConnectionInit(&connection, &database, error);
147            AdbcConnectionRelease(&connection, error);
148        }
149    }
150    AdbcDatabaseRelease(&database, error);
151    return code;
152}
153
154int
155main(int argc, char** argv)
156{
157    struct AdbcError error = ADBC_ERROR_INIT;
158    AdbcStatusCode code = run(&error);
159    if (code == ADBC_STATUS_OK)
160    {
161        printf("Authenticated!\n");
162        return EXIT_SUCCESS;
163    }
164    else
165    {
166        fprintf(stderr, "%s\n", error.message);
167        error.release(&error);
168        return EXIT_FAILURE;
169    }
170}

如何查询#

您可以使用临时 SQL 语句或准备好的 SQL 语句进行查询。

临时 SQL 语句#

您需要使用 arrow::flight::sql::FlightSqlClient::Execute() 执行查询。

您需要使用 arrow::flight::sql::FlightSqlClient::DoGet() 获取结果。

 1arrow::Status
 2run()
 3{
 4    arrow::flight::FlightCallOptions call_options;
 5    ARROW_ASSIGN_OR_RAISE(auto sql_client, connect(call_options));
 6    ARROW_ASSIGN_OR_RAISE(
 7        auto info,
 8        sql_client->Execute(call_options, "SELECT 1 AS number, 'hello' AS string"));
 9    for (const auto& endpoint : info->endpoints())
10    {
11        ARROW_ASSIGN_OR_RAISE(auto reader,
12                              sql_client->DoGet(call_options, endpoint.ticket));
13        while (true)
14        {
15            ARROW_ASSIGN_OR_RAISE(auto chunk, reader->Next());
16            if (!chunk.data)
17            {
18                break;
19            }
20            std::cout << chunk.data->ToString() << std::endl;
21        }
22    }
23    return sql_client->Close();
24}

待办事项

准备好的 SQL 语句#

您需要使用 arrow::flight::sql::FlightSqlClient::Prepare() 准备查询。

您需要使用 arrow::flight::sql::PreparedStatement::SetParameters() 设置参数。

您需要使用 arrow::flight::sql::PreparedStatement::Execute() 执行带有参数的准备好的语句。

您需要使用 arrow::flight::sql::FlightSqlClient::DoGet() 获取结果。

 1arrow::Status
 2run()
 3{
 4    arrow::flight::FlightCallOptions call_options;
 5    ARROW_ASSIGN_OR_RAISE(auto sql_client, connect(call_options));
 6    ARROW_ASSIGN_OR_RAISE(auto statement,
 7                          sql_client->Prepare(call_options,
 8                                              "SELECT i "
 9                                              "  FROM generate_series(1, 100) "
10                                              "       AS series (i) "
11                                              "  WHERE i < $1"));
12    auto schema = arrow::schema({arrow::field("i", arrow::int32())});
13    ARROW_ASSIGN_OR_RAISE(
14        auto record_batch_builder,
15        arrow::RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));
16    auto i_builder = record_batch_builder->GetFieldAs<arrow::Int32Builder>(0);
17    ARROW_RETURN_NOT_OK(i_builder->Append(10));
18    ARROW_ASSIGN_OR_RAISE(auto record_batch, record_batch_builder->Flush());
19    ARROW_RETURN_NOT_OK(statement->SetParameters(record_batch));
20    ARROW_ASSIGN_OR_RAISE(auto info, statement->Execute());
21    for (const auto& endpoint : info->endpoints())
22    {
23        ARROW_ASSIGN_OR_RAISE(auto reader,
24                              sql_client->DoGet(call_options, endpoint.ticket));
25        while (true)
26        {
27            ARROW_ASSIGN_OR_RAISE(auto chunk, reader->Next());
28            if (!chunk.data)
29            {
30                break;
31            }
32            std::cout << chunk.data->ToString() << std::endl;
33        }
34    }
35    ARROW_RETURN_NOT_OK(statement->Close());
36    return sql_client->Close();
37}

待办事项