<!–- 根据一项或多项贡献者许可协议,已获得 Apache 软件基金会 (ASF) 的许可。有关版权所有权的更多信息,请参阅随此作品分发的 NOTICE 文件。ASF 根据 Apache 许可证 2.0 版(“许可证”)向您授予此文件的许可证;除非遵守许可证,否则您不得使用此文件。您可以从以下位置获取许可证的副本:

https://apache.org/licenses/LICENSE-2.0

除非适用法律要求或书面同意,否则根据许可证分发的软件按“原样”分发,不提供任何类型的明示或暗示的保证或条件。有关许可证授予的权限和限制的特定语言,请参阅许可证。 –>

用户手册

本文档旨在简要介绍 Arrow 数据格式,然后逐步讲解 Arrow.jl Julia 包提供的功能,目标是揭示一些“底层”机制,以帮助解释其工作原理以及它如何影响 Arrow 数据格式在实际用例中的应用。

学习 Apache Arrow 项目的最佳场所是其官方网站,特别是数据格式规范。简而言之,Arrow 项目提供了一种正式规范,规定了如何高效地在内存中布局列式“表格”数据,以标准化并最大限度地提高跨语言/平台共享数据的能力。在当前的apache/arrow GitHub 仓库中,存在针对 C++、Java、Go、Javascript、Rust 等语言的实现。其他数据库供应商和数据处理框架/应用程序也构建了对 Arrow 格式的支持,从而为应用程序“使用 Arrow 数据语言”提供了广泛的可能性。

Arrow.jl Julia 包是另一种实现,它能够读取和写入 Arrow 格式的数据。作为一种数据格式,Arrow 指定了用于列式表格数据的精确内存布局,因此,“读取”操作涉及自定义 Julia 对象(Arrow.TableArrow.Stream),它们读取“Arrow 内存块”的*元数据*,然后*包装*其中包含的数组数据,并从元数据中了解类型、大小以及其他属性。让我们仔细看看这种 Arrow 内存“读取”的真正含义/样子。

对泛型路径类型的支持

只要函数将路径作为参数,Arrow.jl 就会尝试支持任何类似路径的类型。只要该类型支持以下操作,Arrow.jl API 应该都能正常工作:

  • Base.open(path, mode)::I where I <: IO

当返回自定义 IO 子类型(I)时,还需要定义以下方法

  • Base.read(io::I, ::Type{UInt8})Base.read(io::I)
  • Base.write(io::I, x)

读取 Arrow 数据

安装 Arrow.jl Julia 包(通过 ] add Arrow)后,如果您有一些 Arrow 数据,例如由pyarrow库(用于与 Arrow 数据交互的 Python 库) 生成的名为 data.arrow 的文件,则可以通过执行以下操作将 Arrow 数据读取到 Julia 会话中:

using Arrow

table = Arrow.Table("data.arrow")

Arrow.Table

此示例中 table 的类型将是 Arrow.Table。在“读取”Arrow 数据时,Arrow.Table 首先 “内存映射”data.arrow 文件,这是一种处理大于系统可用 RAM 的数据的重要技术。通过“内存映射”文件,操作系统实际上不会同时将整个文件内容加载到 RAM 中,而是在请求文件的不同区域时将文件内容“交换”到 RAM 中。一旦“内存映射”完成,Arrow.Table 就会检查文件中的元数据,以确定列数、列名和类型、每列在文件数据中开始的字节偏移量,甚至文件中包含多少个“批次”(Arrow 表可以划分为一个或多个“记录批次”,每个批次都包含部分数据)。获得所有适当的元数据后,Arrow.Table 然后创建自定义数组对象(Arrow.ArrowVector),这些对象充当原始 Arrow 内存字节的“视图”。这是一个重要的点,因为在读取 Arrow 数据时不会为“数据”分配额外的内存。这与我们想要将 csv 文件中的数据作为列读取到 Julia 结构中形成对比;我们需要自己分配这些数组结构,然后解析文件,用从文件中解析的数据“填充”数组的每个元素。另一方面,Arrow 数据*已以二进制格式在内存或磁盘上布局*,只要我们有解释原始字节的元数据,我们就可以确定是将这些字节视为 Vector{Float64} 还是其他类型。反序列化 Arrow 数据时可能会看到的一些 Arrow 数组类型示例包括:

  • Arrow.Primitive:最常见的数组类型,用于简单的固定大小元素,如整数、浮点数、时间类型和小数
  • Arrow.List:一种数组类型,其自身的元素也是某种数组,如字符串列,其中每个元素都可以视为字符数组
  • Arrow.FixedSizeList:类似于 List 类型,但每个数组元素本身都具有固定数量的元素;您可以将其视为 Vector{NTuple{N, T}},其中 N 是固定大小的宽度
  • Arrow.Map:一种数组类型,其中每个元素都类似于 Julia Dict;键值对列表,如 Vector{Dict}
  • Arrow.Struct:一种数组类型,其中每个元素都是自定义结构体的实例,即命名和类型字段的有序集合,有点像 Vector{NamedTuple}
  • Arrow.DenseUnion:一种数组类型,其中元素可以是几种不同类型,并紧凑存储;可以将其视为 Vector{Union{A, B}}
  • Arrow.SparseUnion:另一种数组类型,其中元素可以是几种不同类型,但存储方式类似于由每种可能类型的等长子数组组成(内存效率低于 DenseUnion
  • Arrow.DictEncoded:一种特殊数组类型,其中值是“字典编码”的,这意味着数组的唯一可能值列表存储在内部“编码池”中,而数组的每个存储元素都只是一个整数“代码”,用于索引编码池以获取实际值。

虽然这些自定义数组类型确实是 AbstractArray 的子类型,但目前不支持 setindex!。请记住,这些数组是原始 Arrow 字节的“视图”,因此对于 Arrow.Primitive 以外的数组类型,允许操作这些原始 Arrow 字节变得非常棘手。尽管如此,只需调用 copy(x)(其中 x 是任何 ArrowVector 类型)即可,一个普通的 Julia Vector 类型将被完全实例化(然后就可以修改/操作值)。

那么,您可以使用充满数据的 Arrow.Table 做些什么呢?实际上有很多!

因为 Arrow.Table 实现了 Tables.jl 接口,所以它为使用 Arrow 数据开辟了一个集成世界。一些例子包括

  • df = DataFrame(Arrow.Table(file)):使用 Arrow 向量本身构建 DataFrame;这允许直接在 Arrow 数据上利用 DataFrames.jl 的大量功能;分组、连接、选择等。
  • df = copy(DataFrame(Arrow.Table(file))):构建一个 DataFrame,其中列是常规内存向量(特别是 Base.Vector 和/或 PooledVector)。这要求您有足够的内存将整个 DataFrame 加载到内存中。
  • Tables.datavaluerows(Arrow.Table(file)) |> @map(...) |> @filter(...) |> DataFrame:使用 Query.jl 的行处理实用程序直接在 Arrow 数据上进行映射、分组、过滤、修改等操作。
  • Arrow.Table(file) |> SQLite.load!(db, "arrow_table"):将 Arrow 数据直接加载到 sqlite 数据库/表中,可以在其中对数据执行 sql 查询
  • Arrow.Table(file) |> CSV.write("arrow.csv"):将 Arrow 数据写入 csv 文件

可以在此处找到利用 Tables.jl 接口的 Julia 包的完整列表。

除了让其他软件包发挥所有功能外,Arrow.Table 本身也非常有用。例如,使用 tbl = Arrow.Table(file)

  • tbl[1]:通过索引检索第一列;可以通过 length(tbl) 查询列数
  • tbl[:col1]tbl.col1:检索名为 col1 的列,可以通过使用作为 Symbol 给出的列名进行索引,也可以通过“点访问”进行检索
  • for col in tbl:迭代表中的列
  • AbstractDict 方法,如 haskey(tbl, :col1)get(tbl, :col1, nothing)keys(tbl)values(tbl)

Arrow 类型

在 Arrow 数据格式中,支持特定的逻辑类型,其列表可以在此处找到。这些类型包括布尔值、各种位宽的整数、浮点数、小数、时间类型和二进制/字符串。虽然其中大多数类型可以自然地映射到 Julia 自身内置的类型,但也有一些情况下的定义略有不同,在这些情况下,默认情况下,它们会被转换为更“友好”的 Julia 类型(可以通过将convert=false传递给Arrow.Table来避免这种自动转换,例如Arrow.Table(file; convert=false))。Arrow 到 Julia 类型映射的示例包括:

  • DateTimeTimestampDuration 分别在 Dates.DateDates.TimeTimeZones.ZonedDateTimeDates.Period 子类型中具有自然的 Julia 定义。
  • CharSymbol Julia 类型映射到 Arrow 字符串类型,并带有原始 Julia 类型的附加元数据;这允许在 Julia 中直接反序列化为 CharSymbol,而其他语言实现将这些列视为字符串。
  • 与上述类似,UUID Julia 类型映射到 128 位的 FixedSizeBinary Arrow 类型。
  • Decimal128Decimal256 没有相应的内置 Julia 类型,因此它们使用 Arrow.jl 本身中的兼容类型定义 Arrow.Decimal 进行反序列化。

请注意,当传递 convert=false 时,数据将以 Arrow.jl 定义的类型返回,这些类型与这些类型的 Arrow 定义完全匹配;关于每种类型如何表示其数据的权威来源可以在 Arrow 的Schema.fbs文件中找到。

关于性能的注意事项:将 TimeZones.ZonedDateTime 列写入 Arrow 格式时(通过 Arrow.write),只要该列的 ZonedDateTime 元素都共享一个公共时区,最好将这些列“包装”在 Arrow.ToTimestamp(col) 中。这确保了写入过程可以“预先”知道将要编码哪个时区,从而更加高效和高性能。

自定义类型

为了支持写入自定义 Julia 结构体,Arrow.jl 利用格式的“扩展类型”机制,允许在字段元数据中存储 Julia 类型名称和元数据。为了“介入”此机制,自定义类型可以使用 Arrow.ArrowTypes 子模块中定义的接口方法。例如:

using Arrow

struct Person
    id::Int
    name::String
end

# overload interface method for custom type Person; return a symbol as the "name"
# this instructs Arrow.write what "label" to include with a column with this custom type
ArrowTypes.arrowname(::Type{Person}) = :Person
# overload JuliaType on `Val{:Person}`, which is like a dispatchable string
# return our custom *type* Person; this enables Arrow.Table to know how the "label"
# on a custom column should be mapped to a Julia type and deserialized
ArrowTypes.JuliaType(::Val{:Person}) = Person

table = (col1=[Person(1, "Bob"), Person(2, "Jane")],)
io = IOBuffer()
Arrow.write(io, table)
seekstart(io)
table2 = Arrow.Table(io)

在此示例中,我们正在写入 table,它是一个 NamedTuple,其中一列名为 col1,它有两个元素是自定义 Person 结构体的实例。我们重载 Arrowtypes.arrowname,以便 Arrow.jl 知道如何序列化我们的 Person 结构体。然后,我们重载 ArrowTypes.JuliaType,以便反序列化过程知道如何将我们的类型标签映射回我们的 Person 结构体类型。然后,我们可以将 Arrow 格式的数据写入内存中的 IOBuffer,然后使用 Arrow.Table 将表读回。我们取回的表将是一个 Arrow.Table,其中包含一个元素类型为 PersonArrow.Struct 列。

请注意,如果不调用 Arrowtypes.JuliaType,我们可能会进入一种奇怪的中间状态,我们将 Person 结构体作为表写出,但在读回时,Arrow.jl 不知道 Person 是什么;反序列化不会失败,但我们只会得到一个 Namedtuple{(:id, :name), Tuple{Int, String}} 而不是 Person

虽然此示例非常简单,但它展示了允许自定义类型进行序列化/反序列化的基础知识。但是 ArrowTypes 模块提供了更强大的功能,可以将非原生 Arrow 类型“挂钩”到序列化/反序列化过程中。让我们再来看几个例子;如果您已经厌倦了自定义类型的操作,请随时跳到下一节。

让我们看看 Arrow.jl 如何允许序列化 nothing 值,该值在 Julia 中通常被称为“软件工程师的 NULL”。虽然 Arrow.jl 将 missing 视为默认的 Arrow NULL 值,但 nothing 非常相似,但如果可能,我们仍然希望单独处理它。以下是我们在 ArrowTypes 模块中启用序列化/反序列化的方法:

ArrowTypes.ArrowKind(::Type{Nothing}) = ArrowTypes.NullKind()
ArrowTypes.ArrowType(::Type{Nothing}) = Missing
ArrowTypes.toarrow(::Nothing) = missing
const NOTHING = Symbol("JuliaLang.Nothing")
ArrowTypes.arrowname(::Type{Nothing}) = NOTHING
ArrowTypes.JuliaType(::Val{NOTHING}) = Nothing
ArrowTypes.fromarrow(::Type{Nothing}, ::Missing) = nothing

让我们逐行分析一下这里发生的事情:

  • ArrowKind 重载:ArrowKind 是 Arrow 格式支持的类型的通用“类别”,例如 PrimitiveKindListKind 等。它们 each 对应于 Arrow 格式中支持的不同数据布局策略。在这里,我们将 nothing 的类型定义为 NullKind,这意味着不需要实际内存进行存储,它严格来说是一种“元数据”类型,我们可以在其中存储类型和元素数量。在我们的 Person 示例中,我们不需要重载它,因为像 struct Tmutable struct T 这样声明的类型默认定义为 ArrowTypes.StructKind
  • ArrowType 重载:这里我们发出信号,表明我们的类型 (Nothing) 映射到原生支持的 Arrow 类型 Missing;这对于序列化器很重要,因为它知道它将序列化哪个 Arrow 类型。同样,我们不需要为 Person 重载它,因为序列化器知道如何通过使用反射方法(如 fieldnames(T)getfield(x, i))自动序列化自定义结构体。
  • ArrowTypes.toarrow 重载:这是 ArrowType 的一个姐妹方法;我们说我们的类型将映射到 Missing Arrow 类型,因此这里我们实际定义了它如何转换为 Arrow 类型;在这种情况下,它只返回 missing。这是另一个没有出现在 Person 中的方法;为什么?嗯,正如我们在 ArrowType 中指出的那样,序列化器已经知道如何通过使用所有字段来序列化自定义结构体;如果由于某种原因,我们想要省略某些字段或以其他方式转换事物,那么我们可以定义相应的 ArrowTypetoarrow 方法。
  • arrowname 重载:与我们的 Person 示例类似,我们需要指示序列化器如何在 Arrow 类型元数据中标记我们的自定义类型;这里我们给它符号 Symbol("JuliaLang.Nothing")。请注意,虽然这最终将允许我们在读取 Arrow 数据时将 nothingmissing 区分开来,但如果我们将此数据传递给其他语言实现,它们只会将数据视为 missing,因为它们(可能)不会知道如何“理解” JuliaLang.Nothing 类型标签。
  • JuliaType 重载:再次,与我们的 Person 示例一样,我们指示反序列化器,当它遇到 JuliaLang.Nothing 类型标签时,它应该将这些值视为 Nothing 类型。
  • 最后是 fromarrow 重载:这允许指定如何将原生 Arrow 数据转换回我们的自定义类型。 fromarrow(T, x...) 默认情况下将调用 T(x...),这就是为什么我们不需要为 Person 重载它,但在本例中,Nothing(missing) 将不起作用,因此我们定义我们自己的自定义转换。

让我们再来看一个更复杂的例子,只是为了好玩,并真正看看系统可以被推到多远。

using Intervals
table = (col = [
    Interval{Closed,Unbounded}(1,nothing),
],)
const NAME = Symbol("JuliaLang.Interval")
ArrowTypes.arrowname(::Type{Interval{T, L, R}}) where {T, L, R} = NAME
const LOOKUP = Dict(
    "Closed" => Closed,
    "Unbounded" => Unbounded
)
ArrowTypes.arrowmetadata(::Type{Interval{T, L, R}}) where {T, L, R} = string(L, ".", R)
function ArrowTypes.JuliaType(::Val{NAME}, ::Type{NamedTuple{names, types}}, meta) where {names, types}
    L, R = split(meta, ".")
    return Interval{fieldtype(types, 1), LOOKUP[L], LOOKUP[R]}
end
ArrowTypes.fromarrow(::Type{Interval{T, L, R}}, first, last) where {T, L, R} = Interval{L, R}(first, R == Unbounded ? nothing : last)
io = Arrow.tobuffer(table)
tbl = Arrow.Table(io)

再次,让我们分解一下这里发生的事情:

  • 这里我们试图以 Arrow 格式保存一个 Interval 类型;这种类型的独特之处在于它有两个类型参数(ClosedUnbounded),它们不是根据字段推断/基于的,而只是类型本身的“类型标签”。
  • 请注意,我们为所有 Interval 定义了一个通用的 arrowname 方法,而不管类型参数如何。我们只想让 Arrow 知道我们在这里处理的是哪种通用类型。
  • 接下来,我们使用一个新的方法 ArrowTypes.arrowmetadata 将两个非基于字段的类型参数编码为一个以点分隔的字符串;我们在这里编码此信息是因为请记住,我们必须在 JuliaType(::Val(name)) 定义中匹配我们的 arrowname 符号类型名称才能正确分派;如果我们在 arrowname 中编码类型参数,我们将需要为这两个类型参数的每个唯一组合分别定义 arrowname,并为每个组合分别定义相应的 JuliaType;哎呀。相反,我们让 arrowname 对我们的类型通用,并使用 arrowmetadata 存储*此特定列*的类型参数。
  • 现在在 JuliaType 中,请注意我们正在使用三参数重载;我们需要 NamedTuple 类型,它是我们的 Interval 正在序列化为的原生 Arrow 类型;我们使用它来检索 Interval 的第一个类型参数,它只是两个 firstlast 字段的类型。然后我们使用第三个参数,它是我们从 arrowmetadata 返回的任何字符串。我们调用 L, R = split(meta, ".") 来解析这两个类型参数(在本例中为 ClosedUnbounded),然后从预定义的 LOOKUP 字典中查找这些字符串,该字典将类型参数名称作为字符串与实际类型匹配。然后,我们就拥有了重新创建完整 Interval 类型的所有信息。真棒!
  • 最后一个问题是我们的 fromarrow 方法;UnboundedInterval 实际上将 nothing 作为第二个参数。因此,让默认的 fromarrow 定义调用 Interval{T, L, R}(first, last),其中 firstlast 都是整数,将不起作用。相反,我们检查 R 类型参数是否为 Unbounded,如果是,则传递 nothing 作为第二个参数,否则我们可以传递 last

如果您长时间盯着这些东西,肯定会让您头晕目眩。与往常一样,如有任何疑问,请随时在 #data Slack 频道上提问,或 提出新的问题,详细说明您正在尝试做什么。

Arrow.Stream

除了 Arrow.Table 之外,Arrow.jl 包还提供 Arrow.Stream 用于处理 Arrow 数据。 Arrow.Table 将迭代 Arrow 文件/流中的所有记录批次,并连接列,而 Arrow.Stream 提供了一种*迭代*记录批次的方式,一次一个。每次迭代都会产生一个 Arrow.Table 实例,其中包含单个记录批次的列/数据。如果需要,这允许对 Arrow 数据进行“批量处理”,一次一个记录批次,而不是通过 Arrow.Table 创建一个长表。

自定义应用程序元数据

Arrow 格式允许数据生产者将自定义元数据附加到各种 Arrow 对象。

Arrow.jl 通过 Arrow.getmetadata 为此元数据提供了一个便捷的访问器。 Arrow.getmetadata(t::Arrow.Table) 将返回一个不可变的 AbstractDict{String,String},它表示与表关联的 Schemacustom_metadata(如果不存在此类元数据,则返回 nothing),而 Arrow.getmetadata(c::Arrow.ArrowVector) 将返回与列关联的 Field custom_metadata 的类似表示形式(如果不存在此类元数据,则返回 nothing)。

要在序列化时将自定义模式/列元数据附加到 Arrow 表,请参阅 Arrow.writemetadatacolmetadata 关键字参数。

写入 Arrow 数据

好的,这是对*读取* Arrow 数据的一个很好的概述,但是您如何*生成* Arrow 数据呢?输入 Arrow.write

Arrow.write

使用 Arrow.write,您可以提供一个 io::IO 参数或一个 file_path 来写入 Arrow 数据,以及一个包含要写入数据的 Tables.jl 兼容源。

Tables.jl 兼容源有哪些示例?以下是一些示例:

  • Arrow.write(io, df::DataFrame)DataFrame 是一个可索引列的集合。
  • Arrow.write(io, CSV.File(file)):从 csv 文件读取数据并写入 Arrow 格式。
  • Arrow.write(io, DBInterface.execute(db, sql_query)):通过 DBInterface.jl 接口对数据库执行 SQL 查询,并将查询结果集直接以 Arrow 格式写入。实现 DBInterface 的软件包包括 SQLite.jlMySQL.jlODBC.jl
  • df |> @map(...) |> Arrow.write(io):将 Query.jl 操作链的结果直接作为 Arrow 数据写入。
  • jsontable(json) |> Arrow.write(io):使用 JSONTables.jl 软件包将 json 对象数组或数组对象视为“表格”并将其作为 Arrow 数据写入。
  • Arrow.write(io, (col1=data1, col2=data2, ...))NamedTupleAbstractVectorAbstractVectorNamedTuple 默认都被视为表格,因此如果已经有数据列,则可以快速构建它们以便轻松写入 Arrow 数据。

这些只是众多 集成 中的几个示例。

除了将单个“表格”数据作为单个 Arrow 记录批次写入之外,当输入支持 Tables.partitions 功能时,Arrow.write 还支持写入多个记录批次。一个直接的例子,虽然可能不是非常有用,是 Arrow.StreamArrow.Stream 实现了 Tables.partitions,它迭代“表格”(特别是 Arrow.Table),因此,Arrow.write 将迭代 Arrow.Stream,并将每个 Arrow.Table 作为单独的记录批次写入。这个例子有效的另一个重要原因是 Arrow.Stream 迭代的 Arrow.Table 都具有相同的模式。这很重要,因为在写入 Arrow 数据时,始终会先写入“模式”消息,所有后续记录批次都使用与初始模式匹配的数据写入。

除了支持 Tables.partitions 的输入之外,请注意 Tables.jl 本身提供了 Tables.partitioner 函数,该函数允许将您自己的具有相似模式的表格的单独实例作为“分区”提供,例如:

# treat 2 separate NamedTuples of vectors with same schema as 1 table, 2 partitions
tbl_parts = Tables.partitioner([(col1=data1, col2=data2), (col1=data3, col2=data4)])
Arrow.write(io, tbl_parts)

# treat an array of csv files with same schema where each file is a partition
# in this form, a function `CSV.File` is applied to each element of 2nd argument
csv_parts = Tables.partitioner(CSV.File, csv_files)
Arrow.write(io, csv_parts)

Arrow.Writer

使用 Arrow.Writer,您可以实例化一个 Arrow.Writer 对象,使用它写入源,然后将其关闭。这允许增量写入同一个接收器。它类似于 Arrow.append,无需在写入之间关闭并重新打开接收器,并且没有仅支持 IPC 流格式的限制。

多线程写入

默认情况下,Arrow.write 将使用多个线程同时写入多个记录批次(例如,如果使用 julia -t 8 启动 julia 或设置了 JULIA_NUM_THREADS 环境变量)。可以通过将 ntasks 关键字参数传递给 Arrow.write 来控制写入时要使用的并发任务数。传递 ntasks=1 可避免写入时的任何多线程。

压缩

通过 compress 关键字参数支持写入时进行压缩。可能的值包括 :lz4:zstd 或您自己初始化的 LZ4FrameCompressorZstdCompressor 对象;将导致每个记录批次中的所有缓冲区使用相应的压缩编码或压缩器。