Skyhook:通过 Apache Arrow 将计算引入存储


发布 2022 年 1 月 31 日
作者 Jayjeet Chakraborty, Carlos Maltzahn, David Li, Tom Drabas

CPU、内存、存储和网络带宽每年都在改进,但它们之间的改进维度却日益不同。处理器速度更快了,但其内存带宽却没有跟上;同时,云计算导致存储与应用程序通过网络链路分离。这种不同的演进意味着我们需要重新思考在何处以及何时执行计算,以最好地利用我们可用的资源。

例如,当查询 Ceph 或 Amazon S3 等存储系统上的数据集时,所有数据过滤工作都由客户端完成。数据必须通过网络传输,然后客户端需要花费宝贵的 CPU 周期进行解码,最终却因为过滤而将其丢弃。虽然 Apache Parquet 等格式可以实现一些优化,但从根本上说,所有责任都在客户端。同时,尽管存储系统拥有自己的计算能力,但它却被降级为只提供“哑字节”。

得益于加州大学圣克鲁斯分校的开源软件研究中心 (CROSS),Apache Arrow 7.0.0 包含了 Skyhook,这是一个 Arrow Datasets 扩展,它通过利用存储层来减少客户端资源利用率,从而解决了这个问题。我们将探讨 Skyhook 周围的发展以及 Skyhook 的工作原理。

引入可编程存储

Skyhook 是可编程存储的一个例子:从存储系统公开更高级别的功能供客户端构建。这使我们能够更好地利用这些系统中的现有资源(硬件和开发工作),减少每个客户端的常见操作实现负担,并使这些操作能够与存储层一起扩展。

历史上,Apache Hadoop 等大数据系统一直尝试将计算和存储并置以提高效率。最近,云和分布式计算为了灵活性和可伸缩性将计算和存储分离,但以性能为代价。可编程存储在这些目标之间取得了平衡,允许某些操作紧邻数据运行,同时仍在更高级别上保持数据和计算分离。

特别是,Skyhook 建立在 Ceph 之上,Ceph 是一个分布式存储系统,可以扩展到 EB 级数据,同时可靠且灵活。通过其对象类 SDK,Ceph 通过允许定义具有自定义功能的新对象类型的扩展来实现可编程存储。

Skyhook 架构

让我们看看 Skyhook 如何应用这些想法。总的来说,这个想法很简单:客户端应该能够要求 Ceph 执行基本操作,例如解码文件、过滤数据和选择列。这样,工作就可以利用现有的存储集群资源完成,这意味着它既靠近数据,又可以随着集群规模的扩大而扩展。此外,这减少了通过网络传输的数据,当然也减少了客户端的工作负载。

在存储系统端,Skyhook 使用 Ceph 对象类 SDK 来定义对以 Parquet 或 Feather 格式存储的数据进行扫描操作。为了实现这些操作,Skyhook 首先在 Ceph 的对象存储层中实现一个文件系统 shim,然后在该 shim 之上使用 Arrow Datasets 库现有的过滤和投影功能。

然后,Skyhook 在 Arrow Datasets 层中定义了一个自定义的“文件格式”。对这些文件的查询被转换为直接向 Ceph 发送使用这些新操作的请求,绕过传统的 POSIX 文件系统层。在解码、过滤和投影之后,Ceph 将 Arrow 记录批次直接发送到客户端,最大程度地减少了编码/解码的 CPU 开销——这是 Arrow 带来的另一个优化。记录批次使用 Arrow 的压缩支持来进一步节省带宽。

Skyhook Architecture
Skyhook 扩展了 Ceph 和 Arrow Datasets,将查询下推到 Ceph,从而减少了客户端工作负载和网络流量。(图源自 [“SkyhookDM 现已成为 Apache Arrow 的一部分!”][medium]。)

Skyhook 还优化了 Parquet 文件的存储方式。Parquet 文件由一系列行组组成,每个行组包含文件中的一部分行。在存储此类文件时,Skyhook 会填充或拆分它们,以便每个行组都作为自己的 Ceph 对象存储。通过以这种方式条带化或拆分文件,我们可以在 Ceph 节点上以子文件粒度并行扫描,以进一步提高性能。

应用

在基准测试中,Skyhook 具有最小的存储端 CPU 开销,并且几乎消除了客户端 CPU 使用率。扩展存储集群会相应地降低查询延迟。对于使用 Arrow Datasets API 的 Dask 等系统,这意味着只需切换到 Skyhook 文件格式,我们就可以加快数据集扫描速度,减少需要传输的数据量,并释放 CPU 资源用于计算。

In benchmarks, Skyhook reduces client CPU usage while minimally impacting storage cluster CPU usage.
Skyhook 释放了客户端 CPU 以执行有用的工作,同时最大程度地减少了存储机器所做的工作。客户端仍需解压缩 Skyhook 发送的 LZ4 压缩记录批次。(请注意,存储集群图是累积的。)

当然,Skyhook 背后的思想也适用于 Apache Arrow 以外的其他相邻系统。例如,Apache Iceberg 和 Delta Lake 等“湖仓一体”系统也建立在分布式存储系统之上,并且可以自然地从 Skyhook 中受益以卸载计算。此外,像 DuckDB 这样的内存式基于 SQL 的查询引擎,与 Apache Arrow 无缝集成,可以通过卸载部分 SQL 查询从 Skyhook 中受益。

总结和致谢

Skyhook 在 Arrow 7.0.0 中可用,它基于可编程存储系统的研究。通过将过滤器和投影下推到存储层,我们可以通过释放客户端宝贵的 CPU 资源、减少通过网络发送的数据量以及更好地利用 Ceph 等系统的可伸缩性来加快数据集扫描。要开始使用,只需构建 Arrow 并启用 Skyhook,将 Skyhook 对象类扩展部署到 Ceph(请参阅公告文章中的“用法”),然后使用 SkyhookFileFormat 构建 Arrow 数据集。这里显示了一个小代码示例。

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// https://apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <arrow/compute/api.h>
#include <arrow/dataset/api.h>
#include <arrow/filesystem/api.h>
#include <arrow/table.h>
#include <skyhook/client/file_skyhook.h>

#include <cstdlib>
#include <iostream>
#include <memory>
#include <string>

namespace cp = arrow::compute;
namespace ds = arrow::dataset;
namespace fs = arrow::fs;

// Demonstrate reading a dataset via Skyhook.
arrow::Status ScanDataset() {
  // Configure SkyhookFileFormat to connect to our Ceph cluster.
  std::string ceph_config_path = "/etc/ceph/ceph.conf";
  std::string ceph_data_pool = "cephfs_data";
  std::string ceph_user_name = "client.admin";
  std::string ceph_cluster_name = "ceph";
  std::string ceph_cls_name = "skyhook";
  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
                                              ceph_user_name, ceph_cluster_name,
                                              ceph_cls_name);
  ARROW_ASSIGN_OR_RAISE(auto format,
                        skyhook::SkyhookFileFormat::Make(rados_ctx, "parquet"));

  // Create the filesystem.
  std::string root;
  ARROW_ASSIGN_OR_RAISE(auto fs, fs::FileSystemFromUri("file:///mnt/cephfs/nyc", &root));

  // Create our dataset.
  fs::FileSelector selector;
  selector.base_dir = root;
  selector.recursive = true;

  ds::FileSystemFactoryOptions options;
  options.partitioning = std::make_shared<ds::HivePartitioning>(
      arrow::schema({arrow::field("payment_type", arrow::int32()),
                     arrow::field("VendorID", arrow::int32())}));
  ARROW_ASSIGN_OR_RAISE(auto factory,
                        ds::FileSystemDatasetFactory::Make(fs, std::move(selector),
                                                           std::move(format), options));

  ds::InspectOptions inspect_options;
  ds::FinishOptions finish_options;
  ARROW_ASSIGN_OR_RAISE(auto schema, factory->Inspect(inspect_options));
  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish(finish_options));

  // Scan the dataset.
  auto filter = cp::greater(cp::field_ref("payment_type"), cp::literal(2));
  ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
  ARROW_RETURN_NOT_OK(scanner_builder->Filter(filter));
  ARROW_RETURN_NOT_OK(scanner_builder->UseThreads(true));
  ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());

  ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());

  std::cout << "Got " << table->num_rows() << " rows" << std::endl;
  return arrow::Status::OK();
}

int main(int, char**) {
  auto status = ScanDataset();
  if (!status.ok()) {
    std::cerr << status.message() << std::endl;
    return EXIT_FAILURE;
  }
  return EXIT_SUCCESS;
}

我们要感谢 Ivo Jimenez、Jeff LeFevre、Michael Sevilla 和 Noah Watkins 对此项目的贡献。

这项工作部分由国家科学基金会根据合作协议 OAC-1836650、美国能源部 ASCR DE-NA0003525 (FWP 20-023266) 和开源软件研究中心 (cross.ucsc.edu) 支持。

有关更多信息,请参阅这些论文和文章