Arrow Flight

本节包含一些使用 Arrow Flight 的食谱,Arrow Flight 是一个专门用于表格数据集的 RPC 库。有关 Flight 的更多信息,请参见 format/Flight

使用 Arrow Flight 的简单 Parquet 存储服务

我们将实现一个服务,该服务为表格数据提供键值存储,使用 Flight 处理上传/请求,并使用 Parquet 存储实际数据。

首先,我们将实现服务本身。为简单起见,我们不会使用 Datasets API,而是直接使用 Parquet API。

Parquet 存储服务,服务器实现
  1class ParquetStorageService : public arrow::flight::FlightServerBase {
  2 public:
  3  const arrow::flight::ActionType kActionDropDataset{"drop_dataset", "Delete a dataset."};
  4
  5  explicit ParquetStorageService(std::shared_ptr<arrow::fs::FileSystem> root)
  6      : root_(std::move(root)) {}
  7
  8  arrow::Status ListFlights(
  9      const arrow::flight::ServerCallContext&, const arrow::flight::Criteria*,
 10      std::unique_ptr<arrow::flight::FlightListing>* listings) override {
 11    arrow::fs::FileSelector selector;
 12    selector.base_dir = "/";
 13    ARROW_ASSIGN_OR_RAISE(auto listing, root_->GetFileInfo(selector));
 14
 15    std::vector<arrow::flight::FlightInfo> flights;
 16    for (const auto& file_info : listing) {
 17      if (!file_info.IsFile() || file_info.extension() != "parquet") continue;
 18      ARROW_ASSIGN_OR_RAISE(auto info, MakeFlightInfo(file_info));
 19      flights.push_back(std::move(info));
 20    }
 21
 22    *listings = std::unique_ptr<arrow::flight::FlightListing>(
 23        new arrow::flight::SimpleFlightListing(std::move(flights)));
 24    return arrow::Status::OK();
 25  }
 26
 27  arrow::Status GetFlightInfo(const arrow::flight::ServerCallContext&,
 28                              const arrow::flight::FlightDescriptor& descriptor,
 29                              std::unique_ptr<arrow::flight::FlightInfo>* info) override {
 30    ARROW_ASSIGN_OR_RAISE(auto file_info, FileInfoFromDescriptor(descriptor));
 31    ARROW_ASSIGN_OR_RAISE(auto flight_info, MakeFlightInfo(file_info));
 32    *info = std::unique_ptr<arrow::flight::FlightInfo>(
 33        new arrow::flight::FlightInfo(std::move(flight_info)));
 34    return arrow::Status::OK();
 35  }
 36
 37  arrow::Status DoPut(const arrow::flight::ServerCallContext&,
 38                      std::unique_ptr<arrow::flight::FlightMessageReader> reader,
 39                      std::unique_ptr<arrow::flight::FlightMetadataWriter>) override {
 40    ARROW_ASSIGN_OR_RAISE(auto file_info, FileInfoFromDescriptor(reader->descriptor()));
 41    ARROW_ASSIGN_OR_RAISE(auto sink, root_->OpenOutputStream(file_info.path()));
 42    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, reader->ToTable());
 43
 44    ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(*table, arrow::default_memory_pool(),
 45                                                   sink, /*chunk_size=*/65536));
 46    return arrow::Status::OK();
 47  }
 48
 49  arrow::Status DoGet(const arrow::flight::ServerCallContext&,
 50                      const arrow::flight::Ticket& request,
 51                      std::unique_ptr<arrow::flight::FlightDataStream>* stream) override {
 52    ARROW_ASSIGN_OR_RAISE(auto input, root_->OpenInputFile(request.ticket));
 53    std::unique_ptr<parquet::arrow::FileReader> reader;
 54    ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(std::move(input),
 55                                                 arrow::default_memory_pool(), &reader));
 56
 57    std::shared_ptr<arrow::Table> table;
 58    ARROW_RETURN_NOT_OK(reader->ReadTable(&table));
 59    // Note that we can't directly pass TableBatchReader to
 60    // RecordBatchStream because TableBatchReader keeps a non-owning
 61    // reference to the underlying Table, which would then get freed
 62    // when we exit this function
 63    std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
 64    arrow::TableBatchReader batch_reader(*table);
 65    ARROW_ASSIGN_OR_RAISE(batches, batch_reader.ToRecordBatches());
 66
 67    ARROW_ASSIGN_OR_RAISE(auto owning_reader, arrow::RecordBatchReader::Make(
 68                                                  std::move(batches), table->schema()));
 69    *stream = std::unique_ptr<arrow::flight::FlightDataStream>(
 70        new arrow::flight::RecordBatchStream(owning_reader));
 71
 72    return arrow::Status::OK();
 73  }
 74
 75  arrow::Status ListActions(const arrow::flight::ServerCallContext&,
 76                            std::vector<arrow::flight::ActionType>* actions) override {
 77    *actions = {kActionDropDataset};
 78    return arrow::Status::OK();
 79  }
 80
 81  arrow::Status DoAction(const arrow::flight::ServerCallContext&,
 82                         const arrow::flight::Action& action,
 83                         std::unique_ptr<arrow::flight::ResultStream>* result) override {
 84    if (action.type == kActionDropDataset.type) {
 85      *result = std::unique_ptr<arrow::flight::ResultStream>(
 86          new arrow::flight::SimpleResultStream({}));
 87      return DoActionDropDataset(action.body->ToString());
 88    }
 89    return arrow::Status::NotImplemented("Unknown action type: ", action.type);
 90  }
 91
 92 private:
 93  arrow::Result<arrow::flight::FlightInfo> MakeFlightInfo(
 94      const arrow::fs::FileInfo& file_info) {
 95    ARROW_ASSIGN_OR_RAISE(auto input, root_->OpenInputFile(file_info));
 96    std::unique_ptr<parquet::arrow::FileReader> reader;
 97    ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(std::move(input),
 98                                                 arrow::default_memory_pool(), &reader));
 99
100    std::shared_ptr<arrow::Schema> schema;
101    ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));
102
103    auto descriptor = arrow::flight::FlightDescriptor::Path({file_info.base_name()});
104
105    arrow::flight::FlightEndpoint endpoint;
106    endpoint.ticket.ticket = file_info.base_name();
107    arrow::flight::Location location;
108    ARROW_ASSIGN_OR_RAISE(location,
109        arrow::flight::Location::ForGrpcTcp("localhost", port()));
110    endpoint.locations.push_back(location);
111
112    int64_t total_records = reader->parquet_reader()->metadata()->num_rows();
113    int64_t total_bytes = file_info.size();
114
115    return arrow::flight::FlightInfo::Make(*schema, descriptor, {endpoint}, total_records,
116                                           total_bytes);
117  }
118
119  arrow::Result<arrow::fs::FileInfo> FileInfoFromDescriptor(
120      const arrow::flight::FlightDescriptor& descriptor) {
121    if (descriptor.type != arrow::flight::FlightDescriptor::PATH) {
122      return arrow::Status::Invalid("Must provide PATH-type FlightDescriptor");
123    } else if (descriptor.path.size() != 1) {
124      return arrow::Status::Invalid(
125          "Must provide PATH-type FlightDescriptor with one path component");
126    }
127    return root_->GetFileInfo(descriptor.path[0]);
128  }
129
130  arrow::Status DoActionDropDataset(const std::string& key) {
131    return root_->DeleteFile(key);
132  }
133
134  std::shared_ptr<arrow::fs::FileSystem> root_;
135};  // end ParquetStorageService

首先,我们将启动我们的服务器

auto fs = std::make_shared<arrow::fs::LocalFileSystem>();
ARROW_RETURN_NOT_OK(fs->CreateDir("./flight_datasets/"));
ARROW_RETURN_NOT_OK(fs->DeleteDirContents("./flight_datasets/"));
auto root = std::make_shared<arrow::fs::SubTreeFileSystem>("./flight_datasets/", fs);

arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
    arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0));

arrow::flight::FlightServerOptions options(server_location);
auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
    new ParquetStorageService(std::move(root)));
ARROW_RETURN_NOT_OK(server->Init(options));
rout << "Listening on port " << server->port() << std::endl;
代码输出
Listening on port 34553

然后我们可以创建一个客户端并连接到服务器

arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
    arrow::flight::Location::ForGrpcTcp("localhost", server->port()));

std::unique_ptr<arrow::flight::FlightClient> client;
ARROW_ASSIGN_OR_RAISE(client, arrow::flight::FlightClient::Connect(location));
rout << "Connected to " << location.ToString() << std::endl;
代码输出
Connected to grpc+tcp://localhost:34553

首先,我们将创建一个表并上传它,该表将由服务器存储在 Parquet 文件中。

// Open example data file to upload
ARROW_ASSIGN_OR_RAISE(std::string airquality_path,
                      FindTestDataFile("airquality.parquet"));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::io::RandomAccessFile> input,
                      fs->OpenInputFile(airquality_path));
std::unique_ptr<parquet::arrow::FileReader> reader;
ARROW_RETURN_NOT_OK(
    parquet::arrow::OpenFile(std::move(input), arrow::default_memory_pool(), &reader));

auto descriptor = arrow::flight::FlightDescriptor::Path({"airquality.parquet"});
std::shared_ptr<arrow::Schema> schema;
ARROW_RETURN_NOT_OK(reader->GetSchema(&schema));

// Start the RPC call
std::unique_ptr<arrow::flight::FlightStreamWriter> writer;
std::unique_ptr<arrow::flight::FlightMetadataReader> metadata_reader;
ARROW_ASSIGN_OR_RAISE(auto put_stream, client->DoPut(descriptor, schema));
writer = std::move(put_stream.writer);
metadata_reader = std::move(put_stream.reader);

// Upload data
std::shared_ptr<arrow::RecordBatchReader> batch_reader;
std::vector<int> row_groups(reader->num_row_groups());
std::iota(row_groups.begin(), row_groups.end(), 0);
ARROW_RETURN_NOT_OK(reader->GetRecordBatchReader(row_groups, &batch_reader));
int64_t batches = 0;
while (true) {
  ARROW_ASSIGN_OR_RAISE(auto batch, batch_reader->Next());
  if (!batch) break;
  ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
  batches++;
}

ARROW_RETURN_NOT_OK(writer->Close());
rout << "Wrote " << batches << " batches" << std::endl;
代码输出
Wrote 1 batches

完成后,我们可以检索该数据集的元数据

std::unique_ptr<arrow::flight::FlightInfo> flight_info;
ARROW_ASSIGN_OR_RAISE(flight_info, client->GetFlightInfo(descriptor));
rout << flight_info->descriptor().ToString() << std::endl;
rout << "=== Schema ===" << std::endl;
std::shared_ptr<arrow::Schema> info_schema;
arrow::ipc::DictionaryMemo dictionary_memo;
ARROW_ASSIGN_OR_RAISE(info_schema, flight_info->GetSchema(&dictionary_memo));
rout << info_schema->ToString() << std::endl;
rout << "==============" << std::endl;
代码输出
<FlightDescriptor path='airquality.parquet'>
=== Schema ===
Ozone: int32
Solar.R: int32
Wind: double
Temp: int32
Month: int32
Day: int32
==============

并获取数据

std::unique_ptr<arrow::flight::FlightStreamReader> stream;
ARROW_ASSIGN_OR_RAISE(stream, client->DoGet(flight_info->endpoints()[0].ticket));
std::shared_ptr<arrow::Table> table;
ARROW_ASSIGN_OR_RAISE(table, stream->ToTable());
arrow::PrettyPrintOptions print_options(/*indent=*/0, /*window=*/2);
ARROW_RETURN_NOT_OK(arrow::PrettyPrint(*table, print_options, &rout));
代码输出
Ozone: int32
Solar.R: int32
Wind: double
Temp: int32
Month: int32
Day: int32
----
Ozone:
  [
    [
      41,
      36,
      ...
      18,
      20
    ]
  ]
Solar.R:
  [
    [
      190,
      118,
      ...
      131,
      223
    ]
  ]
Wind:
  [
    [
      7.4,
      8,
      ...
      8,
      11.5
    ]
  ]
Temp:
  [
    [
      67,
      72,
      ...
      76,
      68
    ]
  ]
Month:
  [
    [
      5,
      5,
      ...
      9,
      9
    ]
  ]
Day:
  [
    [
      1,
      2,
      ...
      29,
      30
    ]
  ]

然后,我们将删除数据集

arrow::flight::Action action{"drop_dataset",
                             arrow::Buffer::FromString("airquality.parquet")};
std::unique_ptr<arrow::flight::ResultStream> results;
ARROW_ASSIGN_OR_RAISE(results, client->DoAction(action));
rout << "Deleted dataset" << std::endl;
代码输出
Deleted dataset

并确认它已被删除

std::unique_ptr<arrow::flight::FlightListing> listing;
ARROW_ASSIGN_OR_RAISE(listing, client->ListFlights());
while (true) {
  std::unique_ptr<arrow::flight::FlightInfo> flight_info;
  ARROW_ASSIGN_OR_RAISE(flight_info, listing->Next());
  if (!flight_info) break;
  rout << flight_info->descriptor().ToString() << std::endl;
  rout << "=== Schema ===" << std::endl;
  std::shared_ptr<arrow::Schema> info_schema;
  arrow::ipc::DictionaryMemo dictionary_memo;
  ARROW_ASSIGN_OR_RAISE(info_schema, flight_info->GetSchema(&dictionary_memo));
  rout << info_schema->ToString() << std::endl;
  rout << "==============" << std::endl;
}
rout << "End of listing" << std::endl;
代码输出
End of listing

最后,我们将停止服务器

ARROW_RETURN_NOT_OK(server->Shutdown());
rout << "Server shut down successfully" << std::endl;
代码输出
Server shut down successfully

设置 gRPC 客户端选项

gRPC 客户端的选项可以使用 generic_options 字段传递给 arrow::flight::FlightClientOptions。在 gRPC API 文档 中列出了可用的客户端选项。

例如,您可以使用以下方法更改发送的最大消息长度:

auto client_options = arrow::flight::FlightClientOptions::Defaults();
// Set a very low limit at the gRPC layer to fail all calls
client_options.generic_options.emplace_back(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, 2);

arrow::flight::Location location;
ARROW_ASSIGN_OR_RAISE(location,
    arrow::flight::Location::ForGrpcTcp("localhost", server->port()));

std::unique_ptr<arrow::flight::FlightClient> client;
// pass client_options into Connect()
ARROW_ASSIGN_OR_RAISE(client,
    arrow::flight::FlightClient::Connect(location, client_options));
rout << "Connected to " << location.ToString() << std::endl;
代码输出
Connected to grpc+tcp://localhost:41329

带有其他 gRPC 端点的 Flight 服务

如果您使用的是 gRPC 后端,则可以将其他 gRPC 端点添加到 Flight 服务器。虽然 Flight 客户端无法识别这些端点,但一般的 gRPC 客户端可以。

注意

如果静态链接 Arrow Flight,则 Protobuf 和 gRPC 也必须静态链接,动态链接也是如此。在 https://arrow.apache.org/docs/cpp/build_system.html#a-note-on-linking 中阅读更多内容

创建服务器

要创建 gRPC 服务,首先使用 protobuf 定义服务。

Hello world protobuf 规范
 1syntax = "proto3";
 2
 3service HelloWorldService {
 4  rpc SayHello(HelloRequest) returns (HelloResponse);
 5}
 6
 7message HelloRequest {
 8  string name = 1;
 9}
10
11message HelloResponse {
12  string reply = 1;
13}

接下来,您需要编译它以提供 protobuf 和 gRPC 生成的文件。有关详细信息,请参见 gRPC 的 生成客户端和服务器代码 文档。

然后为 gRPC 服务编写实现

Hello world gRPC 服务实现
 1class HelloWorldServiceImpl : public HelloWorldService::Service {
 2  grpc::Status SayHello(grpc::ServerContext*, const HelloRequest* request,
 3                        HelloResponse* reply) override {
 4    const std::string& name = request->name();
 5    if (name.empty()) {
 6      return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Must provide a name!");
 7    }
 8    reply->set_reply("Hello, " + name);
 9    return grpc::Status::OK;
10  }
11};  // end HelloWorldServiceImpl

最后,使用 builder_hook 钩子在 arrow::flight::FlightServerOptions 上注册额外的 gRPC 服务。

arrow::flight::Location server_location;
ARROW_ASSIGN_OR_RAISE(server_location,
    arrow::flight::Location::ForGrpcTcp("0.0.0.0", 5000));

arrow::flight::FlightServerOptions options(server_location);
auto server = std::unique_ptr<arrow::flight::FlightServerBase>(
    new ParquetStorageService(std::move(root)));

// Create hello world service
HelloWorldServiceImpl grpc_service;

// Use builder_hook to register grpc service
options.builder_hook = [&](void* raw_builder) {
  auto* builder = reinterpret_cast<grpc::ServerBuilder*>(raw_builder);
  builder->RegisterService(&grpc_service);
};

ARROW_RETURN_NOT_OK(server->Init(options));
rout << "Listening on port " << server->port() << std::endl;
代码输出
Listening on port 5000

创建客户端

Flight 客户端实现不知道任何自定义 gRPC 服务,因此要调用它们,您需要创建一个普通的 gRPC 客户端。对于 Hello World 服务,我们使用 HelloWorldService 存根,该存根由编译的 gRPC 定义提供。

auto client_channel =
    grpc::CreateChannel("0.0.0.0:5000", grpc::InsecureChannelCredentials());

auto stub = HelloWorldService::NewStub(client_channel);

grpc::ClientContext context;
HelloRequest request;
request.set_name("Arrow User");
HelloResponse response;
grpc::Status status = stub->SayHello(&context, request, &response);
if (!status.ok()) {
  return arrow::Status::IOError(status.error_message());
}
rout << response.reply();
代码输出
Hello, Arrow User