跳至内容

即使数据集太大而无法加载到内存中,Apache Arrow 也能让您有效地处理单个和多个文件的数据集。借助 Arrow Dataset 对象,您可以使用熟悉的 dplyr 语法分析此类数据。本文介绍数据集并展示如何使用 dplyr 和 arrow 分析它们:我们将首先确保加载了这两个软件包

library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

示例:纽约市出租车数据

Arrow 的 Datasets 对象的主要动机是允许用户分析极其庞大的数据集。例如,考虑 纽约市出租车行程记录数据,该数据广泛用于大数据练习和竞赛。为了演示 Apache Arrow 的功能,我们在公共 Amazon S3 存储桶中托管了 Parquet 格式的此数据版本:完整形式下,我们的数据集版本是一个非常大的表,包含大约 17 亿行和 24 列,其中每一行对应于 2009 年到 2022 年之间的单次出租车行程。还提供了此版本的纽约市出租车数据的 数据字典

这个多文件数据集由 158 个不同的 Parquet 文件组成,每个文件对应于一个月的数据。单个文件通常约为 400-500MB 大小,整个数据集约为 70GB 大小。它不是一个小数据集 – 下载速度很慢,并且无法容纳在典型机器的内存中 🙂 – 因此我们还托管了一个“微型”版本的纽约市出租车数据,其格式完全相同,但仅包含原始数据集中每千个条目中的一个(即,单个文件小于 1MB 大小,“微型”数据集仅为 70MB)

如果在 arrow 中启用了 Amazon S3 支持(对于大多数用户而言为 true;如果您需要对此进行故障排除,请参阅本文末尾的链接),则可以使用此命令连接到存储在 S3 上的“微型出租车数据”的副本

bucket <- s3_bucket("voltrondata-labs-datasets/nyc-taxi-tiny")

或者,您可以使用以下命令连接到 Google Cloud Storage (GCS) 上的数据副本

bucket <- gs_bucket("voltrondata-labs-datasets/nyc-taxi-tiny", anonymous = TRUE)

如果要使用完整的数据集,请在上面的代码中将 nyc-taxi-tiny 替换为 nyc-taxi。除了大小(以及由此产生的在时间、带宽使用和 CPU 周期方面的成本)之外,这两个版本的数据没有任何区别:您可以使用微型出租车数据测试您的代码,然后检查它如何使用完整的数据集进行缩放。

要将存储在 bucket 中的数据集的本地副本复制到名为 "nyc-taxi" 的文件夹中,请使用 copy_files() 函数

copy_files(from = bucket, to = "nyc-taxi")

出于本文的目的,我们假设纽约市出租车数据集(完整数据或微型版本)已在本地下载并存在于 "nyc-taxi" 目录中。

打开数据集

此过程的第一步是创建一个指向数据目录的 Dataset 对象

ds <- open_dataset("nyc-taxi")

重要的是要注意,当我们这样做时,数据值不会加载到内存中。相反,Arrow 会扫描数据目录以查找相关文件,解析文件路径以查找“Hive 样式分区”(请参见下文),并读取数据文件的标头以构造一个 Schema,该 Schema 包含描述数据结构的元数据。有关 Schemas 的更多信息,请参见元数据文章

由此自然会产生两个问题:open_dataset() 寻找什么类型的文件,以及它期望在文件路径中找到什么结构?让我们首先看一下文件类型。

默认情况下,open_dataset() 查找 Parquet 文件,但您可以使用 format 参数覆盖此设置。例如,如果数据编码为 CSV 文件,我们可以设置 format = "csv" 以连接到数据。Arrow Dataset 接口支持多种文件格式,包括

  • "parquet"(默认)
  • "feather""ipc""arrow" 的别名;由于 Feather version 2 是 Arrow 文件格式)
  • "csv"(逗号分隔文件)和 "tsv"(制表符分隔文件)
  • "text"(通用文本分隔文件 - 使用 delimiter 参数指定要使用的分隔符)

对于文本文件,您可以将以下解析选项传递给 open_dataset(),以确保文件被正确读取

  • delim
  • quote
  • escape_double
  • escape_backslash
  • skip_empty_rows

使用文本文件时的另一种选择是使用 open_delim_dataset()open_csv_dataset()open_tsv_dataset()。这些函数是 open_dataset() 的包装器,但具有与 read_csv_arrow()read_delim_arrow()read_tsv_arrow() 相同的参数,以便在打开单个文件的函数和打开数据集的函数之间轻松切换。

例如

ds <- open_csv_dataset("nyc-taxi/csv/")

有关这些参数以及通常解析分隔文本文件的更多信息,请参见 read_delim_arrow()open_delim_dataset() 的帮助文档。

接下来,open_dataset() 期望在文件路径中找到什么信息? 默认情况下,Dataset 接口查找 Hive 样式的分区结构,其中文件夹使用“key=value”约定命名,并且文件夹中的数据文件包含键具有相关值的数据子集。例如,在纽约市出租车数据中,文件路径如下所示

year=2009/month=1/part-0.parquet
year=2009/month=2/part-0.parquet
...

由此,open_dataset() 推断出第一个列出的 Parquet 文件包含 2009 年 1 月的数据。从这个意义上讲,hive 样式分区是自我描述的:文件夹名称明确说明了数据集是如何跨文件拆分的。

有时目录分区不是自我描述的;也就是说,它不包含字段名称。例如,假设纽约市出租车数据使用如下文件路径

2009/01/part-0.parquet
2009/02/part-0.parquet
...

在这种情况下,open_dataset() 需要一些提示来了解如何使用文件路径。在这种情况下,您可以将 c("year", "month") 提供给 partitioning 参数,说明第一个路径段给出了 year 的值,第二个段是 month2009/01/part-0.parquet 中的每一行对于 year 的值为 2009,对于 month 的值为 1,即使这些列可能不存在于文件中。换句话说,我们可以这样打开数据

ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))

无论哪种方式,当您查看数据集时,您都可以看到除了每个文件中存在的列之外,还有 yearmonth 列。这些列不存在于文件本身中:它们是从分区结构推断出来的。

ds
## 
## FileSystemDataset with 158 Parquet files
## vendor_name: string
## pickup_datetime: timestamp[ms]
## dropoff_datetime: timestamp[ms]
## passenger_count: int64
## trip_distance: double
## pickup_longitude: double
## pickup_latitude: double
## rate_code: string
## store_and_fwd: string
## dropoff_longitude: double
## dropoff_latitude: double
## payment_type: string
## fare_amount: double
## extra: double
## mta_tax: double
## tip_amount: double
## tolls_amount: double
## total_amount: double
## improvement_surcharge: double
## congestion_surcharge: double
## pickup_location_id: int64
## dropoff_location_id: int64
## year: int32
## month: int32

查询数据集

现在我们有了一个引用我们数据的 Dataset 对象,我们可以构建 dplyr 样式的查询。这是可能的,因为 arrow 提供了一个后端,允许用户使用 dplyr 动词来操作表格 Arrow 数据。这是一个示例:假设您对最长出租车行程中的小费行为感到好奇。让我们找到 2015 年票价大于 100 美元的行程的中位数小费百分比,按乘客数量细分

system.time(ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = 100 * tip_amount / total_amount) %>%
  group_by(passenger_count) %>%
  summarise(
    median_tip_pct = median(tip_pct),
    n = n()
  ) %>%
  collect() %>%
  print())
## 
## # A tibble: 10 x 3
##    passenger_count median_tip_pct      n
##              <int>          <dbl>  <int>
##  1               1           16.6 143087
##  2               2           16.2  34418
##  3               5           16.7   5806
##  4               4           11.4   4771
##  5               6           16.7   3338
##  6               3           14.6   8922
##  7               0           10.1    380
##  8               8           16.7     32
##  9               9           16.7     42
## 10               7           16.7     11
## 
##    user  system elapsed
##   4.436   1.012   1.402

您刚刚从包含大约 20 亿行的数据集中选择了一个子集,计算了一个新列并对其进行了聚合。所有这些都在现代笔记本电脑上几秒钟内完成。这是如何运作的?

arrow 如此快速地完成此任务有三个原因

首先,arrow 对查询采用延迟评估方法:当在 Dataset 上调用 dplyr 动词时,它们会记录其操作,但在运行 collect() 之前不会在数据上评估这些操作。我们可以通过采用与以前相同的代码并省略最后一步来看到这一点

ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = 100 * tip_amount / total_amount) %>%
  group_by(passenger_count) %>%
  summarise(
    median_tip_pct = median(tip_pct),
    n = n()
  )
## 
## FileSystemDataset (query)
## passenger_count: int64
## median_tip_pct: double
## n: int32
## 
## See $.data for the source Arrow object

此版本的代码会立即返回输出并显示您所做的操作,而无需从文件中加载数据。由于这些查询的评估被延迟,因此您可以构建一个查询,该查询选择到一个小型的子集,而无需生成可能很大的中间数据集。

其次,所有工作都会下推到各个数据文件以及文件中的数据块(具体取决于文件格式)。因此,您可以通过从每个文件中收集较小的切片来从更大的数据集中选择一个子集:您不必将整个数据集加载到内存中来从中进行切片。

第三,由于分区,您可以完全忽略某些文件。在此示例中,通过过滤 year == 2015,将立即排除与所有其他年份相对应的文件:您不必加载它们即可发现没有行与过滤器匹配。对于 Parquet 文件(包含具有组内包含的数据统计信息的行组),可能会有一些您可以避免扫描的整个数据块,因为它们没有 total_amount > 100 的行。

关于查询 Datasets 的最后一件事要注意。假设您尝试在 Arrow Dataset 上调用不受支持的 dplyr 动词或未实现的函数。在这种情况下,arrow 包会引发错误。但是,对于 Arrow Table 对象(已在内存中)上的 dplyr 查询,该包会在处理该 dplyr 动词之前自动调用 collect()。要了解有关 dplyr 后端的更多信息,请参见数据整理文章

批处理(实验性)

有时您想在整个 Dataset 上运行 R 代码,但该 Dataset 远大于内存。您可以在 Dataset 查询上使用 map_batches 以逐批处理它。

注意map_batches 是实验性的,不建议用于生产环境。

例如,要随机抽样一个 Dataset,请使用 map_batches 从每个批次中抽样一定百分比的行

sampled_data <- ds %>%
  filter(year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  map_batches(~ as_record_batch(sample_frac(as.data.frame(.), 1e-4))) %>%
  mutate(tip_pct = tip_amount / total_amount) %>%
  collect()

str(sampled_data)
## 
## tibble [10,918 <U+00D7> 4] (S3: tbl_df/tbl/data.frame)
##  $ tip_amount     : num [1:10918] 3 0 4 1 1 6 0 1.35 0 5.9 ...
##  $ total_amount   : num [1:10918] 18.8 13.3 20.3 15.8 13.3 ...
##  $ passenger_count: int [1:10918] 3 2 1 1 1 1 1 1 1 3 ...
##  $ tip_pct        : num [1:10918] 0.1596 0 0.197 0.0633 0.0752 ...

此函数还可用于通过计算每个批次的局部结果,然后聚合这些局部结果来聚合 Dataset 上的摘要统计信息。扩展上面的示例,您可以将模型拟合到样本数据,然后使用 map_batches 来计算完整 Dataset 上的 MSE。

model <- lm(tip_pct ~ total_amount + passenger_count, data = sampled_data)

ds %>%
  filter(year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = tip_amount / total_amount) %>%
  map_batches(function(batch) {
    batch %>%
      as.data.frame() %>%
      mutate(pred_tip_pct = predict(model, newdata = .)) %>%
      filter(!is.nan(tip_pct)) %>%
      summarize(sse_partial = sum((pred_tip_pct - tip_pct)^2), n_partial = n()) %>%
      as_record_batch()
  }) %>%
  summarize(mse = sum(sse_partial) / sum(n_partial)) %>%
  pull(mse)
## 
## [1] 0.1304284

数据集选项

您可以通过几种方法来控制 Dataset 创建以适应特殊用例。

处理目录中的文件

如果您正在处理单个文件或一组不在同一目录中的文件,您可以向 open_dataset() 提供文件路径或多个文件路径的向量。例如,如果您有一个 CSV 文件太大而无法读入内存,这将非常有用。您可以将文件路径传递给 open_dataset(),使用 group_by() 将数据集划分为可管理的块,然后使用 write_dataset() 将每个块写入单独的 Parquet 文件——所有这些都无需将完整的 CSV 文件读入 R。

显式声明列名和数据类型

您可以指定 schema 参数给 open_dataset() 来声明列及其数据类型。如果您有具有不同存储模式的数据文件(例如,一列可能是 int32,而另一列可能是 int8),并且您希望确保生成的数据集具有特定类型,这将非常有用。

需要明确的是,即使在这个混合整数类型的例子中,也没有必要指定 schema,因为 Dataset 构造函数会协调这些差异。schema 规范只是让您声明您希望结果是什么。

显式声明分区格式

类似地,您可以在 open_dataset()partitioning 参数中提供一个 Schema,以便声明定义分区的虚拟列的类型。在 NYC 出租车数据示例中,如果您希望将 month 保留为字符串而不是整数,这将非常有用。

使用多个数据源

Datasets 的另一个特性是它们可以由多个数据源组成。也就是说,您可能在一个位置有一个分区 Parquet 文件目录,而在另一个目录中有未分区的 Parquet 文件。或者,您可以指向 S3 bucket 中的 Parquet 数据和本地文件系统上的 CSV 目录,并将它们作为一个 Dataset 一起查询。要创建多源 Dataset,请将 Dataset 列表提供给 open_dataset() 而不是文件路径,或者使用 big_dataset <- c(ds1, ds2) 之类的命令将它们连接起来。

写入数据集

正如您所看到的,通过存储在高效的二进制列式格式(如 Parquet 或 Feather)中,并基于常用于过滤的列进行分区,可以使查询大型数据集变得非常快速。然而,数据并不总是以这种方式存储的。有时您可能从一个巨大的 CSV 文件开始。分析数据的第一步是清理并将其重塑为更易于使用的形式。

write_dataset() 函数允许您获取 Dataset 或另一个表格数据对象——Arrow Table 或 RecordBatch,或 R 数据框——并将其写入不同的文件格式,并将其分区为多个文件。

假设您有一个 CSV 格式的 NYC 出租车数据版本

ds <- open_dataset("nyc-taxi/csv/", format = "csv")

您可以通过调用 write_dataset() 将其写入新位置,并将文件转换为 Feather 格式

write_dataset(ds, "nyc-taxi/feather", format = "feather")

接下来,假设 payment_type 列是您经常用于过滤的内容,因此您希望按该变量对数据进行分区。这样做可以确保类似 payment_type == "Cash" 的过滤器只会触及 payment_type 始终为 "Cash" 的文件子集。

表达要对其进行分区的列的一种自然方法是使用 group_by() 方法

ds %>%
  group_by(payment_type) %>%
  write_dataset("nyc-taxi/feather", format = "feather")

这将把文件写入看起来像这样的目录树

system("tree nyc-taxi/feather")
## feather
## ├── payment_type=1
## │   └── part-18.arrow
## ├── payment_type=2
## │   └── part-19.arrow
## ...
## └── payment_type=UNK
##     └── part-17.arrow
##
## 18 directories, 23 files

请注意,目录名称是 payment_type=Cash 等:这是上面描述的 Hive 风格的分区。这意味着当您在此目录上调用 open_dataset() 时,您不必声明分区是什么,因为可以从文件路径读取它们。(要改为写入分区段的裸值,即 Cash 而不是 payment_type=Cash,请使用 hive_style = FALSE 调用 write_dataset()。)

也许,payment_type == "Cash" 是您唯一关心的数据,您只想删除其余数据并拥有更小的工作集。为此,您可以在写入时 filter() 它们

ds %>%
  filter(payment_type == "Cash") %>%
  write_dataset("nyc-taxi/feather", format = "feather")

在写入数据集时,您可以做的另一件事是选择列的子集或重新排序它们。假设您从不在意 vendor_id,并且作为一个字符串列,当您读取它时,它会占用大量空间,所以让我们删除它

ds %>%
  group_by(payment_type) %>%
  select(-vendor_id) %>%
  write_dataset("nyc-taxi/feather", format = "feather")

请注意,虽然您可以选择列的子集,但您目前无法在写入 Dataset 时重命名列。

分区性能注意事项

分区数据集有两个影响性能的方面:它增加了文件数量,并围绕文件创建了一个目录结构。这两者都有好处和坏处。根据配置和数据集的大小,坏处可能超过好处。

由于分区将数据集拆分为多个文件,因此可以并行读取和写入分区的数据集。但是,每个额外的文件都会增加文件系统交互处理中的一些开销。它还会增加整体数据集大小,因为每个文件都有一些共享元数据。例如,每个 parquet 文件都包含 schema 和组级别统计信息。分区的数量是文件数量的下限。如果您按日期对数据集进行分区,并有一年的数据,您将至少有 365 个文件。如果您进一步按另一个具有 1,000 个唯一值的维度进行分区,您将最多有 365,000 个文件。这种精细的分区通常会导致小文件,这些文件主要由元数据组成。

分区数据集创建嵌套的文件夹结构,这些结构允许我们修剪扫描中加载的文件。但是,这会增加在数据集中发现文件的开销,因为我们需要递归地“列出目录”才能找到数据文件。过于精细的分区可能会导致此处出现问题:按日期对数据集进行分区,并有一年的数据,将需要 365 个列表调用才能找到所有文件;添加另一个基数为 1,000 的列将使调用次数变为 365,365。

最佳分区布局将取决于您的数据、访问模式以及哪些系统将读取数据。大多数系统(包括 Arrow)都应跨一系列文件大小和分区布局工作,但您应该避免一些极端情况。这些指南可以帮助您避免一些已知的最坏情况

  • 避免小于 20MB 和大于 2GB 的文件。
  • 避免分区布局超过 10,000 个不同的分区。

对于在文件中具有组概念的文件格式(例如 Parquet),也适用类似的指南。行组可以在读取时提供并行性,并允许基于统计信息跳过数据,但非常小的组可能导致元数据成为文件大小的重要部分。在大多数情况下,Arrow 的文件写入器为组大小调整提供了合理的默认值。

事务/ ACID 保证

Dataset API 不提供事务支持或任何 ACID 保证。这会影响读取和写入。并发读取没问题。并发写入或与读取同时发生的写入可能会产生意外行为。可以使用各种方法来避免操作相同的文件,例如为每个写入器使用唯一的 basename 模板、用于新文件的临时目录或单独存储文件列表,而不是依赖目录发现。

在写入过程中意外终止进程可能会使系统处于不一致状态。写入调用通常在要写入的字节已完全传递到 OS 页面缓存后立即返回。即使写入操作已完成,如果在写入调用后立即发生突然断电,也可能丢失文件的一部分。

大多数文件格式都有魔数,它们写在末尾。这意味着可以安全地检测和丢弃部分文件写入。CSV 文件格式没有任何这样的概念,并且部分写入的 CSV 文件可能会被检测为有效。

进一步阅读