使用 Apache Arrow 加速 R 和 Apache Spark


已发布 2019 年 1 月 25 日
作者 Javier Luraschi

Javier LuraschiRStudio 的一名软件工程师

在 R 中使用 Apache Arrow 支持 Apache Spark 目前正在 sparklyrSparkR 项目中积极开发中。本文探讨了在 R 中使用 Apache Spark、Arrow 和 sparklyr 时早期但很有希望的性能提升。

设置

由于这项工作正在积极开发中,请从 GitHub 安装 sparklyrarrow,如下所示

devtools::install_github("apache/arrow", subdir = "r", ref = "apache-arrow-0.12.0")
devtools::install_github("rstudio/sparklyr", ref = "apache-arrow-0.12.0")

在此基准测试中,我们将使用 dplyr,但使用 DBIsparklyr 中的 Spark DataFrames 预计也会有类似的改进。 本地 Spark 连接和包含 1000 万个数字行的 dataframe 初始化如下

library(sparklyr)
library(dplyr)

sc <- spark_connect(master = "local", config = list("sparklyr.shell.driver-memory" = "6g"))
data <- data.frame(y = runif(10^7, 0, 1))

复制

目前,使用 sparklyr 将数据复制到 Spark 是通过将数据从 R 持久化到磁盘并从 Spark 读取数据来执行的。 这旨在用于小型数据集,因为有更好的工具可以将数据传输到分布式存储系统。 尽管如此,许多用户都要求支持以更快的速度将更多数据传输到 Spark。

使用带有 sparklyrarrow,我们可以将数据直接从 R 传输到 Spark,而无需在 R 中序列化这些数据或将其持久化到磁盘中。

以下示例使用带有和不带有 arrowsparklyr 将 1000 万行从 R 复制到 Spark,使用 arrow 可以提高近 16 倍。

此基准测试使用 microbenchmark R 包,该包多次运行代码,提供总执行时间的统计信息,并绘制每次执行时间以了解每次迭代的分布。

microbenchmark::microbenchmark(
  setup = library(arrow),
  arrow_on = {
    sparklyr_df <<- copy_to(sc, data, overwrite = T)
    count(sparklyr_df) %>% collect()
  },
  arrow_off = {
    if ("arrow" %in% .packages()) detach("package:arrow")
    sparklyr_df <<- copy_to(sc, data, overwrite = T)
    count(sparklyr_df) %>% collect()
  },
  times = 10
) %T>% print() %>% ggplot2::autoplot()
 Unit: seconds
      expr       min        lq       mean    median         uq       max neval
  arrow_on  3.011515  4.250025   7.257739  7.273011   8.974331  14.23325    10
 arrow_off 50.051947 68.523081 119.946947 71.898908 138.743419 390.44028    10
Copying data with R into Spark with and without Arrow

收集

同样,带有 sparklyrarrow 现在可以避免在将数据从 Spark 收集到 R 时在 R 中反序列化数据。这些改进不如复制数据那么显着,因为 sparklyr 已经以列格式收集数据。

以下基准测试将 1000 万行从 Spark 收集到 R 中,并表明 arrow 可以带来 3 倍的改进。

microbenchmark::microbenchmark(
  setup = library(arrow),
  arrow_on = {
    collect(sparklyr_df)
  },
  arrow_off = {
    if ("arrow" %in% .packages()) detach("package:arrow")
    collect(sparklyr_df)
  },
  times = 10
) %T>% print() %>% ggplot2::autoplot()
Unit: seconds
      expr      min        lq      mean    median        uq       max neval
  arrow_on 4.520593  5.609812  6.154509  5.928099  6.217447  9.432221    10
 arrow_off 7.882841 13.358113 16.670708 16.127704 21.051382 24.373331    10
Collecting data with R from Spark with and without Arrow

转换

如今,使用 R 函数对数据进行自定义转换是在 sparklyr 中执行的,方法是通过套接字连接将行格式的数据从 Spark 移动到 R 进程,以行格式传输数据效率低下,因为需要对每一行反序列化多种数据类型,然后将数据转换为列格式(R 最初设计为使用列数据),一旦 R 完成此计算,数据将再次转换为行格式,逐行序列化,然后通过套接字连接发送回 Spark。

通过在 sparklyr 中添加对 arrow 的支持,它使 Spark 能够在 Spark 中并行执行行格式到列格式的转换。 然后通过套接字传输数据,但不进行自定义序列化。 所有 R 进程需要做的就是将这些数据从套接字复制到其堆中,对其进行转换,然后将其复制回套接字连接。

以下示例使用和不使用 arrow 转换 10 万行,arrow 使使用 R 函数进行转换的速度提高了近 41 倍。

microbenchmark::microbenchmark(
  setup = library(arrow),
  arrow_on = {
    sample_n(sparklyr_df, 10^5) %>% spark_apply(~ .x / 2) %>% count()
  },
  arrow_off = {
    if ("arrow" %in% .packages()) detach("package:arrow")
    sample_n(sparklyr_df, 10^5) %>% spark_apply(~ .x / 2) %>% count()
  },
  times = 10
) %T>% print() %>% ggplot2::autoplot()
Unit: seconds
      expr        min         lq       mean     median         uq        max neval
  arrow_on   3.881293   4.038376   5.136604   4.772739   5.759082   7.873711    10
 arrow_off 178.605733 183.654887 213.296238 227.182018 233.601885 238.877341    10
Transforming data with R in Spark with and without Arrow

其他基准测试和微调参数可以在 sparklyr /rstudio/sparklyr/pull/1611SparkR /apache/spark/pull/22954 下找到。期待将此功能带给 Spark、Arrow 和 R 社区。