分离式 IPC 协议#

警告

实验性功能:分离式 IPC 协议目前处于实验阶段。基于反馈和使用情况,在最终标准化之前,协议定义可能会发生变化。

原理#

Arrow IPC 格式描述了一种将 Arrow 数据作为记录批次流进行传输的协议。该协议期望一个连续的字节流,并将其划分为离散的消息(使用长度前缀和延续指示符)。每个离散消息由两部分组成

  • 一个 Flatbuffers 头消息

  • 一系列由扁平化和打包后的主体缓冲区组成的字节(某些消息类型,如 Schema 消息,没有这一部分)——这在 IPC 格式规范中被称为消息主体

在大多数情况下,现有的 IPC 格式已经足够高效

  • 以 IPC 格式接收数据允许对主体缓冲区字节进行零拷贝利用,无需反序列化即可形成 Arrow 数组

  • IPC 文件格式可以进行内存映射,因为它与位置无关,且文件的字节与内存中的期望内容完全一致。

然而,有些用例无法通过这种方式处理

  • 构建 IPC 记录批次消息需要分配一块连续的内存并复制所有数据缓冲区,将它们紧密地打包在一起。这使得将现有的、可直接使用的数据封装到 IPC 消息中的常见情况变得效率低下。

  • 即使 Arrow 数据位于跨进程边界或传输层(如 UCX)可访问的内存中,也没有标准的方法向能够利用它的消费者指定该共享位置。

  • 位于非 CPU 设备(如 GPU)上的 Arrow 数据,在不将数据复制回主机设备或将 Flatbuffers 元数据字节复制到设备内存的情况下,无法使用 Arrow IPC 发送。

    • 同样地,将 IPC 消息接收到设备内存中需要将 Flatbuffers 元数据复制回主机 CPU 设备。这是因为 IPC 流在单个流中交织了数据和元数据。

本协议旨在以高效的方式解决这些用例。

目标#

  • 定义一种用于传递 Arrow IPC 数据的通用协议,它不绑定于任何特定的传输层,并允许利用非 CPU 设备内存、共享内存以及较新的“高性能”传输层(如 UCXlibfabric)。

    • 这使得主体数据可以保存在非 CPU 设备(如 GPU)上,而无需昂贵的设备到主机的拷贝操作。

  • 通过将 IPC 元数据流与 IPC 主体字节分离,允许纯粹将 Flight RPC 用于控制流

定义#

IPC 元数据#

包含 Arrow IPC 消息头部的 Flatbuffers 消息字节

标签 (Tag)#

一个用于流量控制并确定如何解释消息主体的低端字节序(little-endian) uint64 值。特定位可以被屏蔽,以便仅通过标签的一部分来识别消息,其余位可用于控制流或其他消息元数据。某些传输层(如 UCX)对这类标签值有内置支持,无论消息主体是否位于非 CPU 设备上,它们都会在 CPU 内存中提供这些标签。

序列号 (Sequence Number)#

一个 4 字节的低端字节序无符号整数,对于一个流从 0 开始,指示消息的顺序。它也用于识别特定消息,从而将 IPC 元数据头与其对应的主体关联起来,因为元数据和主体可以在不同的管道/流/传输层上发送。

如果序列号达到 UINT32_MAX,应该允许其翻转,因为不太可能有足够的未处理消息在等待处理从而导致序列号重叠。

序列号有两个作用:识别相应的元数据和带标签的主体数据消息,以及确保我们不依赖于消息必须按顺序到达。客户端应使用序列号来正确排序到达的消息以便进行处理。

协议#

利用 libcudfUCX 的参考实现可以在 arrow-experiments 仓库中找到。

要求#

实现此协议的传输层必须提供两项功能

  • 消息发送

    • 分界消息(如 gRPC),而不是无界流(如没有进一步分帧的普通 TCP)。

    • 或者,可以使用像 封装消息格式 那样的分帧机制,但去掉主体字节。

  • 带标签的消息发送

    • 发送带有附加的低端字节序、64 位无符号整数标签的消息以进行控制流。这样的标签允许控制流操作主体位于非 CPU 设备上的消息,而无需将消息本身从设备上拷贝出来。

URI 规范#

当提供 URI 给消费者以连接此协议时(例如通过 Flight 的位置 URI),URI 应指定一个易于识别的方案,如 ucx:fabric:。此外,URI 应编码以下 URI 查询参数

注意

随着该协议的成熟,本文档将更新常用传输方案。

  • want_data - 必需 - uint64 整数值

    • 此值应用于标记发往服务器的初始消息以启动数据传输。该初始消息的主体应为被请求数据流的不透明二进制标识符(类似于 Flight RPC 协议中的 Ticket

  • free_data - 可选 - uint64 整数值

    • 如果服务器可能使用偏移量/地址来访问远程内存或共享内存位置,则 URI 应包含此参数。此值用于标记从客户端发送到数据服务器的消息,包含不再需要使用的特定偏移量/地址(即,直接引用这些内存位置的操作,例如将远程数据拷贝到本地内存,已经完成)。

  • remote_handle - 可选 - base64 编码的字符串

    • 当使用共享内存或远程内存时,此值指示访问内存所需的任何句柄或标识符。

      • 使用 UCX 时,这将是 rkey

      • 对于 CUDA IPC,这将是基础 GPU 指针或内存句柄的值,随后的地址将是相对于此基础指针的偏移量。

背压处理#

目前,本提议没有指定任何管理消息背压以因内存和带宽原因进行限制的方法。目前,这交由传输层自行定义,而不是锁定在某种次优方案上。

随着不同传输层和库之间使用量的增长,将出现通用且高效的方法来处理不同用例下的背压。

注意

虽然协议本身是传输无关的,但目前的使用和示例仅在 UCX 和 libfabric 传输上进行了测试。

协议描述#

可能会出现两种情况

  1. 元数据流和主体数据流通过单独的连接发送

%% Licensed to the Apache Software Foundation (ASF) under one %% or more contributor license agreements. See the NOTICE file %% distributed with this work for additional information %% regarding copyright ownership. The ASF licenses this file %% to you under the Apache License, Version 2.0 (the %% "License"); you may not use this file except in compliance %% with the License. You may obtain a copy of the License at %% %% https://apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, %% software distributed under the License is distributed on an %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY %% KIND, either express or implied. See the License for the %% specific language governing permissions and limitations %% under the License. sequenceDiagram participant D as Data Stream participant C as Client participant M as Metadata Stream activate C C-->>+M: TaggedMessage(server.want_data, bytes=ID_of_desired_data) C-->>+D: TaggedMessage(server.want_data, bytes=ID_of_desired_data) M-->>C: Message(bytes([1]) + le_bytes(sequence_number) + schema_metadata) loop each batch par M-->>C: Message(bytes([1]) + le_bytes(sequence_number) + batch_metadata) and alt D-->>C: TaggedMessage((bytes[0] << 55) | le_bytes(sequence_number),<br/>bytes=batch_data) else D-->>C: TaggedMessage((bytes[1] << 55) | le_bytes(sequence_number),<br/>bytes=uint64_pairs) end end end M-->>C: Message(bytes([0]) + le_bytes(sequence_number)) deactivate M loop C-->>D: TaggedMessage(server.free_data, bytes=uint64_list) end deactivate D deactivate C

  1. 元数据流和主体数据流同时通过同一个连接发送

%% Licensed to the Apache Software Foundation (ASF) under one %% or more contributor license agreements. See the NOTICE file %% distributed with this work for additional information %% regarding copyright ownership. The ASF licenses this file %% to you under the Apache License, Version 2.0 (the %% "License"); you may not use this file except in compliance %% with the License. You may obtain a copy of the License at %% %% https://apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, %% software distributed under the License is distributed on an %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY %% KIND, either express or implied. See the License for the %% specific language governing permissions and limitations %% under the License. sequenceDiagram participant C as Client participant S as Server activate C C-->>+S: TaggedMessage(server.want_data, bytes=ID_of_desired_data) S-->>C: Message(bytes([1]) + le_bytes(sequence_number) + schema_metadata) par loop each chunk S-->>C: Message(bytes([1]) + le_bytes(sequence_number) + batch_metadata) end S-->>C: Message(bytes([0]) + le_bytes(sequence_number)) and loop each chunk alt S-->>C: TaggedMessage((bytes[0] << 55) | le_bytes(sequence_number),<br/>bytes=batch_data) else S-->>C: TaggedMessage((bytes[1] << 55) | le_bytes(sequence_number),<br/>bytes=uint64_pairs) end end end loop C-->>S: TaggedMessage(server.free_data, bytes=uint64_list) end deactivate S deactivate C

服务器序列#

可以有一个单一的服务器处理 IPC 元数据流和主体数据流,也可以有单独的服务器分别处理 IPC 元数据和主体数据。这允许根据需要通过单个传输管道或两个管道进行数据流传输。

元数据流序列#

服务器的静止状态是等待带有特定 <want_data> 标签值的带标签消息以启动传输。此 <want_data> 值由服务器定义,并通过 URI 传播给任何客户端。本协议不规定任何特定值,以免干扰任何其他依赖标签值的现有协议。该消息的主体将包含一个不透明的二进制标识符,以指示要发送的特定数据集/数据流。

注意

例如,随 FlightInfo 消息传递的 ticket 就是此消息的主体。因为它是不透明的,它可以是服务器想要使用的任何内容。URI 和标识符不需要通过 Flight RPC 提供给客户端,也可以通过任何所需的传输层或协议传输。

收到 <want_data> 请求后,服务器通过发送一系列由以下内容组成的消息进行响应

block-beta columns 8 block:P["\n\n\n\nPrefix"]:5 T["Message type\nByte 0"] S["Sequence number\nBytes 1-4"] end H["Flatbuffer bytes\nRest of the message"]:3

  • 5 字节前缀

    • 消息的第一个字节指示消息类型,目前仅允许两种消息类型(未来可能会添加更多类型)

      1. 流结束 (End of Stream)

      2. Flatbuffers IPC 元数据消息

    • 接下来的 4 个字节是一个低端字节序、32 位无符号整数,指示消息的序列号。流中的第一条消息(必须始终是 schema 消息)必须具有 0 的序列号。随后的每条消息必须将该数字递增 1

  • Arrow IPC 头部的完整 Flatbuffers 字节

如 Arrow IPC 格式中所定义,每个元数据消息可以代表数据流使用的数据块或字典。

发送最后一条元数据消息后,服务器必须通过发送恰好 5 个字节的消息来指示流结束

  • 第一个字节为 0,表示流结束消息

  • 最后 4 个字节是序列号(4 字节、无符号整数,低端字节序)

数据流序列#

如果单一服务器同时处理数据流和元数据流,则数据消息开始与元数据消息并行发送给客户端。否则,与元数据序列一样,服务器的静止状态是等待带有 <want_data> 标签值的带标签消息,其主体指示要发送给客户端的数据集/数据流。

对于数据流中的每条 IPC 消息,如果该消息有主体(即 Record Batch 或 Dictionary 消息),则必须在数据流上发送一条带标签的消息。每条消息的 标签 应按如下结构构造

block-beta columns 8 S["Sequence number\nBytes 0-3"]:4 U["Unused (Reserved)\nBytes 4-6"]:3 T["Message type\nByte 7"]:1

  • 标签的最低有效 4 字节(位 0 - 31)应为该消息的 32 位无符号、低端字节序序列号。

  • 标签的最高有效字节(位 56 - 63)指示消息主体的 8 位无符号整数类型。目前仅指定了两种消息类型,但可以根据需要添加更多以扩展协议

    1. 主体包含原始主体缓冲区字节作为打包缓冲区(即标准 IPC 格式主体字节)

    2. 主体包含一系列无符号、低端字节序 64 位整数对,以表示共享或远程内存,其结构如下

      • 前两个整数(例如前 16 字节)表示此消息中所有缓冲区的大小(字节)和缓冲区数量(即随后的 uint64 对的数量)

      • 随后的每个 uint64 值对分别是该特定缓冲区的地址/偏移量及其长度。

  • 标签的所有未指定位(位 32 - 55)保留供本协议的潜在未来更新使用。目前它们必须为 0。

注意

任何传输的共享/远程内存地址必须由服务器保持活动,直到收到相应的带标签 <free_data> 消息。如果客户端在发送任何 <free_data> 消息之前断开连接,则服务器可以认为清理该内存是安全的。

发送最后一条带标签的 IPC 主体消息后,服务器应保持连接并等待带标签的 <free_data> 消息。这些 <free_data> 消息的结构很简单:一个或多个无符号、低端字节序 64 位整数,指示可以释放的地址/偏移量。

一旦没有未完成的待释放地址,该流的工作即告完成。

客户端序列#

此协议的客户端需要并发处理数据消息流和元数据消息流,它们可能来自同一服务器,也可能来自不同服务器。下面是一个流程图,展示了客户端如何处理元数据流和数据流

%% Licensed to the Apache Software Foundation (ASF) under one %% or more contributor license agreements. See the NOTICE file %% distributed with this work for additional information %% regarding copyright ownership. The ASF licenses this file %% to you under the Apache License, Version 2.0 (the %% "License"); you may not use this file except in compliance %% with the License. You may obtain a copy of the License at %% https://apache.org/licenses/LICENSE-2.0 %% Unless required by applicable law or agreed to in writing, %% software distributed under the License is distributed on an %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY %% KIND, either express or implied. See the License for the %% specific language governing permissions and limitations %% under the License. graph LR client((Client))-->c1{{Send #60;want_data#gt; Msg}} subgraph meta [Meta Message] direction LR m1[/Msg Type #40;byte 0#41;<br/>Seq Num #40;bytes 1-5#41;/]-- type 1 -->m2[[Process IPC Header]] m2-- IPC has body -->m3[Get Corresponding<br/>Tagged Msg] m2-- Schema Msg -->m4[/Store Schema/] m1-- type 0 -->e[Indicate End of Stream] end subgraph data [Data Stream] direction LR d1[Request Msg<br/>for Seq Num]-->d2{Most Significant<br/>Byte} d2-- 0 -->d3[Construct from<br/>Metadata and Body] d2-- 1 -->d4[Get shared/remote<br/>buffers] d4 -->d5[Construct from<br/>Metadata and buffers] d3 & d5 -->e2[Output Batch] end client -- recv untagged msg --> meta client -- get tagged msg --> data

  1. 首先,客户端发送一条带标签的消息,使用其在 URI 中获得的 <want_data> 值作为标签,并以不透明 ID 作为主体。

    • 如果元数据服务器和数据服务器是分开的,则需要分别向每个服务器发送一条 <want_data> 消息。

    • 在任何情况下,元数据流和数据流都可以根据传输层的性质并发和/或异步处理。

  2. 对于客户端在元数据流中接收到的每条未打标签消息

    • 消息的第一个字节指示它是流结束消息(值为 0)还是元数据消息(值为 1)。

    • 接下来的 4 个字节是消息的序列号,是一个 32 位无符号、低端字节序整数。

    • 如果它不是流结束消息,剩余字节是 IPC Flatbuffer 字节,可以按常规方式解释。

      • 如果消息有主体(即 Record Batch 或 Dictionary 消息),则客户端应使用相同的序列号从数据流中检索带标签的消息。

    • 如果它流结束消息,则如果没有接收到的序列号缺口,则可以安全地关闭元数据连接。

  3. 当收到需要主体的元数据消息时,应使用 0x00000000FFFFFFFF 的标签掩码结合序列号来匹配消息,而忽略高位字节(例如,我们只关心将低 4 位与序列号匹配)

    • 接收到后,最高有效字节的值决定了客户端处理主体数据的方式

      • 如果最高有效字节为 0:则消息主体是原始 IPC 打包主体缓冲区,从而允许其与相应的元数据头字节轻松处理。

      • 如果最高有效字节为 1:消息主体将由一系列 64 位无符号、低端字节序整数对组成。

        • 前两个整数代表 1) 所有主体缓冲区加在一起的总大小,以便在需要中间缓冲区时易于分配,以及 2) 正在发送的缓冲区数量 (nbuf)。

        • 消息的其余部分将是 nbuf 个整数对,每个缓冲区一个。每一对是 1) 缓冲区的地址/偏移量和 2) 该缓冲区的长度。然后可以根据底层传输层通过共享或远程内存例程检索内存。这些地址/偏移量必须保留,以便稍后可以在 <free_data> 消息中发回,向服务器指示客户端不再需要共享内存。

  4. 一旦收到流结束消息,客户端应处理任何剩余的未处理 IPC 元数据消息。

  5. 在单个内存地址/偏移量能够被远程服务器释放后(在它发送了这些而非完整主体字节的情况下),客户端应向服务器发送相应的 <free_data> 消息。

    • 单条 <free_data> 消息由任意数量的 64 位无符号整数值组成,表示可以释放的地址/偏移量。其数量为任意的原因是允许客户端选择是发送多条消息来释放多个地址,还是将多个地址合并为较少的消息以供释放(从而根据需要使协议不那么“频繁交互”)

持续开发#

如果您决定在自己的环境和系统中尝试此协议,我们期待您的反馈并了解您的用例。由于这是一个目前的实验性协议,我们需要真实世界的使用情况来促进改进,并找到跨传输层进行标准化的正确通用方法。

请通过 Arrow 开发人员邮件列表参与讨论:https://arrow.apache.org/community/#mailing-lists