Skyhook:通过 Apache Arrow 将计算引入存储


已发布 2022 年 1 月 31 日
作者 Jayjeet Chakraborty, Carlos Maltzahn, David Li, Tom Drabas

CPU、内存、存储和网络带宽每年都在变得更好,但它们在不同的维度上改进得越来越多。处理器速度更快,但其内存带宽却没有跟上;与此同时,云计算导致存储通过网络链路与应用程序分离。这种不同的演变意味着我们需要重新思考在何处以及何时执行计算,才能最好地利用可用的资源。

例如,当查询 Ceph 或 Amazon S3 等存储系统上的数据集时,所有过滤数据的工作都由客户端完成。数据必须通过网络传输,然后客户端必须花费宝贵的 CPU 周期来解码它,最终由于过滤器而将其丢弃。虽然像 Apache Parquet 这样的格式可以实现一些优化,但从根本上讲,责任完全在于客户端。与此同时,即使存储系统有自己的计算能力,它也只能提供“哑字节”。

感谢加州大学圣克鲁兹分校的开源软件研究中心 (CROSS),Apache Arrow 7.0.0 包含了 Skyhook,这是一个 Arrow Datasets 扩展,它通过使用存储层来减少客户端资源利用率来解决这个问题。我们将研究围绕 Skyhook 的发展以及 Skyhook 的工作方式。

介绍可编程存储

Skyhook 是可编程存储的一个例子:从存储系统公开更高级别的功能,供客户端构建。这使我们能够更好地利用现有资源(包括硬件和开发工作),减少每个客户端的常见操作的实施负担,并使这些操作能够随存储层扩展。

从历史上看,像 Apache Hadoop 这样的大数据系统一直试图将计算和存储放在一起以提高效率。最近,云计算和分布式计算已经分离了计算和存储,以实现灵活性和可伸缩性,但这需要付出性能代价。可编程存储在这些目标之间取得了平衡,允许某些操作在数据旁边运行,同时仍然在更高级别上保持数据和计算的分离。

特别是,Skyhook 构建于 Ceph 之上,Ceph 是一个分布式存储系统,可扩展到 EB 级数据,同时具有可靠性和灵活性。借助其对象类 SDK,Ceph 通过允许定义具有自定义功能的新对象类型的扩展来实现可编程存储。

Skyhook 架构

让我们看看 Skyhook 如何应用这些想法。总的来说,这个想法很简单:客户端应该能够要求 Ceph 执行基本操作,如解码文件、过滤数据和选择列。这样,这项工作就可以利用现有的存储集群资源来完成,这意味着它既与数据相邻,又可以随着集群规模进行扩展。此外,这减少了通过网络传输的数据,当然也减少了客户端的工作负载。

在存储系统端,Skyhook 使用 Ceph 对象类 SDK 来定义对以 Parquet 或 Feather 格式存储的数据的扫描操作。为了实现这些操作,Skyhook 首先在 Ceph 的对象存储层中实现一个文件系统 shim,然后在该 shim 之上使用 Arrow Datasets 库的现有过滤和投影功能。

然后,Skyhook 在 Arrow Datasets 层中定义一个自定义“文件格式”。针对此类文件的查询会转换为使用这些新操作直接向 Ceph 发出的请求,从而绕过传统的 POSIX 文件系统层。经过解码、过滤和投影后,Ceph 将 Arrow 记录批处理直接发送到客户端,最大限度地减少了编码/解码的 CPU 开销——Arrow 实现了另一个优化。记录批处理使用 Arrow 的压缩支持来进一步节省带宽。

Skyhook Architecture

Skyhook 扩展了 Ceph 和 Arrow Datasets,将查询下推到 Ceph,从而减少客户端工作负载和网络流量。(图表来自 “SkyhookDM is now a part of Apache Arrow!”。)

Skyhook 还优化了 Parquet 文件的存储方式。 Parquet 文件由一系列行组组成,每个行组包含文件中的一部分行。在存储此类文件时,Skyhook 会填充或拆分它们,以便每个行组都存储为其自己的 Ceph 对象。通过以这种方式条带化或拆分文件,我们可以在 Ceph 节点上以子文件粒度并行扫描,以进一步提高性能。

应用

在基准测试中,Skyhook 具有最小的存储端 CPU 开销,并且几乎消除了客户端 CPU 使用率。扩展存储集群会相应地降低查询延迟。对于像 Dask 这样使用 Arrow Datasets API 的系统,这意味着只需切换到 Skyhook 文件格式,我们就可以加速数据集扫描,减少需要传输的数据量,并释放 CPU 资源用于计算。

In benchmarks, Skyhook reduces client CPU usage while minimally impacting storage cluster CPU usage.
Skyhook 释放了客户端 CPU 来执行有用的工作,同时对存储机器完成的工作影响最小。客户端仍然在解压缩 Skyhook 发送的 LZ4 压缩记录批处理中完成一些工作。(请注意,存储集群图是累积的。)

当然,Skyhook 背后的思想适用于 Apache Arrow 之外的其他系统。例如,像 Apache Iceberg 和 Delta Lake 这样的“湖仓一体”系统也构建在分布式存储系统之上,并且可以自然地受益于 Skyhook 来卸载计算。此外,基于内存的 SQL 查询引擎(如 DuckDB)与 Apache Arrow 无缝集成,可以通过卸载 SQL 查询的部分来受益于 Skyhook。

总结和致谢

Skyhook 在 Arrow 7.0.0 中可用,它建立在对可编程存储系统的研究之上。通过将过滤器和投影推送到存储层,我们可以通过释放客户端上宝贵的 CPU 资源、减少通过网络发送的数据量以及更好地利用 Ceph 等系统的可伸缩性来加速数据集扫描。要开始使用,只需构建 Arrow 并启用 Skyhook,将 Skyhook 对象类扩展部署到 Ceph(请参阅 公告帖子中的“用法”),然后使用 SkyhookFileFormat 来构建 Arrow 数据集。 这里显示了一个小代码示例。

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// https://apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <arrow/compute/api.h>
#include <arrow/dataset/api.h>
#include <arrow/filesystem/api.h>
#include <arrow/table.h>
#include <skyhook/client/file_skyhook.h>

#include <cstdlib>
#include <iostream>
#include <memory>
#include <string>

namespace cp = arrow::compute;
namespace ds = arrow::dataset;
namespace fs = arrow::fs;

// Demonstrate reading a dataset via Skyhook.
arrow::Status ScanDataset() {
  // Configure SkyhookFileFormat to connect to our Ceph cluster.
  std::string ceph_config_path = "/etc/ceph/ceph.conf";
  std::string ceph_data_pool = "cephfs_data";
  std::string ceph_user_name = "client.admin";
  std::string ceph_cluster_name = "ceph";
  std::string ceph_cls_name = "skyhook";
  std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
      std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
                                              ceph_user_name, ceph_cluster_name,
                                              ceph_cls_name);
  ARROW_ASSIGN_OR_RAISE(auto format,
                        skyhook::SkyhookFileFormat::Make(rados_ctx, "parquet"));

  // Create the filesystem.
  std::string root;
  ARROW_ASSIGN_OR_RAISE(auto fs, fs::FileSystemFromUri("file:///mnt/cephfs/nyc", &root));

  // Create our dataset.
  fs::FileSelector selector;
  selector.base_dir = root;
  selector.recursive = true;

  ds::FileSystemFactoryOptions options;
  options.partitioning = std::make_shared<ds::HivePartitioning>(
      arrow::schema({arrow::field("payment_type", arrow::int32()),
                     arrow::field("VendorID", arrow::int32())}));
  ARROW_ASSIGN_OR_RAISE(auto factory,
                        ds::FileSystemDatasetFactory::Make(fs, std::move(selector),
                                                           std::move(format), options));

  ds::InspectOptions inspect_options;
  ds::FinishOptions finish_options;
  ARROW_ASSIGN_OR_RAISE(auto schema, factory->Inspect(inspect_options));
  ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish(finish_options));

  // Scan the dataset.
  auto filter = cp::greater(cp::field_ref("payment_type"), cp::literal(2));
  ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
  ARROW_RETURN_NOT_OK(scanner_builder->Filter(filter));
  ARROW_RETURN_NOT_OK(scanner_builder->UseThreads(true));
  ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());

  ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());

  std::cout << "Got " << table->num_rows() << " rows" << std::endl;
  return arrow::Status::OK();
}

int main(int, char**) {
  auto status = ScanDataset();
  if (!status.ok()) {
    std::cerr << status.message() << std::endl;
    return EXIT_FAILURE;
  }
  return EXIT_SUCCESS;
}

我们要感谢 Ivo Jimenez、Jeff LeFevre、Michael Sevilla 和 Noah Watkins 对该项目的贡献。

这项工作部分由国家科学基金会根据合作协议 OAC-1836650、美国能源部 ASCR DE-NA0003525 (FWP 20-023266) 以及开源软件研究中心 (cross.ucsc.edu) 资助。

有关更多信息,请参阅以下论文和文章