开发者指南#

本页面更详细地介绍了 Acero 的设计。它讨论了如何创建自定义执行节点,并描述了 Acero 设计和实现背后的一些理念。最后,它概述了如何使用新行为扩展 Acero 以及如何将此新行为上游到核心 Arrow 存储库中。

理解 ExecNode#

ExecNode 是一个抽象类,它有几个纯虚方法来控制节点的操作方式

ExecNode::StartProducing()#

此方法在计划开始时调用一次。大多数节点会忽略此方法(任何必要的初始化都应该在构造函数或 Init 中进行)。但是,源节点通常会提供自定义实现。源节点应该调度启动读取和提供数据所需的任何任务。源节点通常是计划中任务的主要创建者。

注意

ExecPlan 基于推模型运行。源通常是基于拉模型的。例如,您的源可能是一个迭代器。然后,源节点通常会调度任务以从源中拉取一个项目并将该项目推送到源的输出节点(通过 InputReceived)。

示例#

  • table_source 节点中,输入表被分成多个批次。为每个批次创建一个任务,该任务在其输出上调用节点的 InputReceived

  • scan 节点中,创建一个任务以开始列出数据集中的片段。然后,每个列出任务都会创建任务以异步读取片段中的批次。当批次读取完毕后,一个 continuation 会使用 exec 计划调度一个新任务。此任务在其输出上调用扫描节点的 InputReceived

ExecNode::InputReceived()#

此方法在计划执行期间多次调用。它是节点之间传递数据的方式。输入节点将调用其输出上的 InputReceived。Acero 的执行模型是基于推模型的。每个节点通过调用 InputReceived 并传入一批数据来将数据推送到其输出。

InputReceived 方法通常是节点实际工作发生的地方。例如,一个 project 节点将执行其表达式并创建一个新的扩展输出批次。然后,它将在其输出上调用 InputReceived。InputReceived 永远不会在源节点上调用。Sink 节点永远不会调用 InputReceived。所有其他节点都会同时经历这两者。

某些节点(通常称为“流水线断路器”)必须在生成任何输出之前累积输入。例如,排序节点必须累积所有输入,然后才能对数据进行排序并生成输出。在这些节点中,InputReceived 方法通常会将数据放入某种累积队列中。如果节点没有足够的数据来操作,则它不会调用 InputReceived。然后这将是当前任务的结束。

示例#

  • project 节点运行其表达式,使用接收到的批次作为表达式的输入。从输入批次和表达式的结果创建一个新批次。新批次被赋予与输入批次相同的顺序索引,然后节点在其输出上调用 InputReceived

  • order_by 节点将批次插入到累积队列中。如果这是最后一个批次,则该节点将对累积队列中的所有内容进行排序。然后,该节点将为排序结果中的每个批次在其输出上调用 InputReceived。将为每个批次分配一个新的批次索引。请注意,此最终输出步骤也可能由于调用 InputFinished(如下所述)而发生。

ExecNode::InputFinished()#

此方法将为每个输入调用一次。一个节点一旦知道它将向该输出发送多少批次,就会在其输出上调用 InputFinished。通常情况下,这发生在节点完成工作时。例如,扫描节点在读取完其文件后将调用 InputFinished。但是,如果它知道(可能从文件元数据中)将创建多少批次,它可以更早地调用它。

某些节点将使用此信号来触发某些处理。例如,排序节点需要等到它接收到所有输入后才能对数据进行排序。它依赖于 InputFinished 调用来知道这已经发生。

即使节点在完成后没有执行任何特殊处理(例如,project 节点或 filter 节点不需要执行任何流结束处理),该节点仍然会在其输出上调用 InputFinished。

警告

InputFinished 调用可能在最后一次调用 InputReceived 之前到达。事实上,它甚至可以在任何对 InputReceived 的调用开始之前发送出去。例如,表源节点始终确切地知道它将生成多少批次。它可以选择在调用 InputReceived 之前调用 InputFinished。如果节点需要执行“流结束”处理,则它通常使用 AtomicCounter,这是一个辅助类,用于确定所有数据何时到达。

示例#

  • order_by 检查它是否已经收到了所有批次。如果是,则它执行 InputReceived 示例中描述的排序步骤。在开始发送输出数据之前,它会检查它有多少输出批次(在累积或排序过程中批次大小可能会更改),并在节点的输出上调用 InputFinished

  • fetch 节点在调用 InputReceived 期间意识到它已收到请求的所有行。它立即在其输出上调用 InputFinished(即使它自己的 InputFinished 方法尚未被调用)。

ExecNode::PauseProducing() / ExecNode::ResumeProducing()#

这些方法控制反压。某些节点可能需要暂停其输入以避免累积过多的数据。例如,当用户使用 RecordBatchReader 使用计划时,我们使用 SinkNode。SinkNode 将数据放入 RecordBatchReader 拉取的队列中(这是从推模型到拉模型的转换)。如果用户读取 RecordBatchReader 的速度很慢,则此队列可能会开始填满。另一个例子是 write 节点。此节点将数据写入文件系统。如果写入速度慢,则数据可能会在写入节点处累积。因此,写入节点需要应用反压。

当一个节点意识到需要应用一些背压时,它将调用其输入的 PauseProducing 方法。一旦该节点有足够的空间继续运行,它将调用其输入的 ResumeProducing 方法。例如,当 SinkNode 的队列过满时,它将暂停。随着用户继续从 RecordBatchReader 读取数据,我们可以预期队列将缓慢排空。一旦队列排空到足够程度,SinkNode 就可以调用 ResumeProducing。

源节点通常需要为 PauseProducing 和 ResumeProducing 提供特殊的行为。例如,从文件读取数据的扫描节点可以暂停读取文件。但是,某些源节点可能无法以任何有意义的方式暂停。表源节点暂停并没有多大意义,因为它的数据已经在内存中了。

既不是源节点也不是汇聚节点的节点仍然应该转发背压信号。例如,当在投影节点上调用 PauseProducing 时,它应该在其输入上调用 PauseProducing。如果一个节点有多个输入,那么它应该将信号转发到每个输入。

示例#

  • write 节点在其 InputReceived 方法中将一批数据添加到数据集写入器的队列中。如果数据集写入器已满,它将返回一个未完成的 future,该 future 将在有更多空间时完成。write 节点然后在其输入上调用 PauseProducing。然后,它向 future 添加一个 continuation,该 continuation 将在其输入上调用 ResumeProducing

  • scan 节点使用 AsyncTaskScheduler 来跟踪它调度的所有任务。此调度程序受到限制,以限制 scan 节点允许执行的并发 I/O 量。当调用 PauseProducing 时,节点将暂停调度程序。这意味着排在限制后面的任何任务都不会被提交。但是,任何正在进行的 I/O 将继续(背压不能立即生效)。当调用 ResumeProducing 时,scan 节点将取消暂停调度程序。

ExecNode::StopProducing()#

当计划需要提前结束时,将调用 StopProducing。这可能是因为用户取消了计划,也可能是因为发生了错误。大多数节点不需要在这里做任何事情。节点没有发送其拥有的任何剩余数据的期望或要求。任何调度任务的节点(例如源节点)都应停止生成新数据。

除了计划范围的取消之外,如果节点确定它已收到所需的所有数据,则它可以对其输入调用此方法。但是,由于并行性,节点在停止其输入后仍可能收到几次对 InputReceived 的调用。

如果使用了任何外部资源,则应在此调用中进行清理。

示例#

  • asofjoin 节点有一个专用的处理线程,该线程使用队列与主要的 Acero 线程进行通信。当调用 StopProducing 时,节点会将一个毒丸插入队列中。这告诉处理线程立即停止。处理线程停止后,它会将其外部任务(如下所述)标记为已完成,从而允许计划完成。

  • fetch 节点在 InputReceived 中可能会确定它拥有所需的所有数据。然后它可以对其输入调用 StopProducing

初始化/构造/析构#

简单的初始化逻辑(不会出错)可以在构造函数中完成。如果初始化逻辑可能返回无效状态,则可以在执行节点的工厂方法或 Init 方法中完成。对于简单的验证,首选工厂方法。如果初始化可能进行昂贵的分配或其他资源消耗,则首选 Init 方法。在调用 StartProducing 之前,将始终调用 Init。初始化也可以在 StartProducing 中完成,但请记住,到那时其他节点可能已经启动。

此外,还有一个 Validate 方法可以重载以提供自定义验证。此方法通常在 Init 之前但在添加所有输入和输出之后调用。

目前,最终确定在析构函数中进行。现在有一些例子可能很慢。例如,在写入节点中,如果计划期间出现错误,那么我们可能会在此处关闭一些打开的文件。如果存在可能异步或可能触发错误的重大最终确定操作,那么我们可以向 ExecNode 生命周期引入 Finalize 方法。它之所以还没有完成,仅仅是因为还没有必要。

总结#

ExecNode 生命周期#

方法名称

调用时机…

节点调用时机…

StartProducing

计划开始时

不适用

InputReceived

从输入接收到数据时

将数据发送到输出时

InputFinished

输入知道有多少批次时

节点可以告诉其输出有多少批次时

StopProducing

计划中止或输出有足够的数据时

节点拥有所需的所有数据时

扩展 Acero#

Acero 实例化一个单例 ExecFactoryRegistry,它在名称和执行节点工厂(从选项创建 ExecNode 的方法)之间进行映射。要创建新的 ExecNode,您可以向此注册表注册该节点,然后您的节点就可以被 Acero 使用。如果您希望能够将此节点与 Substrait 计划一起使用,您还需要配置 Substrait 注册表,以便它知道如何将 Substrait 映射到您的自定义节点。

这意味着您可以创建新节点并将其添加到 Acero,而无需从源代码重新编译 Acero。

调度和并行性#

数据引擎可以通过多种方式利用多个计算资源(例如多个内核)。在我们深入了解 Acero 调度的细节之前,我们将介绍一些高级主题。

计划的并行执行#

用户可能希望并发执行多个计划,欢迎他们这样做。但是,Acero 没有计划间调度的概念。每个计划都将尝试最大限度地利用计算资源,并且可能会出现 CPU、内存和磁盘资源的争用。如果计划使用默认的 CPU 和 I/O 线程池,则这种情况会有所缓解,因为它们将共享相同的线程池。

本地分布式计划#

处理多线程的一种常见方法是将输入拆分为多个分区,然后为每个分区创建一个计划,最后以某种方式合并这些计划的结果。例如,假设您有 20 个文件和 10 个内核,并且您想要读取并排序所有数据。您可以为每 2 个文件创建一个计划来读取和排序这些文件。然后,您可以再创建一个计划,该计划接收来自这 10 个子计划的输入,并以排序方式合并这 10 个输入流。

这种方法很流行,因为它是查询在多台服务器上分布的方式,因此它得到广泛支持和理解。Acero 目前没有这样做,但没有理由阻止它。向 Acero 添加 shuffle 和分区节点应该是一个高优先级事项,这将使 Acero 能够被分布式系统使用。一旦完成,如果需要,就可以进行本地 shuffle(本地是指在单个系统上的多个执行计划实例之间交换)。

../../_images/dist_plan.svg

即使计划本身串行运行,分布式计划也可以提供并行性#

流水线并行性#

Acero 尝试使用流水线并行性来最大限度地提高并行性。当每一批数据从源到达时,我们立即创建一个任务并开始处理它。这意味着我们可能会在批次 X-1 的处理完成之前开始处理批次 X。这非常灵活和强大。但是,这也意味着正确实现 ExecNode 比较困难。

例如,ExecNode 的 InputReceived 方法应该是可重入的。换句话说,应该预期在之前的 InputReceived 调用完成之前会调用 InputReceived。这意味着具有任何类型可变状态的节点将需要互斥锁或类似机制来保护该状态免受竞争条件的影响。这也意味着任务很容易乱序,节点不应该期望其输入的任何特定顺序(稍后会详细介绍)。

../../_images/pipeline.svg

具有 3 个 CPU 线程和 2 个 I/O 线程的系统上的流水线并行性示例#

异步性#

某些操作耗时很长,并且可能不需要 CPU。从文件系统读取数据就是一个例子。如果每个内核只有一个线程,那么在等待这些操作完成时会浪费时间。这个问题有两个常见的解决方案。同步解决方案通常是创建比内核数量更多的线程,预期其中一些线程会被阻塞,这是可以接受的。这种方法往往更简单,但它可能导致过多的线程争用,并且需要进行微调。

另一种解决方案是使慢速操作异步化。当慢速操作开始时,调用者放弃线程并允许其他任务同时运行。一旦慢速操作完成,就会创建一个新任务来获取结果并继续处理。这有助于最大限度地减少线程争用,但实现起来往往更复杂。

由于缺乏标准的 C++ 异步 API,Acero 结合使用了这两种方法。Acero 有两个线程池。第一个是 CPU 线程池。此线程池每个内核有一个线程。此线程池中的任务永远不应阻塞(同步造成的轻微延迟除外),并且通常应尽可能积极地使用 CPU。I/O 线程池上的线程预计大部分时间处于空闲状态。它们应避免执行任何 CPU 密集型工作。它们的工作基本上是等待数据可用并在 CPU 线程池上调度后续任务。

../../_images/async.svg

Arrow 通过组合 CPU 和 I/O 线程池来实现异步执行#

注意

Acero 中的大多数节点无需担心异步性。它们是完全同步的,不会生成任务。

每个流水线一个任务(有时不止一个)#

引擎可以选择为节点的每次执行创建一个线程任务。然而,如果没有仔细的调度,这会导致缓存局部性问题。例如,假设我们有一个由三个执行节点组成的基本计划:扫描、投影和过滤(这是一个非常常见的用例)。现在假设有 100 个批次。在每个操作员一个任务的模型中,我们将拥有诸如“扫描批次 5”、“投影批次 5”和“过滤批次 5”之类的任务。每个任务都可能访问相同的数据。例如,也许`project`和`filter`节点需要读取同一列。该列最初是在`scan`节点的解码阶段创建的。为了最大限度地提高缓存利用率,我们需要仔细安排我们的任务,以确保所有这三个任务连续运行并分配给同一个 CPU 内核。

为了避免这个问题,我们设计了在任务结束之前尽可能多地运行节点的任务。这个节点序列通常被称为“流水线”,而结束流水线(从而结束任务)的节点通常被称为“流水线断路器”。有些节点甚至可能介于两者之间。例如,在哈希连接节点中,当我们在探测端接收到一个批次并且哈希表已构建时,我们不需要结束任务,而是继续运行。这意味着任务有时可能在连接节点处结束,有时可能会继续越过连接节点。

../../_images/pipeline_task.svg

计划中流水线的逻辑视图和两个任务,表明流水线边界在计划期间可能会有所不同#

线程池和调度器#

CPU 和 I/O 线程池是核心 Arrow-C++ 库的一部分。它们包含一个 FIFO 任务队列,并在线程可用时执行它们。对于 Acero,我们需要额外的功能。为此,我们使用 AsyncTaskScheduler。在最简单的操作模式下,调度器只是将任务提交到底层线程池。但是,它也能够创建子调度器,这些子调度器可以应用限制、优先级排序和任务跟踪。

  • 受限调度器将成本与每个任务相关联。仅当有空间时,任务才会提交到底层调度器。如果没有,则将任务放入队列中。写入节点使用大小为 1 的限制来避免重复调用数据集写入器(数据集写入器执行其自身的内部调度)。可以手动暂停和取消暂停受限调度器。暂停时,所有任务都将排队,即使有空间,排队的任务也不会提交。这在源节点中对于实现 PauseProducing 和 ResumeProducing 很有用。

  • 可以将优先级应用于受限调度器,以控制排队任务的提交顺序。如果有空间,则立即提交任务(无论优先级如何)。但是,如果限制已满,则任务将排队并进行优先级排序。扫描节点会限制其生成的读取请求数量,并尽可能优先按顺序读取数据集。

  • 任务组可用于跟踪任务集合并在所有任务完成后运行最终任务。这对于 fork-join 样式的问题很有用。写入节点使用任务组在文件的所有未完成写入任务完成后关闭文件。

有一些关于在执行引擎中对任务进行不同优先级排序的研究和示例。Acero 还没有解决这个问题。让我们来看一些常见的情况。

  • 引擎通常会优先读取连接节点的构建端,然后再读取探测端。通过应用背压,在 Acero 中更容易处理这种情况。

  • 另一个常见的用例是控制内存累积。引擎将优先处理更接近接收器节点的任务,以减轻内存压力。但是,Acero 目前假设溢出会添加到流水线断路器,并且计划中的内存使用量将或多或少是静态的(每个内核),并且远低于硬件的限制。如果 Acero 需要在计算资源多而内存有限的环境(例如 GPU)中使用,这种情况可能会改变。

  • 引擎通常会使用工作窃取算法来优先运行同一内核上的任务,以提高缓存局部性。但是,由于 Acero 使用了每个流水线一个任务的模型,因此调度器可以回收的缓存并行性损失机会并不多。只有在无法对数据进行更多处理时,任务才会结束。

虽然目前 Acero 中没有太多的优先级排序,但如果需要,我们确实有应用它的工具。

注意

除了 AsyncTaskScheduler 之外,还有一个名为 TaskScheduler 的类。此类早于 AsyncTaskScheduler,旨在为高效的同步 fork-join 工作负载提供任务跟踪。如果此专门用途满足您的需求,那么您可以考虑使用它。将此与 AsyncTaskScheduler 进行比较,看看两者之间的比较结果会很有趣。

节点内并行性#

某些节点可能可以在任务内利用并行性。例如,在扫描节点中,我们可以并行解码列。在哈希连接节点中,有时会利用并行性来执行复杂的任务,例如构建哈希表。这种并行性不太常见,但不一定不鼓励。不过,应首先进行分析,以确保这种额外的并行性对您的工作负载有所帮助。

所有工作都在任务中进行#

Acero 中的所有工作都作为任务的一部分进行。启动计划时,将创建 AsyncTaskScheduler 并为其分配初始任务。此初始任务会在节点上调用 StartProducing。任务可以安排其他任务。例如,源节点通常会在调用 StartProducing 期间安排任务。流水线断路器通常会在积累了所需的所有数据时安排任务。计划中的所有任务完成后,该计划即被视为完成。

某些节点使用外部线程。必须使用 BeginExternalTask 方法将这些线程注册为外部任务。例如,asof 连接节点使用专用处理线程来实现串行执行。此专用线程注册为外部任务。应尽可能避免使用外部任务,因为它们需要小心处理以避免在错误情况下出现死锁。

有序执行#

某些节点要么对其传出批次建立顺序,要么需要能够按顺序处理批次。Acero 使用 ExecBatch 上的`batch_index`属性来处理排序。如果节点具有确定性输出顺序,则它应对其发出的批次应用批次索引。例如,OrderByNode 会对批次应用新的排序(无论传入顺序如何)。扫描节点能够将隐式排序附加到批次,这反映了正在扫描的文件中行的顺序。

如果节点需要按顺序处理数据,则会稍微复杂一些。由于执行的并行性,我们无法保证批次会按顺序到达节点。但是,通常可以预期它们“大部分是有序的”。因此,我们可以将批次插入排序队列。排序队列被赋予一个回调,该回调保证按顺序对批次进行串行运行。例如,提取节点使用排序队列。回调检查是否需要包含部分或全部批次,然后根据需要对批次进行切片。

即使节点不关心顺序,它也应该尝试在可能的情况下维护批次索引。投影和过滤节点不关心顺序,但它们确保输出批次与其输入批次保持相同的索引。过滤节点甚至会在需要时发出空批次,以便它可以维护批次顺序而没有间隙。

../../_images/ordered.svg

有序执行示例#

分区执行#

如果行以某种方式分组在一起,则流被分区(有时称为分段)。目前还没有正式的分区概念。然而,一种概念正在开始发展(例如分段聚合),我们最终也可能在某个时候向 Acero 引入更正式的分区概念。

溢出#

Acero 中尚未实现溢出。

分布式执行#

当引擎在分布式环境中使用时,某些执行节点非常有用。术语可能会有所不同,因此我们将使用 Substrait 术语。交换节点将数据发送到不同的工作器。通常这是一个分区交换,因此 Acero 需要对每个批次进行分区并将分区分布到 N 个不同的工作器。另一方面,我们有捕获节点。此节点接收来自不同工作器的数据。

这些节点今天在 Acero 中不存在。但是,它们在范围内,我们希望将来拥有这样的节点。

分析和跟踪#

Acero 的跟踪目前只实现了一半,并且分析工具存在重大差距。但是,已经有一些使用 OpenTelemetry 进行跟踪的工作,并且大部分必要的组件都已到位。目前主要缺乏的是对跟踪结果进行某种有效的可视化。

为了使用目前存在的跟踪功能,您需要使用带有 ARROW_WITH_OPENTELEMETRY=ON 的 Arrow 进行构建。然后,您需要设置环境变量 ARROW_TRACING_BACKEND=otlp_http。这将配置 OpenTelemetry 将跟踪结果(作为 OTLP)导出到 HTTP 端点 https://127.0.0.1:4318/v1/traces。您需要配置一个 OpenTelemetry 收集器来收集该端点上的结果,并且您需要配置某种跟踪查看器,例如 Jaeger:https://jaeger.golang.ac.cn/docs/1.21/opentelemetry/

基准测试#

Acero 最完整的宏基准测试由 voltrondata-labs/arrowbench 提供。这些基准测试包括一组 TPC-H 基准测试,它们从 R-dplyr 集成执行,在每次 Arrow 提交时运行,并报告给 Conbench,网址为 https://conbench.ursa.dev/

除了这些 TPC-H 基准测试之外,还有许多针对各种节点(哈希连接、asof 连接等)的微基准测试。最后,计算函数本身应该主要具有微基准测试。有关微基准测试的更多信息,您可以参考 https://arrow.apache.org/docs/developers/benchmarks.html

任何新功能都应包含微基准测试,以避免回归。

绑定#

公共 API#

Acero 的公共 API 由 Declaration 和各种 DeclarationToXyz 方法组成。此外,每个节点的选项类也是公共 API 的一部分。但是,节点是可扩展的,因此此 API 也是可扩展的。

R (dplyr)#

Dplyr 是一个用于以编程方式构建查询的 R 库。arrow-r 包具有 dplyr 绑定,可适配 dplyr API 以创建 Acero 执行计划。此外,还有一个正在开发中的 dplyr-substrait 后端,它最终可能会取代 Acero 感知绑定。

Python#

pyarrow 库以两种不同的方式绑定到 Acero。首先,pyarrow.acero 中有一个直接绑定,它直接绑定到公共 API。其次,有许多计算实用程序,如 pyarrow.Table.group_by,它使用 Acero,但这对用户是不可见的。

Java#

Java 实现公开了 Arrow 数据集的一些功能。它们隐式地使用 Acero。目前,Java 实现中没有与 Acero 或 Substrait 的直接绑定。

设计理念#

引擎无关计算#

如果一个节点需要复杂的计算,那么它应该将该工作封装在不依赖于任何特定引擎设计的抽象中。例如,哈希连接节点使用诸如行编码器、哈希表和执行批处理构建器之类的实用程序。其他地方共享排序队列和行分段器的实现。节点本身应保持最小化,并简单地从 Acero 映射到抽象。

这有助于将设计与 Acero 的设计细节分离,并使它们更能适应引擎的变化。它也有助于将这些抽象本身提升为能力。用于其他引擎或作为 pyarrow 的计算实用程序的潜在新增功能。

创建任务而不是线程#

如果需要并行运行某些内容,则应使用线程任务而不是专用线程。

  • 这可以减少线程数(减少线程争用和上下文切换)

  • 这可以防止死锁(任务在发生故障时自动取消)

  • 这简化了分析(可以轻松测量任务,更容易知道所有工作在哪里)

  • 这使得无需线程即可运行(有时用户正在执行自己的线程,有时我们需要在受线程限制的环境中运行,例如 emscripten)

注意:我们目前并不总是遵循此建议。asof 连接节点中有一个专用进程线程。专用线程对于实验性使用“可以”,但我们希望迁移到不使用它们。

不要阻塞 CPU 线程#

如果您需要运行一个可能长时间运行且没有积极使用 CPU 资源的活动(例如,从磁盘读取、网络 I/O、使用其自身线程等待外部库),则应使用异步实用程序来确保不阻塞 CPU 线程。

不要重复造轮子#

每个节点不应是独立的实用程序孤岛。在可能的情况下,计算应推送到计算函数或公共共享实用程序中。这是像这样的大型项目能够维护的唯一方法。

避免查询优化#

编写高效的 Acero 计划可能具有挑战性。例如,应将过滤表达式和列选择下推到扫描节点中,以便不从磁盘读取数据。表达式应简化,公共子表达式应分解出来。哈希连接节点的构建侧应该是两个输入中较小的一个。

然而,解决这些问题是查询计划器或查询优化器的挑战。创建查询优化器是一项超出 Acero 范围的挑战性任务。随着 Substrait 的采用,我们希望最终会出现解决这些问题的实用程序。因此,我们通常避免在 Acero 中进行任何类型的查询优化。Acero 应尽可能从字面上解释声明。这有助于减少维护并避免意外。

我们也意识到这并非总是可行的。例如,哈希连接节点当前会检测是否存在哈希连接运算符链,如果存在,它会在运算符之间配置布隆过滤器。从技术上讲,这可以留给查询优化器来完成。但是,这种行为特定于 Acero,而且相当特殊,因此不太可能很快引入到优化器中。

性能准则#

批处理大小#

讨论最多的性能标准可能是批处理大小。Acero 最初是根据研究设计的,遵循 morsel-batch 模型。任务是根据大量的行(一个 morsel)创建的。目标是使 morsel 足够大以证明任务的开销是合理的。在任务中,数据被进一步细分为批次。每个批次应足够小,以便舒适地放入 CPU 缓存(通常是 L2 缓存)中。

这设置了两个循环。外循环是并行的,内循环不是

for morsel in dataset: # parallel
  for batch in morsel:
    run_pipeline(batch)

这种执行方式的优点是访问同一列的连续节点(或执行节点内的连续操作)可能会受益于缓存。它对于需要随机访问数据的函数也是必不可少的。它最大限度地提高了并行性,同时最大限度地减少了从主内存到 CPU 缓存的数据传输。

../../_images/microbatch.svg

如果需要多次遍历数据(或随机访问)并且批次远大于缓存,则性能会下降。将任务分解成更小的批次有助于提高任务局部性。#

morsel/batch 模型反映在 Acero 的几个地方

  • 在大多数源节点中,我们将尝试获取 1Mi 行的批次。这通常是可配置的。

  • 在源节点中,我们然后迭代并切掉 32Ki 行的批次。这目前是不可配置的。

  • 哈希连接节点当前要求批次包含不超过 32Ki 行,因为它在某些地方使用 16 位有符号整数作为行索引。

然而,此指导存在争议。分析表明,迁移到较小的批处理大小并没有带来任何实际好处。我们获得的任何优势似乎都损失在每批处理的开销中。大部分开销似乎是由于各种每批处理的分配造成的。此外,根据您的硬件,CPU 缓存<->RAM 不一定是瓶颈。线性访问、预取和高 CPU<->RAM 带宽的组合可以减轻缓存未命中的损失。

因此,本节包含在本指南中是为了提供历史背景,但不应被视为具有约束力。

正在进行和已弃用的工作#

以下工作正在进行中。这里描述它们是为了解释代码库中的某些重复,以及解释即将消失的类型。

扫描器 v2#

扫描器当前是数据集模块中的一个节点,在工厂注册表中注册为“scan”。此节点是在 Acero 之前编写的,并广泛使用 AsyncGenerator 来并行扫描多个文件。不幸的是,AsyncGenerator 的使用使得扫描难以分析、难以调试且无法取消。新的扫描节点正在开发中。它目前注册的名称为“scan2”。新的扫描节点使用 AsyncTaskScheduler 而不是 AsyncGenerator,并且应该提供额外的功能,例如能够跳过行和处理嵌套列投影(对于支持它的格式)。

OrderBySink 和 SelectKSink#

这两个 exec 节点提供了自定义 sink 实现。它们是在 Acero 添加有序执行之前编写的,并且是生成有序输出的唯一方法。但是,它们必须放在计划的末尾,而且它们是自定义 sink 节点的事实使得它们难以用 Declaration 来描述。OrderByNode 和 FetchNode 替代了它们。目前保留这些节点,直到现有绑定不再使用它们。

上游变更#

Acero 的设计使其无需重新编译即可扩展。您可以轻松添加新的计算函数和 exec 节点,而无需创建分支或编译 Acero。但是,当您开发出普遍有用的新功能时,我们希望您能抽出时间将您的更改提交到上游。

即使我们欢迎这些更改,我们也必须承认此过程存在成本。上游代码要求新模块行为正确,但这通常是更容易审查的部分。更重要的是,上游代码是将维护负担从您自己转移到更广泛的 Arrow C++ 项目维护者的过程。这需要维护者对代码有深入的了解,要求代码与项目的风格一致,并且要求代码经过单元测试的充分测试以帮助回归。

因此,我们强烈建议采取以下步骤

  • 开始时,您应该向邮件列表发送消息,宣布您的意图和设计。这将帮助您确定该功能是否更广泛地受到关注,其他人可能在流程早期就有想法或建议。

    • 如果对该功能没有太多兴趣,那么请记住,最终可能难以将更改提交到上游。团队的维护能力有限,我们尝试优先考虑需求量大的功能。

  • 我们建议您在自己的分支上开发和测试更改,直到您确信事情正常工作为止。如果更改很大,那么您可能还需要考虑如何将更改分解成更小的部分。这样做时,您可以共享较大的 PR(作为草稿 PR 或本地分支上的分支)和较小的 PR。这样我们就可以看到较小 PR 的上下文。但是,如果您确实将事情分解了,那么较小的 PR 理想情况下仍然应该独立存在。

  • 任何 PR 都需要具备以下内容

    • 转换新功能的单元测试

    • 如果有任何重要的计算工作正在进行,则进行微基准测试

    • 演示如何使用新功能的示例

    • API 参考和本指南的更新

    • 通过 CI(您可以在您的分支上启用 GitHub Actions,这将允许大多数 CI 作业在您创建 PR 之前运行)