跳至内容

处理存储在云存储系统(如 Amazon Simple Storage Service (S3) 和 Google Cloud Storage (GCS) 中的数据是一项非常常见的任务。为此,Arrow C++ 库提供了一套工具,旨在让操作云存储变得像操作本地文件系统一样简单。

为了实现这一点,Arrow C++ 库包含一个通用的文件系统接口,而 arrow 包将该接口公开给 R 用户。例如,如果您希望,可以创建一个 LocalFileSystem 对象,它允许您以通常的方式与本地文件系统交互:复制、移动和删除文件、获取有关文件和文件夹的信息,等等(有关详细信息,请参见 help("FileSystem", package = "arrow"))。通常情况下,您可能不需要此功能,因为您已经拥有处理本地文件系统的工具,但在远程文件系统的情况下,此接口变得更加有用。目前,S3FileSystem 类提供了针对 Amazon S3 的特定实现,GcsFileSystem 类提供了针对 Google Cloud Storage 的特定实现。

本文概述了使用 Arrow 工具包处理 S3 和 GCS 数据的方法。

Linux 上的 S3 和 GCS 支持

在开始之前,请确保您的 arrow 安装支持 S3 和/或 GCS。对于大多数用户来说,默认情况下将启用支持,因为 CRAN 上托管的 Windows 和 macOS 二进制软件包包含 S3 和 GCS 支持。您可以通过辅助函数检查支持是否已启用

如果这些函数返回 TRUE,则表示已启用相关支持。

在某些情况下,您可能会发现系统未启用支持。这种情况最常见于从源代码安装 arrow 在 Linux 上时发生。在这种情况下,S3 和 GCS 支持并非始终默认启用,并且涉及其他系统要求。有关如何解决此问题的详细信息,请参见 安装文章

连接到云存储

处理文件系统的一种方法是创建 ?FileSystem 对象。?S3FileSystem 对象可以通过 s3_bucket() 函数创建,该函数会自动检测存储桶的 AWS 区域。类似地,?GcsFileSystem 对象可以通过 gs_bucket() 函数创建。生成的 FileSystem 将将路径视为相对于存储桶的路径(因此,例如,在列出目录时,您不需要添加存储桶路径前缀)。

使用 FileSystem 对象,您可以使用 $path() 方法指向其中的特定文件,并将结果传递给文件读取器和写入器(read_parquet()write_feather() 等)。

在现实世界分析中,用户使用云存储的原因通常是为了访问大型数据集。数据集文章 中讨论了这种情况的一个例子,但新用户可能更喜欢使用更小的数据集来学习 arrow 云存储接口的工作原理。为此,本文中的示例依赖于一个多文件 Parquet 数据集,该数据集存储了通过 ggplot2 包提供的 diamonds 数据的副本,在 help("diamonds", package = "ggplot2") 中有记录。该数据集的云存储版本包含 5 个 Parquet 文件,总大小不到 1MB。

diamonds 数据集托管在 S3 和 GCS 上,存储在一个名为 voltrondata-labs-datasets 的存储桶中。要创建引用该存储桶的 S3FileSystem 对象,请使用以下命令

bucket <- s3_bucket("voltrondata-labs-datasets")

要对 GCS 版本的数据执行此操作,命令如下所示

bucket <- gs_bucket("voltrondata-labs-datasets", anonymous = TRUE)

请注意,如果未配置凭据,则 GCS 需要 anonymous = TRUE

在该存储桶中,有一个名为 diamonds 的文件夹。我们可以调用 bucket$ls("diamonds") 来列出存储在此文件夹中的文件,或者调用 bucket$ls("diamonds", recursive = TRUE) 来递归搜索子文件夹。请注意,在 GCS 上,您应始终设置 recursive = TRUE,因为目录通常不会出现在结果中。

以下是列出存储在 GCS 存储桶中的文件时得到的结果

bucket$ls("diamonds", recursive = TRUE)
## [1] "diamonds/cut=Fair/part-0.parquet"     
## [2] "diamonds/cut=Good/part-0.parquet"     
## [3] "diamonds/cut=Ideal/part-0.parquet"    
## [4] "diamonds/cut=Premium/part-0.parquet"  
## [5] "diamonds/cut=Very Good/part-0.parquet"

这里有 5 个 Parquet 文件,每个文件对应 diamonds 数据集中每个“切工”类别。我们可以通过调用 bucket$path() 来指定特定文件的路径

parquet_good <- bucket$path("diamonds/cut=Good/part-0.parquet")

我们可以使用 read_parquet() 直接从该路径读取到 R 中

diamonds_good <- read_parquet(parquet_good)
diamonds_good
## # A tibble: 4,906 × 9
##    carat color clarity depth table price     x     y     z
##    <dbl> <ord> <ord>   <dbl> <dbl> <int> <dbl> <dbl> <dbl>
##  1  0.23 E     VS1      56.9    65   327  4.05  4.07  2.31
##  2  0.31 J     SI2      63.3    58   335  4.34  4.35  2.75
##  3  0.3  J     SI1      64      55   339  4.25  4.28  2.73
##  4  0.3  J     SI1      63.4    54   351  4.23  4.29  2.7 
##  5  0.3  J     SI1      63.8    56   351  4.23  4.26  2.71
##  6  0.3  I     SI2      63.3    56   351  4.26  4.3   2.71
##  7  0.23 F     VS1      58.2    59   402  4.06  4.08  2.37
##  8  0.23 E     VS1      64.1    59   402  3.83  3.85  2.46
##  9  0.31 H     SI1      64      54   402  4.29  4.31  2.75
## 10  0.26 D     VS2      65.2    56   403  3.99  4.02  2.61
## # … with 4,896 more rows
## # ℹ Use `print(n = ...)` to see more rows

请注意,与本地文件相比,这种读取速度会更慢。

直接使用 URI 连接

在大多数使用情况下,在 arrow 中连接到云存储的最简单、最自然的方法是使用 s3_bucket()gs_bucket() 返回的 FileSystem 对象,特别是在需要执行多个文件操作时。但是,在某些情况下,您可能希望通过指定 URI 来直接下载文件。arrow 允许这样做,并且 read_parquet()write_feather()open_dataset() 等函数都将接受托管在 S3 或 GCS 上的云资源的 URI。S3 URI 的格式如下所示

s3://[access_key:secret_key@]bucket/path[?region=]

对于 GCS,URI 格式如下所示

gs://[access_key:secret_key@]bucket/path
gs://anonymous@bucket/path

例如,我们之前在本文中下载的存储“良好切工”钻石的 Parquet 文件在 S3 和 CGS 上均可用。相关的 URI 如下所示

uri <- "s3://voltrondata-labs-datasets/diamonds/cut=Good/part-0.parquet"
uri <- "gs://anonymous@voltrondata-labs-datasets/diamonds/cut=Good/part-0.parquet"

请注意,对于公共存储桶,GCS 需要“anonymous”。无论使用哪个版本,您都可以将该 URI 传递给 read_parquet(),就好像该文件存储在本地一样

df <- read_parquet(uri)

URI 接受查询参数(? 后的部分)中的其他选项,这些选项将向下传递以配置底层文件系统。它们由 & 分隔。例如,

s3://voltrondata-labs-datasets/?endpoint_override=https%3A%2F%2Fstorage.googleapis.com&allow_bucket_creation=true

等效于

bucket <- S3FileSystem$create(
  endpoint_override="https://storage.googleapis.com",
  allow_bucket_creation=TRUE
)
bucket$path("voltrondata-labs-datasets/")

两者都告诉 S3FileSystem 对象允许创建新的存储桶,并与 Google Storage 而不是 S3 通信。后者有效,因为 GCS 实现了与 S3 兼容的 API(请参见下面的 模拟 S3 的文件系统),但如果您希望获得更好的 GCS 支持,您应参考 GcsFileSystem,但使用以 gs:// 开头的 URI。

另请注意,URI 中的参数需要进行 百分比编码,这就是为什么 :// 写成 %3A%2F%2F 的原因。

对于 S3,仅以下选项可以作为查询参数包含在 URI 中:regionschemeendpoint_overrideaccess_keysecret_keyallow_bucket_creationallow_bucket_deletion。对于 GCS,支持的参数是 schemeendpoint_overrideretry_limit_seconds

在 GCS 中,一个有用的选项是 retry_limit_seconds,它设置了请求在返回错误之前可能花费在重试上的秒数。当前默认值为 15 分钟,因此在许多交互式环境中,设置一个较低的值会很有用

gs://anonymous@voltrondata-labs-datasets/diamonds/?retry_limit_seconds=10

身份验证

S3 身份验证

要访问私有的 S3 存储桶,您通常需要两个秘密参数:access_key(类似于用户 ID)和 secret_key(类似于令牌或密码)。传递这些凭据有几种方法

  • 将它们包含在 URI 中,例如 s3://access_key:secret_key@bucket-name/path/to/file。如果您的秘密包含“/”之类的特殊字符,请确保对它们进行 URL 编码(例如,URLencode("123/456", reserved = TRUE))。

  • 将它们作为 access_keysecret_key 传递给 S3FileSystem$create()s3_bucket()

  • 将它们分别设置为名为 AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY 的环境变量。

  • 根据 AWS 文档,在 ~/.aws/credentials 文件中定义它们。

  • 通过将 role_arn 标识符传递给 S3FileSystem$create()s3_bucket(),使用 访问角色 进行临时访问。

GCS 身份验证

使用 GCS 进行身份验证的最简单方法是运行 gcloud 命令来设置应用程序默认凭据

gcloud auth application-default login

要手动配置凭据,您可以传递 access_tokenexpiration(用于使用在其他地方生成的临时令牌),或者传递 json_credentials(用于引用已下载的凭据文件)。

如果您未配置凭据,则要访问公共存储桶,您必须在 URI 中传递 anonymous = TRUE 或将 anonymous 作为用户传递

bucket <- gs_bucket("voltrondata-labs-datasets", anonymous = TRUE)
fs <- GcsFileSystem$create(anonymous = TRUE)
df <- read_parquet("gs://anonymous@voltrondata-labs-datasets/diamonds/cut=Good/part-0.parquet")

使用代理服务器

如果您需要使用代理服务器来连接到 S3 存储桶,则可以将 http://user:password@host:port 形式的 URI 提供给 proxy_options。例如,可以使用以下方式使用在端口 1316 上运行的本地代理服务器

bucket <- s3_bucket(
  bucket = "voltrondata-labs-datasets", 
  proxy_options = "https://127.0.0.1:1316"
)

模拟 S3 的文件系统

S3FileSystem 机制使您能够处理任何提供与 S3 兼容接口的文件系统。例如,MinIO 是一款对象存储服务器,它模拟 S3 API。如果您要以默认设置在本地运行 minio server,则可以使用 arrow 通过 S3FileSystem 连接到它,如下所示

minio <- S3FileSystem$create(
  access_key = "minioadmin",
  secret_key = "minioadmin",
  scheme = "http",
  endpoint_override = "localhost:9000"
)

或者,作为 URI,它将是

s3://minioadmin:minioadmin@?scheme=http&endpoint_override=localhost%3A9000

(请注意 endpoint_override: 的 URL 编码)。

除了其他应用之外,这对于在远程 S3 存储桶上运行之前在本地测试代码非常有用。

禁用环境变量

如上所述,可以使用环境变量来配置访问。但是,如果您希望通过 URI 或其他方法传递连接详细信息,但同时也定义了现有的 AWS 环境变量,则这些变量可能会干扰您的会话。例如,您可能会看到类似以下的错误消息

Error: IOError: When resolving region for bucket 'analysis': AWS Error [code 99]: curlCode: 6, Couldn't resolve host name 

可以使用 Sys.unsetenv() 取消设置这些环境变量,例如

Sys.unsetenv("AWS_DEFAULT_REGION")
Sys.unsetenv("AWS_S3_ENDPOINT")

默认情况下,AWS SDK 会尝试检索有关用户配置的元数据,这在通过 URI 传递连接详细信息时(例如,访问 MINIO 存储桶时)会导致冲突。要禁用 AWS 环境变量的使用,可以将环境变量 AWS_EC2_METADATA_DISABLED 设置为 TRUE

Sys.setenv(AWS_EC2_METADATA_DISABLED = TRUE)

进一步阅读