DuckDB与Arrow的协作:Apache Arrow和DuckDB之间的零拷贝数据集成
已发布 2021年12月03日
作者 Pedro Holanda, Jonathan Keane
TLDR:DuckDB和Apache Arrow之间的零拷贝集成允许使用SQL或关系API在Python和R中快速分析大于内存的数据集。
这篇文章是与DuckDB博客合作发布的,并且交叉发布在上面。
Apache Arrow的一部分是专为分析库优化的内存数据格式。 像Pandas和R Dataframes一样,它使用列式数据模型。 但是Arrow项目包含的不仅仅是格式:Arrow C ++库,可以通过绑定在Python,R和Ruby中使用,它具有其他功能,使您可以有效地计算数据集。 这些附加功能位于上述内存格式的实现之上。 数据集可能跨越Parquet,CSV或其他格式的多个文件,并且文件甚至可能位于远程或云存储(如HDFS或Amazon S3)上。 Arrow C ++查询引擎支持查询结果的流式传输,具有复杂数据类型(例如,列表,结构,地图)的高效实现,并且可以执行重要的扫描优化,例如投影和过滤器下推。
DuckDB是一种新的分析数据管理系统,旨在在其他进程中运行复杂的SQL查询。 DuckDB具有R和Python的绑定以及其他绑定。 DuckDB可以直接查询Arrow数据集并将查询结果流回Arrow。 这种集成允许用户使用DuckDB的SQL接口和API查询Arrow数据,同时利用DuckDB的并行向量化执行引擎,而无需任何额外的数据复制。 此外,这种集成充分利用了Arrow在扫描数据集时的谓词和过滤器下推。
这种集成是独一无二的,因为它使用DuckDB和Arrow之间以及反之亦然的零拷贝数据流式传输,因此您可以将两者组合在一起构成一个查询。 这导致了三个主要好处
- 大于内存的分析: 由于这两个库都支持流式查询结果,因此我们能够在不从磁盘完全加载数据的情况下执行数据。 相反,我们可以一次执行一个批处理。 这使我们可以在大于内存的数据上执行查询。
- 复杂数据类型: DuckDB可以有效地处理可以存储在Arrow向量中的复杂数据类型,包括任意嵌套的结构,列表和映射。
- 高级优化器: DuckDB的最新优化器可以将过滤器和投影直接下推到Arrow扫描中。 结果,只会读取相关的列和分区,从而使系统可以例如利用Parquet文件中的分区消除。 这大大加快了查询执行速度。
对于那些仅对基准测试感兴趣的人,您可以跳到下面的基准测试部分。
快速入门
在深入研究集成的细节之前,在本节中,我们将提供一个快速的示例,说明DuckDB-Arrow集成功能有多强大和易于使用。 仅需几行代码,您就可以开始查询Arrow数据集。 假设您想分析臭名昭著的纽约市出租车数据集,并确定群体的小费是多还是少于单人乘客。
R
Arrow和DuckDB都支持dplyr管道,以方便人们使用dplyr进行数据分析。 Arrow软件包包括两个辅助函数,使我们可以在Arrow和DuckDB之间来回传递数据(to_duckdb()
和to_arrow()
)。 在Arrow或DuckDB中支持某些功能,但在另一种功能中不支持的情况下,这特别有用。 例如,如果您发现一个复杂的dplyr管道,其中SQL转换不适用于DuckDB,请在管道之前使用to_arrow()
来使用Arrow引擎。 或者,如果您有一个在Arrow中尚未实现的功能(例如,窗口聚合),请使用to_duckdb()
来使用DuckDB引擎。 所有这些都不会为来回传递数据而付出(重新)序列化的任何成本!
library(duckdb)
library(arrow)
library(dplyr)
# Open dataset using year,month folder partition
ds <- arrow::open_dataset("nyc-taxi", partitioning = c("year", "month"))
ds %>%
# Look only at 2015 on, where the number of passenger is positive, the trip distance is
# greater than a quarter mile, and where the fare amount is positive
filter(year > 2014 & passenger_count > 0 & trip_distance > 0.25 & fare_amount > 0) %>%
# Pass off to DuckDB
to_duckdb() %>%
group_by(passenger_count) %>%
mutate(tip_pct = tip_amount / fare_amount) %>%
summarise(
fare_amount = mean(fare_amount, na.rm = TRUE),
tip_amount = mean(tip_amount, na.rm = TRUE),
tip_pct = mean(tip_pct, na.rm = TRUE)
) %>%
arrange(passenger_count) %>%
collect()
Python
Python中的工作流程与R中的一样简单。 在此示例中,我们使用DuckDB的关系API。
import duckdb
import pyarrow as pa
import pyarrow.dataset as ds
# Open dataset using year,month folder partition
nyc = ds.dataset('nyc-taxi/', partitioning=["year", "month"])
# We transform the nyc dataset into a DuckDB relation
nyc = duckdb.arrow(nyc)
# Run same query again
nyc.filter("year > 2014 & passenger_count > 0 & trip_distance > 0.25 & fare_amount > 0")
.aggregate("SELECT AVG(fare_amount), AVG(tip_amount), AVG(tip_amount / fare_amount) as tip_pct","passenger_count").arrow()
DuckDB和Arrow:基础知识
在本节中,我们将看一些基本示例,这些示例说明了在Python和R中读取和输出Arrow表所需的代码。
设置
首先,我们需要安装DuckDB和Arrow。 下面显示了Python和R中两个库的安装过程。
# Python Install
pip install duckdb
pip install pyarrow
# R Install
install.packages("duckdb")
install.packages("arrow")
要执行本节中的示例,我们需要下载以下自定义parquet文件
- https://github.com/duckdb/duckdb-web/blob/master/_posts/data/integers.parquet?raw=true
- https://github.com/cwida/duckdb-data/releases/download/v1.0/lineitemsf1.snappy.parquet
Python
在Python中,有两种方法可以查询Arrow中的数据
-
通过关系API
# Reads Parquet File to an Arrow Table arrow_table = pq.read_table('integers.parquet') # Transforms Arrow Table -> DuckDB Relation rel_from_arrow = duckdb.arrow(arrow_table) # we can run a SQL query on this and print the result print(rel_from_arrow.query('arrow_table', 'SELECT SUM(data) FROM arrow_table WHERE data > 50').fetchone()) # Transforms DuckDB Relation -> Arrow Table arrow_table_from_duckdb = rel_from_arrow.arrow()
-
通过使用替换扫描并使用SQL直接查询对象
# Reads Parquet File to an Arrow Table arrow_table = pq.read_table('integers.parquet') # Gets Database Connection con = duckdb.connect() # we can run a SQL query on this and print the result print(con.execute('SELECT SUM(data) FROM arrow_table WHERE data > 50').fetchone()) # Transforms Query Result from DuckDB to Arrow Table # We can directly read the arrow object through DuckDB's replacement scans. con.execute("SELECT * FROM arrow_table").fetch_arrow_table()
可以将DuckDB关系和查询结果转换回Arrow。
R
在R中,您可以通过将表注册为视图(另一种方法是使用dplyr,如上所示)与DuckDB中的Arrow数据进行交互。
library(duckdb)
library(arrow)
library(dplyr)
# Reads Parquet File to an Arrow Table
arrow_table <- arrow::read_parquet("integers.parquet", as_data_frame = FALSE)
# Gets Database Connection
con <- dbConnect(duckdb::duckdb())
# Registers arrow table as a DuckDB view
arrow::to_duckdb(arrow_table, table_name = "arrow_table", con = con)
# we can run a SQL query on this and print the result
print(dbGetQuery(con, "SELECT SUM(data) FROM arrow_table WHERE data > 50"))
# Transforms Query Result from DuckDB to Arrow Table
result <- dbSendQuery(con, "SELECT * FROM arrow_table")
从/到Arrow流式传输数据
在上一节中,我们描述了如何与Arrow表进行交互。 但是,Arrow还允许用户以流式方式与数据进行交互。 可以使用它(例如,从Arrow Dataset)或产生它(例如,返回RecordBatchReader)。 当然,DuckDB能够使用数据集并生成RecordBatchReader。 此示例使用NYC Taxi Dataset,该数据集存储在按年份和月份划分的Parquet文件中,我们可以通过Arrow R软件包下载该文件
arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi")
Python
# Reads dataset partitioning it in year/month folder
nyc_dataset = ds.dataset('nyc-taxi/', partitioning=["year", "month"])
# Gets Database Connection
con = duckdb.connect()
query = con.execute("SELECT * FROM nyc_dataset")
# DuckDB's queries can now produce a Record Batch Reader
record_batch_reader = query.fetch_record_batch()
# Which means we can stream the whole query per batch.
# This retrieves the first batch
chunk = record_batch_reader.read_next_batch()
R
# Reads dataset partitioning it in year/month folder
nyc_dataset = open_dataset("nyc-taxi/", partitioning = c("year", "month"))
# Gets Database Connection
con <- dbConnect(duckdb::duckdb())
# We can use the same function as before to register our arrow dataset
duckdb::duckdb_register_arrow(con, "nyc", nyc_dataset)
res <- dbSendQuery(con, "SELECT * FROM nyc", arrow = TRUE)
# DuckDB's queries can now produce a Record Batch Reader
record_batch_reader <- duckdb::duckdb_fetch_record_batch(res)
# Which means we can stream the whole query per batch.
# This retrieves the first batch
cur_batch <- record_batch_reader$read_next_batch()
上面的R代码以低级别详细信息显示了数据的流式传输方式。 我们在Arrow包中提供了帮助程序to_arrow()
,它是此包装的包装器,使您可以轻松地将其合并到dplyr管道中。 1
基准比较
在这里,我们用一个简单的基准测试来演示使用DuckDB查询Arrow数据集与使用Pandas查询Arrow数据集之间的性能差异。 对于投影和过滤器下推比较,我们将使用Arrow表。 这是因为Pandas无法使用Arrow流对象。
对于NYC Taxi基准测试,我们使用了scilens diamonds配置,对于TPC-H基准测试,我们使用了m1 MacBook Pro。 在这两种情况下,都使用了DuckDB中的并行性(现在默认情况下处于打开状态)。
对于与Pandas的比较,请注意,DuckDB并行运行,而pandas仅支持单线程执行。 除此之外,还应注意我们正在比较自动优化。 DuckDB的查询优化器可以自动下推过滤器和投影。 pandas不支持此自动优化,但是用户可以通过在read_parquet()
调用中手动指定它们来手动执行某些谓词和过滤器下推。
投影下推
在此示例中,我们在lineitem表的两列上运行一个简单的聚合。
# DuckDB
lineitem = pq.read_table('lineitemsf1.snappy.parquet')
con = duckdb.connect()
# Transforms Query Result from DuckDB to Arrow Table
con.execute("""SELECT sum(l_extendedprice * l_discount) AS revenue
FROM
lineitem;""").fetch_arrow_table()
# Pandas
arrow_table = pq.read_table('lineitemsf1.snappy.parquet')
# Converts an Arrow table to a Dataframe
df = arrow_table.to_pandas()
# Runs aggregation
res = pd.DataFrame({'sum': [(df.l_extendedprice * df.l_discount).sum()]})
# Creates an Arrow Table from a Dataframe
new_table = pa.Table.from_pandas(res)
名称 | 时间(秒) |
---|---|
DuckDB | 0.19 |
Pandas | 2.13 |
lineitem表由16列组成,但是,要执行此查询,仅需要两列l_extendedprice
和* l_discount
。 由于DuckDB可以下推这些列的投影,因此它可以比Pandas快一个数量级来执行此查询。
过滤器下推
对于我们的过滤器下推,我们重复上一节中使用的相同聚合,但是在其他4列上添加过滤器。
# DuckDB
lineitem = pq.read_table('lineitemsf1.snappy.parquet')
# Get database connection
con = duckdb.connect()
# Transforms Query Result from DuckDB to Arrow Table
con.execute("""SELECT sum(l_extendedprice * l_discount) AS revenue
FROM
lineitem
WHERE
l_shipdate >= CAST('1994-01-01' AS date)
AND l_shipdate < CAST('1995-01-01' AS date)
AND l_discount BETWEEN 0.05
AND 0.07
AND l_quantity < 24; """).fetch_arrow_table()
# Pandas
arrow_table = pq.read_table('lineitemsf1.snappy.parquet')
df = arrow_table.to_pandas()
filtered_df = lineitem[
(lineitem.l_shipdate >= "1994-01-01") &
(lineitem.l_shipdate < "1995-01-01") &
(lineitem.l_discount >= 0.05) &
(lineitem.l_discount <= 0.07) &
(lineitem.l_quantity < 24)]
res = pd.DataFrame({'sum': [(filtered_df.l_extendedprice * filtered_df.l_discount).sum()]})
new_table = pa.Table.from_pandas(res)
名称 | 时间(秒) |
---|---|
DuckDB | 0.04 |
Pandas | 2.29 |
现在,DuckDB和Pandas之间的差异更加明显,比Pandas快两个数量级。 同样,由于过滤器和投影都已下推到Arrow,因此DuckDB读取的数据少于Pandas,后者无法自动执行此优化。
流式传输
如前所述,DuckDB能够以流式方式使用和生成Arrow数据。 在本节中,我们运行一个简单的基准测试,以展示将其与完全物化和Pandas进行比较时在速度和内存使用方面的优势。 此示例使用完整的纽约市出租车数据集,您可以下载它
# DuckDB
# Open dataset using year,month folder partition
nyc = ds.dataset('nyc-taxi/', partitioning=["year", "month"])
# Get database connection
con = duckdb.connect()
# Run query that selects part of the data
query = con.execute("SELECT total_amount, passenger_count,year FROM nyc where total_amount > 100 and year > 2014")
# Create Record Batch Reader from Query Result.
# "fetch_record_batch()" also accepts an extra parameter related to the desired produced chunk size.
record_batch_reader = query.fetch_record_batch()
# Retrieve all batch chunks
chunk = record_batch_reader.read_next_batch()
while len(chunk) > 0:
chunk = record_batch_reader.read_next_batch()
# Pandas
# We must exclude one of the columns of the NYC dataset due to an unimplemented cast in Arrow.
working_columns = ["vendor_id","pickup_at","dropoff_at","passenger_count","trip_distance","pickup_longitude",
"pickup_latitude","store_and_fwd_flag","dropoff_longitude","dropoff_latitude","payment_type",
"fare_amount","extra","mta_tax","tip_amount","tolls_amount","total_amount","year", "month"]
# Open dataset using year,month folder partition
nyc_dataset = ds.dataset(dir, partitioning=["year", "month"])
# Generate a scanner to skip problematic column
dataset_scanner = nyc_dataset.scanner(columns=working_columns)
# Materialize dataset to an Arrow Table
nyc_table = dataset_scanner.to_table()
# Generate Dataframe from Arow Table
nyc_df = nyc_table.to_pandas()
# Apply Filter
filtered_df = nyc_df[
(nyc_df.total_amount > 100) &
(nyc_df.year >2014)]
# Apply Projection
res = filtered_df[["total_amount", "passenger_count","year"]]
# Transform Result back to an Arrow Table
new_table = pa.Table.from_pandas(res)
名称 | 时间(秒) | 峰值内存使用量(GB) |
---|---|---|
DuckDB | 0.05 | 0.3 |
Pandas | 146.91 | 248 |
DuckDB和Pandas之间的时间差异是我们在本文中探讨的所有集成优势的结合。 在DuckDB中,应用过滤器下推来执行分区消除(即,我们跳过读取年份<= 2014的Parquet文件)。 过滤器下推还用于消除无关的row_groups(即,总金额始终<= 100的行组)。 由于我们的投影下推,Arrow仅必须从Parquet文件中读取感兴趣的列,这使其只能读取20列中的4列。 另一方面,Pandas无法自动下推任何这些优化,这意味着必须读取完整的数据集。 这导致查询执行时间相差4个数量级。
在上表中,我们还展示了 DuckDB (流式处理) 和 Pandas (完全物化) 之间峰值内存使用量的比较。 在 DuckDB 中,我们只需要将感兴趣的行组加载到内存中。 因此,我们的内存使用量很低。 我们的内存使用量也是恒定的,因为我们每次只需要在内存中保留其中一个行组。 另一方面,Pandas 在执行查询时必须完全物化所有 Parquet 文件。 因此,我们看到它的内存消耗持续急剧增加。 这两个解决方案的内存消耗总差异约为 3 个数量级。
结论和反馈
在这篇博文中,我们主要展示了如何使用 DuckDB 在 Arrow 数据集上执行查询。 还有其他一些库也可以使用 Arrow 格式,但它们的目的和功能各不相同。 像往常一样,如果您想在以后的文章中看到针对不同工具的基准测试,我们非常乐意听到您的声音! 请随时给我们发送电子邮件,或直接在 Hacker News 帖子中分享您的想法。
最后但同样重要的是,如果您在使用我们的集成时遇到任何问题,请在 DuckDB 的 issue tracker 或 Arrow 的 issue tracker 中打开一个 issue,具体取决于哪个库出现问题。
-
在 Arrow 6.0.0 中,
to_arrow()
目前返回整个表,但在我们即将发布的 7.0.0 版本中将允许完全流式传输。 ↩