开发 Acero#
本页将更详细地介绍 Acero 的设计。它讨论了如何创建自定义执行节点,并描述了 Acero 设计和实现背后的一些理念。最后,它概述了如何用新行为扩展 Acero,以及如何将这些新行为上游到 Arrow 核心代码库中。
理解 ExecNode#
ExecNode 是一个抽象类,有几个控制节点如何操作的纯虚方法。
ExecNode::StartProducing()#
此方法在计划开始时调用一次。大多数节点会忽略此方法(任何必要的初始化都应在构造函数或 Init 中进行)。然而,源节点通常会提供自定义实现。源节点应调度开始读取和提供数据所需的任何任务。源节点通常是计划中任务的主要创建者。
注意
ExecPlan 基于推模型(push-based)运行。源通常是基于拉模型(pull-based)的。例如,你的源可能是一个迭代器。源节点通常会调度任务从源中拉取一个项目,并将该项目推送到源的输出节点(通过 InputReceived)。
示例#
在
table_source节点中,输入表被分成多个批次。为每个批次创建一个任务,该任务在节点的输出上调用InputReceived。在
scan节点中,会创建一个任务来开始从数据集中列出片段(fragment)。然后,每个列出任务都会创建用于从片段中异步读取批次数据的任务。当批次完全读入后,一个延续(continuation)会用执行计划调度一个新任务。这个任务在 scan 节点的输出上调用InputReceived。
ExecNode::InputReceived()#
在计划执行期间,此方法会被多次调用。这是节点之间传递数据的方式。输入节点会在其输出上调用 InputReceived。Acero 的执行模型是基于推的(push-based)。每个节点通过调用 InputReceived 并传入一批数据来将数据推入其输出。
InputReceived 方法通常是节点实际工作发生的地方。例如,一个 project 节点将执行其表达式并创建一个新的扩展输出批次。然后它会在其输出上调用 InputReceived。InputReceived 永远不会在源节点上被调用。汇节点(Sink node)永远不会调用 InputReceived。所有其他节点都会经历这两种情况。
一些节点(通常称为“管道断路器”)必须在累积输入后才能生成任何输出。例如,一个排序节点必须累积所有输入后才能对数据进行排序并生成输出。在这些节点中,InputReceived 方法通常会将数据放入某种累积队列中。如果节点没有足够的数据来操作,它将不会调用 InputReceived。这将是当前任务的结束。
示例#
project节点运行其表达式,使用接收到的批次作为表达式的输入。从输入批次和表达式的结果创建一个新批次。新批次被赋予与输入批次相同的顺序索引,然后节点在其输出上调用InputReceived。order_by节点将批次插入一个累积队列中。如果这是最后一个批次,那么节点将对累积队列中的所有内容进行排序。然后,节点将为排序结果中的每个批次在输出上调用InputReceived。将为每个批次分配一个新的批次索引。请注意,这个最终的输出步骤也可能由于调用InputFinished(如下所述)而发生。
ExecNode::InputFinished()#
此方法将针对每个输入调用一次。一个节点一旦知道它将发送多少批次到其输出,就会在其输出上调用 InputFinished。通常这发生在节点完成工作时。例如,一个扫描节点在读完文件后会调用 InputFinished。然而,如果它知道(可能从文件元数据)将创建多少批次,它也可以更早地调用它。
一些节点将使用此信号来触发某些处理。例如,一个排序节点需要等到它收到所有输入后才能对数据进行排序。它依赖于 InputFinished 调用来知道这已经发生。
即使一个节点在完成时没有进行任何特殊处理(例如,project 节点或 filter 节点不需要进行任何流末端处理),该节点仍然会在其输出上调用 InputFinished。
警告
InputFinished 调用可能在最后一次调用 InputReceived 之前到达。实际上,它甚至可以在任何 InputReceived 调用开始之前就发送出去。例如,table source 节点总是确切地知道它将产生多少批次。它可以选择在调用 InputReceived 之前调用 InputFinished。如果一个节点需要进行“流末端”处理,它通常会使用一个 AtomicCounter,这是一个辅助类,用于确定所有数据何时到达。
示例#
order_by检查是否已经收到了它所有的批次。如果已经收到了,它就执行在InputReceived示例中描述的排序步骤。在开始发送输出数据之前,它会检查它有多少输出批次(在累积或排序过程中,批次大小可能发生了变化),并在节点的输出上调用InputFinished。fetch节点在一次InputReceived调用期间,意识到它已经收到了所有它被要求的行。它会立即在其输出上调用InputFinished(即使它自己的InputFinished方法还没有被调用)。
ExecNode::PauseProducing() / ExecNode::ResumeProducing()#
这些方法控制背压。一些节点可能需要暂停其输入以避免累积过多数据。例如,当用户使用 RecordBatchReader 消费计划时,我们使用一个 SinkNode。SinkNode 将数据放入一个 RecordBatchReader 从中拉取的队列中(这是从推模型到拉模型的转换)。如果用户读取 RecordBatchReader 的速度很慢,那么这个队列可能会开始填满。另一个例子是 write 节点。这个节点将数据写入文件系统。如果写入速度慢,数据可能会在 write 节点累积。因此,write 节点将需要应用背压。
当一个节点意识到需要施加一些背压时,它会在其输入上调用 PauseProducing。一旦节点有足够的空间继续,它就会在其输入上调用 ResumeProducing。例如,当 SinkNode 的队列变得太满时,它会暂停。随着用户继续从 RecordBatchReader 中读取,我们可以预期队列会慢慢排空。一旦队列排空到一定程度,SinkNode 就可以调用 ResumeProducing。
源节点通常需要为 PauseProducing 和 ResumeProducing 提供特殊的行为。例如,一个从文件中读取的扫描节点可以暂停读取文件。然而,有些源节点可能无法以任何有意义的方式暂停。对于一个 table source 节点来说,暂停没有太大意义,因为它的数据已经在内存中了。
既不是源节点也不是汇节点的节点仍然应该转发背压信号。例如,当在 project 节点上调用 PauseProducing 时,它应该在其输入上调用 PauseProducing。如果一个节点有多个输入,它应该将信号转发给每个输入。
示例#
write节点在其InputReceived方法中,将一个批次添加到一个数据集写入器的队列中。如果数据集写入器此时已满,它将返回一个未完成的 future,当它有更多空间时该 future 将会完成。然后write节点在其输入上调用PauseProducing。然后它向 future 添加一个延续(continuation),这个延续将在其输入上调用ResumeProducing。scan节点使用一个AsyncTaskScheduler来跟踪它调度的所有任务。这个调度器是受节流的,以限制scan节点允许执行的并发 I/O 量。当调用PauseProducing时,节点将暂停调度器。这意味着任何排在节流阀后面的任务都不会被提交。然而,任何正在进行的 I/O 将继续(背压不能立即生效)。当调用ResumeProducing时,scan节点将取消暂停调度器。
ExecNode::StopProducing()#
当计划需要提前结束时,会调用 StopProducing。这可能是因为用户取消了计划,也可能是因为发生了错误。大多数节点在这里不需要做任何事情。不期望或要求节点发送它拥有的任何剩余数据。任何调度任务的节点(例如,源节点)都应该停止产生新数据。
除了计划范围的取消之外,如果一个节点决定它已经收到了所需的所有数据,它可以在其输入上调用此方法。然而,由于并行性,一个节点在停止其输入后可能仍然会收到几次对 InputReceived 的调用。
如果使用了任何外部资源,清理工作应该作为此调用的一部分进行。
示例#
asofjoin节点有一个专用的处理线程,该线程使用队列与 Acero 主线程通信。当调用StopProducing时,该节点会在队列中插入一个毒丸(poison pill)。这告诉处理线程立即停止。一旦处理线程停止,它会将其外部任务(下文描述)标记为已完成,从而允许计划结束。fetch节点,在InputReceived中,可能会判定它已经获得了所需的所有数据。然后它可以对其输入调用StopProducing。
初始化 / 构造 / 销毁#
简单的初始化逻辑(不会出错的)可以在构造函数中完成。如果初始化逻辑可能返回无效状态,那么它可以在执行节点的工厂方法中完成,或者在 Init 方法中完成。对于简单的验证,首选工厂方法。如果初始化可能进行昂贵的分配或其他资源消耗,则首选 Init 方法。Init 将总是在 StartProducing 调用之前被调用。初始化也可以在 StartProducing 中完成,但请记住,到那时其他节点可能已经启动了。
此外,还有一个 Validate 方法可以被重载以提供自定义验证。该方法通常在 Init 之前但在所有输入和输出添加之后调用。
目前,最终化(Finalization)发生在析构函数中。有一些例子表明这样做可能会很慢。例如,在 write 节点中,如果计划执行期间发生了错误,我们可能会在这里关闭一些打开的文件。如果存在重要的最终化操作,且这些操作是异步的或可能触发错误,那么我们可以为 ExecNode 的生命周期引入一个 Finalize 方法。至今尚未这样做,仅仅是因为还没有这个需要。
总结#
方法名称 |
这在何时被调用… |
节点在何时调用它… |
|---|---|---|
StartProducing |
计划开始时 |
N/A |
InputReceived |
从输入接收数据时 |
向输出发送数据时 |
InputFinished |
输入知道有多少批次时 |
节点可以告知其输出有多少批次时 |
StopProducing |
计划被中止或输出有足够数据时 |
节点拥有它需要的所有数据时 |
扩展 Acero#
Acero 实例化了一个单例的 ExecFactoryRegistry,它在名称和执行节点工厂(从选项创建 ExecNode 的方法)之间进行映射。要创建一个新的 ExecNode,你可以将该节点注册到此注册表中,然后你的节点就可以被 Acero 使用了。如果你希望能够将此节点与 Substrait 计划一起使用,你还需要配置 Substrait 注册表,以便它知道如何将 Substrait 映射到你的自定义节点。
这意味着你可以在不从源代码重新编译 Acero 的情况下创建并向 Acero 添加新节点。
调度和并行#
数据引擎有多种方式可以利用多个计算资源(例如多个核心)。在我们深入了解 Acero 的调度细节之前,我们将介绍几个高层次的主题。
计划的并行执行#
用户可能希望并发执行多个计划,他们当然可以这样做。然而,Acero 没有计划间调度的概念。每个计划都会试图最大化其计算资源的使用,并且很可能会出现 CPU、内存和磁盘资源的争用。如果计划使用默认的 CPU 和 I/O 线程池,这种情况会得到一定程度的缓解,因为它们将共享同一个线程池。
本地分布式计划#
一种常见处理多线程的方法是将输入分割成多个分区,然后为每个分区创建一个计划,最后以某种方式合并这些计划的结果。例如,假设你有 20 个文件和 10 个核心,并且你想要读取并排序所有数据。你可以为每 2 个文件创建一个计划来读取和排序这些文件。然后你可以再创建一个额外的计划,它接收这 10 个子计划的输入,并以排序的方式合并这 10 个输入流。
这种方法很受欢迎,因为这是在多个服务器之间分配查询的方式,因此得到了广泛支持和充分理解。Acero 目前没有这样做,但没有理由阻止它。向 Acero 添加 shuffle 和 partition 节点应该是一个高优先级事项,这将使 Acero 能够被分布式系统使用。一旦完成,如果需要,应该可以进行本地 shuffle(本地意味着在单个系统上的多个执行计划实例之间交换数据)。
即使计划本身是串行运行的,分布式计划也可以提供并行性#
流水线并行#
Acero 试图通过流水线并行来最大化并行度。每当一批数据从源头到达时,我们立即创建一个任务并开始处理它。这意味着我们很可能会在批次 X-1 的处理完成之前就开始处理批次 X。这是非常灵活和强大的。然而,这也意味着正确实现一个 ExecNode 是困难的。
例如,一个 ExecNode 的 InputReceived 方法应该是可重入的。换句话说,应该预料到在上一次 InputReceived 调用完成之前,InputReceived 就会被再次调用。这意味着具有任何类型可变状态的节点将需要互斥锁或类似的机制来保护该状态免受竞争条件的影响。这也意味着任务很容易乱序,节点不应该期望其输入的任何特定顺序(稍后会详细介绍)。
一个在拥有 3 个 CPU 线程和 2 个 I/O 线程的系统上的流水线并行示例#
异步性#
一些操作需要很长时间,并且可能不需要CPU。从文件系统读取数据就是一个例子。如果我们每个核心只有一个线程,那么在等待这些操作完成时就会浪费时间。这个问题有两个常见的解决方案。一个同步的解决方案通常是创建比核心数更多的线程,期望其中一些会被阻塞,这是可以接受的。这种方法往往更简单,但可能导致过多的线程争用,并且需要微调。
另一个解决方案是使慢速操作异步化。当慢速操作开始时,调用者放弃线程,允许其他任务在此期间运行。一旦慢速操作完成,就会创建一个新任务来接收结果并继续处理。这有助于最小化线程争用,但实现起来往往更复杂。
由于缺乏标准的 C++ 异步 API,Acero 采用了两种方法的结合。Acero 有两个线程池。第一个是 CPU 线程池。这个线程池每个核心有一个线程。这个线程池中的任务永远不应该阻塞(除了同步时的微小延迟),并且通常应该尽可能多地积极使用 CPU。I/O 线程池上的线程则预计大部分时间处于空闲状态。它们应该避免做任何 CPU 密集型的工作。它们的工作基本上是等待数据可用,然后在 CPU 线程池上调度后续任务。
Arrow 通过结合 CPU 和 I/O 线程池来实现异步执行#
注意
Acero 中的大多数节点不需要担心异步性。它们是完全同步的,并且不会产生任务。
每个流水线一个任务(有时甚至更多)#
一个引擎可以选择为每个节点的执行创建一个线程任务。然而,如果没有仔细的调度,这会导致缓存局部性问题。例如,假设我们有一个由三个执行节点组成的基本计划:scan、project,然后是 filter(这是一个非常常见的用例)。现在假设有 100 个批次。在“每个算子一个任务”的模型中,我们会有像“Scan Batch 5”、“Project Batch 5”和“Filter Batch 5”这样的任务。这些任务中的每一个都有可能访问相同的数据。例如,也许 project 和 filter 节点需要读取同一列。这一列最初是在 scan 节点的解码阶段创建的。为了最大化缓存利用率,我们需要仔细地调度我们的任务,以确保这三个任务是连续运行的,并且被分配到同一个 CPU 核心。
为了避免这个问题,我们设计的任务会尽可能地贯穿多个节点,直到任务结束。这一系列节点通常被称为“流水线(pipeline)”,而结束流水线(从而结束任务)的节点通常被称为“流水线断路器(pipeline breakers)”。有些节点甚至可能介于两者之间。例如,在一个哈希连接节点中,当我们在探测端(probe side)收到一个批次,并且哈希表已经构建完成时,我们不需要结束任务,而是继续运行。这意味着任务有时可能会在连接节点处结束,有时可能会继续经过连接节点。
一个计划中流水线的逻辑视图和两个任务,显示了流水线边界在计划执行期间可能会变化#
线程池和调度器#
CPU 和 I/O 线程池是 Arrow-C++ 核心库的一部分。它们包含一个先进先出(FIFO)的任务队列,并在线程可用时执行它们。对于 Acero,我们需要额外的功能。为此,我们使用 AsyncTaskScheduler。在最简单的操作模式下,调度器只是将任务提交给底层的线程池。然而,它也能够创建子调度器,这些子调度器可以应用节流、优先级排序和任务跟踪。
一个节流的调度器会为每个任务关联一个成本。只有在有足够空间的情况下,任务才会被提交给底层的调度器。如果没有,任务就会被放入一个队列中。写节点使用大小为1的节流阀,以避免重入地调用数据集写入器(数据集写入器会自己进行内部调度)。节流的调度器可以手动暂停和恢复。暂停时,所有任务都会被排队,即使有空间,排队的任务也不会被提交。这在源节点中实现 PauseProducing 和 ResumeProducing 时很有用。
优先级可以应用于节流调度器,以控制排队任务的提交顺序。如果有空间,任务会立即被提交(无论优先级如何)。然而,如果节流阀已满,任务就会被排队并受优先级排序的影响。scan 节点会节流它生成的读取请求数量,并优先按顺序读取数据集(如果可能的话)。
任务组可用于跟踪一组任务,并在所有任务完成后运行一个最终化任务。这对于分叉-合并(fork-join)风格的问题很有用。写节点使用一个任务组,在一个文件的所有未完成的写任务都完成后关闭该文件。
关于在执行引擎中如何优先处理任务,已经有相关的研究和例子。Acero 尚未需要解决这个问题。让我们来看一些常见情况。
引擎通常会优先从连接节点的构建端(build side)读取,然后再从探测端(probe side)读取。在 Acero 中,通过应用背压可以更容易地处理这个问题。
另一个常见的用例是控制内存累积。引擎会优先处理更接近汇(sink)节点的任务,以努力缓解内存压力。然而,Acero 目前假设溢出(spilling)将被添加到流水线断路器处,并且计划中的内存使用将或多或少是静态的(每个核心),并且远低于硬件的限制。如果 Acero 需要在计算资源多而内存有限的环境(例如 GPU)中使用,这种情况可能会改变。
引擎通常会使用工作窃取算法来优先在同一个核心上运行任务,以提高缓存局部性。然而,由于 Acero 使用的是“每个流水线一个任务”的模型,调度器能够回收的缓存并行性损失机会并不多。任务只在没有更多可以用该数据完成的工作时才会结束。
虽然目前 Acero 中没有太多的优先级设置,但我们确实有工具可以在需要时应用它。
注意
除了 AsyncTaskScheduler,还有一个名为 TaskScheduler 的类。这个类早于 AsyncTaskScheduler,旨在为高效的同步分叉-合并(fork-join)工作负载提供任务跟踪。如果这个专门的用途满足你的需求,你可以考虑使用它。将它与 AsyncTaskScheduler 进行性能分析,看看两者比较结果如何,将是很有趣的。
节点内并行#
一些节点有可能在一个任务内部利用并行性。例如,在扫描节点中,我们可以并行解码列。在哈希连接节点中,有时会利用并行性来完成复杂的任务,比如构建哈希表。这种并行性不太常见,但不一定不鼓励。不过,应该首先进行性能分析,以确保这种额外的并行性对你的工作负载有帮助。
所有工作都在任务中进行#
Acero 中所有的工作都是作为任务的一部分发生的。当一个计划启动时,会创建一个 AsyncTaskScheduler 并给予一个初始任务。这个初始任务会在节点上调用 StartProducing。任务可能会调度额外的任务。例如,源节点通常会在调用 StartProducing 期间调度任务。流水线断路器通常会在累积了所需的所有数据后调度任务。一旦一个计划中的所有任务都完成了,这个计划就被认为是完成的。
一些节点使用外部线程。这些线程必须使用 BeginExternalTask 方法注册为外部任务。例如,asof join 节点使用一个专用的处理线程来实现串行执行。这个专用线程被注册为一个外部任务。应尽可能避免使用外部任务,因为它们需要小心处理以避免在错误情况下出现死锁。
有序执行#
一些节点要么为其输出的批次建立一个顺序,要么需要能够按顺序处理批次。Acero 使用 ExecBatch 上的 batch_index 属性来处理排序。如果一个节点具有确定性的输出顺序,它应该对其发出的批次应用一个批次索引。例如,OrderByNode 为批次应用一个新的顺序(不管传入的顺序如何)。扫描节点能够为批次附加一个隐式顺序,该顺序反映了被扫描文件中行的顺序。
如果一个节点需要按顺序处理数据,情况会稍微复杂一些。由于执行的并行性,我们无法保证批次会按顺序到达一个节点。然而,通常可以预期它们是“大部分有序”的。因此,我们可以将批次插入到一个排序队列中。这个排序队列被赋予一个回调函数,保证按顺序、串行地对批次执行。例如,fetch 节点使用一个排序队列。回调函数会检查我们是否需要包含批次的一部分或全部,然后根据需要对批次进行切片。
即使一个节点不关心顺序,它也应该尽可能地维护批次索引。project 和 filter 节点不关心顺序,但它们确保输出批次保持与其输入批次相同的索引。filter 节点甚至会在需要时发出空批次,以便在不产生间隙的情况下保持批次顺序。
一个有序执行的例子#
分区执行#
如果行以某种方式组合在一起,那么流就是分区的(或有时称为分段的)。目前还没有正式的分区概念。然而,一个概念正在开始形成(例如,分段聚合),我们可能最终也会在某个时候向 Acero 引入一个更正式的分区概念。
溢出#
溢出(Spillover)功能尚未在 Acero 中实现。
分布式执行#
当一个引擎在分布式环境中使用时,有一些执行节点非常有用。术语可能有所不同,因此我们将使用 Substrait 的术语。交换(exchange)节点将数据发送到不同的工作节点。通常这是一个分区交换,因此 Acero 应该对每个批次进行分区,并将分区分布到 N 个不同的工作节点上。在另一端,我们有捕获(capture)节点。此节点从不同的工作节点接收数据。
这些节点目前在 Acero 中不存在。然而,它们属于项目范围之内,我们希望将来能有这样的节点。
性能分析与追踪#
Acero 的追踪功能目前只实现了一半,性能分析工具也存在重大缺口。然而,在使用 OpenTelemetry 进行追踪方面已经有了一些努力,并且大部分必要的组件已经就位。目前主要缺乏的是对追踪结果的某种有效可视化。
为了使用目前已有的追踪功能,你需要使用 ARROW_WITH_OPENTELEMETRY=ON 来构建 Arrow。然后你需要设置环境变量 ARROW_TRACING_BACKEND=otlp_http。这会配置 OpenTelemetry 将追踪结果(以 OTLP 格式)导出到 HTTP 端点 https://:4318/v1/traces。你需要配置一个 OpenTelemetry 收集器来在该端点上收集结果,并且你需要配置某种追踪查看器,例如 Jaeger:https://jaeger.golang.ac.cn/docs/1.21/opentelemetry/
基准测试#
Acero 最完整的宏观基准测试由 voltrondata-labs/arrowbench 提供。这些包括一组 TPC-H 基准测试,通过 R-dplyr 集成执行,它们在每次 Arrow 提交时运行,并报告到 Conbench,网址为 https://conbench.ursa.dev/
除了这些TPC-H基准测试外,还有许多针对各种节点(hash-join、asof-join 等)的微基准测试。最后,计算函数本身大多也应该有微基准测试。关于微基准测试的更多信息,您可以参考 https://arrow.apache.org/docs/developers/benchmarks.html
任何新功能都应包含微基准测试以避免性能衰退。
绑定#
公共 API#
Acero 的公共 API 由 Declaration 和各种 DeclarationToXyz 方法组成。此外,每个节点的选项类也是公共 API 的一部分。然而,节点是可扩展的,因此这个 API 也是可扩展的。
R (dplyr)#
Dplyr 是一个用于以编程方式构建查询的 R 库。arrow-r 包具有 dplyr 绑定,可将 dplyr API 适配为创建 Acero 执行计划。此外,还有一个正在开发中的 dplyr-substrait 后端,最终可能会取代这个感知 Acero 的绑定。
Python#
pyarrow 库以两种不同的方式绑定到 Acero。首先,在 pyarrow.acero 中有一个直接绑定到公共 API 的绑定。其次,有许多计算工具,如 pyarrow.Table.group_by,它使用了 Acero,尽管这对用户来说是不可见的。
Java#
Java 实现暴露了 Arrow 数据集的一些功能。这些功能隐式地使用了 Acero。目前 Java 实现中没有直接绑定到 Acero 或 Substrait。
设计哲学#
引擎无关的计算#
如果一个节点需要复杂的计算,它应该将这项工作封装在不依赖于任何特定引擎设计的抽象中。例如,哈希连接节点使用了行编码器、哈希表和执行批次构建器等工具。其他地方共享了排序队列和行分段器的实现。节点本身应该保持最小化,仅仅是从 Acero 到抽象的映射。
这有助于将设计与 Acero 的设计细节解耦,并使它们对引擎的变化更具弹性。它还有助于将这些抽象作为独立的功能进行推广。既可以用于其他引擎,也可以作为 pyarrow 中潜在的新增计算工具。
创建任务而非线程#
如果你需要并行运行某些东西,你应该使用线程任务,而不是专用的线程。
这可以减少线程数量(减少线程争用和上下文切换)。
这可以防止死锁(任务在发生故障时会自动取消)。
这简化了性能分析(任务可以被轻松度量,更容易知道所有工作都在哪里)。
这使得在没有线程的情况下运行成为可能(有时用户自己进行线程处理,有时我们需要在线程受限的环境中运行,如 emscripten)。
注意:我们目前并不总是遵循这个建议。asof join 节点中有一个专用的处理线程。专用线程对于实验性使用是“可以接受的”,但我们希望能够迁移掉它们。
不要在 CPU 线程上阻塞#
如果你需要运行一个可能长时间运行但又不积极使用 CPU 资源的活动(例如,从磁盘读取、网络 I/O、等待使用其自身线程的外部库),你应该使用异步工具来确保你不会阻塞 CPU 线程。
不要重复造轮子#
每个节点不应该是一个独立的工具孤岛。在可能的情况下,计算应该被推送到计算函数或通用的共享工具中。这是像这样的大型项目能够得以维护的唯一方式。
避免查询优化#
编写一个高效的 Acero 计划可能具有挑战性。例如,过滤表达式和列选择应该被下推到扫描节点中,这样数据就不会从磁盘读取。表达式应该被简化,并且公共子表达式应该被提取出来。哈希连接节点的构建端应该是两个输入中较小的那一个。
然而,解决这些问题是查询计划器或查询优化器的任务。创建一个查询优化器是一项具有挑战性的任务,超出了 Acero 的范围。随着 Substrait 的采用,我们希望最终会出现解决这些问题的工具。因此,我们通常避免在 Acero 内部进行任何类型的查询优化。Acero 应该尽可能地字面解释声明。这有助于减少维护并避免意外。
我们也认识到这并不总是可能的。例如,哈希连接节点目前会检测是否存在一个哈希连接操作符链,如果存在,它会在操作符之间配置布隆过滤器。从技术上讲,这是一项可以留给查询优化器的任务。然而,这种行为对 Acero 来说相当特定,也相当小众,因此它不太可能在短期内被引入优化器。
性能指南#
批次大小#
也许讨论最多的性能标准是批次大小。Acero 最初是根据研究设计的,遵循一种 morsel-batch 模型。任务是基于一大批行(一个 morsel)创建的。目标是让 morsel 足够大,以证明任务的开销是合理的。在一个任务内部,数据被进一步细分为多个批次。每个批次应该小到可以轻松放入 CPU 缓存(通常是 L2 缓存)。
这设置了两个循环。外层循环是并行的,内层循环不是。
for morsel in dataset: # parallel
for batch in morsel:
run_pipeline(batch)
这种执行方式的优点是,访问同一列的连续节点(或执行节点内的连续操作)很可能从缓存中受益。对于需要随机访问数据的函数来说,这也是必不可少的。它在最大化并行性的同时,最小化了从主内存到CPU缓存的数据传输。
如果需要多次遍历数据(或随机访问)并且批次远大于缓存,性能就会下降。将任务分解成更小的批次有助于提高任务局部性。#
morsel/batch 模型在 Acero 的几个地方有所体现。
在大多数源节点中,我们会尝试抓取 1 百万行的批次。这通常是可配置的。
在源节点中,我们然后迭代并切分出 32Ki 行的批次。这目前是不可配置的。
哈希连接节点目前要求批次包含 32Ki 行或更少,因为它在某些地方使用 16 位有符号整数作为行索引。
然而,这个指导意见是有争议的。性能分析显示,我们并没有从改用更小的批次大小中获得任何实际的好处。似乎我们获得的任何优势都被每个批次的开销所抵消。这些开销大部分似乎是由于各种每个批次的分配造成的。此外,根据你的硬件,CPU 缓存与 RAM 之间不一定总是瓶颈。线性访问、预取和高 CPU 与 RAM 带宽的结合可以减轻缓存未命中的惩罚。
因此,本节被包含在指南中是为了提供历史背景,但不应被视为必须遵守的规定。
进行中和已弃用的工作#
以下工作正在进行中。在此处描述它们是为了解释代码库中的某些重复之处,以及解释即将被移除的类型。
Scanner v2#
扫描器目前是 datasets 模块中的一个节点,在工厂注册表中注册为“scan”。这个节点是在 Acero 之前编写的,并广泛使用 AsyncGenerator 来并行扫描多个文件。不幸的是,AsyncGenerator 的使用使得扫描难以进行性能分析、难以调试,并且无法取消。一个新的扫描节点正在开发中。它目前注册的名称是“scan2”。新的扫描节点使用 AsyncTaskScheduler 而不是 AsyncGenerator,并且应该提供额外的功能,例如跳过行和处理嵌套列投影(对于支持它的格式)。
OrderBySink 和 SelectKSink#
这两个执行节点提供了自定义的汇(sink)实现。它们是在 Acero 添加有序执行之前编写的,是生成有序输出的唯一方式。然而,它们必须被放置在计划的末尾,而且它们是自定义汇节点这一事实使得用 Declaration 描述它们变得困难。OrderByNode 和 FetchNode 取代了它们。目前保留这些节点,直到现有的绑定不再使用它们为止。
向上游提交变更#
Acero 的设计使其可以在不重新编译的情况下进行扩展。你可以轻松地添加新的计算函数和执行节点,而无需创建分支或编译 Acero。然而,当你开发出具有普遍用途的新功能时,我们希望你能花时间将你的变更上游化。
尽管我们欢迎这些变更,但我们必须承认这个过程是有成本的。向上游提交代码要求新模块行为正确,但这通常是审查中比较容易的部分。更重要的是,向上游提交代码是将维护负担从你自己转移到更广泛的 Arrow C++ 项目维护者的过程。这需要维护者对代码有深入的理解,要求代码与项目的风格保持一致,并且要求代码通过单元测试得到充分的测试以帮助回归。
因此,我们强烈建议采取以下步骤。
在你刚开始时,应该向邮件列表发送一封邮件,宣布你的意图和设计。这将帮助你确定该功能是否有更广泛的兴趣,其他人也可能在过程的早期贡献想法或建议。
如果对该功能没有太多兴趣,请记住,最终将变更上游可能会很困难。团队的维护能力有限,我们会优先考虑需求量大的功能。
我们建议在你自己的分叉(fork)上开发和测试这个变更,直到你相当确信一切都正常工作。如果变更很大,你也可以考虑如何将变更分解成更小的部分。在这样做的时候,你可以同时分享较大的 PR(作为草稿 PR 或在你本地分叉上的一个分支)和较小的 PR。这样我们就可以看到较小 PR 的上下文。然而,如果你确实进行了分解,较小的 PR 最好仍然能够独立存在。
任何 PR 都需要包含以下内容:
覆盖新功能的单元测试
如果有任何重要的计算工作,需要有微基准测试
演示如何使用新功能的示例
更新 API 参考和本指南
通过 CI(你可以在你的 fork 上启用 GitHub Actions,这样大多数 CI 作业就可以在你创建 PR 之前运行)