跳至内容

Apache Arrow 允许你高效地处理单文件和多文件数据集,即使数据集太大而无法加载到内存中。借助 Arrow 数据集对象,你可以使用熟悉的 dplyr 语法分析这种数据。本文介绍了数据集,并展示了如何使用 dplyr 和 arrow 分析它们:我们将从确保两个包都被加载开始

library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

示例:纽约出租车数据

Arrow 数据集对象的主要动机是允许用户分析极大的数据集。例如,考虑 纽约市出租车行程记录数据,它被广泛用于大数据练习和比赛中。为了展示 Apache Arrow 的功能,我们在一个公共 Amazon S3 存储桶中托管了此数据的 Parquet 格式版本:在完整形式中,我们的数据版本是一个非常大的表格,大约有 17 亿行和 24 列,其中每行对应于 2009 年到 2022 年之间的一次出租车行程。此版本纽约出租车数据的 数据字典 也可用。

这个多文件数据集由 158 个不同的 Parquet 文件组成,每个文件对应一个月的 数据。单个文件的大小通常在 400-500MB 左右,整个数据集的大小约为 70GB。它不是一个小数据集 - 下载速度很慢,而且在典型的机器上无法容纳在内存中 🙂 - 所以我们还托管了一个“微型”版本的纽约出租车数据,它以完全相同的方式格式化,但只包含原始数据集中每千个条目中的一条(即,单个文件的大小小于 1MB,“微型”数据集只有 70MB)

如果你在 arrow 中启用了 Amazon S3 支持(大多数用户都是这样;如果你需要解决此问题,请参阅本文末尾的链接),你可以使用以下命令连接到存储在 S3 上的“微型出租车数据”副本

bucket <- s3_bucket("voltrondata-labs-datasets/nyc-taxi-tiny")

或者,你可以使用以下命令连接到存储在 Google Cloud Storage (GCS) 上的数据副本

bucket <- gs_bucket("voltrondata-labs-datasets/nyc-taxi-tiny", anonymous = TRUE)

如果你要使用完整数据集,请在上面的代码中将 nyc-taxi-tiny 替换为 nyc-taxi。除了大小之外 - 以及随之而来的时间、带宽使用量和 CPU 周期的成本 - 两个版本的 数据没有区别:你可以使用微型出租车数据测试你的代码,然后检查它使用完整数据集时的扩展情况。

要将存储在 bucket 中的数据集的本地副本复制到名为 "nyc-taxi" 的文件夹中,请使用 copy_files() 函数

copy_files(from = bucket, to = "nyc-taxi")

为了本文的目的,我们假设纽约出租车数据集(完整数据或微型版本)已下载到本地,并且存在于 "nyc-taxi" 目录中。

打开数据集

该过程的第一步是创建一个指向数据目录的数据集对象

ds <- open_dataset("nyc-taxi")

需要注意的是,当我们这样做时,数据值不会加载到内存中。相反,Arrow 会扫描数据目录以查找相关文件,解析文件路径以查找“Hive 风格的划分”(见下文),并读取数据文件的标题以构建一个包含描述数据结构的元数据的模式。有关模式的更多信息,请参阅 元数据文章

由此自然而然地引出了两个问题:open_dataset() 寻找什么类型的文件,它期望在文件路径中找到什么结构?让我们从查看文件类型开始。

默认情况下,open_dataset() 会查找 Parquet 文件,但你可以使用 format 参数覆盖此设置。例如,如果数据被编码为 CSV 文件,我们可以设置 format = "csv" 来连接到数据。Arrow 数据集接口支持几种文件格式,包括

  • "parquet"(默认值)
  • "feather""ipc""arrow" 的别名;因为 Feather 版本 2 是 Arrow 文件格式)
  • "csv"(逗号分隔文件)和 "tsv"(制表符分隔文件)
  • "text"(通用文本分隔文件 - 使用 delimiter 参数指定要使用的分隔符)

在文本文件的情况下,你可以将以下解析选项传递给 open_dataset() 以确保文件被正确读取

  • delim
  • quote
  • escape_double
  • escape_backslash
  • skip_empty_rows

在处理文本文件时,另一种方法是使用 open_delim_dataset()open_csv_dataset()open_tsv_dataset()。这些函数是 open_dataset() 的包装器,但具有与 read_csv_arrow()read_delim_arrow()read_tsv_arrow() 相似的参数,以便于在打开单个文件的功能和打开数据集的功能之间切换。

例如

ds <- open_csv_dataset("nyc-taxi/csv/")

有关这些参数以及一般情况下解析分隔文本文件的更多信息,请参阅 read_delim_arrow()open_delim_dataset() 的帮助文档。

接下来,open_dataset() 期望在文件路径中找到什么信息?默认情况下,数据集接口会在文件中查找 Hive 风格的划分结构,其中文件夹使用“键=值”约定命名,文件夹中的数据文件包含键具有相关值的 数据子集。例如,在纽约出租车数据中,文件路径如下所示

year=2009/month=1/part-0.parquet
year=2009/month=2/part-0.parquet
...

由此,open_dataset() 推断出第一个列出的 Parquet 文件包含 2009 年 1 月的数据。从这个意义上说,Hive 风格的划分是自描述的:文件夹名称明确说明了数据集是如何在文件之间分割的。

有时目录划分不是自描述的;也就是说,它不包含字段名称。例如,假设纽约出租车数据使用以下文件路径

2009/01/part-0.parquet
2009/02/part-0.parquet
...

在这种情况下,open_dataset() 需要一些提示来了解如何使用文件路径。在这种情况下,你可以向 partitioning 参数提供 c("year", "month"),表示第一个路径段给出 year 的值,第二个段是 month2009/01/part-0.parquet 中的每一行都具有 year 为 2009 和 month 为 1 的值,即使这些列可能不存在于文件中。换句话说,我们将这样打开数据

ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))

无论哪种方式,当你查看数据集时,你都会发现除了每个文件中存在的列之外,还有 yearmonth 列。这些列本身并不存在于文件中:它们是从划分结构中推断出来的。

ds
## 
## FileSystemDataset with 158 Parquet files
## vendor_name: string
## pickup_datetime: timestamp[ms]
## dropoff_datetime: timestamp[ms]
## passenger_count: int64
## trip_distance: double
## pickup_longitude: double
## pickup_latitude: double
## rate_code: string
## store_and_fwd: string
## dropoff_longitude: double
## dropoff_latitude: double
## payment_type: string
## fare_amount: double
## extra: double
## mta_tax: double
## tip_amount: double
## tolls_amount: double
## total_amount: double
## improvement_surcharge: double
## congestion_surcharge: double
## pickup_location_id: int64
## dropoff_location_id: int64
## year: int32
## month: int32

查询数据集

现在我们有一个引用我们数据的 Dataset 对象,我们可以构建 dplyr 风格的查询。这是可能的,因为 arrow 提供了一个后端,允许用户使用 dplyr 动词操作表格 Arrow 数据。以下是一个示例:假设你对最长出租车行程中的小费行为感到好奇。让我们找到 2015 年车费超过 100 美元的行程的中位数小费百分比,并按乘客数量细分

system.time(ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = 100 * tip_amount / total_amount) %>%
  group_by(passenger_count) %>%
  summarise(
    median_tip_pct = median(tip_pct),
    n = n()
  ) %>%
  collect() %>%
  print())
## 
## # A tibble: 10 x 3
##    passenger_count median_tip_pct      n
##              <int>          <dbl>  <int>
##  1               1           16.6 143087
##  2               2           16.2  34418
##  3               5           16.7   5806
##  4               4           11.4   4771
##  5               6           16.7   3338
##  6               3           14.6   8922
##  7               0           10.1    380
##  8               8           16.7     32
##  9               9           16.7     42
## 10               7           16.7     11
## 
##    user  system elapsed
##   4.436   1.012   1.402

你刚刚从包含大约 20 亿行的 Dataset 中选择了一个子集,计算了一个新列,并对其进行了聚合。所有这些都在一台现代笔记本电脑上完成,只需要几秒钟。这是怎么工作的?

arrow 可以如此迅速地完成此任务有三个原因

首先,arrow 采用了一种延迟计算方法来处理查询:当在 Dataset 上调用 dplyr 动词时,它们会记录其操作,但不会在数据上执行这些操作,直到你运行 collect()。我们可以通过使用与之前相同的代码并省略最后一步来看到这一点

ds %>%
  filter(total_amount > 100, year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = 100 * tip_amount / total_amount) %>%
  group_by(passenger_count) %>%
  summarise(
    median_tip_pct = median(tip_pct),
    n = n()
  )
## 
## FileSystemDataset (query)
## passenger_count: int64
## median_tip_pct: double
## n: int32
## 
## See $.data for the source Arrow object

此版本的代码立即返回输出并显示你所做的操作,而无需从文件中加载数据。由于这些查询的计算被推迟,你可以构建一个查询,将查询选择缩小到一个小的子集,而无需生成可能很大的中间数据集。

其次,所有工作都向下推送到单个数据文件,以及根据文件格式,文件中的数据块。因此,你可以通过从每个文件收集较小的切片来从一个更大的数据集中选择一个数据子集:你不需要将整个数据集加载到内存中才能从中切片。

第三,由于划分,你可以完全忽略一些文件。在这个例子中,通过过滤 year == 2015,所有与其他年份对应的文件都会立即被排除:你不需要加载它们来发现没有行与过滤器匹配。对于 Parquet 文件 - 它们包含包含组内数据的统计信息的组 - 你可能会避免扫描整个数据块,因为它们没有 total_amount > 100 的行。

关于查询数据集还需要注意的一点是:假设你尝试在 Arrow Dataset 上调用不支持的 dplyr 动词或未实现的函数。在这种情况下,arrow 包会引发错误。但是,对于 Arrow 表对象上的 dplyr 查询(这些对象已经在内存中),该包会在处理该 dplyr 动词之前自动调用 collect()。要详细了解 dplyr 后端,请参阅 数据整理文章

批处理(实验性)

有时你希望在整个 Dataset 上运行 R 代码,但该 Dataset 比内存大得多。你可以在 Dataset 查询上使用 map_batches 以分批处理它。

注意map_batches 是实验性的,不建议用于生产环境。

例如,要随机抽样一个 Dataset,请使用 map_batches 从每个批次中抽取一定比例的行

sampled_data <- ds %>%
  filter(year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  map_batches(~ as_record_batch(sample_frac(as.data.frame(.), 1e-4))) %>%
  mutate(tip_pct = tip_amount / total_amount) %>%
  collect()

str(sampled_data)
## 
## tibble [10,918 <U+00D7> 4] (S3: tbl_df/tbl/data.frame)
##  $ tip_amount     : num [1:10918] 3 0 4 1 1 6 0 1.35 0 5.9 ...
##  $ total_amount   : num [1:10918] 18.8 13.3 20.3 15.8 13.3 ...
##  $ passenger_count: int [1:10918] 3 2 1 1 1 1 1 1 1 3 ...
##  $ tip_pct        : num [1:10918] 0.1596 0 0.197 0.0633 0.0752 ...

此函数还可以用于通过计算每个批次的局部结果,然后聚合这些局部结果来聚合 Dataset 上的汇总统计信息。扩展上面的示例,你可以将模型拟合到样本数据,然后使用 map_batches 在整个 Dataset 上计算 MSE。

model <- lm(tip_pct ~ total_amount + passenger_count, data = sampled_data)

ds %>%
  filter(year == 2015) %>%
  select(tip_amount, total_amount, passenger_count) %>%
  mutate(tip_pct = tip_amount / total_amount) %>%
  map_batches(function(batch) {
    batch %>%
      as.data.frame() %>%
      mutate(pred_tip_pct = predict(model, newdata = .)) %>%
      filter(!is.nan(tip_pct)) %>%
      summarize(sse_partial = sum((pred_tip_pct - tip_pct)^2), n_partial = n()) %>%
      as_record_batch()
  }) %>%
  summarize(mse = sum(sse_partial) / sum(n_partial)) %>%
  pull(mse)
## 
## [1] 0.1304284

数据集选项

你可以通过几种方法控制 Dataset 的创建,以适应特殊用例。

使用目录中的文件

如果你要处理一个单文件或一组不在同一个目录中的文件,你可以向 open_dataset() 提供一个文件路径或多个文件路径的向量。例如,如果你有一个太大的单个 CSV 文件而无法读入内存,这将非常有用。你可以将文件路径传递给 open_dataset(),使用 group_by() 将 Dataset 分割成可管理的块,然后使用 write_dataset() 将每个块写入单独的 Parquet 文件 - 所有这些都无需将完整的 CSV 文件读入 R。

显式声明列名和数据类型

你可以指定 open_dataset()schema 参数来声明列及其数据类型。如果你有存储模式不同的数据文件(例如,一列在一个文件中可能是 int32,而在另一个文件中可能是 int8),并且你希望确保生成的 Dataset 具有特定的类型,这将非常有用。

需要明确的是,即使在这个混合整数类型的示例中,也无需指定模式,因为 Dataset 构造函数会协调这些差异。模式规范只是让你声明你想要的结果是什么。

显式声明划分格式

类似地,你可以在 open_dataset()partitioning 参数中提供一个模式,以声明定义划分的虚拟列的类型。在纽约出租车数据示例中,如果你想将 month 保留为字符串而不是整数,这将非常有用。

使用多个数据源

Datasets 的另一个特点是它们可以由多个数据源组成。也就是说,您可能在一个位置有一个分区 Parquet 文件目录,而在另一个位置,有一些未分区的文件。或者,您可以指向一个包含 Parquet 数据的 S3 存储桶和本地文件系统上的 CSV 目录,并将它们作为一个 Dataset 查询。要创建多源 Dataset,请向 open_dataset() 提供 Dataset 列表,而不是文件路径,或者使用类似于 big_dataset <- c(ds1, ds2) 的命令将它们连接起来。

写入 Datasets

如您所见,通过将大型 Dataset 存储在高效的二进制列式格式(如 Parquet 或 Feather)中并根据用于过滤的常用列进行分区,可以使查询速度非常快。但是,数据并不总是以这种方式存储。有时,您可能从一个巨大的 CSV 开始。分析数据的第一步是清理并将其重塑为更易用的形式。

write_dataset() 函数允许您获取 Dataset 或其他表格数据对象(Arrow 表或记录批次,或 R 数据框),并将它们写入不同的文件格式,并将其分区到多个文件中。

假设您有一个 NYC Taxi 数据的 CSV 版本

ds <- open_dataset("nyc-taxi/csv/", format = "csv")

您可以将其写入新位置并将文件转换为 Feather 格式,方法是对其调用 write_dataset()

write_dataset(ds, "nyc-taxi/feather", format = "feather")

接下来,假设 payment_type 列是您经常过滤的列,因此您希望按该变量对数据进行分区。通过这样做,您确保像 payment_type == "Cash" 这样的过滤器只接触 payment_type 始终为 "Cash" 的文件子集。

表达您想要分区的列的一种自然方法是使用 group_by() 方法

ds %>%
  group_by(payment_type) %>%
  write_dataset("nyc-taxi/feather", format = "feather")

这将把文件写入看起来像这样的目录树

system("tree nyc-taxi/feather")
## feather
## ├── payment_type=1
## │   └── part-18.arrow
## ├── payment_type=2
## │   └── part-19.arrow
## ...
## └── payment_type=UNK
##     └── part-17.arrow
##
## 18 directories, 23 files

注意目录名称是 payment_type=Cash 等等:这就是上面描述的 Hive 风格分区。这意味着当您对该目录调用 open_dataset() 时,您不必声明分区是什么,因为它们可以从文件路径中读取。(要改为为分区段写入裸值,例如 Cash 而不是 payment_type=Cash,请使用 hive_style = FALSE 调用 write_dataset()。)

也许,payment_type == "Cash" 是您唯一关心的数据,您只想丢弃其余数据并获得一个更小的工作集。为此,您可以在写入时 filter() 它们

ds %>%
  filter(payment_type == "Cash") %>%
  write_dataset("nyc-taxi/feather", format = "feather")

写入 Datasets 时,您还可以做另一件事,那就是选择列子集或重新排序它们。假设您从不关心 vendor_id,并且作为字符串列,它在您读取它时会占用大量空间,因此让我们删除它

ds %>%
  group_by(payment_type) %>%
  select(-vendor_id) %>%
  write_dataset("nyc-taxi/feather", format = "feather")

请注意,虽然您可以选择列子集,但目前您无法在写入 Dataset 时重命名列。

分区性能注意事项

对 Datasets 进行分区有两个影响性能的方面:它会增加文件数量,还会在文件周围创建目录结构。这两者都有优点和缺点。根据您的 Dataset 配置和大小,成本可能会超过收益。

由于分区将 Dataset 分割成多个文件,因此可以并行读取和写入分区 Dataset。但是,每个额外的文件都会在处理中增加一些文件系统交互的开销。它还会增加 Dataset 的总大小,因为每个文件都有一些共享的元数据。例如,每个 Parquet 文件都包含架构和组级统计信息。分区数量是文件数量的下限。如果您按日期对包含一年的数据的 Dataset 进行分区,那么您将至少有 365 个文件。如果您用另外一个具有 1,000 个唯一值的维度进一步分区,那么您将最多有 365,000 个文件。这种细粒度的分区通常会导致大部分由元数据组成的小文件。

分区 Dataset 会创建嵌套的文件夹结构,这些结构允许我们修剪在扫描中加载哪些文件。但是,这会增加在 Dataset 中发现文件的开销,因为我们需要递归地“列出目录”才能找到数据文件。过细的分区会导致问题:按日期对包含一年的数据的 Dataset 进行分区将需要 365 次列表调用才能找到所有文件;添加另一个基数为 1,000 的列将使调用次数达到 365,365 次。

最优的分区布局将取决于您的数据、访问模式以及哪些系统将读取数据。大多数系统(包括 Arrow)应该可以在各种文件大小和分区布局中工作,但有一些极端情况应该避免。以下指南可以帮助避免一些已知的最坏情况

  • 避免文件大小小于 20MB 且大于 2GB。
  • 避免分区布局具有超过 10,000 个不同的分区。

对于具有文件内组概念的文件格式,例如 Parquet,类似的指南也适用。行组可以在读取时提供并行性并允许根据统计信息跳过数据,但非常小的组会导致元数据成为文件大小的重要组成部分。在大多数情况下,Arrow 的文件写入器提供了合理的分组大小默认值。

事务 / ACID 保证

Dataset API 不提供任何事务支持或任何 ACID 保证。这会影响读取和写入。并发读取是可以的。并发写入或与读取并发的写入可能会产生意外行为。可以使用各种方法来避免对相同文件进行操作,例如为每个写入器使用唯一的基文件名模板、使用临时目录存放新文件,或者单独存储文件列表,而不是依赖于目录发现。

在写入过程中意外终止进程会导致系统处于不一致状态。写入调用通常会在将要写入的字节完全传递到操作系统页面缓存后立即返回。即使写入操作已完成,如果在写入调用后立即出现突然的电源故障,也有可能丢失文件的一部分。

大多数文件格式都有在文件末尾写入的魔数。这意味着可以安全地检测到部分文件写入并将其丢弃。CSV 文件格式没有任何这样的概念,部分写入的 CSV 文件可能被检测为有效。

进一步阅读