Apache Arrow 允许您高效地处理单个文件和多文件数据集,即使这些数据集太大而无法加载到内存中。借助 Arrow Dataset 对象,您可以使用熟悉的 dplyr 语法分析此类数据。本文介绍了数据集,并向您展示了如何使用 dplyr 和 arrow 分析它们:我们将首先确保两个包都已加载。
示例:纽约市出租车数据
Arrow Datasets 对象的主要动机是允许用户分析极其庞大的数据集。例如,考虑在大数据练习和竞赛中广泛使用的 纽约市出租车行程记录数据。为了展示 Apache Arrow 的功能,我们在公共 Amazon S3 存储桶中托管了此数据的 Parquet 格式版本:其完整形式是,我们的数据集版本是一个非常大的表,包含约 17 亿行和 24 列,其中每行对应 2009 年至 2022 年间的单次出租车行程。此版本纽约市出租车数据的数据字典也可用。
这个多文件数据集由 158 个独立的 Parquet 文件组成,每个文件对应一个月的数据。单个文件通常大小约为 400-500MB,而完整数据集大小约为 70GB。它不是一个小数据集——下载速度慢,并且无法在典型机器的内存中容纳 🙂 ——因此我们也托管了一个“微型”版本的纽约市出租车数据,其格式完全相同,但只包含原始数据集中每千个条目中的一个(即,单个文件大小小于 1MB,“微型”数据集仅为 70MB)。
如果您的 arrow 中启用了 Amazon S3 支持(大多数用户都是如此;如果您需要解决此问题,请参阅本文末尾的链接),您可以使用此命令连接到存储在 S3 上的“微型出租车数据”副本:
bucket <- s3_bucket("voltrondata-labs-datasets/nyc-taxi-tiny")或者,您可以使用以下命令连接到 Google Cloud Storage (GCS) 上的数据副本:
bucket <- gs_bucket("voltrondata-labs-datasets/nyc-taxi-tiny", anonymous = TRUE)如果您想使用完整数据集,请在上述代码中将 nyc-taxi-tiny 替换为 nyc-taxi。除了大小(以及随之而来的时间、带宽使用和 CPU 周期的成本)之外,这两个版本的数据没有区别:您可以使用微型出租车数据测试您的代码,然后检查它在使用完整数据集时的扩展性。
要将存储在 bucket 中的数据集本地复制到名为 "nyc-taxi" 的文件夹,请使用 copy_files() 函数:
copy_files(from = bucket, to = "nyc-taxi")为本文目的,我们假设纽约市出租车数据集(完整数据或微型版本)已下载到本地,并存在于 "nyc-taxi" 目录中。
打开数据集
该过程的第一步是创建一个指向数据目录的 Dataset 对象:
ds <- open_dataset("nyc-taxi")重要的是要注意,当我们这样做时,数据值不会加载到内存中。相反,Arrow 会扫描数据目录以查找相关文件,解析文件路径以查找“Hive 风格分区”(见下文),并读取数据文件的头部以构建一个包含描述数据结构的元数据的 Schema。有关 Schemas 的更多信息,请参阅元数据文章。
由此自然引出两个问题:open_dataset() 查找哪种文件,以及它期望在文件路径中找到什么结构?让我们从文件类型开始。
默认情况下,open_dataset() 查找 Parquet 文件,但您可以使用 format 参数覆盖此设置。例如,如果数据编码为 CSV 文件,我们可以设置 format = "csv" 来连接数据。Arrow Dataset 接口支持多种文件格式,包括:
-
"parquet"(默认) -
"feather"或"ipc"("arrow"的别名;因为 Feather version 2 是 Arrow 文件格式) -
"csv"(逗号分隔文件)和"tsv"(制表符分隔文件) -
"text"(通用文本分隔文件 - 使用delimiter参数指定要使用的分隔符)
在文本文件的情况下,您可以将以下解析选项传递给 open_dataset() 以确保文件被正确读取:
delimquoteescape_doubleescape_backslashskip_empty_rows
处理文本文件时,另一种方法是使用 open_delim_dataset()、open_csv_dataset() 或 open_tsv_dataset()。这些函数是 open_dataset() 的包装器,但其参数与 read_csv_arrow()、read_delim_arrow() 和 read_tsv_arrow() 镜像,以便于在打开单个文件的函数和打开数据集的函数之间轻松切换。
例如:
ds <- open_csv_dataset("nyc-taxi/csv/")有关这些参数以及一般解析分隔文本文件的更多信息,请参阅 read_delim_arrow() 和 open_delim_dataset() 的帮助文档。
接下来,open_dataset() 期望在文件路径中找到什么信息?默认情况下,Dataset 接口查找 Hive 风格的分区结构,其中文件夹使用“键=值”约定命名,并且文件夹中的数据文件包含键具有相关值的数据子集。例如,在纽约市出租车数据中,文件路径看起来像这样:
year=2009/month=1/part-0.parquet
year=2009/month=2/part-0.parquet
...
由此,open_dataset() 推断出第一个列出的 Parquet 文件包含 2009 年 1 月的数据。从这个意义上说,Hive 风格分区是自描述的:文件夹名称明确说明了数据集是如何在文件中拆分的。
有时目录分区不是自描述的;也就是说,它不包含字段名称。例如,假设纽约市出租车数据使用这样的文件路径:
2009/01/part-0.parquet
2009/02/part-0.parquet
...
在这种情况下,open_dataset() 将需要一些提示来知道如何使用文件路径。在这种情况下,您可以将 c("year", "month") 提供给 partitioning 参数,表示第一个路径段给出 year 的值,第二个段是 month。2009/01/part-0.parquet 中的每一行都将 year 的值设为 2009,将 month 的值设为 1,即使这些列可能不存在于文件中。换句话说,我们将这样打开数据:
ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))无论哪种方式,当您查看 Dataset 时,您会发现除了每个文件中存在的列之外,还有 year 和 month 列。这些列不存在于文件本身中:它们是从分区结构中推断出来的。
ds##
## FileSystemDataset with 158 Parquet files
## vendor_name: string
## pickup_datetime: timestamp[ms]
## dropoff_datetime: timestamp[ms]
## passenger_count: int64
## trip_distance: double
## pickup_longitude: double
## pickup_latitude: double
## rate_code: string
## store_and_fwd: string
## dropoff_longitude: double
## dropoff_latitude: double
## payment_type: string
## fare_amount: double
## extra: double
## mta_tax: double
## tip_amount: double
## tolls_amount: double
## total_amount: double
## improvement_surcharge: double
## congestion_surcharge: double
## pickup_location_id: int64
## dropoff_location_id: int64
## year: int32
## month: int32
查询数据集
现在我们有了一个引用我们数据的 Dataset 对象,我们可以构建 dplyr 风格的查询。这是可能的,因为 arrow 提供了一个后端,允许用户使用 dplyr 动词操作表格 Arrow 数据。这里有一个例子:假设您对最长出租车行程的小费行为感到好奇。让我们找出 2015 年票价超过 100 美元的行程的中位数小费百分比,按乘客数量细分:
system.time(ds |>
filter(total_amount > 100, year == 2015) |>
select(tip_amount, total_amount, passenger_count) |>
mutate(tip_pct = 100 * tip_amount / total_amount) |>
group_by(passenger_count) |>
summarise(
median_tip_pct = median(tip_pct),
n = n()
) |>
collect() |>
print())##
## # A tibble: 10 x 3
## passenger_count median_tip_pct n
## <int> <dbl> <int>
## 1 1 16.6 143087
## 2 2 16.2 34418
## 3 5 16.7 5806
## 4 4 11.4 4771
## 5 6 16.7 3338
## 6 3 14.6 8922
## 7 0 10.1 380
## 8 8 16.7 32
## 9 9 16.7 42
## 10 7 16.7 11
##
## user system elapsed
## 4.436 1.012 1.402
您刚刚从一个包含约 20 亿行的 Dataset 中选择了一个子集,计算了一个新列并对其进行了聚合。所有这些都在现代笔记本电脑上几秒钟内完成。这是如何工作的?
arrow 能够如此快速地完成此任务有三个原因:
首先,arrow 对查询采用惰性评估方法:当对 Dataset 调用 dplyr 动词时,它们会记录其操作,但直到您运行 collect() 才会对数据评估这些操作。我们可以通过使用与之前相同的代码并省略最后一步来看到这一点:
ds |>
filter(total_amount > 100, year == 2015) |>
select(tip_amount, total_amount, passenger_count) |>
mutate(tip_pct = 100 * tip_amount / total_amount) |>
group_by(passenger_count) |>
summarise(
median_tip_pct = median(tip_pct),
n = n()
)##
## FileSystemDataset (query)
## passenger_count: int64
## median_tip_pct: double
## n: int32
##
## See $.data for the source Arrow object
此版本的代码会立即返回输出并显示您所做的操作,而无需从文件中加载数据。由于这些查询的评估是延迟的,您可以构建一个查询以选择一个小子集,而无需生成可能很大的中间数据集。
其次,所有工作都被下推到单个数据文件,并且根据文件格式,下推到文件中的数据块。因此,您可以通过从每个文件中收集较小的切片来从更大的数据集中选择数据子集:您无需将整个数据集加载到内存中即可从中切片。
第三,由于分区,您可以完全忽略一些文件。在此示例中,通过过滤 year == 2015,所有对应于其他年份的文件都立即被排除:您无需加载它们即可发现没有行匹配过滤器。对于 Parquet 文件(其中包含带有组内数据统计信息的行组),您可能可以避免扫描整个数据块,因为它们没有 total_amount > 100 的行。
关于查询数据集的最后一件事。假设您尝试在 Arrow Dataset 上的查询中调用不受支持的 dplyr 动词或未实现的函数。在这种情况下,arrow 包会引发错误。但是,对于 Arrow Table 对象(已在内存中)上的 dplyr 查询,包会自动在处理该 dplyr 动词之前调用 collect()。要了解有关 dplyr 后端的更多信息,请参阅数据整理文章。
批处理(实验性)
有时您想对整个数据集运行 R 代码,但该数据集远大于内存。您可以使用 map_batches 对数据集查询进行逐批处理。
注意:map_batches 是实验性的,不建议用于生产环境。
例如,要随机采样数据集,请使用 map_batches 从每个批次中采样一定比例的行:
sampled_data <- ds |>
filter(year == 2015) |>
select(tip_amount, total_amount, passenger_count) |>
map_batches(~ as_record_batch(sample_frac(as.data.frame(.), 1e-4))) |>
mutate(tip_pct = tip_amount / total_amount) |>
collect()
str(sampled_data)##
## tibble [10,918 <U+00D7> 4] (S3: tbl_df/tbl/data.frame)
## $ tip_amount : num [1:10918] 3 0 4 1 1 6 0 1.35 0 5.9 ...
## $ total_amount : num [1:10918] 18.8 13.3 20.3 15.8 13.3 ...
## $ passenger_count: int [1:10918] 3 2 1 1 1 1 1 1 1 3 ...
## $ tip_pct : num [1:10918] 0.1596 0 0.197 0.0633 0.0752 ...
此函数还可以用于通过计算每个批次的部分结果,然后聚合这些部分结果来聚合数据集的汇总统计数据。扩展上述示例,您可以将模型拟合到样本数据,然后使用 map_batches 在完整数据集上计算 MSE。
model <- lm(tip_pct ~ total_amount + passenger_count, data = sampled_data)
ds |>
filter(year == 2015) |>
select(tip_amount, total_amount, passenger_count) |>
mutate(tip_pct = tip_amount / total_amount) |>
map_batches(function(batch) {
batch |>
as.data.frame() |>
mutate(pred_tip_pct = predict(model, newdata = .)) |>
filter(!is.nan(tip_pct)) |>
summarize(sse_partial = sum((pred_tip_pct - tip_pct)^2), n_partial = n()) |>
as_record_batch()
}) |>
summarize(mse = sum(sse_partial) / sum(n_partial)) |>
pull(mse)##
## [1] 0.1304284
数据集选项
有几种方法可以控制数据集创建以适应特殊用例。
处理目录中的文件
如果您正在处理单个文件或一组不都在同一目录中的文件,您可以向 open_dataset() 提供文件路径或多个文件路径的向量。这很有用,例如,如果您有一个 CSV 文件太大而无法读入内存。您可以将文件路径传递给 open_dataset(),使用 group_by() 将数据集分成可管理的块,然后使用 write_dataset() 将每个块写入单独的 Parquet 文件——所有这些都无需将完整的 CSV 文件读入 R。
明确声明列名和数据类型
您可以指定 open_dataset() 的 schema 参数来声明列及其数据类型。这在您拥有具有不同存储模式(例如,一列可以是 int32 而另一列可以是 int8)的数据文件时很有用,并且您希望确保生成的数据集具有特定类型。
需要明确的是,即使在这种混合整数类型的示例中,也没有必要指定模式,因为 Dataset 构造函数将协调这些差异。模式规范只是让您声明您想要的结果。
显式声明分区格式
同样,您可以在 open_dataset() 的 partitioning 参数中提供一个 Schema,以声明定义分区的虚拟列的类型。这在纽约市出租车数据示例中很有用,如果您想将 month 保留为字符串而不是整数。
使用多个数据源
数据集的另一个特点是它们可以由多个数据源组成。也就是说,您可能在一个位置有一个分区 Parquet 文件的目录,而在另一个目录中,有未分区的文件。或者,您可以指向一个 S3 存储桶中的 Parquet 数据和本地文件系统上一个目录中的 CSV 文件,并将它们作为单个数据集一起查询。要创建多源数据集,请向 open_dataset() 提供数据集列表而不是文件路径,或者使用 big_dataset <- c(ds1, ds2) 之类的命令将它们连接起来。
写入数据集
正如您所看到的,通过以高效的二进制列式格式(如 Parquet 或 Feather)存储和基于常用过滤列进行分区,查询大型数据集可以变得非常快。然而,数据并不总是以这种方式存储。有时您可能会从一个巨大的 CSV 开始。分析数据的第一步是清理并将其重塑为更可用的形式。
write_dataset() 函数允许您获取一个 Dataset 或另一个表格数据对象——Arrow Table 或 RecordBatch,或 R 数据框——并将其写入不同的文件格式,分区为多个文件。
假设您有 CSV 格式的纽约市出租车数据版本:
ds <- open_dataset("nyc-taxi/csv/", format = "csv")您可以通过调用 write_dataset() 将其写入新位置并将其转换为 Feather 格式:
write_dataset(ds, "nyc-taxi/feather", format = "feather")接下来,我们假设 payment_type 列是您经常筛选的,因此您希望按该变量对数据进行分区。通过这样做,您可以确保像 payment_type == "Cash" 这样的筛选器只会触及 payment_type 始终为 "Cash" 的文件子集。
表达您要分区的列的一种自然方式是使用 group_by() 方法:
ds |>
group_by(payment_type) |>
write_dataset("nyc-taxi/feather", format = "feather")这会将文件写入如下所示的目录树:
system("tree nyc-taxi/feather")## feather
## ├── payment_type=1
## │ └── part-18.arrow
## ├── payment_type=2
## │ └── part-19.arrow
## ...
## └── payment_type=UNK
## └── part-17.arrow
##
## 18 directories, 23 files
请注意,目录名称是 payment_type=Cash 等:这是上面描述的 Hive 风格分区。这意味着当您对此目录调用 open_dataset() 时,您无需声明分区是什么,因为它们可以从文件路径中读取。(要改为写入分区段的裸值,即 Cash 而不是 payment_type=Cash,请使用 hive_style = FALSE 调用 write_dataset()。)
然而,也许您只关心 payment_type == "Cash" 的数据,并且只想丢弃其余数据并拥有一个较小的工作集。为此,您可以在写入时使用 filter() 将它们过滤掉:
ds |>
filter(payment_type == "Cash") |>
write_dataset("nyc-taxi/feather", format = "feather")写入数据集时,您可以做的另一件事是选择列的子集或重新排列它们。假设您从不关心 vendor_id,而且作为字符串列,它在读取时会占用大量空间,所以让我们删除它:
ds |>
group_by(payment_type) |>
select(-vendor_id) |>
write_dataset("nyc-taxi/feather", format = "feather")请注意,虽然您可以选择列的子集,但目前在写入数据集时无法重命名列。
分区性能考量
分区数据集有两个方面会影响性能:它增加了文件数量,并围绕文件创建了目录结构。这两者既有好处也有成本。根据配置和数据集的大小,成本可能超过好处。
由于分区将数据集分解为多个文件,因此可以并行读取和写入分区数据集。然而,每个附加文件都会在文件系统交互的处理中增加一点开销。它还会增加整个数据集的大小,因为每个文件都包含一些共享元数据。例如,每个 parquet 文件都包含模式和组级统计信息。分区数量是文件数量的下限。如果您按日期将一个数据集分区为一年数据,您将至少有 365 个文件。如果您再按另一个具有 1,000 个唯一值的维度进行分区,您将拥有多达 365,000 个文件。这种细粒度分区通常会导致文件很小,主要由元数据组成。
分区数据集会创建嵌套的文件夹结构,这允许我们在扫描时修剪要加载的文件。然而,这增加了发现数据集中文件的开销,因为我们需要递归地“列出目录”以查找数据文件。过于精细的分区可能会在这里导致问题:按日期对一年数据进行分区需要 365 次列表调用才能找到所有文件;再添加一个基数 1,000 的列将使其变为 365,365 次调用。
最优的分区布局将取决于您的数据、访问模式以及将读取数据的系统。大多数系统,包括 Arrow,都应适用于各种文件大小和分区布局,但有些极端情况您应该避免。这些指南可以帮助避免一些已知的最坏情况:
- 避免文件小于 20MB 和大于 2GB。
- 避免分区布局具有超过 10,000 个不同的分区。
对于文件中具有组概念的文件格式(例如 Parquet),也适用类似的准则。行组可以在读取时提供并行性,并允许根据统计信息跳过数据,但非常小的组可能会导致元数据在文件大小中占很大比例。Arrow 的文件写入器在大多数情况下提供了合理的组大小默认值。
事务/ACID 保证
Dataset API 不提供事务支持或任何 ACID 保证。这会影响读取和写入。并发读取没有问题。并发写入或与读取同时进行的写入可能会出现意外行为。可以使用各种方法来避免操作相同文件,例如为每个写入器使用唯一的基名模板、用于新文件的临时目录,或独立存储文件列表而不是依赖目录发现。
在写入过程中意外终止进程可能会使系统处于不一致状态。写入调用通常在要写入的字节完全传递到操作系统页面缓存后立即返回。即使写入操作已完成,如果写入调用后立即发生突然断电,文件的一部分仍可能丢失。
大多数文件格式都有在末尾写入的“魔数”(magic numbers)。这意味着可以安全地检测并丢弃部分写入的文件。CSV 文件格式没有这样的概念,并且部分写入的 CSV 文件可能会被检测为有效。