F5 在 Apache Arrow 上的探索之旅(第二部分):自适应模式与排序以优化 Arrow 的使用


发布日期 2023 年 6 月 26 日
作者 Laurent Quérel

在上一篇文章中,我们讨论了在 OpenTelemetry 项目中使用 Apache Arrow 的情况。我们研究了各种技术来最大化 Apache Arrow 的效率,旨在找到数据压缩率和可查询性之间的最佳平衡。压缩结果不言自明,比原始 OTLP 协议提高了 1.5 倍到 5 倍。在本文中,我们将深入探讨三种技术,这些技术使我们能够提高当前版本的 OTel Arrow 协议中 Apache Arrow 缓冲区的压缩率和内存使用。

我们将讨论的第一种技术旨在优化模式的内存使用。正如您将看到的,收益可能非常可观,在某些情况下可能会将内存使用量减半。第二部分将更深入地探讨可用于处理递归模式定义的各种方法。最后,我们将强调您的模式设计以及您可以在记录级别应用的排序对于最大化 Apache Arrow 及其列式表示的优势起着关键作用。

处理动态和未知数据分布

在某些情况下,Arrow 模式的全面定义可能过于宽泛和复杂,以至于无法涵盖您打算以列式形式表示的所有可能情况。然而,通常情况下,对于复杂模式,只有该模式的一个子集实际上会用于特定部署。同样,也并非总是能够提前确定一个或多个字段的最佳字典编码。采用一个涵盖所有情况的宽泛且非常通用的模式通常会占用更多内存。这是因为,对于大多数实现而言,没有值的列仍然会继续占用内存空间。同样,索引 uint64 的字典编码列将比基于 uint8 的字典编码的相同列占用多四倍的内存。

为了更具体地说明这一点,让我们考虑一个位于生产环境输出端的 OTel 收集器,它接收由大量动态服务器产生的遥测数据流。遥测流的内容不可避免地会随着时间的推移在容量和性质上发生变化。在这种情况下,预测最佳模式具有挑战性,并且也很难提前知道通过该点的遥测数据的特定属性的分布。

为了优化此类场景,我们采用了一种中间方法,我们将其命名为动态 Arrow 模式,旨在根据观察到的数据逐步调整模式。其一般原理相对简单。我们从一个通用模式开始,定义应表示的最大范围。该模式的一些字段将声明为可选,而其他字段将根据观察到的分布以多种可能的选项进行编码。理论上,此原理可以应用于其他类型的转换(例如,递归列创建),但我们将让您的想象力探索这些其他选项。因此,如果您遇到某些字段未被利用、某些联合变体未被使用以及/或某个字段的值分布无法先验确定的数据流,那么投入时间实现此模型可能是有益的。这可以提高压缩率、内存使用和处理速度方面的效率。

以下 Go Arrow 模式定义提供了一个此类模式的示例,并附带了一组注解。这些注解将由一个增强的记录构建器处理,该构建器具有动态调整模式的能力。此系统的结构如图 1 所示。

var (
  // Arrow schema for the OTLP Arrow Traces record (without attributes, links, and events).
  TracesSchema = arrow.NewSchema([]arrow.Field{
      // Nullabe:true means the field is optional, in this case of 16 bit unsigned integers
      {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
      {Name: constants.Resource, Type: arrow.StructOf([]arrow.Field{
        {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
        // --- Use dictionary with 8 bit integers initially ----
        {Name: constants.SchemaUrl,Type: arrow.BinaryTypes.String,Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
        {Name: constants.DroppedAttributesCount,Type: arrow.PrimitiveTypes.Uint32,Nullable: true},
      }...), Nullable: true},
      {Name: constants.Scope, Type: arrow.StructOf([]arrow.Field{
          {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Metadata: acommon.Metadata(acommon.DeltaEncoding), Nullable: true},
          // --- Use dictionary with 8 bit integers initially ----
          {Name: constants.Name, Type: arrow.BinaryTypes.String, Metadata: acommon.Metadata(acommon.Dictionary8), Nullable: true},
          {Name: constants.Version, Type: arrow.BinaryTypes.String, Metadata: acommon.Metadata(acommon.Dictionary8), Nullable: true},
          {Name: constants.DroppedAttributesCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
      }...), Nullable: true},
      {Name: constants.SchemaUrl, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
      {Name: constants.StartTimeUnixNano, Type: arrow.FixedWidthTypes.Timestamp_ns},
      {Name: constants.DurationTimeUnixNano, Type: arrow.FixedWidthTypes.Duration_ms, Metadata: schema.Metadata(schema.Dictionary8)},
      {Name: constants.TraceId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 16}},
      {Name: constants.SpanId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 8}},
      {Name: constants.TraceState, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
      {Name: constants.ParentSpanId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 8}, Nullable: true},
      {Name: constants.Name, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8)},
      {Name: constants.KIND, Type: arrow.PrimitiveTypes.Int32, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
      {Name: constants.DroppedAttributesCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
      {Name: constants.DroppedEventsCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
      {Name: constants.DroppedLinksCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
      {Name: constants.Status, Type: arrow.StructOf([]arrow.Field{
        {Name: constants.StatusCode, Type: arrow.PrimitiveTypes.Int32, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
        {Name: constants.StatusMessage, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
      }...), Nullable: true},
    }, nil)
  )

在此示例中,Arrow 字段级元数据用于指定字段何时可选 (Nullable: true) 或指定适用于特定字段的最小字典编码 (Metadata Dictionary8/16/...)。现在让我们设想一个场景,在一个简单的场景中使用此模式,其中只有少数字段实际在使用,并且大多数字典编码字段的基数很低(即,低于 2^8)。理想情况下,我们希望有一个系统能够动态构建以下简化模式,该模式本质上是原始模式的严格子集。

var (
  // Simplified schema definition generated by the Arrow Record encoder based on
  // the data observed.
  TracesSchema = arrow.NewSchema([]arrow.Field{
    {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
    {Name: constants.StartTimeUnixNano, Type: arrow.FixedWidthTypes.Timestamp_ns},
    {Name: constants.TraceId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 16}},
    {Name: constants.SpanId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 8}},
    {Name: constants.Name, Type: &arrow.DictionaryType {
      IndexType: arrow.PrimitiveTypes.Uint8,
      ValueType: arrow.BinaryTypes.String}},
    {Name: constants.KIND, Type: &arrow.DictionaryType {
      IndexType: arrow.PrimitiveTypes.Uint8,
      ValueType: arrow.PrimitiveTypes.Int32,
    }, Nullable: true},
  }, nil)
)

此外,我们希望有一个系统能够自动调整上述模式,如果它在未来的批次中遇到新字段或现有字段的基数超过当前字典定义的范围。在极端情况下,如果特定字段的基数超过某个阈值,我们希望系统自动恢复到非字典表示(字典溢出机制)。这正是我们将在本节的其余部分中详细阐述的内容。

图1描述了实现此方法的不同组件和事件的概述。

Fig 1: Adaptive Arrow schema architecture overview.
图 1:自适应 Arrow 模式架构概览。

整体自适应 Arrow 模式组件接收一个按批次分段的数据流,并生成一个或多个 Arrow 记录流(每个流一个模式)。这些记录中的每一个都使用 Arrow 模式定义,该模式基于带注解的 Arrow 模式和传入数据中观察到的字段形状。

更具体地说,自适应 Arrow 模式组件的过程包含四个主要阶段:

初始化阶段

在初始化阶段,Arrow 记录编码器读取带注解的 Arrow 模式(即参考模式),并生成一组转换。当这些转换应用于参考模式时,它们会产生第一个符合这些注解所描绘约束的最小 Arrow 模式。在此初始迭代中,所有可选字段都被消除,并且所有字典编码字段都被配置为使用注解定义的最小编码(在之前的示例中仅为 Dictionary8)。这些转换形成一棵树,反映了参考模式的结构。

进给阶段

初始化之后是进给阶段。在此阶段,Arrow 记录编码器扫描批次并尝试将所有字段存储在 Arrow 记录构建器中,该构建器由前一步中创建的模式定义。如果数据中存在字段但未包含在模式中,编码器将触发“缺失字段”事件。此过程一直持续到当前批次完全处理完毕。对 Arrow 记录构建器中所有字典编码字段进行额外的内部检查,以确保没有字典溢出(即,唯一条目数多于索引基数允许的数量)。如果检测到此类情况,将生成“字典溢出”事件。因此,最终,所有未知字段和字典溢出都将被检测到,或者,如果数据与模式完美匹配,则不会出现任何差异。

纠正阶段

如果至少生成了一个事件,将启动纠正阶段以修复模式。此可选阶段会考虑上一步中生成的所有事件,并相应地调整转换树,使其与观察到的数据对齐。“缺失字段”事件将删除相应字段的 NoField 转换。“字典溢出”事件将修改字典转换以反映该事件(例如,将索引类型从 uint8 更改为 uint16,或者如果已达到最大索引大小,则转换将删除字典编码并恢复到原始的非字典编码类型)。更新后的转换树随后用于创建新模式和新的 Arrow Record Builder。然后,此 Record Builder 用于使用未正确处理的批次重新执行之前的馈送阶段。

路由阶段

一旦记录构建器被正确填充,就会创建一个 Arrow 记录,系统进入路由阶段。路由器组件计算记录的模式签名,并利用此签名将记录路由到与签名兼容的现有 Arrow 流,或者如果没有匹配项,则启动一个新的流。

这个四阶段过程应该会逐步调整并稳定模式,使其结构和定义针对特定数据流进行优化。未使用的字段永远不会不必要地占用内存。字典编码字段将根据观察到的数据基数以最优索引大小定义,并且基数超过特定阈值(由配置定义)的字段将自动恢复为其非字典编码版本。

要有效地执行此方法,您必须确保接收方具有足够的灵活性。关键是您的下游管道即使在模式中缺少某些字段或使用各种字典索引配置时也能正常运行。虽然这并非总能在不实施额外接收转换的情况下实现,但在某些场景中证明是值得的。

以下结果突显了通过应用各种优化技术所实现的显着内存使用量减少。这些结果是使用与前面介绍的模式类似的模式收集的。显着的内存效率凸显了这种方法的有效性。

Fig 2: Comparative analysis of memory usage for different schema optimizations.
图 2:不同模式优化内存使用情况的比较分析。

转换树的概念使得能够基于从数据中获得的知识执行各种类型的模式优化的通用方法。此架构高度灵活;当前的实现允许删除未使用的字段、应用最具体的字典编码以及优化联合类型变体。未来,有可能引入可以表示为初始模式上的转换的其他优化。此方法的实现可在此处获取。

处理递归模式定义

Apache Arrow 不支持递归模式定义,这意味着无法直接表示具有可变深度的数据结构。图 3 示例了这样一个递归定义,其中属性的值可以是简单数据类型、值列表或值映射。此定义的深度无法预先确定。

Fig 3: Recursive definition of OTel attributes.
图 3:OTel 属性的递归定义。

可以采用几种策略来规避此限制。从技术上讲,我们提出的动态模式概念可以扩展为动态更新模式,以包含任何缺失的递归级别。然而,对于此用例,此方法很复杂,并且有一个显着的缺点,即不提供任何关于模式最大大小的保证。这种缺乏约束会带来安全问题;因此,不对此方法进行详细阐述。

第二种方法是通过采用支持递归模式定义的序列化格式来打破递归。然后可以将此序列化的结果作为二进制类型列集成到 Arrow 记录中,从而在特定级别有效地停止递归。为了充分利用列式表示的优势,至关重要的是在数据结构中尽可能深地应用这种特设序列化。在 OpenTelemetry 的上下文中,这是在属性级别执行的——更具体地说,是在属性的第二级别。

各种序列化格式,如 protobuf 或 CBOR,可用于编码递归数据。未经特殊处理,这些二进制列可能不易被现有 Arrow 查询引擎查询。因此,仔细确定何时何地应用这种技术至关重要。虽然我不知道 Arrow 系统内有任何尝试解决此限制的尝试,但它似乎并非不可克服,并且将构成一个有价值的扩展。这将有助于降低将 Arrow 与依赖此类递归定义的其他系统集成的复杂性。

排序的重要性

在之前的文章中,我们探讨了表示层次数据模型的各种策略,包括基于结构体/列表/映射/联合的嵌套结构、去规范化和扁平化表示以及多记录方法。每种方法都有其独特的优点和缺点。然而,在本节的最后一部分,我们将更深入地研究多记录方法,特别关注其提供多功能排序选项的能力以及这些选项如何有助于提高压缩率。

在 OTel Arrow 协议中,我们利用多记录方法来表示指标、日志和跟踪。以下实体关系图提供了各种记录模式的简化版本,并说明了它们之间的关系,特别是用于表示计量器和求和的模式。OpenTelemetry 中使用的 Arrow 数据模型的全面描述可在此处访问

这些 Arrow 记录(也称为表)形成一个层次结构,其中 METRICS 作为主要入口点。每个表都可以根据一个或多个列独立排序。这种排序策略有助于对重复数据进行分组,从而提高压缩率。

Fig 4: A simplified entity-relationship diagram representing OTel sum & gauge metrics.
图 4:表示 OTel 求和与计量指标的简化实体关系图。

METRICS 表与次级 RESOURCE_ATTRSSCOPE_ATTRSNUMBER_DATA_POINTS 表之间的关系通过主表中的唯一 id 和每个次级表中的 parent_id 列建立。此 {id,parent_id} 对代表一种开销,应在压缩后最大限度地最小化。

为了实现这一点,不同表的排序过程遵循层次结构,从主表到叶表。主表排序(按一列或多列),然后为每一行分配一个增量 ID。此数字 ID 使用 Arrow 上实现的增量编码存储。

与主表直接连接的次级表采用相同的原理进行排序,但 parent_id 列始终用作排序语句中的最后一列。在排序语句中包含 parent_id 列可以采用增量编码的变体。这种方法的效率总结在下面的图表中。

Fig 5: Comparative analysis of compression ratios - OTLP Protocol vs. Two variations of the OTel Arrow Protocol with multivariate metrics stream. (lower percentage is better)
图 5:压缩率比较分析 - OTLP 协议与 OTel Arrow 协议的两种变体,均采用多变量指标流。(百分比越低越好)

第二列显示了不同大小的 OTLP 批次在 ZSTD 压缩前后的平均大小。此列作为后续两列的参考点。第三列显示了未应用任何排序的 OTel Arrow 协议的结果,而最后一列显示了启用排序的 OTel Arrow 协议的结果。

在压缩之前,两种 OTel Arrow 配置的平均批次大小预计是相似的。然而,在压缩之后,对每个独立表进行排序对压缩率的好处立即显现出来。在没有排序的情况下,OTel Arrow 协议的压缩率比参考协议好 1.40 到 1.67 倍。当启用排序时,OTel Arrow 协议的性能比参考协议提高了 4.94 到 7.21 倍!

压缩方面的收益显然取决于您的数据以及数据批次中信息的冗余程度。根据我们的观察,选择一个好的排序通常可以将压缩率提高 1.5 到 8 倍。

将复杂模式分解为多个更简单的模式以增强排序能力,并采用有针对性的方法高效编码表示关系的标识符,成为提高整体数据压缩的有效策略。这种方法还消除了复杂的 Arrow 数据类型,例如列表、映射和联合。因此,它不仅提高了而且简化了数据查询能力。这种简化对现有查询引擎非常有利,因为它们可能难以在复杂模式上运行。

结论和下一步

本文结束了我们关于 Apache Arrow 的两部分系列文章,其中我们探讨了在特定上下文中最大化 Apache Arrow 效用的各种策略。本系列第二部分中提出的自适应模式架构为未来的优化可能性铺平了道路。我们期待看到社区能在此贡献的基础上添加什么。

Apache Arrow 是一个卓越的项目,并不断被蓬勃发展的生态系统所增强。然而,在我们的探索过程中,我们注意到了一些差距或摩擦点,如果加以解决,可能会显著丰富整体体验。

  • 在某些情况下,设计一个高效的 Arrow 模式可能是一项具有挑战性的任务。在记录级别收集统计信息的能力可以促进此设计阶段(每个字段的数据分布、字典统计信息、压缩前后 Arrow 数组大小等等)。这些统计信息还将有助于确定最有效的列,以便在此基础上进行记录排序。
  • 对递归模式的原生支持也将通过简化 Arrow 在复杂场景中的使用来提高采用率。虽然我不知道 Arrow 系统内有任何尝试解决此限制的尝试,但它似乎并非不可克服,并且将构成一个有价值的扩展。这将有助于降低将 Arrow 与依赖此类递归定义的其他系统集成的复杂性。
  • 协调数据类型和 IPC 流功能的支援也将带来巨大的好处。主要的客户端库支持嵌套和分层模式,但由于缺乏整个生态系统的全面支持,其使用受到限制。例如,列表和/或联合类型在查询引擎或 Parquet 桥接器中支持不佳。此外,IPC 流中的高级字典支持在不同实现之间不一致(即,增量字典和替换字典并非所有实现都支持)。
  • 通过原生集成本文中提出的动态模式概念,可以改进复杂模式的内存消耗和压缩率的优化支持
  • 检测字典溢出(索引级别)并非易于实时测试。API 可以改进以在插入发生时立即指示此溢出。

我们将 Apache Arrow 与 OpenTelemetry 结合使用的努力取得了令人鼓舞的成果。虽然这在开发、探索和基准测试方面投入了大量资金,但我们希望这些文章能帮助您加快使用 Apache Arrow 的旅程。展望未来,我们设想与 Apache Arrow 进行端到端集成,并计划大幅扩展我们对 Arrow 生态系统的使用。这种扩展涉及提供与 Parquet 的桥接,并与 DataFusion 等查询引擎集成,目标是在收集器中处理遥测流。