DuckDB 嘎嘎叫地使用 Arrow:Apache Arrow 和 DuckDB 之间的零拷贝数据集成


已发布 2021 年 12 月 3 日
作者 Pedro Holanda, Jonathan Keane

简而言之:DuckDB 和 Apache Arrow 之间的零拷贝集成允许使用 SQL 或关系 API 在 Python 和 R 中快速分析大于内存的数据集。

这篇文章是与 DuckDB 博客 合作并在其上交叉发布的。

Apache Arrow 的一部分是为分析库优化的内存数据格式。与 Pandas 和 R 数据帧一样,它使用列式数据模型。但 Arrow 项目不仅仅包含这种格式:Arrow C++ 库(可通过绑定在 Python、R 和 Ruby 中访问)具有允许您高效计算数据集的附加功能。这些附加功能建立在上述内存格式实现之上。数据集可以跨越 Parquet、CSV 或其他格式的多个文件,文件甚至可以位于远程或云存储(如 HDFS 或 Amazon S3)上。Arrow C++ 查询引擎支持查询结果的流式传输,具有复杂数据类型(例如,列表、结构、映射)的有效实现,并且可以执行重要的扫描优化,例如投影和过滤器下推。

DuckDB 是一种新的分析数据管理系统,旨在在其他进程中运行复杂的 SQL 查询。DuckDB 具有 R 和 Python 等语言的绑定。DuckDB 可以直接查询 Arrow 数据集并将查询结果流式传输回 Arrow。这种集成允许用户使用 DuckDB 的 SQL 接口和 API 查询 Arrow 数据,同时利用 DuckDB 的并行矢量化执行引擎,而无需任何额外的数据复制。此外,这种集成充分利用了 Arrow 的谓词和过滤器下推,同时扫描数据集。

这种集成是独一无二的,因为它在 DuckDB 和 Arrow 之间以及反之亦然使用零拷贝数据流,以便您可以将两者组合在一起进行查询。这带来了三个主要好处

  1. **大于内存的分析:**由于两个库都支持流式查询结果,我们能够在不完全从磁盘加载数据的情况下对数据执行操作。相反,我们可以一次执行一批。这使我们能够对大于内存的数据执行查询。
  2. **复杂数据类型:**DuckDB 可以高效地处理可以存储在 Arrow 向量中的复杂数据类型,包括任意嵌套的结构、列表和映射。
  3. **高级优化器:**DuckDB 最先进的优化器可以直接将过滤器和投影下推到 Arrow 扫描中。因此,只会读取相关的列和分区,从而使系统能够利用 Parquet 文件中的分区消除等功能。这显着加快了查询执行速度。

对于只对基准测试感兴趣的人,您可以跳转到下面的基准测试部分

快速浏览

在深入研究集成的细节之前,在本节中,我们将提供一个简单的示例来说明 DuckDB-Arrow 集成是多么强大和易于使用。只需几行代码,您就可以开始查询 Arrow 数据集。假设您想分析臭名昭著的 纽约市出租车数据集,并确定团体小费是否比单人乘客更多或更少。

R

Arrow 和 DuckDB 都支持 dplyr 管道,方便那些更习惯于使用 dplyr 进行数据分析的人。Arrow 包包含两个辅助函数,允许我们在 Arrow 和 DuckDB 之间来回传递数据(to_duckdb()to_arrow())。这在 Arrow 或 DuckDB 中支持某些功能而另一个不支持的情况下尤其有用。例如,如果您发现一个复杂的 dplyr 管道,其中 SQL 转换无法与 DuckDB 一起使用,请在管道前使用 to_arrow() 来使用 Arrow 引擎。或者,如果您有一个尚未在 Arrow 中实现的函数(例如,窗口聚合),请使用 to_duckdb() 来使用 DuckDB 引擎。所有这些操作都不会在您来回传递数据时产生任何(重新)序列化数据的成本!

library(duckdb)
library(arrow)
library(dplyr)

# Open dataset using year,month folder partition
ds <- arrow::open_dataset("nyc-taxi", partitioning = c("year", "month"))

ds %>%
  # Look only at 2015 on, where the number of passenger is positive, the trip distance is
  # greater than a quarter mile, and where the fare amount is positive
  filter(year > 2014 & passenger_count > 0 & trip_distance > 0.25 & fare_amount > 0) %>%
  # Pass off to DuckDB
  to_duckdb() %>%
  group_by(passenger_count) %>%
  mutate(tip_pct = tip_amount / fare_amount) %>%
  summarise(
    fare_amount = mean(fare_amount, na.rm = TRUE),
    tip_amount = mean(tip_amount, na.rm = TRUE),
    tip_pct = mean(tip_pct, na.rm = TRUE)
  ) %>%
  arrange(passenger_count) %>%
  collect()

Python

Python 中的工作流程与 R 中一样简单。在本例中,我们使用 DuckDB 的关系 API。

import duckdb
import pyarrow as pa
import pyarrow.dataset as ds

# Open dataset using year,month folder partition
nyc = ds.dataset('nyc-taxi/', partitioning=["year", "month"])

# We transform the nyc dataset into a DuckDB relation
nyc = duckdb.arrow(nyc)

# Run same query again
nyc.filter("year > 2014 & passenger_count > 0 & trip_distance > 0.25 & fare_amount > 0")
    .aggregate("SELECT AVG(fare_amount), AVG(tip_amount), AVG(tip_amount / fare_amount) as tip_pct","passenger_count").arrow()

DuckDB 和 Arrow:基础知识

在本节中,我们将介绍在 Python 和 R 中读取和输出 Arrow 表所需的一些基本代码示例。

设置

首先,我们需要安装 DuckDB 和 Arrow。下面显示了在 Python 和 R 中安装这两个库的过程。

# Python Install
pip install duckdb
pip install pyarrow
# R Install
install.packages("duckdb")
install.packages("arrow")

要执行本节中的示例,我们需要下载以下自定义 parquet 文件

  • https://github.com/duckdb/duckdb-web/blob/master/_posts/data/integers.parquet?raw=true
  • https://github.com/cwida/duckdb-data/releases/download/v1.0/lineitemsf1.snappy.parquet

Python

在 Python 中有两种方法可以查询 Arrow 中的数据

  1. 通过关系 API

     # Reads Parquet File to an Arrow Table
     arrow_table = pq.read_table('integers.parquet')
    
     # Transforms Arrow Table -> DuckDB Relation
     rel_from_arrow = duckdb.arrow(arrow_table)
    
     # we can run a SQL query on this and print the result
     print(rel_from_arrow.query('arrow_table', 'SELECT SUM(data) FROM arrow_table WHERE data > 50').fetchone())
    
     # Transforms DuckDB Relation -> Arrow Table
     arrow_table_from_duckdb = rel_from_arrow.arrow()
    
  2. 通过使用替换扫描并使用 SQL 直接查询对象

     # Reads Parquet File to an Arrow Table
     arrow_table = pq.read_table('integers.parquet')
    
     # Gets Database Connection
     con = duckdb.connect()
    
     # we can run a SQL query on this and print the result
     print(con.execute('SELECT SUM(data) FROM arrow_table WHERE data > 50').fetchone())
    
     # Transforms Query Result from DuckDB to Arrow Table
     # We can directly read the arrow object through DuckDB's replacement scans.
     con.execute("SELECT * FROM arrow_table").fetch_arrow_table()
    

可以将 DuckDB 关系和查询结果转换回 Arrow。

R

在 R 中,您可以通过将表注册为视图来与 DuckDB 中的 Arrow 数据进行交互(另一种方法是使用上面所示的 dplyr)。

library(duckdb)
library(arrow)
library(dplyr)

# Reads Parquet File to an Arrow Table
arrow_table <- arrow::read_parquet("integers.parquet", as_data_frame = FALSE)

# Gets Database Connection
con <- dbConnect(duckdb::duckdb())

# Registers arrow table as a DuckDB view
arrow::to_duckdb(arrow_table, table_name = "arrow_table", con = con)

# we can run a SQL query on this and print the result
print(dbGetQuery(con, "SELECT SUM(data) FROM arrow_table WHERE data > 50"))

# Transforms Query Result from DuckDB to Arrow Table
result <- dbSendQuery(con, "SELECT * FROM arrow_table")

从/向 Arrow 流式传输数据

在上一节中,我们描述了如何与 Arrow 表进行交互。但是,Arrow 还允许用户以流式方式与数据进行交互。无论是使用它(例如,从 Arrow 数据集)还是生成它(例如,返回 RecordBatchReader)。当然,DuckDB 能够使用数据集并生成 RecordBatchReaders。此示例使用纽约市出租车数据集,该数据集存储在按年份和月份分区的 Parquet 文件中,我们可以通过 Arrow R 包下载该数据集

arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi")

Python

# Reads dataset partitioning it in year/month folder
nyc_dataset = ds.dataset('nyc-taxi/', partitioning=["year", "month"])

# Gets Database Connection
con = duckdb.connect()

query = con.execute("SELECT * FROM nyc_dataset")
# DuckDB's queries can now produce a Record Batch Reader
record_batch_reader = query.fetch_record_batch()
# Which means we can stream the whole query per batch.
# This retrieves the first batch
chunk = record_batch_reader.read_next_batch()

R

# Reads dataset partitioning it in year/month folder
nyc_dataset = open_dataset("nyc-taxi/", partitioning = c("year", "month"))

# Gets Database Connection
con <- dbConnect(duckdb::duckdb())

# We can use the same function as before to register our arrow dataset
duckdb::duckdb_register_arrow(con, "nyc", nyc_dataset)

res <- dbSendQuery(con, "SELECT * FROM nyc", arrow = TRUE)
# DuckDB's queries can now produce a Record Batch Reader
record_batch_reader <- duckdb::duckdb_fetch_record_batch(res)

# Which means we can stream the whole query per batch.
# This retrieves the first batch
cur_batch <- record_batch_reader$read_next_batch()

前面的 R 代码以低级细节显示了数据的流式传输方式。我们在 Arrow 包中提供了辅助函数 to_arrow(),它是对此的包装,可以轻松地将此流式传输合并到 dplyr 管道中。1

基准测试比较

在这里,我们通过一个简单的基准测试来演示使用 DuckDB 查询 Arrow 数据集与使用 Pandas 查询 Arrow 数据集之间的性能差异。对于投影和过滤器下推比较,我们将使用 Arrow 表。这是因为 Pandas 无法使用 Arrow 流对象。

对于纽约市出租车基准测试,我们使用了 scilens 钻石配置,对于 TPC-H 基准测试,我们使用了 m1 MacBook Pro。在这两种情况下,都使用了 DuckDB 中的并行性(现在默认情况下启用)。

对于与 Pandas 的比较,请注意 DuckDB 并行运行,而 pandas 仅支持单线程执行。除此之外,应该注意的是,我们正在比较自动优化。DuckDB 的查询优化器可以自动下推过滤器和投影。pandas 不支持这种自动优化,但用户可以通过在 read_parquet() 调用中手动指定它们来手动执行其中一些谓词和过滤器下推。

投影下推

在本例中,我们对 lineitem 表的两列运行一个简单的聚合。

# DuckDB
lineitem = pq.read_table('lineitemsf1.snappy.parquet')
con = duckdb.connect()

# Transforms Query Result from DuckDB to Arrow Table
con.execute("""SELECT sum(l_extendedprice * l_discount) AS revenue
                FROM
                lineitem;""").fetch_arrow_table()

# Pandas
arrow_table = pq.read_table('lineitemsf1.snappy.parquet')

# Converts an Arrow table to a Dataframe
df = arrow_table.to_pandas()

# Runs aggregation
res =  pd.DataFrame({'sum': [(df.l_extendedprice * df.l_discount).sum()]})

# Creates an Arrow Table from a Dataframe
new_table = pa.Table.from_pandas(res)

名称 时间(秒)
DuckDB 0.19
Pandas 2.13

lineitem 表由 16 列组成,但是,要执行此查询,只需要两列 l_extendedprice 和 * l_discount。由于 DuckDB 可以下推这些列的投影,因此它能够比 Pandas 快一个数量级地执行此查询。

过滤器下推

对于我们的过滤器下推,我们重复上一节中使用的相同聚合,但在另外 4 列上添加过滤器。

# DuckDB
lineitem = pq.read_table('lineitemsf1.snappy.parquet')

# Get database connection
con = duckdb.connect()

# Transforms Query Result from DuckDB to Arrow Table
con.execute("""SELECT sum(l_extendedprice * l_discount) AS revenue
        FROM
            lineitem
        WHERE
            l_shipdate >= CAST('1994-01-01' AS date)
            AND l_shipdate < CAST('1995-01-01' AS date)
            AND l_discount BETWEEN 0.05
            AND 0.07
            AND l_quantity < 24; """).fetch_arrow_table()

# Pandas
arrow_table = pq.read_table('lineitemsf1.snappy.parquet')

df = arrow_table.to_pandas()
filtered_df = lineitem[
        (lineitem.l_shipdate >= "1994-01-01") &
        (lineitem.l_shipdate < "1995-01-01") &
        (lineitem.l_discount >= 0.05) &
        (lineitem.l_discount <= 0.07) &
        (lineitem.l_quantity < 24)]

res =  pd.DataFrame({'sum': [(filtered_df.l_extendedprice * filtered_df.l_discount).sum()]})
new_table = pa.Table.from_pandas(res)
名称 时间(秒)
DuckDB 0.04
Pandas 2.29

现在 DuckDB 和 Pandas 之间的差异更大,比 Pandas 快两个数量级。同样,由于过滤器和投影都被下推到 Arrow,DuckDB 读取的数据少于 Pandas,而后者无法自动执行此优化。

流式传输

如前所述,DuckDB 能够以流式方式处理 Arrow 数据。在本节中,我们将运行一个简单的基准测试,以展示与完全物化和 Pandas 相比,DuckDB 在速度和内存使用方面的优势。此示例使用完整的纽约市出租车数据集,您可以下载

# DuckDB
# Open dataset using year,month folder partition
nyc = ds.dataset('nyc-taxi/', partitioning=["year", "month"])

# Get database connection
con = duckdb.connect()

# Run query that selects part of the data
query = con.execute("SELECT total_amount, passenger_count,year FROM nyc where total_amount > 100 and year > 2014")

# Create Record Batch Reader from Query Result.
# "fetch_record_batch()" also accepts an extra parameter related to the desired produced chunk size.
record_batch_reader = query.fetch_record_batch()

# Retrieve all batch chunks
chunk = record_batch_reader.read_next_batch()
while len(chunk) > 0:
    chunk = record_batch_reader.read_next_batch()
# Pandas
# We must exclude one of the columns of the NYC dataset due to an unimplemented cast in Arrow.
working_columns = ["vendor_id","pickup_at","dropoff_at","passenger_count","trip_distance","pickup_longitude",
    "pickup_latitude","store_and_fwd_flag","dropoff_longitude","dropoff_latitude","payment_type",
    "fare_amount","extra","mta_tax","tip_amount","tolls_amount","total_amount","year", "month"]

# Open dataset using year,month folder partition
nyc_dataset = ds.dataset(dir, partitioning=["year", "month"])
# Generate a scanner to skip problematic column
dataset_scanner = nyc_dataset.scanner(columns=working_columns)

# Materialize dataset to an Arrow Table
nyc_table = dataset_scanner.to_table()

# Generate Dataframe from Arow Table
nyc_df = nyc_table.to_pandas()

# Apply Filter
filtered_df = nyc_df[
    (nyc_df.total_amount > 100) &
    (nyc_df.year >2014)]

# Apply Projection
res = filtered_df[["total_amount", "passenger_count","year"]]

# Transform Result back to an Arrow Table
new_table = pa.Table.from_pandas(res)
名称 时间(秒) 内存使用峰值 (GB)
DuckDB 0.05 0.3
Pandas 146.91 248

DuckDB 和 Pandas 之间的时间差异是我们本文探讨的所有集成优势的综合结果。在 DuckDB 中,应用了过滤器下推来执行分区消除(即,我们跳过读取年份 <= 2014 的 Parquet 文件)。过滤器下推也用于消除不相关的行组(即,总金额始终 <= 100 的行组)。由于我们的投影下推,Arrow 只需从 Parquet 文件中读取感兴趣的列,这使得它只需读取 20 列中的 4 列。另一方面,Pandas 无法自动下推任何这些优化,这意味着必须读取整个数据集。这导致查询执行时间相差 4 个数量级。

在上表中,我们还描述了 DuckDB(流式)和 Pandas(完全物化)之间内存使用峰值的比较。在 DuckDB 中,我们只需要将感兴趣的行组加载到内存中。因此,我们的内存使用率很低。由于我们一次只需要在内存中保留一个这样的行组,因此我们的内存使用率也是恒定的。另一方面,Pandas 在执行查询时必须完全物化所有 Parquet 文件。因此,我们看到它的内存消耗持续急剧增加。两种解决方案的内存消耗总差异约为 3 个数量级。

结论和反馈

在这篇博文中,我们主要展示了如何使用 DuckDB 对 Arrow 数据集执行查询。还有其他库也可以使用 Arrow 格式,但它们具有不同的用途和功能。与往常一样,如果您希望将来看到与不同工具进行基准测试的文章,我们很乐意听取您的意见!请随时给我们发送电子邮件或直接在 Hacker News 帖子中分享您的想法。

最后但同样重要的是,如果您在使用我们的集成时遇到任何问题,请在DuckDB 的问题跟踪器Arrow 的问题跟踪器中提出问题,具体取决于哪个库存在问题。

  1. 在 Arrow 6.0.0 中,to_arrow() 目前返回完整表,但在即将发布的 7.0.0 版本中将允许完全流式传输。