Skyhook:使用 Apache Arrow 将计算引入存储
已发布 2022 年 1 月 31 日
作者 Jayjeet Chakraborty、Carlos Maltzahn、David Li、Tom Drabas
CPU、内存、存储和网络带宽每年都在提升,但它们改进的维度越来越不同。处理器速度更快,但其内存带宽却没有跟上;与此同时,云计算导致存储与应用程序在网络链路中分离。这种不同的演变意味着我们需要重新思考在何处以及何时执行计算,才能更好地利用我们可用的资源。
例如,当在 Ceph 或 Amazon S3 等存储系统上查询数据集时,过滤数据的所有工作都由客户端完成。数据必须通过网络传输,然后客户端必须花费宝贵的 CPU 周期对其进行解码,最终却因为过滤器而将其丢弃。虽然 Apache Parquet 等格式支持一些优化,但从根本上说,责任全在客户端。同时,即使存储系统拥有自己的计算能力,它也仅仅沦为提供“哑字节”。
感谢加州大学圣克鲁兹分校的开源软件研究中心 (CROSS),Apache Arrow 7.0.0 包含 Skyhook,这是一个 Arrow Datasets 扩展,它通过使用存储层来减少客户端资源利用率,从而解决了这个问题。我们将研究 Skyhook 周围的发展以及 Skyhook 的工作原理。
引入可编程存储
Skyhook 是可编程存储的一个例子:从存储系统中公开更高级别的功能供客户端构建。这使我们能够更好地利用此类系统中的现有资源(硬件和开发工作),减少每个客户端常见操作的实施负担,并使此类操作能够随着存储层进行扩展。
历史上,像 Apache Hadoop 这样的大数据系统试图将计算和存储放在一起以提高效率。最近,云计算和分布式计算为了灵活性和可扩展性而将计算和存储分离,但这会降低性能。可编程存储在这些目标之间取得了平衡,允许某些操作在数据旁边运行,同时在更高级别上保持数据和计算分离。
特别是,Skyhook 构建在 Ceph 之上,这是一个分布式存储系统,可扩展到 EB 级的数据,同时保持可靠性和灵活性。凭借其对象类 SDK,Ceph 通过允许定义具有自定义功能的新对象类型的扩展来实现可编程存储。
Skyhook 架构
让我们看看 Skyhook 如何应用这些想法。总体思路很简单:客户端应该能够要求 Ceph 执行基本操作,如解码文件、过滤数据和选择列。这样,工作就可以使用现有的存储集群资源来完成,这意味着它既靠近数据,又可以随着集群规模进行扩展。此外,这减少了通过网络传输的数据量,当然也减少了客户端的工作负载。
在存储系统方面,Skyhook 使用 Ceph 对象类 SDK 定义对以 Parquet 或 Feather 格式存储的数据的扫描操作。为了实现这些操作,Skyhook 首先在 Ceph 的对象存储层中实现文件系统填充程序,然后在该填充程序之上使用 Arrow Datasets 库的现有过滤和投影功能。
然后,Skyhook 在 Arrow Datasets 层中定义自定义“文件格式”。对此类文件的查询将使用这些新操作转换为对 Ceph 的直接请求,从而绕过传统的 POSIX 文件系统层。解码、过滤和投影后,Ceph 将 Arrow 记录批次直接发送到客户端,从而最大限度地减少编码/解码的 CPU 开销——这是 Arrow 实现的另一项优化。记录批次使用 Arrow 的压缩支持来进一步节省带宽。

Skyhook 扩展了 Ceph 和 Arrow Datasets,将查询下推到 Ceph,从而减少客户端工作负载和网络流量。(图片来自 “SkyhookDM 现在是 Apache Arrow 的一部分!”。)
Skyhook 还优化了 Parquet 文件的存储方式。Parquet 文件由一系列行组组成,每个行组都包含文件中的一部分行。存储此类文件时,Skyhook 会对其进行填充或拆分,以便将每个行组存储为其自己的 Ceph 对象。通过以这种方式对文件进行条带化或拆分,我们可以跨 Ceph 节点并行化子文件粒度的扫描,以进一步提高性能。
应用
在基准测试中,Skyhook 的存储端 CPU 开销最小,并且几乎消除了客户端 CPU 使用率。扩展存储集群会相应地减少查询延迟。对于使用 Arrow Datasets API 的系统(如 Dask),这意味着只需切换到 Skyhook 文件格式,我们就可以加速数据集扫描,减少需要传输的数据量,并释放 CPU 资源用于计算。

当然,Skyhook 背后的理念适用于 Apache Arrow 附近及其他系统。例如,像 Apache Iceberg 和 Delta Lake 这样的“湖仓”系统也建立在分布式存储系统之上,并且可以自然地受益于 Skyhook 来卸载计算。此外,与 Apache Arrow 无缝集成的基于内存的 SQL 查询引擎(如 DuckDB)可以通过卸载部分 SQL 查询来受益于 Skyhook。
总结和致谢
Arrow 7.0.0 中提供的 Skyhook 建立在对可编程存储系统的研究之上。通过将过滤器和投影推送到存储层,我们可以通过释放客户端上宝贵的 CPU 资源、减少通过网络发送的数据量以及更好地利用 Ceph 等系统的可扩展性来加速数据集扫描。要开始使用,只需启用 Skyhook 构建 Arrow,将 Skyhook 对象类扩展部署到 Ceph(请参阅公告帖子中的“用法”),然后使用 SkyhookFileFormat
构造 Arrow 数据集。这里显示了一个小的代码示例。
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// https://apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <arrow/compute/api.h>
#include <arrow/dataset/api.h>
#include <arrow/filesystem/api.h>
#include <arrow/table.h>
#include <skyhook/client/file_skyhook.h>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <string>
namespace cp = arrow::compute;
namespace ds = arrow::dataset;
namespace fs = arrow::fs;
// Demonstrate reading a dataset via Skyhook.
arrow::Status ScanDataset() {
// Configure SkyhookFileFormat to connect to our Ceph cluster.
std::string ceph_config_path = "/etc/ceph/ceph.conf";
std::string ceph_data_pool = "cephfs_data";
std::string ceph_user_name = "client.admin";
std::string ceph_cluster_name = "ceph";
std::string ceph_cls_name = "skyhook";
std::shared_ptr<skyhook::RadosConnCtx> rados_ctx =
std::make_shared<skyhook::RadosConnCtx>(ceph_config_path, ceph_data_pool,
ceph_user_name, ceph_cluster_name,
ceph_cls_name);
ARROW_ASSIGN_OR_RAISE(auto format,
skyhook::SkyhookFileFormat::Make(rados_ctx, "parquet"));
// Create the filesystem.
std::string root;
ARROW_ASSIGN_OR_RAISE(auto fs, fs::FileSystemFromUri("file:///mnt/cephfs/nyc", &root));
// Create our dataset.
fs::FileSelector selector;
selector.base_dir = root;
selector.recursive = true;
ds::FileSystemFactoryOptions options;
options.partitioning = std::make_shared<ds::HivePartitioning>(
arrow::schema({arrow::field("payment_type", arrow::int32()),
arrow::field("VendorID", arrow::int32())}));
ARROW_ASSIGN_OR_RAISE(auto factory,
ds::FileSystemDatasetFactory::Make(fs, std::move(selector),
std::move(format), options));
ds::InspectOptions inspect_options;
ds::FinishOptions finish_options;
ARROW_ASSIGN_OR_RAISE(auto schema, factory->Inspect(inspect_options));
ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish(finish_options));
// Scan the dataset.
auto filter = cp::greater(cp::field_ref("payment_type"), cp::literal(2));
ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
ARROW_RETURN_NOT_OK(scanner_builder->Filter(filter));
ARROW_RETURN_NOT_OK(scanner_builder->UseThreads(true));
ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());
std::cout << "Got " << table->num_rows() << " rows" << std::endl;
return arrow::Status::OK();
}
int main(int, char**) {
auto status = ScanDataset();
if (!status.ok()) {
std::cerr << status.message() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
我们要感谢 Ivo Jimenez、Jeff LeFevre、Michael Sevilla 和 Noah Watkins 对这个项目的贡献。
这项工作部分得到了国家科学基金会在合作协议 OAC-1836650、美国能源部 ASCR DE-NA0003525 (FWP 20-023266) 和开源软件研究中心 (cross.ucsc.edu) 的支持。
有关更多信息,请参阅以下论文和文章
- SkyhookDM:使用可编程存储在 Ceph 中进行数据处理。(USENIX ;login: 2020 年夏季刊,第 45 卷,第 2 期)
- SkyhookDM 现在是 Apache Arrow 的一部分!(Medium)
- 迈向 Arrow 原生存储系统。 (arXiv.org)