我们 F5 在 Apache Arrow 上的历程(第 2 部分):自适应模式和排序以优化 Arrow 的使用


已发布 2023 年 6 月 26 日
作者 Laurent Quérel

在上一篇文章中,我们讨论了在 OpenTelemetry 项目中使用 Apache Arrow 的情况。我们研究了各种技术来最大限度地提高 Apache Arrow 的效率,旨在在数据压缩率和可查询性之间找到最佳平衡。压缩结果不言自明,比原始 OTLP 协议的改进幅度从 1.5 倍到 5 倍不等。在本文中,我们将深入探讨三种技术,这些技术使我们能够提高当前版本的 OTel Arrow 协议中 Apache Arrow 缓冲区的压缩率和内存使用情况。

我们将讨论的第一种技术旨在优化内存使用方面的模式。正如您将看到的,收益可能是可观的,在某些情况下甚至可以将内存使用量减少一半。第二部分将更深入地探讨可用于处理递归模式定义的方法。最后,我们将强调,您的模式的设计,以及您可以在记录级别应用的排序,在最大化 Apache Arrow 及其列式表示的好处方面起着关键作用。

处理动态和未知的数据分布

在某些情况下,Arrow 模式的全面定义可能会变得过于宽泛和复杂,以涵盖您打算以列式形式表示的所有可能情况。然而,与复杂模式一样,此模式的很大一部分实际上只会被特定部署使用。同样,并非总能提前确定一个或多个字段的最佳字典编码。采用涵盖所有情况的广泛且通用的模式通常更占用内存。这是因为,对于大多数实现来说,一个没有值的列仍然会消耗内存空间。同样,一个基于 uint64 进行索引的字典编码列将比基于 uint8 的字典编码消耗四倍的内存。

为了更具体地说明这一点,让我们考虑一个位于生产环境输出处的 OTel 收集器,它接收由一组庞大且动态的服务器产生的遥测数据流。该遥测流的内容将不可避免地随时间在数量和性质上发生变化。在这种情况下,预测最佳模式是具有挑战性的,并且同样难以提前知道通过该点的遥测数据的特定属性的分布情况。

为了优化此类场景,我们采用了一种中间方法,我们称之为**动态 Arrow 模式**,旨在根据观察到的数据逐渐调整模式。基本原理相对简单。我们从一个定义了应表示的最大范围的通用模式开始。该模式的一些字段将被声明为可选,而其他字段将根据观察到的分布,用多种可能的选项进行编码。理论上,此原理可应用于其他类型的转换(例如,递归列创建),但我们将把这些其他选项留给您的想象力去探索。因此,如果您遇到某些字段未被使用的、某些联合体变体未被使用的,以及/或者某个字段的值分布无法先验确定的数据流,那么投入时间来实现此模型可能是值得的。这可以提高压缩率、内存使用和处理速度方面的效率。

以下 Go Arrow 模式定义提供了一个此类模式的示例,其中包含一组注释。这些注释将被增强的 Record Builder 处理,该构建器具备动态调整模式的能力。该系统的结构如图 1 所示。

var (
  // Arrow schema for the OTLP Arrow Traces record (without attributes, links, and events).
  TracesSchema = arrow.NewSchema([]arrow.Field{
      // Nullabe:true means the field is optional, in this case of 16 bit unsigned integers
      {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
      {Name: constants.Resource, Type: arrow.StructOf([]arrow.Field{
        {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
        // --- Use dictionary with 8 bit integers initially ----
        {Name: constants.SchemaUrl,Type: arrow.BinaryTypes.String,Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
        {Name: constants.DroppedAttributesCount,Type: arrow.PrimitiveTypes.Uint32,Nullable: true},
      }...), Nullable: true},
      {Name: constants.Scope, Type: arrow.StructOf([]arrow.Field{
          {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Metadata: acommon.Metadata(acommon.DeltaEncoding), Nullable: true},
          // --- Use dictionary with 8 bit integers initially ----
          {Name: constants.Name, Type: arrow.BinaryTypes.String, Metadata: acommon.Metadata(acommon.Dictionary8), Nullable: true},
          {Name: constants.Version, Type: arrow.BinaryTypes.String, Metadata: acommon.Metadata(acommon.Dictionary8), Nullable: true},
          {Name: constants.DroppedAttributesCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
      }...), Nullable: true},
      {Name: constants.SchemaUrl, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
      {Name: constants.StartTimeUnixNano, Type: arrow.FixedWidthTypes.Timestamp_ns},
      {Name: constants.DurationTimeUnixNano, Type: arrow.FixedWidthTypes.Duration_ms, Metadata: schema.Metadata(schema.Dictionary8)},
      {Name: constants.TraceId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 16}},
      {Name: constants.SpanId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 8}},
      {Name: constants.TraceState, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
      {Name: constants.ParentSpanId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 8}, Nullable: true},
      {Name: constants.Name, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8)},
      {Name: constants.KIND, Type: arrow.PrimitiveTypes.Int32, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
      {Name: constants.DroppedAttributesCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
      {Name: constants.DroppedEventsCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
      {Name: constants.DroppedLinksCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
      {Name: constants.Status, Type: arrow.StructOf([]arrow.Field{
        {Name: constants.StatusCode, Type: arrow.PrimitiveTypes.Int32, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
        {Name: constants.StatusMessage, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
      }...), Nullable: true},
    }, nil)
  )

在此示例中,Arrow 字段级别的元数据用于指定字段是否为可选(Nullable: true)或指定适用于特定字段的最小字典编码(Metadata Dictionary8/16/…)。现在,让我们想象一个场景,在该场景中,仅使用了少数几个字段,并且大多数字典编码字段的基数较低(即低于 $2^8$)。理想情况下,我们希望系统能够动态地构建以下简化的模式,该模式本质上是原始模式的严格子集。

var (
  // Simplified schema definition generated by the Arrow Record encoder based on
  // the data observed.
  TracesSchema = arrow.NewSchema([]arrow.Field{
    {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
    {Name: constants.StartTimeUnixNano, Type: arrow.FixedWidthTypes.Timestamp_ns},
    {Name: constants.TraceId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 16}},
    {Name: constants.SpanId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 8}},
    {Name: constants.Name, Type: &arrow.DictionaryType {
      IndexType: arrow.PrimitiveTypes.Uint8,
      ValueType: arrow.BinaryTypes.String}},
    {Name: constants.KIND, Type: &arrow.DictionaryType {
      IndexType: arrow.PrimitiveTypes.Uint8,
      ValueType: arrow.PrimitiveTypes.Int32,
    }, Nullable: true},
  }, nil)
)

此外,我们希望系统能够在未来批次中遇到新字段或现有字段的基数超过当前字典定义大小时,能够自动调整上述模式。在极端情况下,如果特定字段的基数超过某个阈值,我们希望系统能够自动恢复到非字典表示(字典溢出机制)。这正是我们将在本节剩余部分详细介绍的内容。

用于实现此方法的不同组件和事件的概述如图 1 所示。

Fig 1: Adaptive Arrow schema architecture overview.
图 1:自适应 Arrow 模式架构概述。

整体自适应 Arrow 模式组件接收分割成批次的数据流,并产生一个或多个 Arrow 记录流(每个流一个模式)。这些记录中的每一个都由一个 Arrow 模式定义,该模式基于带注释的 Arrow 模式和传入数据中字段的形状。

更具体地说,自适应 Arrow 模式组件的过程包括四个主要阶段

初始化阶段

在初始化阶段,Arrow 记录编码器读取带注释的 Arrow 模式(即参考模式)并生成一组转换。当将这些转换应用于参考模式时,它们会产生第一个最小的 Arrow 模式,该模式符合这些注释所描述的约束。在初始迭代中,所有可选字段都被消除,所有字典编码字段都被配置为使用注释中定义的最短编码(在前面的示例中仅为 Dictionary8)。这些转换形成一个树,反映了参考模式的结构。

进料阶段

初始化之后是进料阶段。在这里,Arrow 记录编码器扫描批次,并尝试将所有字段存储在由上一步创建的模式定义的 Arrow 记录构建器中。如果数据中存在但未包含在模式中的字段,编码器将触发 missing field(缺少字段)事件。此过程将持续进行,直到当前批次完全处理完毕。Arrow 记录构建器中对所有字典编码字段进行额外的内部检查,以确保没有字典溢出(即唯一的条目数超过索引允许的基数)。如果检测到这种情况,则会生成 Dictionary overflow(字典溢出)事件。因此,到最后,所有未知字段和字典溢出都将被检测到,或者,如果数据与模式完全对齐,则不会出现任何差异。

更正阶段

如果在上一个阶段生成了至少一个事件,将启动更正阶段以修复模式。这个可选阶段会考虑上一个阶段生成的所有事件,并相应地调整转换树以与观察到的数据对齐。missing field 事件将删除相应字段的 NoField 转换。dictionary overflow 事件将修改字典转换以反映事件(例如,将索引类型从 uint8 更改为 uint16,或者如果达到了最大索引大小,转换将删除字典编码并恢复到原始的非字典编码类型)。然后,更新后的转换树用于创建新的模式和新的 Arrow 记录构建器。然后,此记录构建器用于使用未正确处理的批次重新播放前面的进料阶段。

路由阶段

一旦记录构建器被正确进料,就会创建一个 Arrow 记录,系统进入路由阶段。路由器组件计算记录的模式签名,并利用此签名将记录路由到与签名兼容的现有 Arrow 流,或者在没有匹配项时启动一个新流。

这个四阶段的过程应该逐渐调整和稳定模式,使其结构和定义针对特定数据流进行了优化。未使用的字段永远不会不必要地消耗内存。字典编码字段将根据观察到的数据基数定义为最合适的索引大小,并且基数超过某个阈值(由配置定义)的字段将自动恢复到其非字典编码版本。

要有效地执行此方法,您必须确保接收端具有足够的灵活性。至关重要的是,即使模式中缺少某些字段或使用了各种字典索引配置,您的下游管道仍然可以正常工作。虽然这在不实施额外的接收端转换的情况下并不总是可行的,但在某些情况下是值得的。

以下结果突出了通过应用各种优化技术实现的显著内存使用减少。这些结果是使用与前面介绍的模式类似 ​​的模式收集的。巨大的内存效率凸显了该方法的有效性。

Fig 2: Comparative analysis of memory usage for different schema optimizations.
图 2:不同模式优化内存使用情况的比较分析。

转换树的概念使得能够基于从数据中获得的知识来执行各种模式优化的通用方法。这种架构非常灵活;当前实现允许删除未使用的字段、应用最特定的字典编码以及优化联合体类型变体。将来,有可能引入可以表示为对初始模式进行转换的额外优化。此方法的实现在此处可用。

处理递归模式定义

Apache Arrow 不支持递归模式定义,这意味着具有可变深度的嵌套结构无法直接表示。图 3 举例说明了这种递归定义,其中属性的值可以是简单数据类型、值列表或值的映射。此定义的深度无法预先确定。

Fig 3: Recursive definition of OTel attributes.
图 3:OTel 属性的递归定义。

可以使用多种策略来规避此限制。从技术上讲,我们提出的动态模式概念可以扩展为动态更新模式以包含任何缺失的递归级别。然而,对于此用例,此方法很复杂,并且有一个明显的缺点,即无法保证模式的最大大小。这种缺乏约束可能会带来安全问题;因此,不详细介绍此方法。

第二种方法是使用支持递归模式定义序列化格式来打破递归。该序列化的结果可以作为二进制类型列集成到 Arrow 记录中,从而在特定级别上停止递归。为了充分利用列式表示的优势,至关重要的是尽可能深入地将这种特设序列化应用于数据结构中。在 OpenTelemetry 的上下文中,这在属性级别执行——更具体地说,在属性的第二级别执行。

可以使用各种序列化格式(如 protobuf 或 CBOR)来编码递归数据。如果没有特殊的处理,现有的 Arrow 查询引擎可能难以查询这些二进制列。因此,仔细确定何时以及何地应用此类技术至关重要。虽然我没有意识到 Arrow 系统内有任何解决此限制的尝试,但这似乎并非不可克服,将是一个有价值的扩展。这将有助于减少将 Arrow 与依赖此类递归定义的其他系统集成的复杂性。

排序的重要性

在我们先前的文章中,我们探讨了表示分层数据模型的各种策略,包括基于 struct/list/map/union 的嵌套结构、反规范化和展平表示,以及多记录方法。每种方法都有其独特的优点和缺点。然而,在最后一部分中,我们将深入探讨多记录方法,特别关注它提供通用排序选项的能力,以及这些选项如何有助于提高压缩率。

在 OTel Arrow 协议中,我们利用多记录方法来表示指标、日志和跟踪。以下实体关系图提供了各种记录模式的简化版本,并说明了它们之间的关系,特别是用于表示量规 (gauges) 和总和 (sums) 的关系。有关 OpenTelemetry 中使用的 Arrow 数据模型的完整描述,请此处访问。

这些 Arrow 记录,也称为表,形成一个以 METRICS 为主要入口点的层次结构。每个表都可以根据一个或多个列独立排序。这种排序策略有利于分组重复数据,从而提高压缩率。

Fig 4: A simplified entity-relationship diagram representing OTel sum & gauge metrics.
图 4:代表 OTel 总和和量规指标的简化实体关系图。

METRICS 表与次要 RESOURCE_ATTRSSCOPE_ATTRSNUMBER_DATA_POINTS 表之间的关系是通过主表中的唯一 id 和每个次要表中的 parent_id 列建立的。这个 {id,parent_id} 对代表一种开销,在压缩后应尽可能减少。

为实现此目的,不同表的排序过程遵循层次结构,从主表到底层。对主表进行排序(按一个或多个列),然后为每一行分配一个增量 id。该数字 id 使用 delta 编码进行存储,delta 编码基于 Arrow 实现。

直接连接到主表的次要表使用相同的原理进行排序,但 parent_id 列始终用作排序语句中的最后一列。在排序语句中包含 parent_id 列可以利用 delta 编码的变体。此方法的效率如下表所示。

Fig 5: Comparative analysis of compression ratios - OTLP Protocol vs. Two variations of the OTel Arrow Protocol with multivariate metrics stream. (lower percentage is better)
图 5:压缩率的比较分析 - OTLP 协议与 OTel Arrow 协议的两种变体与多变量指标流。(百分比越低越好)

第二列显示了不同大小批次的 OTLP 批次在 ZSTD 压缩前后的平均大小。此列作为后两列的参考点。第三列显示了未应用任何排序的 OTel Arrow 协议的结果,而最后一列显示了启用排序的 OTel Arrow 协议的结果。

在压缩之前,两种 OTel Arrow 配置的平均批次大小可以预见地相似。然而,在压缩之后,对每个单独的表进行排序对压缩率的好处立即显现出来。如果没有排序,OTel Arrow 协议的压缩率比参考值好 1.40 到 1.67 倍。启用排序后,OTel Arrow 协议的性能比参考值高出 4.94 到 7.21 倍!

压缩方面的收益显然取决于您的数据以及数据批次中信息的冗余程度。根据我们的观察,选择良好的排序通常可以将压缩率提高 1.5 到 8 倍。

将复杂模式分解为多个更简单的模式以增强排序功能,以及采用有针对性的方法来有效编码表示关系的标识符,成为提高整体数据压缩的有效策略。此方法还消除了复杂的 Arrow 数据类型,如列表、映射和联合体。因此,它不仅提高了,而且简化了数据查询能力。这种简化对可能难以操作复杂模式的现有查询引擎来说是有益的。

结论和后续步骤

本文结束了我们关于 Apache Arrow 的两部分系列文章,在其中我们探讨了在特定情况下最大化 Apache Arrow 实用性的各种策略。本系列第二部分介绍的自适应模式架构为未来的优化可能性铺平了道路。我们期待看到社区在此贡献的基础上能增加些什么。

Apache Arrow 是一个杰出的项目,并不断得到一个蓬勃发展的生态系统的增强。然而,在我们的探索过程中,我们注意到某些差距或摩擦点,如果得到解决,可以极大地丰富整体体验。

  • 设计高效的 Arrow 模式在某些情况下可能具有挑战性。能够在记录级别**收集统计数据**可以帮助进行此设计阶段(每个字段的数据分布、字典统计数据、压缩前/后的 Arrow 数组大小等)。这些统计数据还将有助于确定最有效的列来作为记录排序的基础。
  • **对递归模式的本机支持**也将通过简化 Arrow 在复杂场景中的使用来增加其采用率。虽然我没有意识到 Arrow 系统内有任何解决此限制的尝试,但这似乎并非不可克服,将是一个有价值的扩展。这将有助于减少将 Arrow 与依赖此类递归定义的其他系统集成的复杂性。
  • **统一数据类型的支持以及 IPC 流功能**也将是一大福音。主要的客户端库支持嵌套和分层模式,但由于生态系统其余部分缺乏完全支持,其实用性受到限制。例如,列表和/或联合体类型并未得到查询引擎或 Parquet 桥的充分支持。此外,不同实现之间 IPC 流中高级字典支持不一致(即,并非所有实现都支持增量字典和替换字典)。
  • 通过在本介绍的动态模式概念的本机集成,可以在内存消耗和压缩率方面**优化复杂模式的支持**。
  • **检测字典溢出**(索引级别)不是可以即时测试的事情。API 可以改进,以便在发生插入时立即指示此溢出。

我们利用 Apache Arrow 结合 OpenTelemetry 的努力取得了令人鼓舞的结果。虽然这需要大量的开发、探索和基准测试投入,但我们希望这些文章能帮助加速您使用 Apache Arrow 的历程。展望未来,我们设想与 Apache Arrow 进行端到端集成,并计划显著扩展我们对 Arrow 生态系统的使用。这种扩展涉及提供与 Parquet 的桥接,并与 DataFusion 等查询引擎集成,目标是在收集器内处理遥测流。