使用 ADBC 在 DuckDB 中实现快速流式插入
已发布 2025 年 3 月 10 日
作者 Loïc Alleyne (loicalleyne)
TL;DR(太长不看)
DuckDB 正在迅速成为数据从业人员工具箱中必不可少的一部分,在数据工程、机器学习和本地分析中找到了用例。 在许多情况下,DuckDB 已被用于查询和处理已由另一个进程保存到存储(基于文件或外部数据库)的数据。 Arrow 数据库连接 API 能够使用 DuckDB 作为引擎进行高吞吐量的数据处理。
它是如何开始的
我所在的公司是领先的数字户外营销平台,包括程序化广告技术堆栈。 多年来,我的技术运营团队一直在使用实时程序化拍卖系统以 Apache Avro 格式发出的日志。 随着时间的推移,我们使用这些数据构建了一个完整的运营和分析后端。 Avro 文件是基于行的,这对于大规模分析来说并不理想,实际上非常痛苦。 以至于我开发并贡献了一个 Avro 读取器功能到 Apache Arrow Go 库,以便能够将 Avro 文件转换为 parquet。 这个数据管道现在运行顺利,每天将数百 GB 的数据从 Avro 转换为 Parquet。
由于“计算机科学中的任何问题都可以通过另一层间接解决”,原始系统已经像洋葱一样长出了层,并开始发出其他日志,这次是 Apache Parquet 格式…

正如我们在史莱克中学到的那样,洋葱就像食人魔:它们是绿色的,它们有层次,它们让你哭泣,所以这种欢欣鼓舞是短暂的,因为选择用来发出 parquet 文件的机制效率很低
- 新的洋葱层(嗯……系统组件)将 Protobuf 编码的消息发送到 Kafka 主题
- 带有 S3 sink connector 的 Kafka Connect 集群消耗主题并将 parquet 文件保存到对象存储
由于数据的火灾,集群大小随着时间的推移增长到 > 25 个节点,并且每小时产生数千个小的 Parquet 文件(13 MB 或更小)。 这导致查询延迟不断增加,在某些情况下,由于查询超时(也称为 小文件问题)而破坏了我们的工具。 更不用说在我们的数据仓库中对原始数据运行聚合既不快速也不便宜。
DuckDB 来救援……我想
我曾使用 DuckDB 处理和分析 Parquet 数据,所以我知道它可以非常快速地完成这项工作。 然后我在 LinkedIn 上看到了这篇文章(使用 Kafka 和 DuckDB 进行实时分析),其中有人构建了一个系统,用于使用 Go 中的 DuckDB 进行近实时分析。
幻灯片列出了 DuckDB 的局限性
海报的解决方案在应用程序层批量处理数据,设法将摄取量扩大 100 倍至约 20k 个插入/秒,并指出他们认为使用 DuckDB Appender API 可能会将此速度提高 10 倍。 所以,可能大约 200k 个插入/秒。 Yayyyyy…

然后我注意到幻灯片中的数据模式是扁平的,只有 4 个字段(与具有深度嵌套的列表和结构的 OpenRTB 模式相比);然后查看我们的监控仪表板,我意识到在高峰期我们的系统每秒发出 >250k 个事件。 [提示悲伤的长号]
毫不气馁(并且对设置/运行/维护 Spark 集群的想法不太感兴趣),我怀疑 Apache Arrow 的柱状内存表示可能仍然使 DuckDB 可行,因为它具有 Arrow API;获取 Parquet 文件就像运行 COPY...TO (format parquet)
一样容易。
使用在 Github 问题中找到的模式,我编写了一个 POC,使用 github.com/marcboeker/go-duckdb 连接到数据库,检索 Arrow,创建 Arrow Reader,在 reader 上注册视图,然后从视图运行 INSERT 语句。
这感觉有点像兔子从帽子里拉出自己,但没关系,它在我的笔记本电脑上管理了大约 74k 到 110k 行/秒。
为了确保这确实是正确的解决方案,我还尝试了 DuckDB 的 Appender API(在撰写本文时,这是快速插入的官方推荐方法),并且在我的笔记本电脑上管理了…大约 63k 行/秒。 好的,但是…meh。
新的希望
在 Gopher Slack 上的讨论中,Matthew Topol aka zeroshade 建议使用 ADBC 及其更简单的 API。 你问 Matt Topol 是谁? 实际上 写了关于 Apache Arrow 的书的人,就是他(使用 Apache Arrow 的内存分析:加速数据分析以高效处理平面和分层数据结构 第 2 版)。 它是使用 Arrow 的极好的资源和指南。
顺便说一句,如果您喜欢首字母缩略词来记住书名,那就是 IMAAA:ADAFEPOFAHDS2E。
但我离题了。 Matt 也是 Apache Arrow PMC 的成员,是 Apache Iceberg Go 实现的主要贡献者,并且通常是一个友善、乐于助人的人。
ADBC
ADBC 是
-
一套用不同语言(C/C++、Go 和 Java,还有更多语言即将推出)编写的 抽象 API,用于处理数据库和 Arrow 数据。
例如,ADBC 中查询的结果集都作为 Arrow 数据的流返回,而不是逐行返回。
-
一套用不同语言(C/C++、C#/.NET、Go、Java、Python 和 Ruby)编写的 API 实现,针对不同的数据库(例如 PostgreSQL、SQLite、DuckDB、任何支持 Flight SQL 的数据库)。
回到绘图板,我创建了 Quacfka,这是一个使用 ADBC 构建的 Go 库,并将我的系统分为 3 个工作池,由通道连接
- Kafka 客户端消耗主题消息并将字节写入消息通道
- 使用 Bufarrow 库的反序列化 Protobuf 数据并将其附加到 Arrow 数组的处理例程,将 Arrow 记录写入记录通道
- DuckDB 插入器将 Arrow 记录绑定到 ADBC 语句并执行插入
我首先按顺序运行这些程序以确定每个程序运行的速度
2025/01/23 23:39:27 kafka read start with 8 readers
2025/01/23 23:39:41 read 15728642 kafka records in 14.385530 secs @1093365.498477 messages/sec
2025/01/23 23:39:41 deserialize []byte to proto, convert to arrow records with 32 goroutines start
2025/01/23 23:40:04 deserialize to arrow done - 15728642 records in 22.283532 secs @ 705841.509812 messages/sec
2025/01/23 23:40:04 ADBC IngestCreateAppend start with 32 connections
2025/01/23 23:40:25 duck ADBC insert 15728642 records in 21.145649535 secs @ 743824.007783 rows/sec
确定了这种架构后,我开始并发运行 worker,检测系统,分析我的代码以识别性能问题,并调整设置以最大化吞吐量。 我认为有足够的性能空间来允许飞行中聚合。
一个问题:尽管 DuckDB 具有出色的 轻量级压缩,但来自此来源的插入使文件大小以 ~8GB/分钟 的速度增加。 暂停插入以导出 Parquet 文件并释放存储会将总体吞吐量降低到不可接受的水平。 我决定根据文件大小阈值实施数据库文件的轮换。
DuckDB 能够查询磁盘或对象存储上的 Hive 分区 parquet,因此可以通过运行指向 parquet 文件最终位置的单独查询服务器,将分析部分与数据提取管道分离。
通过迭代,我创建了多个 API,试图使飞行中聚合足够高效,以使总体吞吐量保持在我 250k 行/秒的目标之上。
前两个要么遇到数据局部性问题,要么没有经过足够的优化
- CustomArrows:在每个 Arrow 记录上运行以创建一个新记录以与原始记录一起插入的函数
- DuckRunner:在轮换之前在数据库文件上运行一系列查询
推理如果取消嵌套 Arrow 记录数组中深度嵌套的数据会导致数据局部性问题
- Normalizer:在反序列化函数中使用的一个 Bufarrow API,用于标准化消息数据并将其附加到另一个 Arrow 记录,插入到单独的表中
这种方法允许吞吐量恢复到几乎与没有 Normalizer 一样高的水平 - 平面数据处理和插入速度要快得多。
哦,我们到了一半……靠祈祷活着
接下来,我尝试并发连接到多个数据库。 砰! 段错误。 DuckDB 并发模型并非以这种方式 设计 的。 从一个进程中,只能打开单个数据库(内存中或文件),然后可以将其他数据库文件 附加 到中心数据库的目录。
在已经决定轮换数据库文件之后,我决定制作一个单独的程序 (Runner) 来处理轮换的数据库文件,对规范化数据运行聚合并将表转储到 parquet。 这意味着在两者之间建立 RPC 连接,并弄清楚一种反压机制以避免 磁盘已满
事件。
但是,同时运行这两个程序会导致内存压力问题,更不用说大大降低了吞吐量。 将 VM 升级到具有更多 vCPU 和内存的 VM 仅略有帮助,显然存在一些资源争用。
自 Go 1.5 起,默认的 GOMAXPROCS
值是可用 CPU 核心的数量。如果将其降低以“沙盒化”提取过程,并同时在 Runner 中设置 DuckDB 线程数会怎么样? 实际上,这样做效果很好,提高了整体吞吐量。Runner 运行 COPY...TO...parquet
查询,遍历 parquet 输出文件夹,将文件上传到对象存储,并删除已上传的文件。 在 Quafka-Service 中平衡 DuckDB 文件轮换大小阈值,使 Runner 能够跟上进度,并避免磁盘上积压大量 DB 文件。
结果

注意:两次运行的 GOMAXPROCS
都设置为 24(DuckDB 插入例程的数量)。
提取原始数据(14 个字段,其中一个深度嵌套的 LIST.STRUCT.LIST 字段)+ 规范化数据
num_cpu: 60
runtime_os: linux
kafka_clients: 5
kafka_queue_cap: 983040
processor_routines: 32
arrow_queue_cap: 4
duckdb_threshold_mb: 4200
duckdb_connections: 24
normalizer_fields: 10
start_time: 2025-02-24T21:06:23Z
end_time: 2025-02-24T21:11:23Z
records: 123_686_901.00
norm_records: 122_212_452.00
data_transferred: 146.53 GB
duration: 4m59.585s
records_per_second: 398_271.90
total_rows_per_second: 806_210.41
transfer_rate: 500.86 MB/second
duckdb_files: 9
duckdb_files_MB: 38429
file_avg_duration: 33.579s
如果我们仅插入扁平的规范化数据,可以达到多少行/秒?(注意:原始记录仍然会被处理,只是不插入)
num_cpu: 60
runtime_os: linux
kafka_clients: 10
kafka_queue_cap: 1228800
processor_routines: 32
arrow_queue_cap: 4
duckdb_threshold_mb: 4200
duckdb_connections: 24
normalizer_fields: 10
start_time: 2025-02-25T19:04:33Z
end_time: 2025-02-25T19:09:36Z
records: 231_852_772.00
norm_records: 363_247_327.00
data_transferred: 285.76 GB
duration: 5m3.059s
records_per_second: 0.00
total_rows_per_second: 1_198_601.39
transfer_rate: 965.54 MB/second
duckdb_files: 5
duckdb_files_MB: 20056
file_avg_duration: 58.975s
部署后,parquet 文件的数量从每小时约 3000 个小文件降至每小时 < 20 个文件。告别小文件!
挑战/学习
- DuckDB 插入是瓶颈;网络速度、Protobuf 反序列化、构建 Arrow Record 不是。
- 为了最快地插入 DuckDB,Arrow Record Batch 应至少包含 122880 行(与 DuckDB 存储行组大小对齐)。
- DuckDB 不允许您在同一进程中同时打开多个数据库(导致段错误)。 DuckDB 设计为在进程中仅运行一次,中心数据库的目录能够添加与其他数据库的连接。
- 解决方法
- 使用单独的进程来写入和读取多个数据库文件。
- 打开单个 DuckDB 数据库并使用 ATTACH 连接其他 DB 文件。
- 解决方法
- 插入扁平数据比嵌套数据快得多。
ADBC 为 DuckDB 提供了真正的高吞吐量数据提取 API,释放了使用 DuckDB 处理流数据的各种用例,使其成为数据从业者更有用的工具。