分离式 IPC 协议#

警告

实验性:分离式 IPC 协议在其当前形式下处于实验阶段。根据反馈和使用情况,协议定义可能会发生变化,直到其完全标准化。

基本原理#

Arrow IPC 格式描述了一种将 Arrow 数据作为记录批处理流传输的协议。此协议期望一个连续的字节流,分为离散的消息(使用长度前缀和继续指示符)。每条离散消息包含两个部分

  • 一个Flatbuffers 头消息

  • 一系列由扁平化和打包的主体缓冲区组成的字节(某些消息类型,如 Schema 消息,没有此部分) - 这在 IPC 格式规范中称为消息主体

在大多数情况下,现有的 IPC 格式在其当前存在形式下已经足够高效

  • 以 IPC 格式接收数据允许零拷贝利用主体缓冲区字节,无需反序列化即可形成 Arrow 数组

  • IPC 文件格式可以内存映射,因为它与位置无关,并且文件中的字节与内存中预期的字节完全相同。

但是,有些用例没有得到处理

  • 构建 IPC 记录批处理消息需要分配一个连续的字节块并将所有数据缓冲区复制到其中,并将其紧密地打包在一起。这会使将现有、可直接使用的数据包装到 IPC 消息中的常见情况变得最差。

  • 即使 Arrow 数据位于跨进程边界或传输(如 UCX)可访问的内存中,也没有标准方法来指定该共享位置给消费者,而消费者可以利用它。

  • 位于非 CPU 设备(例如 GPU)上的 Arrow 数据不能使用 Arrow IPC 发送,除非必须将数据复制回主机设备或将 Flatbuffers 元数据字节复制到设备内存中。

    • 同样,将 IPC 消息接收到的设备内存中需要执行将 Flatbuffers 元数据复制回主机 CPU 设备的操作。这是由于 IPC 流在一个流中交错数据和元数据的事实。

此协议试图以有效的方式解决这些用例。

目标#

  • 定义一个用于传递 Arrow IPC 数据的通用协议,该协议不依赖于任何特定的传输,并且还允许利用非 CPU 设备内存、共享内存和更新的“高性能”传输,例如UCXlibfabric

    • 这允许主体中的数据保留在非 CPU 设备(如 GPU)上,而无需昂贵的设备到主机的复制。

  • 允许使用Flight RPC仅用于控制流,方法是将 IPC 元数据流与 IPC 主体字节分离

定义#

IPC 元数据

包含 Arrow IPC 消息头的 Flatbuffers 消息字节

标记

用于流控制和小端uint64值,用于确定如何解释消息的主体。可以屏蔽特定的位以允许仅通过标记的一部分识别消息,并将其余位用于控制流或其他消息元数据。某些传输(如 UCX)内置支持此类标记值,并且无论消息主体是否可能驻留在非 CPU 设备上,都将在 CPU 内存中提供它们。

序列号

一个小端、4 字节无符号整数,从流的 0 开始,指示消息的序列顺序。它也用于识别特定消息以将 IPC 元数据头与其对应的正文关联起来,因为元数据和正文可以通过单独的管道/流/传输发送。

如果序列号达到UINT32_MAX,则应允许其回绕,因为不太可能存在足够多的未处理消息等待处理,从而导致序列号重叠。

序列号有两个用途:识别相应的元数据和标记的主体数据消息,并确保我们不依赖于消息必须按顺序到达。客户端应使用序列号正确排序到达的消息以进行处理。

协议#

可以在arrow-experiments 存储库中找到利用libcudfUCX的参考示例实现。

要求#

实现此协议的传输**必须**提供两部分功能

  • 消息发送

    • 定界消息(如 gRPC),而不是非定界流(如没有进一步帧的普通 TCP)。

    • 或者,可以使用类似于封装消息格式的 IPC 协议的帧机制,同时省略主体字节。

  • 标记消息发送

    • 发送一条消息,该消息附加了一个小端、64 位无符号整数值标记用于控制流。这样的标记允许控制流操作一条其主体位于非 CPU 设备上的消息,而无需将消息本身从设备上复制。

URI 规范#

当向消费者提供 URI 以与该协议一起使用时(例如通过Flight 的位置 URI),URI 应指定一个易于识别的方案,例如ucx:fabric:。此外,URI 应编码以下 URI 查询参数

注意

随着此协议的成熟,本文档将使用与其一起使用的常用识别传输方案进行更新。

  • want_data - **必需** - uint64 整数值

    • 此值应用于标记发送到服务器的初始消息以启动数据传输。启动消息的主体应为请求的数据流的不透明二进制标识符(如 Flight RPC 协议中的Ticket

  • free_data - **可选** - uint64 整数值

    • 如果服务器可能使用偏移量/地址发送消息以进行远程内存访问或共享内存位置,则 URI 应包含此参数。此值用于标记从客户端发送到数据服务器的消息,其中包含不再被客户端需要的特定偏移量/地址(即任何直接引用这些内存位置的操作,例如将远程数据复制到本地内存中,已完成)。

  • remote_handle - **可选** - base64 编码的字符串

    • 在使用共享内存或远程内存时,此值指示访问内存所需的任何必需句柄或标识符。

      • 使用 UCX,这将是rkey

      • 使用 CUDA IPC,这将是基本 GPU 指针或内存句柄的值,后续地址将从此基本指针偏移。

反压处理#

当前此提案未指定任何管理消息反压以抑制内存和带宽原因的方法。目前,这将是**传输定义的**,而不是锁定到次优的东西。

随着不同传输和库之间使用的增长,将出现常见模式,这些模式将允许跨不同用例以通用但有效的方式处理反压。

注意

虽然协议本身与传输无关,但到目前为止,当前的使用和示例仅使用 UCX 和 libfabric 传输进行了测试,仅此而已。

协议描述#

有两种可能性可能发生

  1. 元数据和主体数据的流通过单独的连接发送

%% Licensed to the Apache Software Foundation (ASF) under one %% or more contributor license agreements. See the NOTICE file %% distributed with this work for additional information %% regarding copyright ownership. The ASF licenses this file %% to you under the Apache License, Version 2.0 (the %% "License"); you may not use this file except in compliance %% with the License. You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, %% software distributed under the License is distributed on an %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY %% KIND, either express or implied. See the License for the %% specific language governing permissions and limitations %% under the License. sequenceDiagram participant D as Data Stream participant C as Client participant M as Metadata Stream activate C C-->>+M: TaggedMessage(server.want_data, bytes=ID_of_desired_data) C-->>+D: TaggedMessage(server.want_data, bytes=ID_of_desired_data) M-->>C: Message(bytes([1]) + le_bytes(sequence_number) + schema_metadata) loop each batch par M-->>C: Message(bytes([1]) + le_bytes(sequence_number) + batch_metadata) and alt D-->>C: TaggedMessage((bytes[0] << 55) | le_bytes(sequence_number),<br/>bytes=batch_data) else D-->>C: TaggedMessage((bytes[1] << 55) | le_bytes(sequence_number),<br/>bytes=uint64_pairs) end end end M-->>C: Message(bytes([0]) + le_bytes(sequence_number)) deactivate M loop C-->>D: TaggedMessage(server.free_data, bytes=uint64_list) end deactivate D deactivate C

  1. 元数据和主体数据的流通过同一连接同时发送

%% Licensed to the Apache Software Foundation (ASF) under one %% or more contributor license agreements. See the NOTICE file %% distributed with this work for additional information %% regarding copyright ownership. The ASF licenses this file %% to you under the Apache License, Version 2.0 (the %% "License"); you may not use this file except in compliance %% with the License. You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, %% software distributed under the License is distributed on an %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY %% KIND, either express or implied. See the License for the %% specific language governing permissions and limitations %% under the License. sequenceDiagram participant C as Client participant S as Server activate C C-->>+S: TaggedMessage(server.want_data, bytes=ID_of_desired_data) S-->>C: Message(bytes([1]) + le_bytes(sequence_number) + schema_metadata) par loop each chunk S-->>C: Message(bytes([1]) + le_bytes(sequence_number) + batch_metadata) end S-->>C: Message(bytes([0]) + le_bytes(sequence_number)) and loop each chunk alt S-->>C: TaggedMessage((bytes[0] << 55) | le_bytes(sequence_number),<br/>bytes=batch_data) else S-->>C: TaggedMessage((bytes[1] << 55) | le_bytes(sequence_number),<br/>bytes=uint64_pairs) end end end loop C-->>S: TaggedMessage(server.free_data, bytes=uint64_list) end deactivate S deactivate C

服务器序列#

可以有一个服务器同时处理 IPC 元数据流和主体数据流,或者分别为处理 IPC 元数据和主体数据设置单独的服务器。如果需要,这允许跨单个传输管道或两个管道流式传输数据。

元数据流序列#

服务器的待机状态是等待具有特定<want_data>标记值的**标记**消息以启动传输。此<want_data>值由服务器定义,并通过它们提供的 URI 传播到任何客户端。此协议没有规定任何特定值,以便它不会干扰任何其他依赖于标记值的现有协议。该消息的主体将包含一个不透明的二进制标识符,以指示要发送的特定数据集/数据流。

注意

例如,与FlightInfo消息一起传递的**票证**将是此消息的主体。因为它是不透明的,所以它可以是服务器想要使用的任何东西。URI 和标识符不需要通过 Flight RPC 传递给客户端,但可以来自任何所需的传输或协议。

收到<want_data>请求后,服务器通过发送由以下内容组成的一系列消息来响应

block-beta columns 8 block:P["\n\n\n\nPrefix"]:5 T["Message type\nByte 0"] S["Sequence number\nBytes 1-4"] end H["Flatbuffer bytes\nRest of the message"]:3

  • 5 字节前缀

    • 消息的第一个字节指示消息的类型,目前只有两种允许的消息类型(将来可能会添加更多类型)

      1. 流结束

      2. Flatbuffers IPC 元数据消息

    • 接下来的 4 个字节是一个小端、32 位无符号整数,指示消息的序列号。流中的第一条消息(**必须**始终是模式消息)**必须**具有序列号0。每条后续消息**必须**将数字增加1

  • Arrow IPC 头的完整 Flatbuffers 字节

如 Arrow IPC 格式中所定义,每个元数据消息可以表示数据块或字典,供数据流使用。

发送最后一个元数据消息后,服务器**必须**通过发送一条仅包含**正好**5 个字节的消息来指示流的结束

  • 第一个字节是0,表示**流结束**消息

  • 最后 4 个字节是序列号(4 字节,小端字节顺序的无符号整数)

数据流序列#

如果单个服务器同时处理数据和元数据流,则**应**与元数据消息并行开始将数据消息发送到客户端。否则,与元数据序列一样,服务器的待机状态是等待具有<want_data>标记值的**标记**消息,其主体指示要发送到客户端的数据集/数据流。

对于数据流中的每条 IPC 消息,如果该消息具有主体(即记录批处理或字典消息),则**必须**在数据流上发送**标记**消息。每条消息的标记应按如下方式构造

block-beta columns 8 S["Sequence number\nBytes 0-3"]:4 U["Unused (Reserved)\nBytes 4-6"]:3 T["Message type\nByte 7"]:1

  • 标记的最低有效 4 个字节(位 0 - 31)应为消息的 32 位无符号小端序列号。

  • 标记的最高有效字节(位 56 - 63)指示消息主体的**类型**,为 8 位无符号整数。当前仅指定了两种消息类型,但可以根据需要添加更多类型以扩展协议

    1. 主体包含原始主体缓冲区字节作为打包缓冲区(即标准 IPC 格式主体字节)

    2. 主体包含一系列无符号的小端 64 位整数对,以表示共享或远程内存,示意性结构如下

      • 前两个整数(例如,前 16 个字节)表示所有缓冲区的大小(以字节为单位)以及此消息中的缓冲区数量(以及因此以下uint64对的数量)

      • 每对后续的uint64值是地址/偏移量,后跟该特定缓冲区的长度。

  • 标记的所有未指定位(位 32 - 55)保留供此协议的潜在更新将来使用。目前,它们**必须**为 0。

注意

跨发送的任何共享/远程内存地址**必须**由服务器保持活动状态,直到收到相应的标记<free_data>消息。如果客户端在发送任何<free_data>消息之前断开连接,则可以假设如果服务器需要,可以安全地清理内存。

发送最后一个标记的 IPC 主体消息后,服务器应保持连接并等待标记的<free_data>消息。这些<free_data>消息的结构很简单:一个或多个无符号的小端 64 位整数,指示可以释放的地址/偏移量。

一旦没有更多待释放的地址,此流的工作就完成了。

客户端序列#

此协议的客户端需要同时处理消息的数据流和元数据流,这两个流可能来自同一个服务器,也可能来自不同的服务器。下图是一个流程图,展示了客户端如何处理元数据流和数据流。

%% Licensed to the Apache Software Foundation (ASF) under one %% or more contributor license agreements. See the NOTICE file %% distributed with this work for additional information %% regarding copyright ownership. The ASF licenses this file %% to you under the Apache License, Version 2.0 (the %% "License"); you may not use this file except in compliance %% with the License. You may obtain a copy of the License at %% http://www.apache.org/licenses/LICENSE-2.0 %% Unless required by applicable law or agreed to in writing, %% software distributed under the License is distributed on an %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY %% KIND, either express or implied. See the License for the %% specific language governing permissions and limitations %% under the License. graph LR client((Client))-->c1{{Send #60;want_data#gt; Msg}} subgraph meta [Meta Message] direction LR m1[/Msg Type #40;byte 0#41;<br/>Seq Num #40;bytes 1-5#41;/]-- type 1 -->m2[[Process IPC Header]] m2-- IPC has body -->m3[Get Corresponding<br/>Tagged Msg] m2-- Schema Msg -->m4[/Store Schema/] m1-- type 0 -->e[Indicate End of Stream] end subgraph data [Data Stream] direction LR d1[Request Msg<br/>for Seq Num]-->d2{Most Significant<br/>Byte} d2-- 0 -->d3[Construct from<br/>Metadata and Body] d2-- 1 -->d4[Get shared/remote<br/>buffers] d4 -->d5[Construct from<br/>Metadata and buffers] d3 & d5 -->e2[Output Batch] end client -- recv untagged msg --> meta client -- get tagged msg --> data

  1. 首先,客户端使用 URI 中提供的 `<want_data>` 值作为标签,以及不透明 ID 作为主体,发送一个带标签的消息。

    • 如果元数据服务器和数据服务器是分开的,则需要分别向每个服务器发送 `<want_data>` 消息。

    • 在任何一种情况下,元数据流和数据流都可以根据传输的性质并行和/或异步处理。

  2. 对于客户端在元数据流中接收到的每个**无标签**消息

    • 消息的第一个字节指示它是流结束消息(值 `0`)还是元数据消息(值 `1`)。

    • 接下来的 4 个字节是消息的序列号,一个以小端字节序表示的无符号 32 位整数。

    • 如果它**不是**流结束消息,则其余字节是 IPC Flatbuffer 字节,可以像普通字节一样解释。

      • 如果消息有主体(即记录批次或字典消息),则客户端应该使用相同的序列号从数据流中检索带标签的消息。

    • 如果它**是**流结束消息,则如果接收到的序列号没有间隙,则可以安全地关闭元数据连接。

  3. 当接收到需要主体的元数据消息时,`0x00000000FFFFFFFF` 的标签掩码**应该**与序列号一起使用以匹配消息,而不管高位字节如何(例如,我们只关心将低 4 个字节与序列号匹配)。

    • 一旦接收到,最高有效字节的值决定了客户端如何处理主体数据。

      • 如果最高有效字节为 0:则消息的主体是原始 IPC 打包的主体缓冲区,允许它与相应的元数据头字节轻松处理。

      • 如果最高有效字节为 1:则消息的主体将包含一系列以小端字节序表示的无符号 64 位整数对。

        • 前两个整数分别表示1)所有主体缓冲区的总大小,以便在需要中间缓冲区时轻松分配,以及2)正在发送的缓冲区数量(`nbuf`)。

        • 消息的其余部分将是 `nbuf` 对整数,每个缓冲区一对。每一对是1)缓冲区的地址/偏移量和2)该缓冲区的长度。然后可以通过基于底层传输的共享或远程内存例程检索内存。这些地址/偏移量**必须**保留,以便以后可以在 `<free_data>` 消息中发送回,指示服务器客户端不再需要共享内存。

  4. 接收到流结束消息后,客户端应该处理任何剩余的未处理 IPC 元数据消息。

  5. 在远程服务器能够释放各个内存地址/偏移量(在它发送这些地址/偏移量而不是完整主体字节的情况下)之后,客户端应该向服务器发送相应的 `<free_data>` 消息。

    • 单个 `<free_data>` 消息包含任意数量的无符号 64 位整数值,表示可以释放的地址/偏移量。它之所以是任意数量,是为了允许客户端选择是发送多条消息来释放多个地址,还是将多个地址合并成较少的要释放的消息(因此,如果需要,可以使协议不那么“健谈”)。

持续开发#

如果您决定在自己的环境和系统中尝试此协议,我们非常乐意收到您的反馈并了解您的用例。由于这目前是一个**实验性**协议,我们需要真实的用例才能促进改进它并在跨传输标准化方面找到正确的泛化。

请使用 Apache Arrow 开发者邮件列表参与讨论:https://arrow.apache.org/community/#mailing-lists