我们在 F5 使用 Apache Arrow 的旅程(第二部分):自适应模式和排序以优化 Arrow 的使用


已发布 2023 年 6 月 26 日
作者 Laurent Quérel

在前一篇文章中,我们讨论了在 OpenTelemetry 项目中使用 Apache Arrow 的情况。 我们研究了各种技术来最大限度地提高 Apache Arrow 的效率,旨在找到数据压缩率和可查询性之间的最佳平衡。 压缩结果不言自明,比原始 OTLP 协议提高了 1.5 倍到 5 倍。 在本文中,我们将深入研究三种技术,这些技术使我们能够增强当前版本 OTel Arrow 协议中 Apache Arrow 缓冲区的压缩率和内存使用率。

我们将讨论的第一种技术旨在优化模式的内存使用。 正如你将看到的,收益可能是巨大的,在某些情况下可能会减少一半的内存使用。 第二部分将更深入地探讨可用于处理递归模式定义的各种方法。 最后,我们将强调模式的设计以及可以在记录级别应用的排序在最大化 Apache Arrow 及其柱状表示的优势方面发挥着关键作用。

处理动态和未知的数据分布

在某些情况下,Arrow 模式的完整定义最终可能会过于宽泛和复杂,以便涵盖你打算以柱状形式表示的所有可能情况。 然而,与复杂的模式一样,通常只有该模式的子集实际上会被用于特定的部署。 同样,也不总是可以提前确定一个或多个字段的最佳字典编码。 采用涵盖所有情况的广泛且非常通用的模式通常会占用更多的内存。 这是因为,对于大多数实现来说,没有值的列仍然会消耗内存空间。 同样,索引 uint64 的字典编码的列将占用比基于 uint8 的字典编码的同一列多四倍的内存。

为了更具体地说明这一点,让我们考虑一个位于生产环境输出端的 OTel 收集器,它接收来自大型动态服务器集的遥测数据流。 毫无疑问,随着时间的推移,此遥测流的内容将在数量和性质上发生变化。 在这种情况下预测最佳模式具有挑战性,并且同样难以提前知道通过此点的遥测数据的特定属性的分布。

为了优化这种情况,我们采用了一种中间方法,我们将其命名为动态 Arrow 模式,旨在根据观察到的数据逐步调整模式。 一般原则相对简单。 我们从定义应该表示的最大范围的通用模式开始。 此模式的某些字段将被声明为可选,而其他字段将根据观察到的分布使用多种可能的选项进行编码。 理论上,此原则可以应用于其他类型的转换(例如,递归列创建),但我们将让你的想象力探索这些其他选项。 因此,如果你遇到某些字段未被使用、某些 union 变体保持未使用状态和/或无法先验地确定字段的值分布的数据流,则可能值得花时间实现此模型。 这可以提高压缩率、内存使用率和处理速度方面的效率。

以下 Go Arrow 模式定义提供了此类模式的示例,该模式配备了一系列注释。 这些注释将由增强的记录构建器处理,该构建器能够动态调整模式。 该系统的结构如图 1 所示。

var (
  // Arrow schema for the OTLP Arrow Traces record (without attributes, links, and events).
  TracesSchema = arrow.NewSchema([]arrow.Field{
      // Nullabe:true means the field is optional, in this case of 16 bit unsigned integers
      {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
      {Name: constants.Resource, Type: arrow.StructOf([]arrow.Field{
        {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
        // --- Use dictionary with 8 bit integers initially ----
        {Name: constants.SchemaUrl,Type: arrow.BinaryTypes.String,Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
        {Name: constants.DroppedAttributesCount,Type: arrow.PrimitiveTypes.Uint32,Nullable: true},
      }...), Nullable: true},
      {Name: constants.Scope, Type: arrow.StructOf([]arrow.Field{
          {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Metadata: acommon.Metadata(acommon.DeltaEncoding), Nullable: true},
          // --- Use dictionary with 8 bit integers initially ----
          {Name: constants.Name, Type: arrow.BinaryTypes.String, Metadata: acommon.Metadata(acommon.Dictionary8), Nullable: true},
          {Name: constants.Version, Type: arrow.BinaryTypes.String, Metadata: acommon.Metadata(acommon.Dictionary8), Nullable: true},
          {Name: constants.DroppedAttributesCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
      }...), Nullable: true},
      {Name: constants.SchemaUrl, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
      {Name: constants.StartTimeUnixNano, Type: arrow.FixedWidthTypes.Timestamp_ns},
      {Name: constants.DurationTimeUnixNano, Type: arrow.FixedWidthTypes.Duration_ms, Metadata: schema.Metadata(schema.Dictionary8)},
      {Name: constants.TraceId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 16}},
      {Name: constants.SpanId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 8}},
      {Name: constants.TraceState, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
      {Name: constants.ParentSpanId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 8}, Nullable: true},
      {Name: constants.Name, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8)},
      {Name: constants.KIND, Type: arrow.PrimitiveTypes.Int32, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
      {Name: constants.DroppedAttributesCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
      {Name: constants.DroppedEventsCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
      {Name: constants.DroppedLinksCount, Type: arrow.PrimitiveTypes.Uint32, Nullable: true},
      {Name: constants.Status, Type: arrow.StructOf([]arrow.Field{
        {Name: constants.StatusCode, Type: arrow.PrimitiveTypes.Int32, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
        {Name: constants.StatusMessage, Type: arrow.BinaryTypes.String, Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
      }...), Nullable: true},
    }, nil)
  )

在此示例中,Arrow 字段级元数据用于指定字段何时是可选的(Nullable:true)或指定适用于特定字段的最小字典编码(元数据 Dictionary8/16/…)。 现在让我们想象一下,在一个简单的场景中使用此模式,其中实际上只使用少数几个字段,并且大多数字典编码字段的基数较低(即,低于 2^8)。 理想情况下,我们希望有一个能够动态构建以下简化模式的系统,该模式本质上是原始模式的严格子集。

var (
  // Simplified schema definition generated by the Arrow Record encoder based on
  // the data observed.
  TracesSchema = arrow.NewSchema([]arrow.Field{
    {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
    {Name: constants.StartTimeUnixNano, Type: arrow.FixedWidthTypes.Timestamp_ns},
    {Name: constants.TraceId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 16}},
    {Name: constants.SpanId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 8}},
    {Name: constants.Name, Type: &arrow.DictionaryType {
      IndexType: arrow.PrimitiveTypes.Uint8,
      ValueType: arrow.BinaryTypes.String}},
    {Name: constants.KIND, Type: &arrow.DictionaryType {
      IndexType: arrow.PrimitiveTypes.Uint8,
      ValueType: arrow.PrimitiveTypes.Int32,
    }, Nullable: true},
  }, nil)
)

此外,我们希望有一个系统能够自动调整上述模式,如果它在将来的批处理中遇到新字段或基数超过当前字典定义大小的现有字段。 在极端情况下,如果特定字段的基数超过某个阈值,我们希望系统自动恢复为非字典表示形式(字典溢出机制)。 这正是我们将在本节的其余部分中详细阐述的内容。

图 1 描绘了用于实现此方法的不同组件和事件的概述。

Fig 1: Adaptive Arrow schema architecture overview.
图 1:自适应 Arrow 模式架构概述。

整个自适应 Arrow 模式组件采用分段成批处理的数据流,并生成一个或多个 Arrow 记录流(每个流一个模式)。 这些记录中的每一个都使用 Arrow 模式定义,该模式基于带注释的 Arrow 模式和传入数据中观察到的字段形状。

更具体地说,自适应 Arrow 模式组件的过程包括四个主要阶段

初始化阶段

在初始化阶段,Arrow 记录编码器读取带注释的 Arrow 模式(即,参考模式)并生成转换集合。 当这些转换应用于参考模式时,它们会生成符合这些注释描述的约束的第一个最小 Arrow 模式。 在此初始迭代中,将消除所有可选字段,并且所有字典编码字段都配置为使用注释定义的最小编码(在前面的示例中仅为 Dictionary8)。 这些转换形成一棵树,反映了参考模式的结构。

馈送阶段

初始化之后是馈送阶段。 在这里,Arrow 记录编码器扫描批处理并尝试将所有字段存储在 Arrow 记录构建器中,该构建器由先前步骤中创建的模式定义。 如果数据中存在某个字段但未包含在模式中,则编码器将触发 缺少字段事件。 此过程将继续,直到当前批处理完全处理完毕。 在 Arrow 记录构建器中的所有字典编码字段上执行额外的内部检查,以确保没有字典溢出(即,唯一条目多于索引允许的基数)。 如果检测到这种情况,则会生成 字典溢出事件。 因此,到最后,所有未知字段和字典溢出都将被检测到,或者,如果数据与模式完全对齐,则不会出现任何差异。

纠正阶段

如果至少生成了一个事件,则将启动纠正阶段以修复模式。 此可选阶段会考虑在上一个阶段中生成的所有事件,并相应地调整转换树,以与观察到的数据对齐。 缺少字段事件将删除相应字段的 NoField 转换。 字典溢出事件将修改字典转换以反映该事件(例如,将索引类型从 uint8 更改为 uint16,或者如果已达到最大索引大小,则转换将删除字典编码并恢复为原始的非字典编码类型)。 然后,更新后的转换树将用于创建新模式和新的 Arrow 记录构建器。 然后,此记录构建器用于使用未正确处理的批处理重播前面的馈送阶段。

路由阶段

一旦 Record Builder 被正确地填充数据后,就会创建一个 Arrow Record,系统随即进入路由阶段。路由组件会计算 Record 的 schema 签名,并利用此签名将 Record 路由到与该签名兼容的现有 Arrow 流,如果不存在匹配的流,则启动一个新流。

这个四阶段的过程应该逐步调整和稳定 schema,使其适应并优化为特定数据流的结构和定义。未使用的字段永远不会不必要地消耗内存。字典编码的字段将根据观察到的数据基数,使用最佳索引大小进行定义,而基数超过特定阈值(由配置定义)的字段将自动恢复为其非字典编码版本。

为了有效地执行此方法,您必须确保接收端具有足够的灵活性。至关重要的是,即使 schema 中缺少某些字段或采用各种字典索引配置时,您的下游管道仍能正常工作。虽然这可能并非始终可行,需要在接收时实施额外的转换,但在某些情况下,这确实值得。

以下结果突出了通过应用各种优化技术所实现的显著内存使用量减少。这些结果是使用与之前展示的 schema 类似的 schema 收集的。 显著的内存效率突显了这种方法的有效性。

Fig 2: Comparative analysis of memory usage for different schema optimizations.
图 2:不同 schema 优化的内存使用情况比较分析。

转换树的概念支持一种通用的方法,可以基于从数据中获得的知识执行各种类型的 schema 优化。 这种架构非常灵活; 当前的实现允许删除未使用的字段,应用最具体的字典编码以及优化联合类型变体。 未来,有可能引入可以表示为初始 schema 转换的其他优化。 此方法的实现可在此处找到:这里

处理递归 schema 定义

Apache Arrow 不支持递归 schema 定义,这意味着无法直接表示具有可变深度的数据结构。 图 3 举例说明了这样的递归定义,其中属性的值可以是简单数据类型、值列表或值映射。 无法预先确定此定义的深度。

Fig 3: Recursive definition of OTel attributes.
图 3:OTel 属性的递归定义。

可以采用几种策略来规避此限制。 从技术上讲,我们提出的动态 schema 概念可以扩展为动态更新 schema,以包含任何缺失的递归级别。 但是,对于此用例,此方法很复杂,并且存在无法保证 schema 最大大小的显著缺点。 这种约束的缺乏可能会带来安全问题; 因此,不会详细阐述此方法。

第二种方法包括通过采用支持递归 schema 定义的序列化格式来打破递归。 然后,可以将此序列化的结果作为二进制类型列集成到 Arrow record 中,从而有效地在特定级别停止递归。 为了充分利用列式表示的优势,至关重要的是,尽可能在数据结构中深入应用这种特殊的序列化。 在 OpenTelemetry 的上下文中,这在属性级别执行,更具体地说,是在属性的第二层执行。

可以使用各种序列化格式(例如 protobuf 或 CBOR)来编码递归数据。 如果没有特殊处理,现有的 Arrow 查询引擎可能不容易查询这些二进制列。 因此,至关重要的是认真确定何时何地应用这种技术。 虽然我不知道在 Arrow 系统中尝试解决此限制的任何尝试,但这似乎并非无法克服,并且将构成有价值的扩展。 这将有助于降低将 Arrow 与依赖于此类递归定义的其他系统集成在一起的复杂性。

排序的重要性

在我们之前的文章中,我们探讨了各种表示分层数据模型的策略,包括基于 struct/list/map/union 的嵌套结构、反规范化和平展表示以及多 record 方法。 每种方法都具有其独特的优势和劣势。 但是,在本节中,我们将更深入地研究多 record 方法,特别关注其提供通用排序选项的能力,以及这些选项如何提高压缩率。

在 OTel Arrow 协议中,我们利用多 record 方法来表示指标、日志和跟踪。 以下实体关系图提供了各种 record schema 的简化版本,并说明了它们的关系,特别是用于表示 gauges 和 sums 的关系。 有关 OpenTelemetry 中使用的 Arrow 数据模型的完整描述,可在此处访问:这里

这些 Arrow record,也称为表,形成一个层次结构,其中 METRICS 充当主要入口点。 每个表都可以根据一个或多个列独立排序。 这种排序策略有助于对重复数据进行分组,从而提高压缩率。

Fig 4: A simplified entity-relationship diagram representing OTel sum & gauge metrics.
图 4:表示 OTel sum & gauge 指标的简化实体关系图。

METRICS 表与辅助 RESOURCE_ATTRSSCOPE_ATTRSNUMBER_DATA_POINTS 表之间的关系通过主表中的唯一 id 和每个辅助表中的 parent_id 列建立。 这个 {id,parent_id} 对表示一个开销,应在压缩后尽可能地减少。

为了实现此目的,不同表的排序过程遵循层次结构,从主表向下到叶表。 主表按(一个或多个列)排序,然后为每一行分配一个增量 id。 此数值 id 使用增量编码存储,该编码在 Arrow 之上实现。

直接连接到主表的辅助表使用相同的原则进行排序,但 parent_id 列始终用作排序语句中的最后一列。 在排序语句中包含 parent_id 列可以使用增量编码的变体。 此方法的效率总结在下面的图表中。

Fig 5: Comparative analysis of compression ratios - OTLP Protocol vs. Two variations of the OTel Arrow Protocol with multivariate metrics stream. (lower percentage is better)
图 5:压缩率比较分析 - 具有多元指标流的 OTLP 协议与 OTel Arrow 协议的两种变体。 (百分比越低越好)

第二列显示了 OTLP 批处理在各种大小的批处理中进行 ZSTD 压缩之前和之后的平均大小。 此列用作后续两列的参考点。 第三列显示了未应用任何排序的 OTel Arrow 协议的结果,而最后一列显示了启用排序的 OTel Arrow 协议的结果。

在压缩之前,两种 OTel Arrow 配置的平均批处理大小预计相似。 但是,压缩后,对每个单独的表进行排序对压缩率的好处立即显现出来。 在没有排序的情况下,OTel Arrow 协议的压缩率比参考值高 1.40 到 1.67 倍。 启用排序后,OTel Arrow 协议的性能比参考值高 4.94 到 7.21 倍!

压缩方面的收益显然取决于您的数据以及数据批处理中存在的冗余信息。 根据我们的观察,选择良好的排序通常会将压缩率提高 1.5 到 8 倍。

将复杂的 schema 分解为多个更简单的 schema 以增强排序功能,再加上一种有针对性的方法来有效地编码表示关系的标识符,成为增强整体数据压缩的有效策略。 此方法还消除了复杂的 Arrow 数据类型,例如列表、映射和联合。 因此,它不仅改进而且简化了数据查询能力。 这种简化证明对现有的查询引擎有益,这些引擎可能难以处理复杂的 schema。

结论和后续步骤

本文结束了我们关于 Apache Arrow 的分为两部分的文章系列,在其中我们探讨了在特定上下文中最大化 Apache Arrow 效用的各种策略。 本系列第二部分中介绍的自适应 schema 架构为未来的优化可能性铺平了道路。 我们期待看到社区可以基于此贡献添加的内容。

Apache Arrow 是一个卓越的项目,通过蓬勃发展的生态系统不断增强。 但是,在我们的探索过程中,我们注意到某些差距或摩擦点,如果解决这些差距或摩擦点,可以显着丰富整体体验。

  • 在某些情况下,设计高效的 Arrow schema 可能是一项具有挑战性的任务。 拥有在 record 级别收集统计信息的能力可以促进此设计阶段(每个字段的数据分布、字典统计信息、压缩前/后的 Arrow 数组大小等等)。 这些统计信息还将有助于识别基于其排序 record 的最有效列。
  • 对递归 schema 的原生支持也将通过简化在复杂场景中 Arrow 的使用来增加采用率。 虽然我不知道在 Arrow 系统中尝试解决此限制的任何尝试,但这似乎并非无法克服,并且将构成有价值的扩展。 这将有助于降低将 Arrow 与依赖于此类递归定义的其他系统集成在一起的复杂性。
  • 协调对数据类型以及 IPC 流功能的支持也将是一项重大好处。 主要客户端库支持嵌套和分层 schema,但由于生态系统其余部分缺乏完全支持,因此它们的使用受到限制。 例如,查询引擎或 Parquet 桥接器对列表和/或联合类型的支持不佳。 此外,不同实现之间 IPC 流中的高级字典支持不一致(即,并非所有实现都支持增量字典和替换字典)。
  • 通过本地集成本文中介绍的动态 schema 的概念,可以优化复杂 schema 在内存消耗和压缩率方面的支持
  • 检测字典溢出(索引级别)不是一件容易在运行时测试的事情。 可以改进 API,以便在发生插入时立即指示此溢出。

我们努力将 Apache Arrow 与 OpenTelemetry 结合使用,并取得了令人鼓舞的结果。 虽然这需要在开发、探索和基准测试方面进行大量投资,但我们希望这些文章将有助于加速您使用 Apache Arrow 的旅程。 展望未来,我们设想与 Apache Arrow 进行端到端集成,并计划显着扩展我们对 Arrow 生态系统的使用。 此扩展涉及提供与 Parquet 的桥接并与 DataFusion 等查询引擎集成,以实现处理收集器中的遥测流的目的。