分离式 IPC 协议#
警告
实验性:分离式 IPC 协议目前处于实验阶段。根据反馈和使用情况,协议定义可能会在完全标准化之前发生变化。
原理#
Arrow IPC 格式描述了一种将 Arrow 数据作为记录批次流进行传输的协议。该协议期望一个连续的字节流,该字节流被分成离散的消息(使用长度前缀和继续指示符)。每个离散消息包含两部分:
一个 Flatbuffers 头部消息
一系列字节,包含展平并打包的体缓冲区(某些消息类型,如 Schema 消息,没有此部分)——这在 IPC 格式规范中称为消息体。
对于大多数情况,现有的 IPC 格式已经足够高效
以 IPC 格式接收数据允许零拷贝利用体缓冲区的字节,无需反序列化即可形成 Arrow Arrays
IPC 文件格式可以进行内存映射,因为它与位置无关,并且文件的字节与内存中预期的一致。
然而,存在一些当前格式无法处理的使用场景
构建 IPC 记录批次消息需要分配一个连续的字节块,并将所有数据缓冲区复制到其中,紧密地打包在一起。这使得将现有可直接使用的数据封装到 IPC 消息中的常见情况变得低效。
即使 Arrow 数据位于可跨进程边界或传输(如 UCX)访问的内存中,也没有标准方法可以向可能利用该共享位置的消费者指定该位置。
位于非 CPU 设备(例如 GPU)上的 Arrow 数据无法使用 Arrow IPC 发送,除非将数据复制回主机设备或将 Flatbuffers 元数据字节复制到设备内存中。
同样地,将 IPC 消息接收到设备内存中需要将 Flatbuffers 元数据复制回主机 CPU 设备。这是因为 IPC 流在单个流中交错数据和元数据。
本协议试图以高效的方式解决这些使用场景。
目标#
定义一种通用的协议,用于传输 Arrow IPC 数据,不与任何特定传输绑定,并且允许利用非 CPU 设备内存、共享内存以及新的“高性能”传输(如 UCX 或 libfabric)。
这使得可以将体中的数据保存在非 CPU 设备(如 GPU)上,而无需昂贵的设备到主机复制。
允许纯粹使用 Flight RPC 进行控制流,方法是将 IPC 元数据流与 IPC 体字节分离
定义#
- IPC 元数据
构成 Arrow IPC 消息头部的 Flatbuffers 消息字节
- 标签
一个小端序的
uint64
值,用于流控制和确定如何解释消息体。可以屏蔽特定位,仅根据标签的一部分来识别消息,剩余的位可用于控制流或其他消息元数据。某些传输(如 UCX)内置支持此类标签值,并且无论消息体是否驻留在非 CPU 设备上,都会在 CPU 内存中提供这些值。- 序列号
一个小端序的 4 字节无符号整数,流的起始值为 0,表示消息的顺序。它还用于标识特定消息,将 IPC 元数据头部与其对应的体关联起来,因为元数据和体可以通过单独的管道/流/传输发送。
如果序列号达到
UINT32_MAX
,应允许其回绕,因为不太可能有足够的待处理消息等待处理而导致序列号重叠。序列号有两个作用:识别对应的元数据和带标签的体数据消息,并确保我们不依赖消息必须按顺序到达。客户端应使用序列号对到达的消息进行正确排序以便处理。
协议#
利用 libcudf 和 UCX 的参考示例实现可以在 arrow-experiments 仓库中找到。
要求#
实现此协议的传输层**必须**提供两项功能
消息发送
分隔消息(如 gRPC),而不是非分隔流(如没有进一步成帧的纯 TCP)。
或者,可以使用 IPC 协议的封装消息格式等成帧机制,同时省略体字节。
带标签的消息发送
发送带有附加的小端序、无符号 64 位整型标签的消息,用于控制流。这样的标签允许控制流在消息体位于非 CPU 设备上的情况下进行操作,而无需将消息本身从设备上复制下来。
URI 规范#
当向消费者提供用于联系本协议的 URI 时(例如通过 Flight 的 Location URI),URI 应指定易于识别的方案,如 ucx: 或 fabric:。此外,URI 应编码以下 URI 查询参数
注意
随着该协议的成熟,本文档将更新,包含与其一同使用的常用传输方案。
want_data
- **必需** - uint64 整数值该值应用于标记发送给服务器的初始消息,以启动数据传输。启动消息的体应是所请求数据流的不透明二进制标识符(类似于 Flight RPC 协议中的
Ticket
)
free_data
- **可选** - uint64 整数值如果服务器可能使用偏移/地址发送消息以进行远程内存访问或共享内存位置,则 URI 应包含此参数。该值用于标记从客户端发送到数据服务器的消息,其中包含客户端不再需要的特定偏移/地址(即,直接引用这些内存位置的任何操作,例如将远程数据复制到本地内存,已完成)。
remote_handle
- **可选** - base64 编码字符串当使用共享内存或远程内存时,该值指示访问内存所需的任何句柄或标识符。
使用 UCX,这将是 *rkey* 值
使用 CUDA IPC,这将是 GPU 基本指针或内存句柄的值,后续地址将是相对于该基本指针的偏移量。
背压处理#
*目前*,本提案未指定任何方法来管理消息的背压,以出于内存和带宽原因进行节流。目前,这将是**传输层定义**的,而不是锁定到次优方案。
随着不同传输和库的使用增加,将出现常见模式,从而提供一种通用且高效的方式来处理跨不同用例的背压。
注意
虽然协议本身是传输无关的,但目前的用法和示例仅使用 UCX 和 libfabric 传输进行了测试,仅此而已。
协议描述#
可能发生两种情况
元数据流和体数据流通过单独的连接发送
元数据流和体数据流通过同一连接同时发送
服务器顺序#
可以有一个服务器同时处理 IPC 元数据流和体数据流,也可以有单独的服务器处理 IPC 元数据和体数据。这允许根据需要通过单个传输管道或两个管道传输数据。
元数据流顺序#
服务器的常态是等待带有特定 <want_data>
标签值的**带标签**消息,以启动传输。该 <want_data>
值由服务器定义,并通过提供给客户端的 URI 传播。本协议不规定任何特定值,以免干扰依赖标签值的任何其他现有协议。该消息的体将包含一个不透明的二进制标识符,用于指示要发送的特定数据集/数据流。
注意
例如,与 FlightInfo 消息一起传递的 **ticket** 将是此消息的体。由于它是不透明的,服务器可以使用任何内容。URI 和标识符不需要通过 Flight RPC 提供给客户端,而是可以通过任何所需的传输或协议传递。
收到 <want_data>
请求后,服务器*应该*通过发送包含以下内容的流消息进行响应
一个 5 字节前缀
消息的第一个字节指示消息类型,目前只允许两种消息类型(将来可能会添加更多类型)
流结束
Flatbuffers IPC 元数据消息
接下来的 4 个字节是一个小端序的无符号 32 位整数,指示消息的序列号。流中的第一条消息(**必须**始终是 Schema 消息)的序列号**必须**为
0
。每个后续消息**必须**将序列号增加1
。
Arrow IPC 头部的完整 Flatbuffers 字节
如 Arrow IPC 格式中所定义,每个元数据消息可以表示供数据流使用的数据块或字典。
发送最后一个元数据消息后,服务器**必须**通过发送一个由**恰好** 5 个字节组成的消息来指示流的结束
第一个字节是
0
,表示**流结束**消息最后 4 个字节是序列号(4 字节,无符号整数,小端字节序)
数据流顺序#
如果单个服务器同时处理数据流和元数据流,则数据消息**应该**与元数据消息并行开始发送到客户端。否则,与元数据顺序一样,服务器的常态是等待带有 <want_data>
标签值的**带标签**消息,其体指示要发送给客户端的数据集/数据流。
对于数据流中的每条 IPC 消息,如果该消息有体(即 Record Batch 或 Dictionary 消息),则**必须**在数据流上发送一个**带标签**的消息。每条消息的标签结构应如下:
标签的*最低有效* 4 个字节(位 0 - 31)应该是消息的无符号 32 位、小端序序列号。
标签的*最高有效*字节(位 56 - 63)指示消息体的**类型**,为一个 8 位无符号整数。目前只指定了两种消息类型,但可以根据需要添加更多类型来扩展协议
体包含原始体缓冲区字节,作为一个打包的缓冲区(即标准的 IPC 格式体字节)
体包含一系列无符号、小端序的 64 位整数对,用于表示共享或远程内存,其结构示意如下
前两个整数(例如前 16 个字节)代表所有缓冲区的*总*大小(以字节为单位)以及此消息中的缓冲区数量(因此也是后面
uint64
对的数量)每个后续的
uint64
对是一个地址/偏移量,后跟该特定缓冲区的长度。
标签的所有未指定位(位 32 - 55)均**保留**供本协议的潜在更新将来使用。目前它们**必须**为 0。
注意
任何跨传输发送的共享/远程内存地址**必须**由服务器保持存活,直到收到相应的带标签的 <free_data>
消息。如果客户端在发送任何 <free_data>
消息之前断开连接,则如果服务器希望清理内存,可以假定是安全的。
发送最后一个带标签的 IPC 体消息后,服务器应保持连接并等待带标签的 <free_data>
消息。这些 <free_data>
消息的结构很简单:一个或多个无符号、小端序的 64 位整数,指示可以释放的地址/偏移量。
一旦没有待释放的地址,此流的工作就完成了。
客户端顺序#
本协议的客户端需要同时处理数据流和元数据流的消息,这两个流可能来自同一服务器或不同的服务器。下面是流程图,展示客户端如何处理元数据流和数据流
首先,客户端发送一个带标签的消息,使用 URI 中提供的
<want_data>
值作为标签,不透明 ID 作为体。如果元数据和数据服务器是分开的,则需要分别向每个服务器发送
<want_data>
消息。在任一场景下,元数据流和数据流都可以根据传输层的性质并发和/或异步处理。
对于客户端在元数据流中接收到的每条**不带标签**的消息
消息的第一个字节指示它是*流结束*消息(值为
0
)还是元数据消息(值为1
)。接下来的 4 个字节是消息的序列号,一个无符号 32 位整数,小端字节序。
如果**不是***流结束*消息,剩余字节是 IPC Flatbuffer 字节,可以正常解析。
如果消息有体(即 Record Batch 或 Dictionary 消息),则客户端应使用相同的序列号从数据流中检索相应的带标签消息。
如果**是***流结束*消息,并且接收到的序列号没有中断,则可以安全地关闭元数据连接。
当收到需要体的元数据消息时,**应该**使用标签掩码
0x00000000FFFFFFFF
和序列号一起匹配消息,而不管高位字节(例如,我们只关心将低 4 个字节与序列号匹配)收到后,最高有效字节的值决定客户端如何处理体数据
如果最高有效字节是 0:则消息的体是原始 IPC 打包体缓冲区,可以轻松地与相应的元数据头部字节一起处理。
如果最高有效字节是 1:则消息的体将由一系列无符号、小端序的 64 位整数对组成。
前两个整数代表 *1)* 所有体缓冲区的总大小,以便在需要中间缓冲区时易于分配,以及 *2)* 正在发送的缓冲区数量(
nbuf
)。消息的其余部分将是
nbuf
对整数,每对对应一个缓冲区。每对是 *1)* 缓冲区的地址/偏移量和 *2)* 该缓冲区的长度。然后可以根据底层传输通过共享或远程内存例程检索内存。这些地址/偏移量**必须**保留,以便稍后可以在<free_data>
消息中发送回去,告知服务器客户端不再需要共享内存。
收到*流结束*消息后,客户端应处理任何剩余的未处理 IPC 元数据消息。
在远程服务器可以释放单个内存地址/偏移量后(如果它发送的是这些而不是完整的体字节),客户端应向服务器发送相应的
<free_data>
消息。单个
<free_data>
消息包含任意数量的无符号 64 位整数值,表示可以释放的地址/偏移量。将其设为*任意数量*的原因是允许客户端选择是发送多个消息来释放多个地址,还是将多个地址合并到较少的消息中进行释放(从而根据需要使协议不那么“冗长”)
持续开发#
如果您决定在自己的环境和系统中使用此协议,我们非常欢迎反馈,并乐于了解您的用例。由于这目前是一个**实验性**协议,我们需要实际使用情况,以便促进其改进并找到适合跨传输进行标准化的通用性。
请使用 Arrow 开发者邮件列表参与讨论:https://arrow.apache.org/community/#mailing-lists