使用 Apache Arrow 加速 R 和 Apache Spark
发布日期 2019 年 1 月 25 日
作者 Javier Luraschi
Javier Luraschi 是 RStudio 的一名软件工程师
目前,sparklyr 和 SparkR 项目正在积极开发对 Apache Spark 中 R 语言的支持。本文探讨了在使用 R 结合 Apache Spark、Arrow 和 sparklyr 时所取得的初步但极具前景的性能改进。
设置
由于这项工作尚在积极开发中,请按照以下方式从 GitHub 安装 sparklyr 和 arrow
devtools::install_github("apache/arrow", subdir = "r", ref = "apache-arrow-0.12.0")
devtools::install_github("rstudio/sparklyr", ref = "apache-arrow-0.12.0")
在本基准测试中,我们将使用 dplyr,但使用 DBI 或 sparklyr 中的 Spark DataFrames 也能获得类似的改进。本地 Spark 连接和包含 1000 万个数值行的数据帧初始化方式如下
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", config = list("sparklyr.shell.driver-memory" = "6g"))
data <- data.frame(y = runif(10^7, 0, 1))
复制数据
目前,使用 sparklyr 将数据复制到 Spark 的方式是通过将数据从 R 持久化到磁盘,然后再由 Spark 读取。这种方式原本适用于小型数据集,因为已有更好的工具可以将数据传输到分布式存储系统中。尽管如此,许多用户仍要求支持以更快的速度将更多数据传输到 Spark 中。
通过在 sparklyr 中使用 arrow,我们可以直接将数据从 R 传输到 Spark,而无需在 R 中进行序列化或持久化到磁盘。
以下示例展示了使用 sparklyr(启用与未启用 arrow)将 1000 万行数据从 R 复制到 Spark 的过程,使用 arrow 带来了近 16 倍的性能提升。
此基准测试使用了 microbenchmark R 包,该包会多次运行代码,提供总执行时间的统计数据,并绘制每次执行时间以了解各次迭代的分布情况。
microbenchmark::microbenchmark(
setup = library(arrow),
arrow_on = {
sparklyr_df <<- copy_to(sc, data, overwrite = T)
count(sparklyr_df) %>% collect()
},
arrow_off = {
if ("arrow" %in% .packages()) detach("package:arrow")
sparklyr_df <<- copy_to(sc, data, overwrite = T)
count(sparklyr_df) %>% collect()
},
times = 10
) %T>% print() %>% ggplot2::autoplot()
Unit: seconds
expr min lq mean median uq max neval
arrow_on 3.011515 4.250025 7.257739 7.273011 8.974331 14.23325 10
arrow_off 50.051947 68.523081 119.946947 71.898908 138.743419 390.44028 10
收集数据
同样,arrow 与 sparklyr 的结合现在可以在将数据从 Spark 收集到 R 时避免在 R 中进行反序列化。这些改进不如复制数据那样显著,因为 sparklyr 本身已经在以列式格式收集数据。
以下基准测试演示了将 1000 万行数据从 Spark 收集到 R 的过程,结果显示 arrow 可以带来 3 倍的性能提升。
microbenchmark::microbenchmark(
setup = library(arrow),
arrow_on = {
collect(sparklyr_df)
},
arrow_off = {
if ("arrow" %in% .packages()) detach("package:arrow")
collect(sparklyr_df)
},
times = 10
) %T>% print() %>% ggplot2::autoplot()
Unit: seconds
expr min lq mean median uq max neval
arrow_on 4.520593 5.609812 6.154509 5.928099 6.217447 9.432221 10
arrow_off 7.882841 13.358113 16.670708 16.127704 21.051382 24.373331 10
数据转换
目前,在 sparklyr 中使用 R 函数进行自定义数据转换时,会将数据以行格式从 Spark 通过套接字连接移动到 R 进程中。由于每一行都需要反序列化多种数据类型,因此以行格式传输数据效率较低。随后,数据被转换为列格式(R 最初设计就是使用列式数据)。当 R 完成计算后,数据再次被转换为行格式,逐行序列化,然后通过套接字连接发回 Spark。
通过在 sparklyr 中添加对 arrow 的支持,Spark 可以并行执行从行格式到列格式的转换。数据随后通过套接字传输,但无需进行任何自定义序列化。R 进程只需将数据从套接字复制到其堆内存中,进行转换,然后再复制回套接字连接即可。
以下示例展示了在启用与未启用 arrow 的情况下转换 10 万行数据的过程,arrow 使得使用 R 函数的转换速度提高了近 41 倍。
microbenchmark::microbenchmark(
setup = library(arrow),
arrow_on = {
sample_n(sparklyr_df, 10^5) %>% spark_apply(~ .x / 2) %>% count()
},
arrow_off = {
if ("arrow" %in% .packages()) detach("package:arrow")
sample_n(sparklyr_df, 10^5) %>% spark_apply(~ .x / 2) %>% count()
},
times = 10
) %T>% print() %>% ggplot2::autoplot()
Unit: seconds
expr min lq mean median uq max neval
arrow_on 3.881293 4.038376 5.136604 4.772739 5.759082 7.873711 10
arrow_off 178.605733 183.654887 213.296238 227.182018 233.601885 238.877341 10
更多基准测试和微调参数可参见 sparklyr 的 /rstudio/sparklyr/pull/1611 以及 SparkR 的 /apache/spark/pull/22954。期待将此功能带给 Spark、Arrow 和 R 社区。