使用 Apache Arrow 加速 PySpark
已发布 2017 年 7 月 26 日
作者 BryanCutler
Bryan Cutler 是 IBM Spark 技术中心 STC 的软件工程师
从 Apache Spark 2.3 版本开始,Apache Arrow 将成为受支持的依赖项,并开始在列式数据传输方面提供更高的性能。如果您是偏爱使用 Python 和 Pandas 的 Spark 用户,这绝对是一个令人兴奋的消息!最初的工作仅限于使用 toPandas() 收集 Spark DataFrame,我将在下文讨论这一点,但目前还有许多额外的改进正在进行中。
优化 Spark 到 Pandas 的转换
以前在 PySpark 中使用 DataFrame.toPandas() 将 Spark DataFrame 转换为 Pandas 的方式效率极低。基本上,它的工作原理是首先将所有行收集到 Spark 驱动程序。然后,每行都会被序列化成 Python 的 pickle 格式,并发送到一个 Python 工作进程。这个子进程会将每行反序列化成一个巨大的元组列表。最后,使用 pandas.DataFrame.from_records() 从该列表中创建 Pandas DataFrame。
这看起来可能像标准流程,但存在两个明显的问题:1) 即使使用 CPickle,Python 序列化也是一个缓慢的过程,2) 使用 from_records 创建 pandas.DataFrame 必须缓慢地遍历纯 Python 数据的列表并将每个值转换为 Pandas 格式。有关详细分析,请参阅此处。
这就是 Arrow 大放异彩并优化这些步骤的地方:1) 数据一旦以 Arrow 内存格式存储,就不再需要序列化/pickle,因为 Arrow 数据可以直接发送到 Python 进程;2) 当 Arrow 数据在 Python 中接收后,pyarrow 可以利用零拷贝方法一次性从整个数据块创建 pandas.DataFrame,而不是处理单独的标量值。此外,到 Arrow 数据的转换可以在 JVM 上完成,然后推回给 Spark 执行器并行执行,从而大大减轻了驱动程序的负载。
自SPARK-13534 合并后,调用 toPandas() 时使用 Arrow 需要通过将 SQLConf "spark.sql.execution.arrow.enabled" 设置为 "true" 来启用。让我们来看一个简单的使用示例。
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.0-SNAPSHOT
/_/
Using Python version 2.7.13 (default, Dec 20 2016 23:09:15)
SparkSession available as 'spark'.
In [1]: from pyspark.sql.functions import rand
...: df = spark.range(1 << 22).toDF("id").withColumn("x", rand())
...: df.printSchema()
...:
root
|-- id: long (nullable = false)
|-- x: double (nullable = false)
In [2]: %time pdf = df.toPandas()
CPU times: user 17.4 s, sys: 792 ms, total: 18.1 s
Wall time: 20.7 s
In [3]: spark.conf.set("spark.sql.execution.arrow.enabled", "true")
In [4]: %time pdf = df.toPandas()
CPU times: user 40 ms, sys: 32 ms, total: 72 ms
Wall time: 737 ms
In [5]: pdf.describe()
Out[5]:
id x
count 4.194304e+06 4.194304e+06
mean 2.097152e+06 4.998996e-01
std 1.210791e+06 2.887247e-01
min 0.000000e+00 8.291929e-07
25% 1.048576e+06 2.498116e-01
50% 2.097152e+06 4.999210e-01
75% 3.145727e+06 7.498380e-01
max 4.194303e+06 9.999996e-01
此示例在我使用 Spark 默认设置的笔记本电脑上本地运行,因此显示的时间不应被视为精确。尽管如此,很明显性能得到了巨大的提升,使用 Arrow 将原本极其缓慢的操作提速到了几乎可以忽略不计。
使用注意事项
在使用此新功能之前,请记住以下几点。在撰写本文时,pyarrow 不会与 pyspark 自动安装,需要手动安装,请参阅安装说明。计划将 pyarrow 添加为 pyspark 依赖项,以便 > pip install pyspark 也会安装 pyarrow。
目前,控制 SQLConf 默认是禁用的。可以像上面的示例中那样以编程方式启用它,或者通过在 SPARK_HOME/conf/spark-defaults.conf 中添加行 "spark.sql.execution.arrow.enabled=true" 来启用。
此外,并非所有 Spark 数据类型目前都受支持,仅限于基本类型。类型支持的扩展正在进行中,预计也将包含在 Spark 2.3 版本中。
未来改进
如前所述,这是使用 Arrow 使 Spark Python 用户的生活更轻松的第一步。一些令人兴奋的正在进行中的举措包括允许矢量化 UDF 评估(SPARK-21190,SPARK-21404),以及使用 Pandas DataFrame 对分组数据应用函数的能力(SPARK-20396)。正如 Arrow 帮助将 Spark 转换为 Pandas 一样,在从现有的 Pandas DataFrame 创建 Spark DataFrame 时,它也可以朝相反的方向工作(SPARK-20791)。敬请期待更多!
协作者
达到这个第一个里程碑是 Apache Arrow 和 Spark 社区共同努力的结果。感谢Wes McKinney、Li Jin、Holden Karau、Reynold Xin、Wenchen Fan、Shane Knapp 以及许多其他推动这一努力向前发展的人们的辛勤工作。