开发者指南#

本页更详细地介绍了 Acero 的设计。它讨论了如何创建自定义执行节点,并描述了 Acero 设计和实现背后的一些理念。最后,它概述了如何使用新行为扩展 Acero,以及如何将这种新行为上游到核心 Arrow 存储库中。

理解 ExecNode#

ExecNode 是一个抽象类,具有多个纯虚方法来控制节点的运行方式

ExecNode::StartProducing()#

此方法在计划开始时调用一次。大多数节点忽略此方法(任何必要的初始化都应在构造函数或 Init 中进行)。但是,源节点通常会提供自定义实现。源节点应安排启动读取和提供数据所需的任何任务。源节点通常是计划中任务的主要创建者。

注意

ExecPlan 基于推送模型运行。源通常是基于拉取的。例如,您的源可能是一个迭代器。源节点通常会安排任务以从源中提取一个项目,并将该项目通过 InputReceived 推送到源的输出节点中。

示例#

  • table_source 节点中,输入表被分成多个批次。为每个批次创建一个任务,该任务调用节点输出上的 InputReceived

  • scan 节点中,创建一个任务以开始从数据集列出片段。每个列出任务然后创建任务以异步地从片段读取批次。当批次完全读取后,继续安排一个新任务到执行计划中。此任务调用扫描节点输出上的 InputReceived

ExecNode::InputReceived()#

此方法在计划执行期间被多次调用。它是节点之间传递数据的方式。输入节点将在其输出上调用 InputReceived。Acero 的执行模型是基于推送的。每个节点通过调用 InputReceived 并传入一批数据,将数据推送到其输出中。

InputReceived 方法通常是节点实际工作发生的地方。例如,project 节点将执行其表达式并创建一个新的扩展输出批次。然后,它将在其输出上调用 InputReceived。永远不会在源节点上调用 InputReceived。Sink 节点永远不会调用 InputReceived。所有其他节点都将经历这两种情况。

一些节点(通常称为“管道中断者”)必须累积输入才能生成任何输出。例如,排序节点必须累积所有输入才能对数据进行排序并生成输出。在这些节点中,InputReceived 方法通常会将数据放入某种累积队列中。如果节点没有足够的数据来操作,那么它将不会调用 InputReceived。这将是当前任务的结束。

示例#

  • project 节点运行其表达式,使用接收到的批次作为表达式的输入。从输入批次和表达式的结果创建一个新批次。新批次被赋予与输入批次相同的顺序索引,然后节点在其输出上调用 InputReceived

  • order_by 节点将批次插入累积队列中。如果这是最后一个批次,则节点将对累积队列中的所有内容进行排序。然后,该节点将对排序结果中的每个批次在其输出上调用 InputReceived。将为每个批次分配一个新的批次索引。请注意,此最终输出步骤也可能由于调用 InputFinished(如下所述)而发生。

ExecNode::InputFinished()#

每个输入将调用此方法一次。节点一旦知道将发送到该输出的批次数,就会在其输出上调用 InputFinished。通常,这发生在节点完成工作时。例如,扫描节点一旦完成读取文件,就会调用 InputFinished。但是,如果它知道(可能来自文件元数据)将创建多少批次,则可以更早地调用它。

一些节点将使用此信号来触发某些处理。例如,排序节点需要等到它收到所有输入才能对数据进行排序。它依赖于 InputFinished 调用来知道这种情况已经发生。

即使节点在完成时没有进行任何特殊处理(例如,project 节点或 filter 节点不需要进行任何流结束处理),该节点仍将在其输出上调用 InputFinished。

警告

InputFinished 调用可能在最后一次调用 InputReceived 之前到达。事实上,它甚至可以在任何 InputReceived 调用开始之前发送出去。例如,表源节点始终准确地知道它将产生多少批次。它可以选择在调用 InputReceived 之前调用 InputFinished。如果节点需要进行“流结束”处理,那么它通常使用 AtomicCounter,这是一个辅助类,用于确定何时所有数据都已到达。

示例#

  • order_by 检查以查看它是否已经收到所有批次。如果它已经收到,那么它将执行 InputReceived 示例中描述的排序步骤。在它开始发送输出数据之前,它会检查它有多少输出批次(批次大小可能作为累积或排序的一部分而更改),并在节点输出上调用 InputFinished

  • fetch 节点在调用 InputReceived 期间意识到它已经收到了它被要求的所有行。它立即在其输出上调用 InputFinished(即使它自己的 InputFinished 方法尚未被调用)

ExecNode::PauseProducing() / ExecNode::ResumeProducing()#

这些方法控制反压。一些节点可能需要暂停其输入以避免累积太多数据。例如,当用户使用 RecordBatchReader 消费计划时,我们使用 SinkNode。SinkNode 将数据放置在 RecordBatchReader 从中提取的队列中(这是从推送模型到拉取模型的转换)。如果用户读取 RecordBatchReader 的速度很慢,那么这个队列可能会开始填满。对于另一个例子,我们可以考虑写入节点。此节点将数据写入文件系统。如果写入速度很慢,那么数据可能会在写入节点处累积。因此,写入节点将需要应用反压。

当一个节点意识到它需要应用一些反压时,它将在其输入上调用 PauseProducing。一旦节点有足够的空间继续,它将在其输入上调用 ResumeProducing。例如,SinkNode 会在其队列太满时暂停。随着用户继续从 RecordBatchReader 读取,我们可以预期队列会慢慢耗尽。一旦队列耗尽得足够多,那么 SinkNode 就可以调用 ResumeProducing。

源节点通常需要为 PauseProducing 和 ResumeProducing 提供特殊行为。例如,从文件读取的扫描节点可以暂停读取文件。但是,一些源节点可能无法以任何有意义的方式暂停。表格源节点暂停没有多大意义,因为它的数据已经在内存中。

既不是源节点也不是 sink 节点的节点仍应转发反压信号。例如,当在 project 节点上调用 PauseProducing 时,它应该在其输入上调用 PauseProducing。如果一个节点有多个输入,那么它应该将信号转发到每个输入。

示例#

  • write 节点在其 InputReceived 方法中,将一个批次添加到数据集写入器的队列中。如果数据集写入器已满,它将返回一个未完成的 future,当有更多空间时,该 future 将完成。write 节点然后调用其输入上的 PauseProducing。然后,它将一个延续添加到 future,该延续将调用其输入上的 ResumeProducing

  • scan 节点使用 AsyncTaskScheduler 来跟踪它调度的所有任务。该调度器受到限制,以限制 scan 节点允许执行的并发 I/O 量。当调用 PauseProducing 时,节点将暂停调度器。这意味着在限制之后排队的任何任务都不会被提交。但是,任何正在进行的 I/O 将继续(反压不能立即生效)。当调用 ResumeProducing 时,scan 节点将取消暂停调度器。

ExecNode::StopProducing() #

当计划需要提前结束时,调用 StopProducing。发生这种情况可能是因为用户取消了计划,也可能是因为发生了错误。大多数节点不需要在此处执行任何操作。不期望或要求节点发送任何剩余数据。任何调度任务的节点(例如,源节点)应停止生成新数据。

除了计划范围内的取消之外,如果一个节点已确定它已收到它需要的所有数据,则该节点可以在其输入上调用此方法。但是,由于并行性,一个节点在停止其输入后,可能仍然会收到一些对 InputReceived 的调用。

如果使用了任何外部资源,则清理应作为此调用的一部分进行。

示例 #

  • asofjoin 节点有一个专用处理线程,该线程使用队列与主 Acero 线程通信。当调用 StopProducing 时,该节点将毒丸插入队列。这告诉处理线程立即停止。一旦处理线程停止,它将标记其外部任务(如下所述)为已完成,这允许计划完成。

  • fetch 节点,在 InputReceived 中,可能会确定它已拥有它需要的所有数据。然后,它可以在其输入上调用 StopProducing

初始化/构造/析构 #

简单的初始化逻辑(不会出错)可以在构造函数中完成。如果初始化逻辑可能返回无效状态,则可以在 exec 节点的工厂方法或 Init 方法中完成。对于简单的验证,首选工厂方法。如果初始化可能会进行昂贵的分配或其他资源消耗,则首选 Init 方法。Init 将始终在调用 StartProducing 之前被调用。初始化也可以在 StartProducing 中完成,但请记住,其他节点可能已经在那时启动。

此外,还有一个 Validate 方法可以被重载以提供自定义验证。此方法通常在 Init 之前但在所有输入和输出添加之后被调用。

最终确定今天在析构函数中发生。今天有一些例子表明这可能很慢。例如,在写入节点中,如果在计划期间发生错误,那么我们可能会在此处关闭一些打开的文件。如果存在重要的最终确定,它是异步的或者可能触发错误,那么我们可以向 ExecNode 生命周期引入一个 Finalize 方法。尚未完成,只是因为尚未需要。

总结 #

ExecNode 生命周期 #

方法名称

这是何时调用的……

节点何时调用此方法……

StartProducing

计划正在启动

不适用

InputReceived

从输入接收到数据

将数据发送到输出

InputFinished

输入知道有多少批次

节点可以告诉其输出有多少批次

StopProducing

计划被中止或输出有足够的数据

节点拥有它需要的所有数据

扩展 Acero #

Acero 实例化一个单例 ExecFactoryRegistry,它在名称和 exec 节点工厂(从选项创建 ExecNode 的方法)之间进行映射。要创建一个新的 ExecNode,您可以向此注册表注册该节点,并且您的节点现在可以被 Acero 使用。如果您希望能够将此节点与 Substrait 计划一起使用,您还需要配置 Substrait 注册表,以便它知道如何将 Substrait 映射到您的自定义节点。

这意味着您可以创建新的节点并将其添加到 Acero,而无需从源代码重新编译 Acero。

调度和并行性 #

数据引擎可以通过多种方式利用多个计算资源(例如,多个核心)。在我们深入了解 Acero 的调度细节之前,我们将介绍一些高级主题。

计划的并行执行 #

用户可能希望并发执行多个计划,他们可以这样做。但是,Acero 没有计划间调度的概念。每个计划都将尝试最大限度地利用其计算资源,并且可能会存在 CPU、内存和磁盘资源的争用。如果计划使用默认的 CPU 和 I/O 线程池,这将得到一定程度的缓解,因为它们将共享同一个线程池。

本地分布式计划 #

解决多线程问题的一种常见方法是将输入拆分为多个分区,然后为每个分区创建一个计划,然后以某种方式合并这些计划的结果。例如,假设您有 20 个文件和 10 个核心,并且您想要读取和排序所有数据。您可以为每 2 个文件创建一个计划来读取和排序这些文件。然后,您可以创建一个额外的计划,该计划从这 10 个子计划获取输入,并以排序方式合并这 10 个输入流。

这种方法很受欢迎,因为它是在多个服务器上分配查询的方式,因此它被广泛支持并且易于理解。Acero 今天没有这样做,但没有理由阻止它。向 Acero 添加 shuffle 和分区节点应该是一个高度优先事项,并且可以使 Acero 被分布式系统使用。一旦完成,如果需要,应该可以进行本地 shuffle(本地意味着在单个系统上的多个 exec 计划实例之间交换)。

../../_images/dist_plan.svg

即使计划本身串行运行,分布式计划也可以提供并行性 #

管道并行性 #

Acero 尝试使用管道并行性来最大化并行性。随着每个数据批次从源到达,我们立即创建一个任务并开始处理它。这意味着我们可能会在批次 X-1 的处理完成之前开始处理批次 X。这非常灵活且强大。但是,这也意味着正确实现 ExecNode 很困难。

例如,ExecNode 的 InputReceived 方法应该是可重入的。换句话说,应该预期在先前对 InputReceived 的调用完成之前将调用 InputReceived。这意味着具有任何类型可变状态的节点将需要互斥锁或类似机制来保护该状态免受竞争条件的影响。这也意味着任务很容易变得无序,并且节点不应期望其输入的任何特定排序(稍后会详细介绍)。

../../_images/pipeline.svg

在具有 3 个 CPU 线程和 2 个 I/O 线程的系统上进行管道并行性的示例 #

异步性 #

某些操作需要很长时间,并且可能不需要 CPU。从文件系统读取数据就是一个例子。如果我们每个核心只有一个线程,那么在我们等待这些操作完成时,时间将被浪费。解决这个问题有两种常见的方法。一种同步解决方案通常是创建比核心更多的线程,并期望其中一些线程将被阻塞,这没关系。这种方法往往更简单,但它可能导致过多的线程争用,并且需要微调。

另一种解决方案是使慢速操作异步。当慢速操作开始时,调用者放弃线程,并允许其他任务在此期间运行。一旦慢速操作完成,就会创建一个新任务来获取结果并继续处理。这有助于最大限度地减少线程争用,但往往更难实现。

由于缺乏标准的 C++ 异步 API,Acero 使用了这两种方法的组合。Acero 有两个线程池。第一个是 CPU 线程池。此线程池每个核心有一个线程。此线程池中的任务永远不应阻塞(除了用于同步的微小延迟),并且通常应尽可能多地主动使用 CPU。I/O 线程池上的线程预计大部分时间都处于空闲状态。他们应避免进行任何 CPU 密集型工作。他们的工作基本上是等待数据可用并在 CPU 线程池上调度后续任务。

../../_images/async.svg

Arrow 通过组合 CPU 和 I/O 线程池来实现异步执行 #

注意

Acero 中的大多数节点不需要担心异步性。它们是完全同步的,并且不生成任务。

每个管道的任务(有时超出) #

引擎可以选择为节点的每次执行创建一个线程任务。但是,如果没有仔细的调度,这会导致缓存局部性问题。例如,假设我们有一个包含三个执行节点的基本计划:扫描(scan)、投影(project)和过滤(filter)(这是一个非常常见的用例)。现在假设有 100 个批次。在每个算子一个任务的模型中,我们将会有诸如“扫描批次 5”、“投影批次 5”和“过滤批次 5”之类的任务。每个任务都可能访问相同的数据。例如,可能 projectfilter 节点需要读取同一列。该列最初是在 scan 节点的解码阶段创建的。为了最大化缓存利用率,我们需要仔细调度我们的任务,以确保这三个任务都连续运行并分配给同一个 CPU 核心。

为了避免这个问题,我们设计的任务尽可能地运行通过尽可能多的节点才结束任务。这一系列的节点通常被称为“管道”(pipeline),结束管道(从而结束任务)的节点通常被称为“管道断路器”(pipeline breakers)。有些节点甚至可能介于两者之间。例如,在一个哈希连接节点中,当我们在探测端(probe side)接收到一个批次,并且哈希表已经构建完成时,我们不需要结束任务,而是继续运行。这意味着任务有时可能在连接节点结束,有时可能继续通过连接节点。

../../_images/pipeline_task.svg

计划中管道的逻辑视图和两个任务,显示管道边界在计划期间可能会发生变化#

线程池和调度器#

CPU 和 I/O 线程池是 Arrow-C++ 核心库的一部分。它们包含一个任务的 FIFO 队列,并在线程可用时执行它们。对于 Acero,我们需要额外的能力。为此,我们使用 AsyncTaskScheduler。在最简单的操作模式下,调度器只是将任务提交给底层的线程池。但是,它也能够创建子调度器,这些子调度器可以应用节流、优先级排序和任务跟踪。

  • 节流调度器(throttled scheduler)将成本与每个任务关联起来。只有在有空间时,任务才会被提交给底层调度器。如果没有空间,则任务会被放入队列中。写入节点使用大小为 1 的节流来避免重入调用数据集写入器(dataset writer)(数据集写入器有其自身的内部调度)。节流调度器可以手动暂停和取消暂停。当暂停时,所有任务都会排队,即使有空间,排队的任务也不会被提交。这在源节点中可以用来实现 PauseProducing 和 ResumeProducing。

  • 优先级可以应用于节流调度器,以控制队列中的任务的提交顺序。如果有空间,则立即提交任务(无论优先级如何)。但是,如果节流已满,则任务会排队并接受优先级排序。扫描节点会节流它生成的读取请求数量,并尽可能按顺序优先读取数据集。

  • 任务组可以用于跟踪一组任务,并在所有任务完成后运行一个最终任务。这对于分叉-汇合(fork-join)风格的问题很有用。写入节点使用任务组来关闭文件,一旦该文件的所有未完成的写入任务都已完成。

关于在执行引擎中优先排序任务的不同方法,已经有很多研究和例子。Acero 尚未解决这个问题。让我们来看一些常见的情况

  • 引擎通常会优先从连接节点的构建端(build side)读取数据,然后再从探测端(probe side)读取数据。这在 Acero 中可以通过应用背压(backpressure)更容易地处理。

  • 另一个常见的用例是控制内存积累。引擎会优先处理更靠近 sink 节点的任务,以缓解内存压力。但是,Acero 目前假设溢出(spilling)将被添加到管道断路器中,并且计划中的内存使用量将或多或少是静态的(每个核心),并且远低于硬件的限制。如果 Acero 需要在具有许多计算资源但内存有限的环境中使用(例如 GPU),这种情况可能会发生变化。

  • 引擎通常会使用工作窃取(work stealing)算法来优先在同一核心上运行任务,以提高缓存局部性。但是,由于 Acero 使用每个管道一个任务的模型,因此调度器可以回收的缓存并行性机会损失不多。只有在无法使用数据完成更多工作时,任务才会结束。

虽然 Acero 今天没有太多优先级排序,但我们确实有工具在需要时应用它。

注意

除了 AsyncTaskScheduler 之外,还有一个名为 TaskScheduler 的类。这个类早于 AsyncTaskScheduler,旨在为高效的同步分叉-汇合工作负载提供任务跟踪。如果此特殊用途满足您的需求,您可以考虑使用它。将其与 AsyncTaskScheduler 进行分析比较,看看两者有多接近,将会很有趣。

节点内并行#

某些节点可能会在一个任务内利用并行性。例如,在扫描节点中,我们可以并行解码列。在哈希连接节点中,有时会针对复杂的任务(例如构建哈希表)利用并行性。这种并行性不太常见,但不一定不鼓励。但是,首先应该进行分析以确保这种额外的并行性对您的工作负载有帮助。

所有工作都在任务中进行#

Acero 中的所有工作都是作为任务的一部分进行的。当一个计划开始时,AsyncTaskScheduler 被创建并赋予一个初始任务。这个初始任务调用节点的 StartProducing。任务可能会调度额外的任务。例如,源节点通常会在调用 StartProducing 期间调度任务。管道断路器通常会在它们积累了所有需要的数据时调度任务。一旦计划中的所有任务都完成,则该计划被认为是完成的。

某些节点使用外部线程。这些线程必须使用 BeginExternalTask 方法注册为外部任务。例如,asof 连接节点使用专用处理线程来实现串行执行。这个专用线程被注册为外部任务。应尽可能避免外部任务,因为它们需要小心处理,以避免在错误情况下发生死锁。

有序执行#

某些节点要么建立对其传出批次的排序,要么需要能够按顺序处理批次。Acero 使用 ExecBatch 上的 batch_index 属性来处理排序。如果一个节点具有确定的输出顺序,那么它应该对其发出的批次应用批次索引。例如,OrderByNode 对批次应用新的排序(无论传入的排序如何)。扫描节点能够将一个隐式的排序附加到批次上,该排序反映了正在扫描的文件中行的顺序。

如果一个节点需要按顺序处理数据,那么它会更复杂一些。由于执行的并行性,我们不能保证批次会按顺序到达一个节点。但是,通常可以预期它们是“大部分有序的”。因此,我们可以将批次插入到一个排序队列(sequencing queue)中。排序队列被赋予一个回调,保证该回调会按顺序、串行地在批次上运行。例如,fetch 节点使用一个排序队列。该回调检查我们是否需要包含部分或全部批次,如果需要,则对批次进行切片。

即使一个节点不关心顺序,它也应该尽量保持批次索引,如果可以的话。投影和过滤节点不关心顺序,但它们确保输出批次保持与其输入批次相同的索引。如果需要,过滤节点甚至会发出空批次,以便它可以保持批次顺序而没有间隔。

../../_images/ordered.svg

有序执行的示例#

分区执行#

如果行以某种方式分组在一起,则流被分区(或有时称为分段)。目前,没有分区的正式概念。但是,一个概念正在开始发展(例如,分段聚合),我们最终可能会在 Acero 中引入更正式的分区概念。

溢出(Spillover)#

溢出尚未在 Acero 中实现。

分布式执行#

当在分布式环境中使用引擎时,某些执行节点很有用。术语可能有所不同,因此我们将使用 Substrait 术语。交换节点(exchange node)将数据发送给不同的工作节点(workers)。通常,这是一个分区交换(partitioned exchange),因此 Acero 预计会分区每个批次并将分区分布在 N 个不同的工作节点上。在另一端,我们有捕获节点(capture node)。此节点接收来自不同工作节点的数据。

这些节点今天在 Acero 中不存在。但是,它们将在范围内,我们希望有一天能拥有这样的节点。

性能分析 & 追踪#

Acero 的追踪目前只实现了一半,并且在性能分析工具中存在重大空白。但是,已经有一些使用 open telemetry 进行追踪的努力,并且大多数必要的组件已经到位。目前主要缺少的是某种对追踪结果的有效可视化。

为了使用今天存在的追踪功能,您需要使用 ARROW_WITH_OPENTELEMETRY=ON 构建 Arrow。然后,您需要设置环境变量 ARROW_TRACING_BACKEND=otlp_http。这将配置 open telemetry 以将跟踪结果(作为 OTLP)导出到 HTTP 端点 http://localhost:4318/v1/traces。您将需要配置一个 open telemetry 收集器以在该端点上收集结果,并且您将需要配置某种跟踪查看器,例如 Jaeger:https://jaeger.golang.ac.cn/docs/1.21/opentelemetry/

基准测试#

Acero 最完整的宏基准测试由 voltrondata-labs/arrowbench 提供。这些测试包括一套 TPC-H 基准测试,从 R-dplyr 集成执行,每次 Arrow 提交时都会运行,并在 Conbench 上报告:https://conbench.ursa.dev/

除了这些 TPC-H 基准测试之外,还有许多针对各种节点(hash-join、asof-join 等)的微基准测试。最后,计算函数本身应该大部分都有微基准测试。有关微基准测试的更多信息,您可以参考 https://arrow.apache.org/docs/developers/benchmarks.html

任何新功能都应包含微基准测试,以避免回归。

绑定#

公共 API#

Acero 的公共 API 包括 Declaration 和各种 DeclarationToXyz 方法。此外,每个节点的选项类也是公共 API 的一部分。但是,节点是可扩展的,因此此 API 也是可扩展的。

R (dplyr)#

Dplyr 是一个 R 库,用于以编程方式构建查询。 arrow-r 包具有 dplyr 绑定,可以调整 dplyr API 来创建 Acero 执行计划。此外,还有一个正在开发的 dplyr-substrait 后端,最终可能会取代 Acero 感知绑定。

Python#

pyarrow 库以两种不同的方式绑定到 Acero。首先,在 pyarrow.acero 中有一个直接绑定,它直接绑定到公共 API。其次,有许多计算实用程序,例如 pyarrow.Table.group_by,它使用 Acero,但这对于用户来说是不可见的。

Java#

Java 实现公开了来自 Arrow 数据集的一些功能。这些功能隐式地使用 Acero。目前,Java 实现中没有直接绑定到 Acero 或 Substrait。

设计理念#

引擎独立的计算#

如果一个节点需要复杂的计算,那么它应该将该工作封装在不依赖于任何特定引擎设计的抽象中。例如,hash join 节点使用行编码器、哈希表和 exec 批构建器等实用程序。其他地方共享排序队列和行分段器的实现。节点本身应该保持最小化,并且只需从 Acero 映射到抽象即可。

这有助于将设计与 Acero 的设计细节解耦,并使它们更能抵抗引擎中的更改。 它还有助于将这些抽象作为自身的能力进行推广。 无论是用于其他引擎还是用于 pyarrow 的潜在新添加作为计算实用程序。

创建任务而不是线程#

如果需要并行运行某些内容,则应使用线程任务而不是专用线程。

  • 这可以减少线程数(减少线程争用和上下文切换)

  • 这可以防止死锁(如果发生故障,任务会自动取消)

  • 这简化了性能分析(可以轻松地测量任务,更容易知道所有工作都在哪里)

  • 这使得无需线程即可运行(有时用户正在进行自己的线程处理,有时我们需要在线程受限的环境中运行,例如 emscripten)

注意:我们目前并不总是遵循此建议。 asof join 节点中有一个专用的进程线程。 专用线程对于实验性使用来说是“可以的”,但我们希望从它们那里迁移开。

不要阻塞 CPU 线程#

如果您需要运行可能长时间运行的活动,而该活动并未主动使用 CPU 资源(例如,从磁盘读取、网络 I/O、等待使用其自身线程的外部库),则应使用异步实用程序来确保您不会阻塞 CPU 线程。

不要重新发明轮子#

每个节点不应是实用程序的独立孤岛。 在可能的情况下,应将计算推送到计算函数或通用共享实用程序中。 这是像这样大的项目可以希望维护的唯一方法。

避免查询优化#

编写高效的 Acero 计划可能具有挑战性。 例如,应将过滤器表达式和列选择推送到扫描节点中,以便不从磁盘读取数据。 应该简化表达式并分解出公共子表达式。 hash join 节点的构建端应该是两个输入中较小的一个。

但是,解决这些问题是查询计划程序或查询优化程序所保留的挑战。 创建查询优化程序是一项具有挑战性的任务,超出了 Acero 的范围。 随着 Substrait 的采用,我们希望最终会出现解决这些问题的实用程序。 因此,我们通常避免在 Acero 中进行任何类型的查询优化。 Acero 应尽可能按字面意义解释声明。 这有助于减少维护并避免意外。

我们也意识到这并非总是可能的。 例如,hash join 节点目前会检测是否存在 hash join 运算符链,如果存在,则在运算符之间配置布隆过滤器。 从技术上讲,这是一项可以留给查询优化程序完成的任务。 但是,此行为相当特定于 Acero 且相当小众,因此不太可能很快将其引入优化程序。

性能指南#

批量大小#

也许讨论最多的性能标准是批量大小。 Acero 最初是基于研究设计的,遵循 morsel-batch 模型。 基于一大批行(一个 morsel)创建任务。 目标是使 morsel 足够大以证明任务的开销是合理的。 在任务中,数据进一步细分为批次。 每个批次应足够小,以便舒适地放入 CPU 缓存中(通常是 L2 缓存)。

这设置了两个循环。 外循环是并行的,内循环不是

for morsel in dataset: # parallel
  for batch in morsel:
    run_pipeline(batch)

这种执行方式的优点是,访问同一列的连续节点(或 exec 节点中的连续操作)可能会受益于缓存。 这对于需要随机访问数据的函数也是必不可少的。 它最大限度地提高了并行性,同时最大限度地减少了从主存储器到 CPU 缓存的数据传输。

../../_images/microbatch.svg

如果需要多次遍历数据(或随机访问)并且批次比缓存大得多,则性能会受到影响。 将任务分成较小的批次有助于提高任务的局部性。#

morsel/batch 模型反映在 Acero 中的几个地方

  • 在大多数源节点中,我们将尝试获取 1Mi 行的批次。 这通常是可配置的。

  • 然后在源节点中,我们迭代并切出 32Ki 行的批次。 这目前是不可配置的。

  • hash join 节点目前要求批次包含至少 32Ki 行或更少,因为它在某些地方使用 16 位有符号整数作为行索引。

但是,此指南是有争议的。 性能分析表明,移动到较小的批量大小并没有给我们带来任何实际好处。 似乎我们获得的任何优势都在每个批次的开销中丢失了。 大部分开销似乎是由于每个批次的各种分配造成的。 此外,根据您的硬件,CPU Cache<->RAM 是否始终是瓶颈尚不清楚。 线性访问、预取和高 CPU<->RAM 带宽的组合可以减轻缓存未命中的惩罚。

因此,本节包含在本指南中以提供历史背景,但不应被视为具有约束力。

正在进行和已弃用的工作#

以下工作正在进行中。 在这里描述它们是为了解释代码库中的某些重复以及解释即将消失的类型。

Scanner v2#

扫描程序目前是 datasets 模块中的一个节点,注册到工厂注册表中为“scan”。 该节点是在 Acero 之前编写的,并且广泛使用 AsyncGenerator 来并行扫描多个文件。 不幸的是,AsyncGenerator 的使用使得扫描难以进行性能分析、难以调试且无法取消。 一个新的扫描节点正在开发中。 它目前注册的名称为“scan2”。 新的扫描节点使用 AsyncTaskScheduler 而不是 AsyncGenerator,并且应该提供额外的功能,例如跳过行和处理嵌套列投影(对于支持它的格式)的能力

OrderBySink 和 SelectKSink#

这两个 exec 节点提供了自定义的 sink 实现。 它们是在将有序执行添加到 Acero 之前编写的,并且是生成有序输出的唯一方法。 但是,它们必须放置在计划的末尾,并且它们是自定义 sink 节点的事实使得很难用 Declaration 来描述它们。 OrderByNode 和 FetchNode 替换了这些。 这些暂时保留,直到现有的绑定从它们那里移开。

向上游传输更改#

Acero 的设计使其无需重新编译即可扩展。 您可以轻松地添加新的计算函数和 exec 节点,而无需创建分支或编译 Acero。 但是,当您开发出通常有用的新功能时,我们希望您能抽出时间来向上游传输您的更改。

即使我们欢迎这些更改,我们也不得不承认这个过程是有成本的。 向上游传输代码需要新模块的行为正确,但这通常是更容易审查的部分。 更重要的是,向上游传输代码是将维护负担从您自己转移到更广泛的 Arrow C++ 项目维护者的过程。 这要求维护人员对代码有深刻的理解,要求代码与项目的风格一致,并且要求使用单元测试对代码进行充分的测试,以帮助进行回归。

因此,我们强烈建议您采取以下步骤

  • 在您开始时,您应该向邮件列表发送一条消息,宣布您的意图和设计。 这将帮助您确定对该功能是否有更广泛的兴趣,并且其他人可能有一些想法或建议可以在过程的早期做出贡献。

    • 如果对该功能没有太大的兴趣,请记住,最终可能很难向上游传输更改。 团队的维护能力是有限的,我们尽量优先考虑需求量大的功能。

  • 我们建议您在自己的分支上开发和测试更改,直到您对事情的运行情况非常有信心为止。 如果更改很大,那么您还可以考虑如何将更改分解为更小的部分。 在执行此操作时,您可以同时共享较大的 PR(作为草稿 PR 或本地分支上的一个分支)和较小的 PR。 这样我们就可以看到较小 PR 的上下文。 但是,如果您确实分解了内容,则较小的 PR 仍然应该理想地独立存在。

  • 任何 PR 都需要包含以下内容

    • 单元测试,用于转换新功能

    • 如果正在进行任何重要的计算工作,则需要进行微基准测试

    • 演示如何使用新功能的示例

    • 对 API 参考和本指南的更新

    • 通过 CI(您可以在您的分支上启用 GitHub Actions,这将允许在您创建 PR 之前运行大多数 CI 作业)