开发者指南#

此页面更详细地介绍了 Acero 的设计。它讨论了如何创建自定义执行节点,并描述了 Acero 设计和实现背后的某些理念。最后,它概述了如何使用新行为扩展 Acero,以及如何将这种新行为上游到核心 Arrow 存储库中。

理解 ExecNode#

ExecNode 是一个抽象类,具有几个纯虚方法,这些方法控制节点的操作方式

ExecNode::StartProducing()#

此方法在计划开始时调用一次。大多数节点忽略此方法(任何必要的初始化都应在构造函数或 Init 中完成)。但是,源节点通常会提供自定义实现。源节点通常会安排启动读取和提供数据所需的任何任务。源节点通常是计划中任务的主要创建者。

注意

ExecPlan 在基于推送的模型上运行。源通常是基于拉取的。例如,您的源可能是一个迭代器。源节点通常会安排任务从源中拉取一个项目,并将该项目推送到源的输出节点(通过 InputReceived)。

示例#

  • table_source 节点中,输入表被分成批次。为每个批次创建一个任务,该任务调用节点输出的 InputReceived

  • scan 节点中,创建一个任务来开始列出数据集中的片段。然后每个列出任务都会异步创建任务来从片段中读取批次。当批次完全读入时,一个延续会安排一个新的任务,该任务具有执行计划。此任务调用扫描节点输出的 InputReceived

ExecNode::InputReceived()#

此方法在计划执行期间多次调用。它就是节点相互传递数据的方式。输入节点将在其输出上调用 InputReceived。Acero 的执行模型是基于推送的。每个节点都通过调用 InputReceived 并传入一批数据来将数据推送到其输出。

InputReceived 方法通常是节点实际工作发生的地方。例如,一个投影节点将执行其表达式,并创建一个新的扩展输出批次。然后它将调用其输出的 InputReceived。InputReceived 永远不会在源节点上调用。接收节点永远不会调用 InputReceived。所有其他节点都会同时经历两者。

某些节点(通常称为“管道中断器”)必须在生成任何输出之前累积输入。例如,一个排序节点必须在排序数据并生成输出之前累积所有输入。在这些节点中,InputReceived 方法通常会将数据放置到某种累积队列中。如果节点没有足够的数据来操作,那么它将不会调用 InputReceived。然后这将是当前任务的结束。

示例#

  • project 节点运行其表达式,使用接收到的批次作为表达式的输入。从输入批次和表达式的结果创建一个新批次。新批次被赋予与输入批次相同的顺序索引,然后节点在其输出上调用 InputReceived

  • order_by 节点将批次插入累积队列。如果这是最后一个批次,那么节点将对累积队列中的所有内容进行排序。然后节点将在排序结果中的每个批次上调用其输出的 InputReceived。一个新的批次索引将被分配给每个批次。请注意,此最终输出步骤也可能作为对 InputFinished(下面描述)的调用而发生。

ExecNode::InputFinished()#

此方法将为每个输入调用一次。节点将在其输出上调用 InputFinished,一旦它知道它将向该输出发送多少个批次。通常情况下,这发生在节点完成工作时。例如,扫描节点将在完成读取文件后调用 InputFinished。但是,如果它知道(可能来自文件元数据)将创建多少个批次,它可以更早地调用它。

某些节点将使用此信号来触发某些处理。例如,排序节点需要等到它接收到所有输入才能对数据进行排序。它依赖于 InputFinished 调用来了解这种情况已发生。

即使节点在完成时不执行任何特殊处理(例如,投影节点或过滤器节点不需要执行任何流结束处理),该节点仍然会在其输出上调用 InputFinished。

警告

InputFinished 调用可能在对 InputReceived 的最后一次调用之前到达。实际上,它甚至可能在任何对 InputReceived 的调用开始之前就被发送出去。例如,表源节点总是准确地知道它将产生多少个批次。它可以选择在它曾经调用 InputReceived 之前调用 InputFinished。如果节点需要执行“流结束”处理,那么它通常使用 AtomicCounter,它是一个帮助类,用于确定何时到达所有数据。

示例#

  • order_by 检查它是否已经接收到所有批次。如果是,它将执行在 InputReceived 示例中描述的排序步骤。在它开始发送输出数据之前,它会检查它有多少输出批次(在累积或排序期间,批次大小可能会发生变化),并在节点的输出上调用 InputFinished

  • fetch 节点在对 InputReceived 的调用期间,意识到它已经接收到它要求的所有行。它立即在它的输出上调用 InputFinished(即使它自己的 InputFinished 方法还没有被调用)

ExecNode::PauseProducing() / ExecNode::ResumeProducing()#

这些方法控制背压。某些节点可能需要暂停其输入,以避免积累过多数据。例如,当用户使用 RecordBatchReader 使用接收节点时,我们使用一个接收节点。接收节点将数据放置在 RecordBatchReader 从中拉取的队列中(这是从基于推送的模型到基于拉取的模型的转换)。如果用户正在缓慢地读取 RecordBatchReader,那么该队列可能会开始填满。另一个例子可以考虑写入节点。此节点将数据写入文件系统。如果写入很慢,那么数据可能会在写入节点处累积。结果,写入节点将需要应用背压。

当节点意识到它需要应用一些背压时,它将在其输入上调用 PauseProducing。一旦节点有足够的可用空间继续,它将在其输入上调用 ResumeProducing。例如,接收节点在队列过满时会暂停。当用户继续从 RecordBatchReader 中读取时,我们预计队列会逐渐清空。一旦队列清空到足够的程度,接收节点就可以调用 ResumeProducing。

源节点通常需要为 PauseProducing 和 ResumeProducing 提供特殊行为。例如,从文件读取的扫描节点可以暂停读取文件。但是,某些源节点可能无法以任何有意义的方式暂停。表源节点暂停没有太大意义,因为它的数据已经存在于内存中。

既不是源也不是接收的节点仍然应该转发背压信号。例如,当在投影节点上调用 PauseProducing 时,它应该在其输入上调用 PauseProducing。如果节点有多个输入,那么它应该将信号转发到每个输入。

示例#

  • write 节点在它的 InputReceived 方法中,将一个批次添加到数据集写入者的队列中。如果数据集写入者随后已满,它将返回一个未完成的期货,该期货将在它有更多空间时完成。然后 write 节点在其输入上调用 PauseProducing。然后它在期货中添加一个延续,该延续将在其输入上调用 ResumeProducing

  • scan 节点使用一个 AsyncTaskScheduler 来跟踪它安排的所有任务。此调度程序受到限制,以限制 scan 节点允许执行的并发 I/O 量。当调用 PauseProducing 时,节点将暂停调度程序。这意味着排在节流阀后面的任何任务都不会被提交。但是,任何正在进行的 I/O 将继续(背压不能立即生效)。当调用 ResumeProducing 时,scan 节点将取消暂停调度程序。

ExecNode::StopProducing()#

当计划需要提前结束时,将调用 StopProducing。这可能是因为用户取消了计划,也可能是因为发生了错误。大多数节点不需要在这里执行任何操作。不期望或要求节点发送它拥有的任何剩余数据。任何安排任务的节点(例如源节点)都应停止生成新数据。

除了计划范围的取消之外,如果节点决定它已经接收到它需要的所有数据,那么它可能会在其输入上调用此方法。但是,由于并行性的原因,节点在停止其输入后可能仍然会接收到对 InputReceived 的一些调用。

如果使用任何外部资源,则清理应作为此调用的一部分进行。

示例#

  • asofjoin 节点有一个专用的处理线程,它使用队列与主要 Acero 线程通信。当调用 StopProducing 时,节点将一个毒丸插入队列。这告诉处理线程立即停止。处理线程停止后,它将标记其外部任务(下面描述)为已完成,这将允许计划完成。

  • fetch 节点在 InputReceived 中可能会决定它已经拥有它需要的所有数据。然后它可以在其输入上调用 StopProducing

初始化/构造/销毁#

简单的初始化逻辑(不能出错)可以在构造函数中完成。如果初始化逻辑可能会返回无效的状态,那么它可以完成在执行节点的工厂方法或 Init 方法中。对于简单的验证,首选工厂方法。如果初始化可能执行昂贵的分配或其他资源消耗,则首选 Init 方法。在调用 StartProducing 之前,总是会调用 Init。初始化也可以在 StartProducing 中完成,但请记住,其他节点可能已经开始运行。

此外,还有一个 Validate 方法可以被重载以提供自定义验证。此方法通常在 Init 之前调用,但在添加所有输入和输出之后调用。

析构函数会在今天执行最终化操作。今天有一些示例可能会导致最终化操作速度缓慢。例如,在写入节点中,如果在计划期间发生错误,那么我们可能需要在这里关闭一些打开的文件。如果有重要的最终化操作是异步的或可能触发错误,那么我们可以在 ExecNode 生命周期中引入 Finalize 方法。之所以还没有这样做,是因为目前还没有这种需求。

摘要#

ExecNode 生命周期#

方法名称

调用时机…

节点在…时调用此方法…

StartProducing

计划开始

N/A

InputReceived

从输入接收数据

向输出发送数据

InputFinished

输入知道批次的数量

节点可以告诉其输出批次的数量

StopProducing

计划中止或输出有足够的数据

节点拥有其需要的所有数据

扩展 Acero#

Acero 实例化了一个单例 ExecFactoryRegistry,它将名称与执行节点工厂(从选项创建 ExecNode 的方法)进行映射。要创建一个新的 ExecNode,您可以将节点注册到此注册表中,然后您的节点就可以被 Acero 使用。如果您想在 Substrait 计划中使用此节点,您还需要配置 Substrait 注册表,以便它知道如何将 Substrait 映射到您的自定义节点。

这意味着您可以创建并添加新的节点到 Acero,而无需从源代码重新编译 Acero。

调度和并行性#

数据引擎可以利用多种方式来利用多个计算资源(例如,多个内核)。在我们深入了解 Acero 的调度细节之前,我们将介绍一些高级主题。

计划的并行执行#

用户可能希望同时执行多个计划,并且他们可以随意这样做。但是,Acero 并没有跨计划调度的概念。每个计划都会尝试最大限度地利用其计算资源,并且可能会出现 CPU、内存和磁盘资源的争用。如果计划使用默认的 CPU 和 I/O 线程池,则争用程度会有一定程度的缓解,因为它们将共享同一个线程池。

本地分布式计划#

解决多线程问题的一种常见方法是将输入分成多个分区,然后为每个分区创建一个计划,最后以某种方式合并这些计划的结果。例如,假设您有 20 个文件和 10 个内核,并且您希望读取和排序所有数据。您可以为每 2 个文件创建一个计划来读取和排序这些文件。然后,您可以创建一个额外的计划,该计划接收来自这 10 个子计划的输入,并将这 10 个输入流以排序的方式合并。

这种方法很受欢迎,因为它是在多个服务器上分布式执行查询的方式,因此得到了广泛的支持并且得到了很好的理解。Acero 目前还没有这样做,但没有任何理由阻止它。向 Acero 添加 shuffle 和分区节点应该是一个优先事项,这将使 Acero 可以被分布式系统使用。一旦完成,就可以在需要时执行本地 shuffle(本地是指在单个系统上的多个执行计划实例之间交换数据)。

../../_images/dist_plan.svg

即使计划本身是串行运行的,分布式计划也可以提供并行性#

流水线并行性#

Acero 尝试使用流水线并行性来最大限度地提高并行性。当每批数据从源头到达时,我们立即创建一个任务并开始处理它。这意味着我们可能会在处理批次 X-1 完成之前就开始处理批次 X。这非常灵活且功能强大。但是,这也意味着正确实现 ExecNode 很困难。

例如,ExecNode 的 InputReceived 方法应该是可重入的。换句话说,应该预期在先前对 InputReceived 的调用完成之前,InputReceived 会被调用。这意味着具有任何类型可变状态的节点需要使用互斥锁或类似机制来保护该状态免受竞争条件的影响。这也意味着任务很容易变得无序,节点不应该期望其输入有任何特定的顺序(稍后会详细介绍)。

../../_images/pipeline.svg

在具有 3 个 CPU 线程和 2 个 I/O 线程的系统上的流水线并行性示例#

异步性#

某些操作需要很长时间,并且可能不需要 CPU。从文件系统读取数据就是一个例子。如果我们每个内核只有一个线程,那么在等待这些操作完成时,时间就会浪费。这个问题有两个常见的解决方案。同步解决方案通常是创建比内核更多的线程,并期望其中一些线程会被阻塞,这是可以接受的。这种方法通常更简单,但会导致过多的线程争用,并且需要进行微调。

另一个解决方案是使缓慢的操作异步化。当缓慢的操作开始时,调用者放弃线程并允许其他任务在此时运行。一旦缓慢的操作完成,就会创建一个新的任务来获取结果并继续处理。这有助于最大限度地减少线程争用,但实现起来往往更复杂。

由于缺乏标准的 C++ 异步 API,Acero 使用了这两种方法的组合。Acero 有两个线程池。第一个是 CPU 线程池。这个线程池每个内核有一个线程。在这个线程池中的任务不应该阻塞(除了同步方面的轻微延迟之外),并且通常应该尽可能积极地使用 CPU。I/O 线程池中的线程应该大部分时间处于空闲状态。它们应该避免执行任何 CPU 密集型工作。它们的任务基本上是等待数据可用,并在 CPU 线程池上调度后续任务。

../../_images/async.svg

Arrow 通过组合 CPU 和 I/O 线程池来实现异步执行#

注意

Acero 中的大多数节点不需要担心异步性。它们是完全同步的,不会生成任务。

每个流水线一个任务(有时甚至更多)#

引擎可以选择为每个节点的执行创建一个线程任务。但是,如果没有仔细的调度,这会导致缓存局部性问题。例如,假设我们有一个由三个执行节点组成的基本计划,分别是扫描、投影和过滤(这是一个非常常见的用例)。现在假设有 100 个批次。在每个操作一个任务的模型中,我们将有诸如“扫描批次 5”、“投影批次 5”和“过滤批次 5”之类的任务。这些任务中的每一个都可能访问相同的数据。例如,也许 projectfilter 节点需要读取同一列。该列最初是在 scan 节点的解码阶段创建的。为了最大限度地利用缓存,我们需要仔细调度任务,以确保这三个任务都连续运行并分配给同一个 CPU 内核。

为了避免这个问题,我们设计了在任务结束之前尽可能多地运行多个节点的任务。这组节点通常被称为“流水线”,而结束流水线的节点(因此也结束任务)通常被称为“流水线断路器”。有些节点甚至可能介于两者之间。例如,在散列连接节点中,当我们在探测侧接收一个批次时,并且散列表已经构建完毕,我们不需要结束任务,而是可以继续运行。这意味着任务有时会在连接节点结束,有时会继续运行到连接节点之后。

../../_images/pipeline_task.svg

计划中流水线的逻辑视图和两个任务,显示流水线边界在计划期间可能发生变化#

线程池和调度器#

CPU 和 I/O 线程池是 Arrow-C++ 核心库的一部分。它们包含一个任务的 FIFO 队列,并且会在线程可用时执行这些任务。对于 Acero,我们需要额外的功能。为此,我们使用 AsyncTaskScheduler。在最简单的操作模式下,调度器只是将任务提交到底层线程池。但是,它也能够创建子调度器,子调度器可以应用限流、优先级和任务跟踪。

  • 限流调度器与每个任务相关联一个成本。只有在有空间的情况下,任务才会提交到底层调度器。如果没有空间,则任务将被放入队列中。写入节点使用大小为 1 的限流来避免递归调用数据集写入器(数据集写入器进行自己的内部调度)。限流调度器可以手动暂停和恢复。暂停时,所有任务都会被排队,即使有空间,排队的任务也不会被提交。这在源节点中很有用,可以实现 PauseProducing 和 ResumeProducing。

  • 可以将优先级应用于限流调度器来控制排队任务的提交顺序。如果有空间,则任务会立即提交(无论优先级如何)。但是,如果限流已满,则任务会被排队并受到优先级的影响。扫描节点会限制其生成的读取请求数量,并优先读取数据集,如果可能的话按顺序读取。

  • 任务组可以用于跟踪一组任务,并在所有任务完成后运行一个最终化任务。这对于 fork-join 样式的问题很有用。写入节点使用任务组来关闭一个文件,一旦该文件的所有未完成的写入任务都完成了。

有很多研究和示例展示了在执行引擎中优先处理任务的不同方法。Acero 还没有遇到需要解决这个问题的情况。让我们来看一些常见的情况。

  • 引擎通常会优先读取连接节点的构建侧,然后再读取探测侧。这在 Acero 中可以通过应用反压更容易地处理。

  • 另一个常见的用例是控制内存累积。引擎会优先处理更靠近接收节点的任务,以缓解内存压力。但是,Acero 目前假设溢出将在流水线断路器处添加,并且计划中的内存使用量或多或少是静态的(每个内核),并且远低于硬件的限制。如果 Acero 需要在具有大量计算资源和有限内存的环境(例如 GPU)中使用,这可能会发生变化。

  • 引擎通常会使用工作窃取算法来优先处理在同一个内核上运行的任务,以提高缓存局部性。但是,由于 Acero 使用每个流水线一个任务的模型,因此调度器可以回收的缓存并行性机会并不多。任务只有在没有更多可以使用的数据的工作时才会结束。

虽然 Acero 目前还没有实现很多优先级处理,但我们拥有在需要时应用优先级处理的工具。

注意

除了 AsyncTaskScheduler 之外,还有一个名为 TaskScheduler 的类。这个类早于 AsyncTaskScheduler,设计用于为高效的同步 fork-join 工作负载提供任务跟踪。如果这个专门用途符合您的需求,那么您可以考虑使用它。有趣的是,可以将它与 AsyncTaskScheduler 进行性能对比,看看它们之间的差距有多大。

节点内并行性#

某些节点有可能在一个任务内利用并行性。例如,在扫描节点中,我们可以并行解码列。在散列连接节点中,有时会利用并行性来执行诸如构建散列表之类的复杂任务。这种类型的并行性不太常见,但不一定不鼓励。但是,应该先进行性能分析,以确保这种额外的并行性对您的工作负载有帮助。

所有工作都在任务中完成#

Acero 中的所有工作都在任务中完成。当计划启动时,会创建 AsyncTaskScheduler,并为它提供一个初始任务。这个初始任务会调用节点上的 StartProducing 方法。任务可能会调度额外的任务。例如,源节点通常会在调用 StartProducing 时调度任务。流水线断路器通常会在它们累积所有需要的数据时调度任务。一旦计划中的所有任务都完成了,则该计划就被认为已完成。

某些节点使用外部线程。这些线程必须使用 BeginExternalTask 方法注册为外部任务。例如,asof 连接节点使用一个专用的处理线程来实现串行执行。这个专用线程被注册为一个外部任务。应尽量避免使用外部任务,因为它们需要仔细处理,以避免在错误情况下出现死锁。

有序执行#

某些节点要么为其传出的批次建立顺序,要么需要能够按顺序处理批次。Acero 使用 ExecBatch 上的 batch_index 属性来处理排序。如果节点具有确定性的输出顺序,则应该在其发出的批次上应用批次索引。例如,OrderByNode 会对批次应用新的顺序(无论传入的顺序如何)。扫描节点能够将隐式顺序附加到批次,该顺序反映了正在扫描的文件中行的顺序。

如果节点需要按顺序处理数据,那么它会变得更加复杂。由于执行的并行性质,我们无法保证批次会按顺序到达节点。但是,一般可以预期它们是“大致有序”的。因此,我们可以将批次插入到一个排序队列中。排序队列被提供了一个回调,该回调保证会按顺序对批次进行串行运行。例如,获取节点使用排序队列。回调会检查是否需要包含批次的一部分或全部,然后在需要时对批次进行切片。

即使节点不关心顺序,也应该尝试尽可能维护批次索引。投影和过滤节点不关心顺序,但它们会确保输出批次保留与其输入批次相同的索引。如果需要,过滤节点甚至会发出空批次,以便它可以维护批次顺序而不会出现间隙。

../../_images/ordered.svg

有序执行示例#

分区执行#

如果以某种方式将行分组在一起,则流将被分区(有时称为分段)。目前还没有关于分区的正式概念。但是,一个正在开始发展(例如,分段聚合),我们最终可能还会在 Acero 中引入一个更正式的分区概念。

溢出#

溢出尚未在 Acero 中实现。

分布式执行#

当引擎在分布式环境中使用时,某些执行节点很有用。术语可能有所不同,因此我们将使用 Substrait 术语。交换节点将数据发送到不同的工作器。通常,这是一个分区交换,因此预计 Acero 将对每个批次进行分区,并将分区分布在 N 个不同的工作器之间。在另一端,我们有捕获节点。此节点接收来自不同工作器的数据。

这些节点目前不存在于 Acero 中。但是,它们将在范围内,我们希望将来能够拥有这样的节点。

分析和跟踪#

Acero 的跟踪目前处于半实现状态,分析工具存在重大差距。但是,在使用开放式遥测进行跟踪方面已经做出了一些努力,并且大部分必要的组件都已到位。目前主要缺少的是对跟踪结果的某种有效可视化。

为了使用现有的跟踪,您需要使用 ARROW_WITH_OPENTELEMETRY=ON 构建 Arrow。然后,您需要设置环境变量 ARROW_TRACING_BACKEND=otlp_http。这将配置开放式遥测,以将跟踪结果(作为 OTLP)导出到 HTTP 端点 https://127.0.0.1:4318/v1/traces。您需要配置开放式遥测收集器以收集该端点上的结果,并且您需要配置某种类型的跟踪查看器,例如 Jaeger:https://jaeger.golang.ac.cn/docs/1.21/opentelemetry/

基准测试#

Acero 最完整的宏观基准测试由 voltrondata-labs/arrowbench 提供。这些包括一组 TPC-H 基准测试,从 R-dplyr 集成执行,这些基准测试在每次 Arrow 提交时运行,并报告给 https://conbench.ursa.dev/ 上的 Conbench。

除了这些 TPC-H 基准测试外,还有许多针对不同节点(哈希连接、asof 连接等)的微基准测试。最后,计算函数本身应该大多具有微基准测试。有关微基准测试的更多信息,请参阅 https://arrow.apache.org/docs/developers/benchmarks.html

任何新功能都应包括微基准测试,以避免回归。

绑定#

公共 API#

Acero 的公共 API 由 Declaration 和各种 DeclarationToXyz 方法组成。此外,每个节点的选项类也是公共 API 的一部分。但是,节点是可扩展的,因此此 API 也是可扩展的。

R (dplyr)#

Dplyr 是一个用于以编程方式构建查询的 R 库。arrow-r 包具有 dplyr 绑定,这些绑定会调整 dplyr API 以创建 Acero 执行计划。此外,还有一个正在开发中的 dplyr-substrait 后端,它最终可能会取代 Acero 意识绑定。

Python#

pyarrow 库以两种不同的方式绑定到 Acero。首先,在 pyarrow.acero 中有一个直接绑定,它直接绑定到公共 API。其次,有一些计算实用程序,例如 pyarrow.Table.group_by,它使用 Acero,尽管对用户来说这是不可见的。

Java#

Java 实现公开了 Arrow 数据集的一些功能。这些隐式地使用 Acero。目前,Java 实现中没有直接绑定到 Acero 或 Substrait。

设计理念#

引擎独立计算#

如果一个节点需要复杂的计算,那么它应该将这些工作封装在不依赖于任何特定引擎设计的抽象中。例如,哈希连接节点使用诸如行编码器、哈希表和执行批次构建器之类的实用程序。其他地方共享排序队列和行分段器的实现。节点本身应该保持最小化,并且只是从 Acero 到抽象的映射。

这有助于将设计与 Acero 的设计细节分离,并使它们对引擎更改更具弹性。它还有助于将这些抽象作为其自身的功能来推广。无论是在其他引擎中使用,还是作为计算实用程序在 pyarrow 中潜在的新增功能。

使任务成为非线程#

如果您需要并行运行某些东西,那么您应该使用线程任务而不是专用线程。

  • 这将线程计数保持在较低水平(减少线程争用和上下文切换)

  • 这可以防止死锁(在发生故障的情况下,任务会自动取消)

  • 这简化了分析(可以轻松地测量任务,更容易了解所有工作在哪里)

  • 这使得在没有线程的情况下运行成为可能(有时用户正在执行自己的线程,有时我们需要在像 emscripten 这样的线程限制环境中运行)

注意:我们目前并不总是遵循此建议。asof 连接节点中有一个专用进程线程。专用线程在实验使用中是“可以的”,但我们希望从它们迁移。

不要阻塞 CPU 线程#

如果您需要运行一个可能运行时间很长的活动,该活动没有积极使用 CPU 资源(例如,从磁盘读取、网络 I/O、等待使用其自身线程的外部库),那么您应该使用异步实用程序来确保您不阻塞 CPU 线程。

不要重新发明轮子#

每个节点不应该是独立的实用程序孤岛。在可能的情况下,应将计算推送到计算函数或公共共享实用程序中。这是像这样的大型项目有望维护的唯一途径。

避免查询优化#

编写高效的 Acero 计划可能具有挑战性。例如,过滤器表达式和列选择应向下推送到扫描节点,以便数据不会从磁盘读取。表达式应简化,并且公共子表达式应被分解出来。哈希连接节点的构建端应该是两个输入中较小的一个。

但是,找出这些问题是为查询规划器或查询优化器保留的一项挑战。创建查询优化器是一项艰巨的任务,超出了 Acero 的范围。随着 Substrait 的采用,我们希望最终会出现解决这些问题的实用程序。因此,我们通常会避免在 Acero 中执行任何类型的查询优化。Acero 应该尽可能地按字面解释声明。这有助于减少维护并避免意外情况。

我们还意识到这并不总是可能的。例如,哈希连接节点目前会检测是否存在一串哈希连接运算符,如果存在,它会在运算符之间配置布隆过滤器。从技术上讲,这可以留给查询优化器来完成。但是,这种行为相当特定于 Acero 并且相当利基,因此不太可能很快将其引入优化器。

性能指南#

批次大小#

也许最常讨论的性能标准是批次大小。Acero 最初是基于研究来遵循 morsel-batch 模型设计的。任务是基于大量行(一个 morsel)创建的。目标是让 morsel 足够大,以证明任务的开销是合理的。在任务中,数据进一步细分为批次。每个批次应该足够小,以便舒适地放入 CPU 缓存(通常是 L2 缓存)。

这建立了两个循环。外循环是并行的,内循环不是。

for morsel in dataset: # parallel
  for batch in morsel:
    run_pipeline(batch)

这种执行风格的优点是,访问同一列的连续节点(或执行节点内的连续操作)可能从缓存中受益。对于需要随机访问数据的函数,它也是必不可少的。它最大程度地提高了并行性,同时最大程度地减少了从主内存到 CPU 缓存的数据传输。

../../_images/microbatch.svg

如果需要对数据进行多次遍历(或随机访问),并且批次比缓存大得多,那么性能就会下降。将任务分解成更小的批次有助于提高任务局部性。#

morsel/batch 模型反映在 Acero 的几个地方

  • 在大多数源节点中,我们将尝试获取 1Mi 行的批次。这通常是可配置的。

  • 在源节点中,我们然后迭代并切除 32Ki 行的批次。这目前不可配置。

  • 哈希连接节点目前要求批次包含至少 32Ki 行或更少,因为它在某些地方使用 16 位有符号整数作为行索引。

但是,此指南存在争议。分析表明,我们没有从转到更小的批次大小中获得任何真正的收益。似乎我们获得的任何优势都会在每个批次的开销中消失。这些开销中的大多数似乎是由于各种每个批次的分配造成的。此外,根据您的硬件,CPU Cache<->RAM 并不总是瓶颈。线性访问、预取和高 CPU<->RAM 带宽的组合可以减轻缓存未命中的惩罚。

因此,本节包含在本指南中以提供历史背景,但不应被视为约束性。

正在进行的和已弃用的工作#

以下工作正在进行中。它们在此处描述,以解释代码库中的某些重复,以及解释即将消失的类型。

扫描程序 v2#

扫描程序目前是数据集模块中的一个节点,在工厂注册表中注册为“scan”。此节点是在 Acero 之前编写的,并且广泛使用 AsyncGenerator 来并行扫描多个文件。不幸的是,使用 AsyncGenerator 使扫描难以分析、难以调试,并且无法取消。一个新的扫描节点正在开发中。它目前注册了“scan2”名称。新的扫描节点使用 AsyncTaskScheduler 而不是 AsyncGenerator,并且应提供额外的功能,例如跳过行和处理嵌套列投影(对于支持它的格式)

OrderBySink 和 SelectKSink#

这两个执行节点提供了自定义接收器实现。它们是在将有序执行添加到 Acero 之前编写的,并且是生成有序输出的唯一方式。但是,它们必须放置在计划的末尾,并且它们是自定义接收器节点这一事实使它们难以用 Declaration 描述。OrderByNode 和 FetchNode 替换了它们。这些目前保留着,直到现有的绑定从它们迁移。

上游更改#

Acero 的设计使其可以在不重新编译的情况下进行扩展。您可以轻松地添加新的计算函数和执行节点,而无需创建分支或编译 Acero。但是,在您开发了普遍有用的新功能时,我们希望您能抽出时间将更改上游。

即使我们欢迎这些更改,我们也必须承认此过程存在成本。上游代码要求新模块的行为正确,但这通常是更容易审查的部分。更重要的是,上游代码是一个将维护负担从您自己转移到更广泛的 Arrow C++ 项目维护者的过程。这需要维护者深入了解代码,需要代码与项目的风格一致,并且需要代码通过单元测试得到良好测试,以帮助进行回归。

因此,我们强烈建议您采取以下步骤

  • 在开始时,您应该向邮件列表发送一条消息,宣布您的意图和设计。这将帮助您确定该功能是否有更广泛的兴趣,并且其他人可能会有想法或建议在过程的早期阶段做出贡献。

    • 如果对该功能没有太多兴趣,请记住,最终上游更改可能会很困难。团队的维护能力有限,我们试图优先考虑需求量大的功能。

  • 我们建议您在自己的分支上开发和测试更改,直到您将其提升到对工作正常感到相当自信的程度。如果更改很大,您可能还想考虑如何将更改分解成更小的部分。在您这样做时,您可以共享更大的 PR(作为草稿 PR 或本地分支上的分支)和更小的 PR。这样,我们可以看到较小 PR 的上下文。但是,如果您确实将事物分解,则较小的 PR 仍然应该能够独立运行。

  • 任何 PR 都需要包含以下内容

    • 转换新功能的单元测试

    • 如果有任何重大计算工作正在进行,则需要进行微基准测试

    • 演示如何使用新功能的示例

    • 对 API 参考和本指南的更新

    • 通过 CI(您可以在您的分支上启用 GitHub Actions,这将允许大多数 CI 作业在您创建 PR 之前运行)