跳过内容

Apache Arrow 让您可以高效地处理单个或多个文件组成的数据集,即使数据集大到无法完全加载到内存中。借助 Arrow Dataset 对象,您可以使用熟悉的 dplyr 语法来分析此类数据。本文将介绍数据集(Datasets),并展示如何使用 dplyr 和 arrow 对其进行分析:我们首先确保两个包都已加载。

library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

示例:纽约市出租车数据

Arrow 数据集对象的主要动机是允许用户分析极大的数据集。举个例子,考虑在大数据练习和竞赛中广泛使用的纽约市出租车行程记录数据。为了演示 Apache Arrow 的功能,我们在公共 Amazon S3 存储桶中托管了此数据的 Parquet 格式版本:在完整形式下,我们的数据集版本是一个包含约 17 亿行和 24 列的超大表格,其中每一行对应 2009 年至 2022 年间某次出租车行程。此版本纽约市出租车数据的数据字典也可供参考。

这个多文件数据集由 158 个独立的 Parquet 文件组成,每个文件对应一个月的数据。单个文件的大小通常在 400-500MB 左右,整个数据集大小约为 70GB。这不算是一个小数据集——下载速度慢,且无法放入普通机器的内存中 🙂——因此我们还托管了一个“微型”版纽约市出租车数据,其格式完全相同,但仅包含原始数据集中每千分之一的条目(即每个文件大小小于 1MB,整个“微型”数据集仅 70MB)。

如果您在 arrow 中启用了 Amazon S3 支持(大多数用户默认启用;如果需要排查此问题,请参阅本文末尾的链接),您可以使用此命令连接到存储在 S3 上的“微型出租车数据”副本:

bucket <- s3_bucket("arrow-datasets/nyc-taxi-tiny")

或者,您可以使用以下命令连接到 Google Cloud Storage (GCS) 上的数据副本:

bucket <- gs_bucket("arrow-datasets/nyc-taxi-tiny", anonymous = TRUE)

如果您想使用完整的数据集,请将上述代码中的 nyc-taxi-tiny 替换为 nyc-taxi。除了大小——以及随之而来的时间、带宽使用和 CPU 周期的成本——这两个版本的数据没有任何区别:您可以先使用微型出租车数据测试代码,然后再检查它在完整数据集上的扩展表现。

要将存储在 bucket 中的数据集本地副本下载到名为 "nyc-taxi" 的文件夹中,请使用 copy_files() 函数:

copy_files(from = bucket, to = "nyc-taxi")

在本文中,我们假设纽约市出租车数据集(无论是完整数据还是微型版本)已下载到本地,并存在于 "nyc-taxi" 目录中。

打开数据集

该过程的第一步是创建一个指向数据目录的 Dataset 对象:

ds <- open_dataset("nyc-taxi")

需要注意的是,当我们这样做时,数据值并不会被加载到内存中。相反,Arrow 会扫描数据目录以查找相关文件,解析文件路径以寻找“Hive 风格分区”(见下文),并读取数据文件的头部以构建一个包含描述数据结构的元数据的 Schema。有关 Schema 的更多信息,请参阅元数据文章

由此自然会引出两个问题:open_dataset() 查找哪种类型的文件,以及它期望在文件路径中找到什么结构?让我们先看看文件类型。

默认情况下,open_dataset() 会查找 Parquet 文件,但您可以使用 format 参数覆盖此设置。例如,如果数据被编码为 CSV 文件,我们可以设置 format = "csv" 来连接数据。Arrow Dataset 接口支持多种文件格式,包括:

  • "parquet"(默认值)
  • "feather""ipc"(均为 "arrow" 的别名;因为 Feather 第 2 版即为 Arrow 文件格式)
  • "csv"(逗号分隔文件)和 "tsv"(制表符分隔文件)
  • "text"(通用文本分隔文件 - 使用 delimiter 参数指定具体分隔符)

对于文本文件,您可以将以下解析选项传递给 open_dataset(),以确保文件被正确读取:

  • delim
  • quote
  • escape_double
  • escape_backslash
  • skip_empty_rows

处理文本文件的另一种方法是使用 open_delim_dataset()open_csv_dataset()open_tsv_dataset()。这些函数是 open_dataset() 的封装,但其参数与 read_csv_arrow()read_delim_arrow()read_tsv_arrow() 对应,以便于在处理单个文件的函数和处理数据集的函数之间轻松切换。

例如:

ds <- open_csv_dataset("nyc-taxi/csv/")

有关这些参数以及一般性解析分隔文本文件的更多信息,请参阅 read_delim_arrow()open_delim_dataset() 的帮助文档。

接下来,open_dataset() 期望在文件路径中找到什么信息?默认情况下,Dataset 接口会寻找 Hive 风格的分区结构,其中文件夹使用“键=值”约定命名,文件夹中的数据文件包含该键具有相关值的数据子集。例如,在纽约市出租车数据中,文件路径如下所示:

year=2009/month=1/part-0.parquet
year=2009/month=2/part-0.parquet
...

由此,open_dataset() 推断列出的第一个 Parquet 文件包含 2009 年 1 月的数据。从这个意义上说,Hive 风格的分区是自描述的:文件夹名称明确说明了数据集是如何跨文件拆分的。

有时目录分区不是自描述的,也就是说,它不包含字段名称。例如,假设纽约市出租车数据使用如下文件路径:

2009/01/part-0.parquet
2009/02/part-0.parquet
...

在这种情况下,open_dataset() 需要一些关于如何使用文件路径的提示。此例中,您可以将 c("year", "month") 提供给 partitioning 参数,表示路径的第一个分段提供 year 的值,第二个分段是 month2009/01/part-0.parquet 中的每一行在 year 列的值都为 2009,在 month 列的值都为 1,即使这些列在文件中本身并不存在。换句话说,我们将这样打开数据:

ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))

无论哪种方式,当您查看数据集时,可以看到除了每个文件中都存在的列之外,还有 yearmonth 列。这些列在文件本身中并不存在:它们是从分区结构中推断出来的。

ds
## 
## FileSystemDataset with 158 Parquet files
## vendor_name: string
## pickup_datetime: timestamp[ms]
## dropoff_datetime: timestamp[ms]
## passenger_count: int64
## trip_distance: double
## pickup_longitude: double
## pickup_latitude: double
## rate_code: string
## store_and_fwd: string
## dropoff_longitude: double
## dropoff_latitude: double
## payment_type: string
## fare_amount: double
## extra: double
## mta_tax: double
## tip_amount: double
## tolls_amount: double
## total_amount: double
## improvement_surcharge: double
## congestion_surcharge: double
## pickup_location_id: int64
## dropoff_location_id: int64
## year: int32
## month: int32

查询数据集

现在我们有了指向数据的 Dataset 对象,可以构建 dplyr 风格的查询。这是可能的,因为 arrow 提供了一个后端,允许用户使用 dplyr 动词来操作表格化的 Arrow 数据。举个例子:假设您对最长出租车行程中的小费行为感到好奇。让我们找出 2015 年车费超过 100 美元的行程的中位数小费百分比,并按乘客人数进行细分:

system.time(ds |>
  filter(total_amount > 100, year == 2015) |>
  select(tip_amount, total_amount, passenger_count) |>
  mutate(tip_pct = 100 * tip_amount / total_amount) |>
  group_by(passenger_count) |>
  summarise(
    median_tip_pct = median(tip_pct),
    n = n()
  ) |>
  collect() |>
  print())
## 
## # A tibble: 10 x 3
##    passenger_count median_tip_pct      n
##              <int>          <dbl>  <int>
##  1               1           16.6 143087
##  2               2           16.2  34418
##  3               5           16.7   5806
##  4               4           11.4   4771
##  5               6           16.7   3338
##  6               3           14.6   8922
##  7               0           10.1    380
##  8               8           16.7     32
##  9               9           16.7     42
## 10               7           16.7     11
## 
##    user  system elapsed
##   4.436   1.012   1.402

您刚刚从一个包含约 20 亿行的数据集中选择了一个子集,计算了一个新列,并对其进行了汇总。所有这些在现代笔记本电脑上仅需几秒钟即可完成。这是如何实现的?

arrow 之所以能如此快速地完成此任务,有三个原因:

首先,arrow 对查询采用惰性计算(lazy evaluation)方法:当对 Dataset 调用 dplyr 动词时,它们会记录操作,但在您运行 collect() 之前不会对数据进行评估。我们可以通过使用与之前相同的代码但不执行最后一步来看到这一点:

ds |>
  filter(total_amount > 100, year == 2015) |>
  select(tip_amount, total_amount, passenger_count) |>
  mutate(tip_pct = 100 * tip_amount / total_amount) |>
  group_by(passenger_count) |>
  summarise(
    median_tip_pct = median(tip_pct),
    n = n()
  )
## 
## FileSystemDataset (query)
## passenger_count: int64
## median_tip_pct: double
## n: int32
## 
## See $.data for the source Arrow object

此版本的代码会立即返回输出并显示您所做的操作,而无需从文件中加载数据。因为这些查询的评估是延迟的,您可以构建一个选择到较小子集的查询,而无需生成可能很大的中间数据集。

其次,所有工作都被下推(push down)到各个数据文件,并根据文件格式下推到文件内的数据块。因此,您可以通过从每个文件中收集较小的数据片,从更大的数据集中选择数据子集:您不必为了进行切片而将整个数据集加载到内存中。

第三,由于分区,您可以完全忽略某些文件。在此示例中,通过过滤 year == 2015,所有对应于其他年份的文件会被立即排除:您不必为了确定没有行匹配过滤器而加载它们。对于包含行组(row groups)及该组内数据统计信息的 Parquet 文件,可能有整块数据可以避免扫描,因为它们中没有 total_amount > 100 的行。

关于查询数据集,最后要注意的一点是:如果您尝试在 Arrow 数据集上的查询中调用不受支持的 dplyr 动词或未实现的功能,arrow 包会引发错误。然而,对于 Arrow Table 对象(已经在内存中)的 dplyr 查询,该包会在处理该 dplyr 动词之前自动调用 collect()。要了解有关 dplyr 后端的更多信息,请参阅数据整理文章

批处理(实验性)

有时您想对整个数据集运行 R 代码,但该数据集远大于内存。您可以使用 Dataset 查询上的 map_batches 来逐批处理它。

注意map_batches 是实验性的,不建议在生产环境中使用。

作为一个例子,要随机采样一个数据集,使用 map_batches 对每一批中的一部分行进行采样:

sampled_data <- ds |>
  filter(year == 2015) |>
  select(tip_amount, total_amount, passenger_count) |>
  map_batches(~ as_record_batch(sample_frac(as.data.frame(.), 1e-4))) |>
  mutate(tip_pct = tip_amount / total_amount) |>
  collect()

str(sampled_data)
## 
## tibble [10,918 <U+00D7> 4] (S3: tbl_df/tbl/data.frame)
##  $ tip_amount     : num [1:10918] 3 0 4 1 1 6 0 1.35 0 5.9 ...
##  $ total_amount   : num [1:10918] 18.8 13.3 20.3 15.8 13.3 ...
##  $ passenger_count: int [1:10918] 3 2 1 1 1 1 1 1 1 3 ...
##  $ tip_pct        : num [1:10918] 0.1596 0 0.197 0.0633 0.0752 ...

此函数还可用于通过计算每一批的部分结果,然后聚合这些部分结果来对数据集进行汇总统计。扩展上面的示例,您可以将模型拟合到样本数据,然后使用 map_batches 计算完整数据集上的 MSE。

model <- lm(tip_pct ~ total_amount + passenger_count, data = sampled_data)

ds |>
  filter(year == 2015) |>
  select(tip_amount, total_amount, passenger_count) |>
  mutate(tip_pct = tip_amount / total_amount) |>
  map_batches(function(batch) {
    batch |>
      as.data.frame() |>
      mutate(pred_tip_pct = predict(model, newdata = .)) |>
      filter(!is.nan(tip_pct)) |>
      summarize(sse_partial = sum((pred_tip_pct - tip_pct)^2), n_partial = n()) |>
      as_record_batch()
  }) |>
  summarize(mse = sum(sse_partial) / sum(n_partial)) |>
  pull(mse)
## 
## [1] 0.1304284

数据集选项

您可以通过几种方式控制数据集的创建,以适应特殊用例。

处理目录中的文件

如果您正在处理单个文件或一组不在同一目录下的文件,可以将文件路径或多个文件路径的向量提供给 open_dataset()。例如,如果您有一个大到无法加载到内存中的单个 CSV 文件,这非常有用。您可以将该文件路径传递给 open_dataset(),使用 group_by() 将数据集分区为可管理的块,然后使用 write_dataset() 将每个块写入单独的 Parquet 文件——所有这些都无需将完整的 CSV 文件读取到 R 中。

显式声明列名和数据类型

您可以指定 open_dataset()schema 参数来声明列及其数据类型。如果您的数据文件具有不同的存储模式(例如,一列在一个文件中可能是 int32,在另一个文件中是 int8),并且您希望确保生成的数据集具有特定类型,那么这很有用。

需要明确的是,即使在这个整数类型混合的示例中,也不一定要指定模式,因为数据集构造函数会协调这些差异。模式规范只是让您声明希望结果是什么样子。

显式声明分区格式

同样,您可以在 open_dataset()partitioning 参数中提供 Schema,以声明定义分区的虚拟列的类型。在纽约市出租车数据示例中,如果您想将 month 保留为字符串而不是整数,这将非常有用。

处理多个数据源

数据集的另一个特性是它们可以由多个数据源组成。也就是说,您可能在一个位置有一个分区的 Parquet 文件目录,在另一个目录中有未分区的文件。或者,您可以指向 S3 存储桶中的 Parquet 数据和本地文件系统上的 CSV 目录,并将它们作为一个数据集一起查询。要创建多源数据集,请将数据集列表提供给 open_dataset() 而不是文件路径,或者使用 big_dataset <- c(ds1, ds2) 之类的命令连接它们。

写入数据集

如您所见,通过使用 Parquet 或 Feather 等高效的二进制列式格式存储,并基于常用过滤列进行分区,查询大型数据集可以变得非常快。然而,数据并不总是以这种方式存储。有时您可能从一个巨大的 CSV 开始。分析数据的第一步是清理它并将其重塑为更可用的形式。

write_dataset() 函数允许您获取一个数据集或其他表格数据对象——Arrow Table 或 RecordBatch,或 R 数据框——并将其写入不同的文件格式,并分区为多个文件。

假设您有 CSV 格式的纽约市出租车数据:

ds <- open_dataset("nyc-taxi/csv/", format = "csv")

您可以通过在上面调用 write_dataset() 将其写入新位置并将文件转换为 Feather 格式:

write_dataset(ds, "nyc-taxi/feather", format = "feather")

接下来,让我们假设 payment_type 列是您经常过滤的字段,因此您希望按该变量对数据进行分区。通过这样做,您可以确保像 payment_type == "Cash" 这样的过滤操作仅触及 payment_type 始终为 "Cash" 的文件子集。

表达您想要分区的列的一种自然方法是使用 group_by() 方法:

ds |>
  group_by(payment_type) |>
  write_dataset("nyc-taxi/feather", format = "feather")

这将把文件写入如下目录树:

system("tree nyc-taxi/feather")
## feather
## ├── payment_type=1
## │   └── part-18.arrow
## ├── payment_type=2
## │   └── part-19.arrow
## ...
## └── payment_type=UNK
##     └── part-17.arrow
##
## 18 directories, 23 files

请注意,目录名称为 payment_type=Cash 等:这就是上述的 Hive 风格分区。这意味着当您在此目录上调用 open_dataset() 时,无需声明分区是什么,因为它们可以从文件路径中读取。(如果想要为分区段写入原始值,即 Cash 而不是 payment_type=Cash,请在调用 write_dataset() 时设置 hive_style = FALSE。)

也许 payment_type == "Cash" 是您唯一关心的数据,您只想丢弃其余部分并获得一个较小的工作集。为此,您可以在写入时使用 filter() 将其过滤掉:

ds |>
  filter(payment_type == "Cash") |>
  write_dataset("nyc-taxi/feather", format = "feather")

写入数据集时可以做的另一件事是选择列的子集或对其重新排序。假设您从不关心 vendor_id,并且作为字符串列,它在读取时会占用大量空间,所以让我们删除它:

ds |>
  group_by(payment_type) |>
  select(-vendor_id) |>
  write_dataset("nyc-taxi/feather", format = "feather")

请注意,虽然您可以选择列的子集,但目前在写入数据集时无法重命名列。

分区性能考虑因素

分区数据集有两个影响性能的方面:它增加了文件数量,并围绕文件创建了目录结构。两者都有好处也有成本。根据配置和数据集的大小,成本可能会超过收益。

因为分区将数据集拆分为多个文件,所以分区数据集可以并行读取和写入。但是,每个额外的文件都会在处理文件系统交互时增加一点开销。它还会增加整个数据集的大小,因为每个文件都有一些共享的元数据。例如,每个 parquet 文件都包含模式和组级统计信息。分区的数量是文件数量的下限。如果您按年份按日期对数据集进行分区,您将至少有 365 个文件。如果您进一步按另一个具有 1,000 个唯一值的维度进行分区,您将拥有多达 365,000 个文件。这种细粒度的分区通常会导致大量主要由元数据组成的小文件。

分区数据集创建嵌套的文件夹结构,这允许我们修剪扫描中加载的文件。然而,这增加了发现数据集中的文件的开销,因为我们需要递归地“列出目录”以查找数据文件。过细的分区会导致问题:按日期为一年的数据对数据集进行分区需要 365 次列表调用才能找到所有文件;再添加一列基数为 1,000 的列将使其变为 365,365 次调用。

最优化分区布局将取决于您的数据、访问模式以及将要读取数据的系统。大多数系统,包括 Arrow,应该都能在各种文件大小和分区布局下工作,但您应该避免一些极端情况。以下准则可以帮助避免一些已知的最坏情况:

  • 避免文件小于 20MB 和大于 2GB。
  • 避免分区布局中包含超过 10,000 个不同分区。

对于文件中包含组概念的文件格式,例如 Parquet,也适用类似的准则。行组可以在读取时提供并行性,并允许根据统计数据跳过数据,但非常小的组可能会导致元数据在文件大小中占据很大一部分。Arrow 的文件写入器在大多数情况下都为组大小提供了合理的默认值。

事务/ACID 保证

Dataset API 不提供事务支持或任何 ACID 保证。这会影响读取和写入。并发读取没问题。并发写入或与读取并行的写入可能会产生意外行为。可以使用各种方法来避免对相同文件进行操作,例如为每个写入器使用唯一的基本名称模板、为新文件使用临时目录,或单独存储文件列表,而不是依赖于目录发现。

在写入过程中意外终止进程可能导致系统处于不一致状态。写入调用通常在要写入的字节完全交付给操作系统页面缓存后立即返回。即使写入操作已完成,如果在写入调用后立即发生突然断电,文件的一部分仍可能丢失。

大多数文件格式都有在末尾写入的“魔数”。这意味着可以安全地检测和丢弃部分文件写入。CSV 文件格式没有这样的概念,部分写入的 CSV 文件可能会被检测为有效文件。

进一步阅读