跳转到目录

Apache Arrow 允许您高效地处理单文件和多文件数据集,即使数据集太大而无法加载到内存中。借助 Arrow Dataset 对象,您可以使用熟悉的 dplyr 语法分析此类数据。本文介绍了 Datasets 并向您展示了如何使用 dplyr 和 arrow 分析它们:我们将首先确保加载这两个包

library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

示例:纽约市出租车数据

Arrow 的 Datasets 对象的主要动机是允许用户分析超大型数据集。例如,考虑广泛用于大数据练习和竞赛的 纽约市出租车行程记录数据。为了展示 Apache Arrow 的功能,我们在公共 Amazon S3 存储桶中托管了此数据的 Parquet 格式版本:完整形式下,我们的数据版本是一个非常大的表,包含约 17 亿行和 24 列,其中每一行对应于 2009 年至 2022 年间的单次出租车行程。此版本纽约市出租车数据的数据字典 也可用。

这个多文件数据集由 158 个不同的 Parquet 文件组成,每个文件对应一个月的数据。单个文件的大小通常约为 400-500MB,整个数据集的大小约为 70GB。它不是一个小数据集——下载速度慢,并且在典型机器上无法放入内存 🙂——因此我们还托管了一个“微型”版本的纽约市出租车数据,其格式完全相同,但仅包含原始数据集中每千个条目中的一个(即,单个文件大小小于 1MB,“微型”数据集只有 70MB)

如果您在 arrow 中启用了 Amazon S3 支持(大多数用户都是如此;如果您需要对此进行故障排除,请参阅本文末尾的链接),则可以使用以下命令连接到存储在 S3 上的“微型出租车数据”副本

bucket <- s3_bucket("voltrondata-labs-datasets/nyc-taxi-tiny")

或者,您可以使用以下命令连接到 Google Cloud Storage (GCS) 上的数据副本

bucket <- gs_bucket("voltrondata-labs-datasets/nyc-taxi-tiny", anonymous = TRUE)

如果要使用完整数据集,请将上面代码中的 nyc-taxi-tiny 替换为 nyc-taxi。除了大小(以及随之而来的时间、带宽使用和 CPU 周期成本)之外,这两个版本的数据没有区别:您可以使用微型出租车数据测试您的代码,然后使用完整数据集检查其扩展方式。

要将存储在 bucket 中的数据集的本地副本复制到名为 "nyc-taxi" 的文件夹中,请使用 copy_files() 函数

copy_files(from = bucket, to = "nyc-taxi")

出于本文的目的,我们假设纽约市出租车数据集(完整版本或微型版本)已下载到本地并存在于 "nyc-taxi" 目录中。

打开数据集

该过程的第一步是创建一个指向数据目录的 Dataset 对象

ds <- open_dataset("nyc-taxi")

需要注意的是,当我们这样做时,数据值不会加载到内存中。相反,Arrow 扫描数据目录以查找相关文件,解析文件路径以查找“Hive 样式分区”(见下文),并读取数据文件的标头以构造包含描述数据结构元数据的 Schema。有关 Schemas 的更多信息,请参阅元数据文章

由此自然产生两个问题:open_dataset() 查找什么样的文件,以及它期望在文件路径中找到什么结构?让我们首先看一下文件类型。

默认情况下,open_dataset() 查找 Parquet 文件,但您可以使用 format 参数覆盖此设置。例如,如果数据编码为 CSV 文件,我们可以设置 format = "csv" 来连接数据。Arrow Dataset 接口支持多种文件格式,包括

  • "parquet"(默认)
  • "feather""ipc""arrow" 的别名;因为 Feather 版本 2 是 Arrow 文件格式)
  • "csv"(逗号分隔文件)和 "tsv"(制表符分隔文件)
  • "text"(通用文本分隔文件 - 使用 delimiter 参数指定要使用的分隔符)

对于文本文件,您可以将以下解析选项传递给 open_dataset() 以确保正确读取文件

  • delim(分隔符)
  • quote(引号)
  • escape_double(双重转义)
  • escape_backslash(反斜杠转义)
  • skip_empty_rows(跳过空行)

处理文本文件时的另一种方法是使用 open_delim_dataset()open_csv_dataset()open_tsv_dataset()。这些函数是 open_dataset() 的包装器,但其参数与 read_csv_arrow()read_delim_arrow()read_tsv_arrow() 镜像,以便在用于打开单个文件的函数和用于打开数据集的函数之间轻松切换。

例如

ds <- open_csv_dataset("nyc-taxi/csv/")

有关这些参数以及通常解析分隔文本文件的更多信息,请参阅 read_delim_arrow()open_delim_dataset() 的帮助文档。

接下来,open_dataset() 期望在文件路径中找到什么信息?默认情况下,Dataset 接口查找 Hive 样式的分区结构,其中文件夹使用“键=值”约定命名,文件夹中的数据文件包含键具有相关值的 数据子集。例如,在纽约市出租车数据中,文件路径如下所示

year=2009/month=1/part-0.parquet
year=2009/month=2/part-0.parquet
...

由此,open_dataset() 推断第一个列出的 Parquet 文件包含 2009 年 1 月的数据。从这个意义上说,hive 样式分区是自描述的:文件夹名称明确说明了 Dataset 如何在文件中拆分。

有时目录分区不是自描述的;也就是说,它不包含字段名称。例如,假设纽约市出租车数据使用如下文件路径

2009/01/part-0.parquet
2009/02/part-0.parquet
...

在这种情况下,open_dataset() 需要一些关于如何使用文件路径的提示。在这种情况下,您可以向 partitioning 参数提供 c("year", "month"),表示第一个路径段给出 year 的值,第二个段是 month2009/01/part-0.parquet 中的每一行都有一个 year 值为 2009 且 month 值为 1,即使这些列可能不存在于文件中。换句话说,我们将像这样打开数据

ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))

无论哪种方式,当您查看 Dataset 时,您会看到除了每个文件中存在的列之外,还有列 yearmonth。这些列本身不存在于文件中:它们是从分区结构推断出来的。

ds
## 
## FileSystemDataset with 158 Parquet files
## vendor_name: string
## pickup_datetime: timestamp[ms]
## dropoff_datetime: timestamp[ms]
## passenger_count: int64
## trip_distance: double
## pickup_longitude: double
## pickup_latitude: double
## rate_code: string
## store_and_fwd: string
## dropoff_longitude: double
## dropoff_latitude: double
## payment_type: string
## fare_amount: double
## extra: double
## mta_tax: double
## tip_amount: double
## tolls_amount: double
## total_amount: double
## improvement_surcharge: double
## congestion_surcharge: double
## pickup_location_id: int64
## dropoff_location_id: int64
## year: int32
## month: int32

查询数据集

现在我们有了一个引用我们数据的 Dataset 对象,我们可以构造 dplyr 样式的查询。这是可能的,因为 arrow 提供了一个后端,允许用户使用 dplyr 动词操作表格 Arrow 数据。这是一个例子:假设您对最长出租车行程中的小费行为感到好奇。让我们找出 2015 年车费超过 100 美元的行程的平均小费百分比,按乘客人数细分

system.time(ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = 100 * tip_amount / total_amount) %>%
  group_by(passenger_count) %>%
  summarise(
    median_tip_pct = median(tip_pct),
    n = n()
  ) %>%
  collect() %>%
  print())
## 
## # A tibble: 10 x 3
##    passenger_count median_tip_pct      n
##              <int>          <dbl>  <int>
##  1               1           16.6 143087
##  2               2           16.2  34418
##  3               5           16.7   5806
##  4               4           11.4   4771
##  5               6           16.7   3338
##  6               3           14.6   8922
##  7               0           10.1    380
##  8               8           16.7     32
##  9               9           16.7     42
## 10               7           16.7     11
## 
##    user  system elapsed
##   4.436   1.012   1.402

您刚刚从包含大约 20 亿行的数据集中选择了一个子集,计算了一个新列并对其进行了聚合。所有这些都在现代笔记本电脑上几秒钟内完成。这是如何工作的?

arrow 可以如此快速地完成这项任务有三个原因

首先,arrow 采用延迟评估方法来查询:当在 Dataset 上调用 dplyr 动词时,它们会记录其操作,但在您运行 collect() 之前不会对数据评估这些操作。我们可以通过采用与之前相同的代码并省略最后一步来看到这一点

ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = 100 * tip_amount / total_amount) %>%
  group_by(passenger_count) %>%
  summarise(
    median_tip_pct = median(tip_pct),
    n = n()
  )
## 
## FileSystemDataset (query)
## passenger_count: int64
## median_tip_pct: double
## n: int32
## 
## See $.data for the source Arrow object

此版本的代码会立即返回输出并显示您所做的操作,而无需从文件中加载数据。由于这些查询的评估被延迟,您可以构建一个查询,该查询选择到一个小子集,而不会生成可能很大的中间数据集。

其次,所有工作都被下推到单个数据文件,并且取决于文件格式,文件中的数据块。因此,您可以通过从每个文件中收集较小的切片来从更大的数据集中选择数据的子集:您不必将整个数据集加载到内存中即可从中切片。

第三,由于分区,您可以完全忽略某些文件。在此示例中,通过过滤 year == 2015,所有对应于其他年份的文件都会立即被排除:您不必加载它们即可发现没有行匹配过滤器。对于包含具有组内数据统计信息的行的 Parquet 文件,您可能可以避免扫描整个数据块,因为它们没有 total_amount > 100 的行。

关于查询数据集的最后一点说明。假设您尝试在 Arrow 数据集上调用不受支持的 dplyr 动词或未实现的函数。在这种情况下,arrow 包会引发错误。但是,对于 Arrow Table 对象(已在内存中)上的 dplyr 查询,该包会在处理该 dplyr 动词之前自动调用 collect()。要了解有关 dplyr 后端的更多信息,请参阅数据整理文章

批处理(实验性)

有时您想在整个 Dataset 上运行 R 代码,但该 Dataset 比内存大得多。您可以在 Dataset 查询上使用 map_batches 来逐批处理它。

**注意**:map_batches 是实验性的,不建议用于生产用途。

例如,要随机抽样 Dataset,请使用 map_batches 从每批中抽取一定百分比的行

sampled_data <- ds %>%
  filter(year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  map_batches(~ as_record_batch(sample_frac(as.data.frame(.), 1e-4))) %>%
  mutate(tip_pct = tip_amount / total_amount) %>%
  collect()

str(sampled_data)
## 
## tibble [10,918 <U+00D7> 4] (S3: tbl_df/tbl/data.frame)
##  $ tip_amount     : num [1:10918] 3 0 4 1 1 6 0 1.35 0 5.9 ...
##  $ total_amount   : num [1:10918] 18.8 13.3 20.3 15.8 13.3 ...
##  $ passenger_count: int [1:10918] 3 2 1 1 1 1 1 1 1 3 ...
##  $ tip_pct        : num [1:10918] 0.1596 0 0.197 0.0633 0.0752 ...

此函数还可用于通过计算每批的部分结果然后聚合这些部分结果来聚合 Dataset 上的汇总统计信息。扩展上面的示例,您可以将模型拟合到样本数据,然后使用 map_batches 计算完整 Dataset 上的 MSE。

model <- lm(tip_pct ~ total_amount + passenger_count, data = sampled_data)

ds %>%
  filter(year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = tip_amount / total_amount) %>%
  map_batches(function(batch) {
    batch %>%
      as.data.frame() %>%
      mutate(pred_tip_pct = predict(model, newdata = .)) %>%
      filter(!is.nan(tip_pct)) %>%
      summarize(sse_partial = sum((pred_tip_pct - tip_pct)^2), n_partial = n()) %>%
      as_record_batch()
  }) %>%
  summarize(mse = sum(sse_partial) / sum(n_partial)) %>%
  pull(mse)
## 
## [1] 0.1304284

数据集选项

您可以通过几种方式控制 Dataset 的创建以适应特殊用例。

使用目录中的文件

如果您正在处理单个文件或一组不在同一目录中的文件,您可以向 open_dataset() 提供一个文件路径或多个文件路径的向量。例如,如果您有一个太大而无法读入内存的 CSV 文件,这将非常有用。您可以将文件路径传递给 open_dataset(),使用 group_by() 将数据集划分为可管理的块,然后使用 write_dataset() 将每个块写入单独的 Parquet 文件——所有这些都不需要将完整的 CSV 文件读入 R。

显式声明列名和数据类型

您可以指定 open_dataset()schema 参数来声明列及其数据类型。如果您有具有不同存储模式的数据文件(例如,一列在其中一个文件中可能是 int32,而在另一个文件中是 int8),并且您希望确保生成的数据集具有特定类型,这将非常有用。

需要明确的是,即使在这个混合整数类型的示例中,也没有必要指定模式,因为数据集构造函数会协调这些差异。模式规范只是让您可以声明所需的结果。

显式声明分区格式

类似地,您可以在 open_dataset()partitioning 参数中提供一个模式,以声明定义分区的虚拟列的类型。在纽约出租车数据示例中,如果您希望将 month 保留为字符串而不是整数,这将非常有用。

使用多个数据源

数据集的另一个特性是它们可以由多个数据源组成。也就是说,您可能在一个位置有一个分区 Parquet 文件的目录,而在另一个目录中,文件尚未分区。或者,您可以指向 S3 存储桶中的 Parquet 数据和本地文件系统上的 CSV 目录,并将它们一起查询为单个数据集。要创建多源数据集,请向 open_dataset() 提供数据集列表而不是文件路径,或者使用 big_dataset <- c(ds1, ds2) 之类的命令将它们连接起来。

写入数据集

如您所见,通过以高效的二进制列式格式(如 Parquet 或 Feather)存储数据并基于常用作过滤的列进行分区,可以使查询大型数据集变得非常快。但是,数据并不总是以这种方式存储的。有时您可能从一个巨大的 CSV 文件开始。分析数据的第一步是清理并将其重塑为更易于使用的形式。

write_dataset() 函数允许您获取数据集或其他表格数据对象(Arrow 表格或 RecordBatch,或 R 数据框),并将其写入不同的文件格式,并分区到多个文件中。

假设您有一个 CSV 版本的纽约出租车数据

ds <- open_dataset("nyc-taxi/csv/", format = "csv")

您可以通过对其调用 write_dataset() 将其写入新位置并将文件转换为 Feather 格式

write_dataset(ds, "nyc-taxi/feather", format = "feather")

接下来,假设 payment_type 列是您经常过滤的列,因此您希望按该变量对数据进行分区。通过这样做,您可以确保像 payment_type == "Cash" 这样的过滤器只会触及 payment_type 始终为 "Cash" 的文件的子集。

表达您想要分区的列的一种自然方式是使用 group_by() 方法

ds %>%
  group_by(payment_type) %>%
  write_dataset("nyc-taxi/feather", format = "feather")

这会将文件写入如下所示的目录树

system("tree nyc-taxi/feather")
## feather
## ├── payment_type=1
## │   └── part-18.arrow
## ├── payment_type=2
## │   └── part-19.arrow
## ...
## └── payment_type=UNK
##     └── part-17.arrow
##
## 18 directories, 23 files

请注意,目录名称是 payment_type=Cash 等:这是上面描述的 Hive 样式分区。这意味着当您在此目录上调用 open_dataset() 时,您不必声明分区是什么,因为它们可以从文件路径中读取。(要改为为分区段写入裸值,即 Cash 而不是 payment_type=Cash,请使用 hive_style = FALSE 调用 write_dataset()。)

但是,也许 payment_type == "Cash" 是您唯一关心的数据,您只想删除其余数据并拥有一个较小的工作集。为此,您可以在写入时将它们 filter()

ds %>%
  filter(payment_type == "Cash") %>%
  write_dataset("nyc-taxi/feather", format = "feather")

写入数据集时,您可以做的另一件事是选择列的子集或重新排序它们。假设您从不关心 vendor_id,并且作为一个字符串列,当您读入它时,它会占用大量空间,所以让我们删除它

ds %>%
  group_by(payment_type) %>%
  select(-vendor_id) %>%
  write_dataset("nyc-taxi/feather", format = "feather")

请注意,虽然您可以选择列的子集,但当前无法在写入数据集时重命名列。

分区性能注意事项

对数据集进行分区有两个方面会影响性能:它会增加文件数量,并会在文件周围创建目录结构。这两者都有好处也有成本。根据配置和数据集的大小,成本可能超过收益。

由于分区将数据集拆分为多个文件,因此可以使用并行方式读取和写入分区数据集。但是,每个附加文件都会在文件系统交互处理中增加少量开销。它还会增加整体数据集大小,因为每个文件都有一些共享元数据。例如,每个 parquet 文件都包含模式和组级别统计信息。分区数是文件数的下限。如果按日期对一年数据的数据集进行分区,您将至少有 365 个文件。如果进一步按另一个维度进行分区,该维度具有 1,000 个唯一值,则您将最多拥有 365,000 个文件。这种精细的分区通常会导致小文件主要由元数据组成。

分区数据集会创建嵌套文件夹结构,这些结构允许我们修剪在扫描中加载的文件。但是,这会增加发现数据集中文件的开销,因为我们需要递归地“列出目录”才能找到数据文件。过于精细的分区可能会导致此处出现问题:按日期对一年数据的数据集进行分区将需要 365 次列表调用才能找到所有文件;添加另一个基数为 1,000 的列将使调用次数达到 365,365 次。

最佳分区布局将取决于您的数据、访问模式以及将读取数据的系统。大多数系统(包括 Arrow)都应适用于各种文件大小和分区布局,但您应避免出现极端情况。以下准则可以帮助避免一些已知的最悪情况

  • 避免小于 20MB 和大于 2GB 的文件。
  • 避免使用超过 10,000 个不同分区的布局。

对于文件中具有组概念的文件格式(例如 Parquet),适用类似的准则。行组可以在读取时提供并行性,并允许基于统计信息跳过数据,但非常小的组会导致元数据占据文件大小的很大一部分。在大多数情况下,Arrow 的文件编写器会为组大小提供合理的默认值。

事务 / ACID 保证

Dataset API 不提供事务支持或任何 ACID 保证。这会影响读取和写入。并发读取是可以的。并发写入或与读取同时进行的写入可能会产生意外行为。可以使用各种方法来避免对相同文件进行操作,例如为每个写入器使用唯一的基名模板、为新文件使用临时目录或单独存储文件列表而不是依赖目录发现。

在写入过程中意外终止进程可能会使系统处于不一致状态。写入调用通常在要写入的字节已完全传递到操作系统页面缓存后立即返回。即使写入操作已完成,如果在写入调用后立即发生电源损耗,则文件的一部分仍有可能丢失。

大多数文件格式都有写在末尾的魔术数字。这意味着可以安全地检测和丢弃部分文件写入。CSV 文件格式没有任何此类概念,部分写入的 CSV 文件可能会被检测为有效。

延伸阅读