使用 Apache Arrow 加速 PySpark


已发布 2017年7月26日
作者 BryanCutler

Bryan Cutler 是 IBM Spark 技术中心 STC 的软件工程师

Apache Spark 2.3 版本开始,Apache Arrow 将成为一个受支持的依赖项,并开始通过柱状数据传输提供更高的性能。如果您是一个喜欢用 Python 和 Pandas 工作的 Spark 用户,这绝对是一件令人兴奋的事情!最初的工作仅限于使用 toPandas() 收集 Spark DataFrame,我将在下面讨论,但是目前还有许多其他的改进正在 进行中

优化 Spark 转换为 Pandas

以前使用 DataFrame.toPandas() 在 PySpark 中将 Spark DataFrame 转换为 Pandas 的方法效率非常低。 基本上,它的工作方式是首先将所有行收集到 Spark 驱动程序。 接下来,每一行都将被序列化为 Python 的 pickle 格式并发送到 Python worker 进程。 此子进程将每一行解封到巨大的元组列表中。 最后,使用 pandas.DataFrame.from_records() 从列表中创建一个 Pandas DataFrame。

所有这些可能看起来像是标准程序,但存在 2 个明显的的问题:1)即使使用 CPickle,Python 序列化也是一个缓慢的过程,并且 2)使用 from_records 创建 pandas.DataFrame 必须缓慢地遍历纯 Python 数据列表并将每个值转换为 Pandas 格式。 有关详细分析,请参见此处

这是 Arrow 真正发挥作用来帮助优化这些步骤的地方:1)一旦数据采用 Arrow 内存格式,就不再需要序列化/pickle,因为 Arrow 数据可以直接发送到 Python 进程,2)当 Python 收到 Arrow 数据后,pyarrow 可以利用零拷贝方法一次性从整个数据块创建 pandas.DataFrame,而不是处理单个标量值。 此外,到 Arrow 数据的转换可以在 JVM 上完成,并推送回 Spark 执行器以并行执行,从而大大减轻了驱动程序的负载。

由于合并了 SPARK-13534,因此调用 toPandas() 时使用 Arrow 需要通过将 SQLConf “spark.sql.execution.arrow.enabled” 设置为 “true” 来启用。 让我们看一个简单的用法示例。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0-SNAPSHOT
      /_/

Using Python version 2.7.13 (default, Dec 20 2016 23:09:15)
SparkSession available as 'spark'.

In [1]: from pyspark.sql.functions import rand
   ...: df = spark.range(1 << 22).toDF("id").withColumn("x", rand())
   ...: df.printSchema()
   ...: 
root
 |-- id: long (nullable = false)
 |-- x: double (nullable = false)


In [2]: %time pdf = df.toPandas()
CPU times: user 17.4 s, sys: 792 ms, total: 18.1 s
Wall time: 20.7 s

In [3]: spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [4]: %time pdf = df.toPandas()
CPU times: user 40 ms, sys: 32 ms, total: 72 ms                                 
Wall time: 737 ms

In [5]: pdf.describe()
Out[5]: 
                 id             x
count  4.194304e+06  4.194304e+06
mean   2.097152e+06  4.998996e-01
std    1.210791e+06  2.887247e-01
min    0.000000e+00  8.291929e-07
25%    1.048576e+06  2.498116e-01
50%    2.097152e+06  4.999210e-01
75%    3.145727e+06  7.498380e-01
max    4.194303e+06  9.999996e-01

此示例是在我的笔记本电脑上使用 Spark 默认值在本地运行的,因此显示的时间不应精确地被采用。 即使如此,很明显性能得到了巨大的提升,并且使用 Arrow 使原本极其缓慢的事情加速到几乎不被注意到。

使用说明

在使用此新功能之前,请记住以下几点。 在撰写本文时,pyarrow 不会与 pyspark 自动安装,需要手动安装,请参见安装说明。 计划将 pyarrow 添加为 pyspark 依赖项,以便 > pip install pyspark 也会安装 pyarrow。

当前,控制 SQLConf 默认情况下处于禁用状态。 可以像上面的示例中那样以编程方式启用它,或者将行“spark.sql.execution.arrow.enabled=true”添加到 SPARK_HOME/conf/spark-defaults.conf

此外,并非所有 Spark 数据类型当前都受支持,并且仅限于原始类型。 扩展的类型支持正在开发中,预计也会在 Spark 2.3 版本中提供。

未来的改进

如前所述,这只是使用 Arrow 使 Spark Python 用户的生活更轻松的第一步。 正在进行的一些令人兴奋的举措是允许矢量化 UDF 评估(SPARK-21190SPARK-21404),以及使用 Pandas DataFrame 在分组数据上应用函数的能力(SPARK-20396)。 正如 Arrow 帮助将 Spark 转换为 Pandas 一样,当从现有的 Pandas DataFrame 创建 Spark DataFrame 时,它也可以在另一个方向上工作(SPARK-20791)。 敬请关注更多!

合作者

达到第一个里程碑是 Apache Arrow 和 Spark 社区共同努力的结果。 感谢 Wes McKinney, Li Jin, Holden Karau, Reynold Xin, Wenchen Fan, Shane Knapp 和许多其他帮助推动这项工作前进的人的辛勤工作。