用户手册

本文档的目标是提供对 Arrow 数据格式的简要介绍,然后分步介绍 Arrow.jl Julia 包提供的功能,旨在揭示一些“幕后”的机制,以帮助解释工作原理以及这对 Arrow 数据格式的实际用例有何影响。

了解 Apache Arrow 项目的最佳地点是其网站本身,特别是数据格式规范。简而言之,Arrow 项目为如何有效地在内存中布局列式“表”数据提供了正式规范,以标准化和最大化跨语言/平台共享数据的能力。在当前的apache/arrow GitHub 仓库中,存在针对 C++、Java、Go、Javascript、Rust 等语言的实现。其他数据库供应商和数据处理框架/应用程序也内置了对 Arrow 格式的支持,从而为应用程序“说”Arrow 的数据语言提供了广泛的可能性。

Arrow.jl Julia 包是另一种实现,它能够以 Arrow 格式读写数据。作为一种数据格式,Arrow 指定了列式表数据要使用的确切内存布局,因此,“读取”涉及自定义的 Julia 对象(Arrow.TableArrow.Stream),它们读取“Arrow 内存块”的元数据,然后封装其中包含的数组数据,并从元数据中了解了类型和大小等属性。让我们仔细看看这种“读取”Arrow 内存到底意味着/看起来是什么样的。

对通用类路径类型的支持

Arrow.jl 尝试在任何函数接受路径作为参数的地方支持任何类路径类型。只要该类型支持以下方法,Arrow.jl API 就应该能通用工作:

  • Base.open(path, mode)::I where I <: IO

当返回一个自定义的 IO 类型的子类 (I) 时,还需要定义以下方法:

  • Base.read(io::I, ::Type{UInt8})Base.read(io::I)
  • Base.write(io::I, x)

读取 Arrow 数据

在安装了 Arrow.jl Julia 包(通过 ] add Arrow)之后,如果您有一些 Arrow 数据,比如一个名为 data.arrow 的文件,该文件由pyarrow库(一个用于与 Arrow 数据交互的 Python 库)生成,您可以通过以下方式将其读入 Julia 会话中:

using Arrow

table = Arrow.Table("data.arrow")

Arrow.Table

此示例中 table 的类型将是 Arrow.Table。在“读取”Arrow 数据时,Arrow.Table 首先“内存映射”data.arrow 文件,这是一种处理大于系统可用 RAM 的数据的重要技术。通过“内存映射”文件,操作系统不会同时将整个文件内容加载到 RAM 中,但文件内容会在请求文件的不同区域时被“换入”到 RAM 中。内存映射后,Arrow.Table 会检查文件中的元数据,以确定列数、它们的名称和类型、每列在文件数据中开始的字节偏移量,甚至该文件中包含多少“批次”(Arrow 表可以划分为一个或多个“记录批次”,每个批次包含数据的一部分)。在掌握了所有适当的元数据后,Arrow.Table 创建了自定义数组对象(Arrow.ArrowVector),它们充当原始 Arrow 内存字节的“视图”。这是一个重要的点,因为读取 Arrow 数据时不会为“数据”分配额外内存。这与如果我们想将 CSV 文件中的数据作为列读入 Julia 结构体形成对比;我们需要自己分配那些数组结构体,然后解析文件,用从文件中解析出的数据“填充”数组的每个元素。另一方面,Arrow 数据已经以二进制格式布局在内存或磁盘上,只要我们有元数据来解释原始字节,我们就可以弄清楚是将这些字节视为 Vector{Float64} 等。当反序列化 Arrow 数据时,您可能会看到的一些 Arrow 数组类型示例包括:

  • Arrow.Primitive:最常见的数组类型,用于简单的、固定大小的元素,如整数、浮点数、时间类型和十进制数
  • Arrow.List:一种数组类型,其元素本身也是某种数组,例如字符串列,其中每个元素可以看作是字符数组
  • Arrow.FixedSizeList:与 List 类型类似,但每个数组元素本身具有固定数量的元素;您可以将其视为 Vector{NTuple{N, T}},其中 N 是固定大小的宽度
  • Arrow.Map:一种数组类型,其中每个元素类似于 Julia 的 Dict;一个键值对的列表,如 Vector{Dict}
  • Arrow.Struct:一种数组类型,其中每个元素都是一个自定义 struct 的实例,即命名和类型化字段的有序集合,有点像 Vector{NamedTuple}
  • Arrow.DenseUnion:一种数组类型,其中元素可以是几种不同类型,紧凑地存储;可以看作是 Vector{Union{A, B}}
  • Arrow.SparseUnion:另一种数组类型,其中元素可以是几种不同类型,但存储方式是它们由每个可能类型的等长子数组组成(不如 DenseUnion 节省内存)
  • Arrow.DictEncoded:一种特殊的数组类型,其中值被“字典编码”,意味着数组可能的唯一值列表存储在内部的“编码池”中,而数组中存储的每个元素只是一个整数“代码”,用于索引编码池以获取实际值。

虽然这些自定义数组类型是 AbstractArray 的子类型,但目前不支持 setindex!。请记住,这些数组是原始 Arrow 字节的“视图”,因此对于除 Arrow.Primitive 以外的数组类型,操作那些原始 Arrow 字节会变得非常棘手。尽管如此,对于任何 ArrowVector 类型的对象,只需调用 copy(x) 即可将其完全具体化为一个普通的 Julia Vector 类型(之后就可以修改/操作值了)。

那么,一个装满了数据的 Arrow.Table 能做什么呢?实际上有很多!

因为 Arrow.Table 实现了 Tables.jl 接口,它为使用 Arrow 数据开辟了与各种工具集成的世界。以下是一些示例:

  • df = DataFrame(Arrow.Table(file)):构建一个 DataFrame,使用 Arrow 向量本身;这允许直接在 Arrow 数据上利用 DataFrames.jl 的大量功能;分组、连接、选择等。
  • df = copy(DataFrame(Arrow.Table(file))):构建一个 DataFrame,其中列是常规的内存中向量(特别是 Base.Vector 和/或 PooledVector)。这要求您有足够的内存将整个 DataFrame 加载到内存中。
  • Tables.datavaluerows(Arrow.Table(file)) |> @map(...) |> @filter(...) |> DataFrame:使用 Query.jl 的行处理实用程序直接对 Arrow 数据进行映射、分组、过滤、突变等。
  • Arrow.Table(file) |> SQLite.load!(db, "arrow_table"):将 Arrow 数据直接加载到 sqlite 数据库/表中,可以在数据上执行 SQL 查询
  • Arrow.Table(file) |> CSV.write("arrow.csv"):将 Arrow 数据写入 CSV 文件

有关利用 Tables.jl 接口的 Julia 包的完整列表,请参阅此处

除了让其他包尽情享用之外,Arrow.Table 本身也很有用。例如,使用 tbl = Arrow.Table(file)

  • tbl[1]:通过索引检索第一列;可以通过 length(tbl) 查询列数
  • tbl[:col1]tbl.col1:通过使用列名作为 Symbol 进行索引,或通过“点访问”检索名为 col1 的列
  • for col in tbl:遍历表中的列
  • AbstractDict 方法,如 haskey(tbl, :col1)get(tbl, :col1, nothing)keys(tbl)values(tbl)

Arrow 类型

在 Arrow 数据格式中,支持特定的逻辑类型,其列表可以在此处找到。这些包括布尔值、各种位宽的整数、浮点数、十进制数、时间类型以及二进制/字符串。虽然其中大多数自然地映射到 Julia 本身内置的类型,但在少数情况下,定义略有不同,在这些情况下,默认情况下,它们会被转换为更“友好”的 Julia 类型(可以通过向 Arrow.Table 传递 convert=false 来避免这种自动转换,例如 Arrow.Table(file; convert=false))。Arrow 到 Julia 类型映射的示例包括:

  • DateTimeTimestampDuration 在 Julia 中分别具有自然定义在 Dates.DateDates.TimeTimeZones.ZonedDateTimeDates.Period 子类型中。
  • CharSymbol Julia 类型映射到 Arrow 字符串类型,并带有原始 Julia 类型的附加元数据;这允许直接反序列化为 Julia 中的 CharSymbol,而其他语言实现只会将这些列视为字符串
  • 与上述情况类似,UUID Julia 类型映射到 128 位 FixedSizeBinary Arrow 类型。
  • Decimal128Decimal256 没有对应的内置 Julia 类型,因此使用 Arrow.jl 本身中兼容的类型定义进行反序列化:Arrow.Decimal

请注意,当传递 convert=false 时,数据将以 Arrow.jl 定义的类型返回,这些类型与 Arrow 定义完全匹配;关于每种类型如何表示其数据的权威来源可以在 Arrow 的Schema.fbs文件中找到。

关于性能的一个说明:当使用 Arrow.writeTimeZones.ZonedDateTime 列写入 Arrow 格式时,最好使用 Arrow.ToTimestamp(col)“包装”这些列,前提是该列的 ZonedDateTime 元素都具有共同的时区。这确保了写入过程可以“预先”知道将编码哪个时区,因此效率更高、性能更好。

自定义类型

为了支持写入自定义 Julia struct,Arrow.jl 利用该格式的“扩展类型”机制,允许在字段元数据中存储 Julia 类型名称和元数据。要“挂钩”到此机制中,自定义类型可以使用定义在 Arrow.ArrowTypes 子模块中的接口方法。例如:

using Arrow

struct Person
    id::Int
    name::String
end

# overload interface method for custom type Person; return a symbol as the "name"
# this instructs Arrow.write what "label" to include with a column with this custom type
ArrowTypes.arrowname(::Type{Person}) = :Person
# overload JuliaType on `Val{:Person}`, which is like a dispatchable string
# return our custom *type* Person; this enables Arrow.Table to know how the "label"
# on a custom column should be mapped to a Julia type and deserialized
ArrowTypes.JuliaType(::Val{:Person}) = Person

table = (col1=[Person(1, "Bob"), Person(2, "Jane")],)
io = IOBuffer()
Arrow.write(io, table)
seekstart(io)
table2 = Arrow.Table(io)

在此示例中,我们正在写入我们的 table,它是一个 NamedTuple,其中一列名为 col1,它有两个元素,分别是我们的自定义 Person struct 的实例。我们重载 Arrowtypes.arrowname,以便 Arrow.jl 知道如何序列化我们的 Person struct。然后我们重载 ArrowTypes.JuliaType,以便反序列化过程知道如何从我们的类型标签映射回我们的 Person struct 类型。然后我们可以将数据以 Arrow 格式写入内存中的 IOBuffer,然后使用 Arrow.Table 读回该表。我们得到的表将是一个 Arrow.Table,带有一个单 Arrow.Struct 列,其元素类型为 Person

请注意,如果不调用 Arrowtypes.JuliaType,我们可能会陷入一种奇怪的“中间”状态,即我们已经将包含 Person struct 的表写入,但在读回时,Arrow.jl 不知道 Person 是什么;反序列化不会失败,但我们只会得到一个 Namedtuple{(:id, :name), Tuple{Int, String}} 而不是 Person

虽然这个示例非常简单,但它展示了允许自定义类型序列化/反序列化的基础知识。但 ArrowTypes 模块提供了更强大的功能,用于将非原生 Arrow 类型“挂钩”到序列化/反序列化过程中。让我们再看几个示例;如果您对自定义类型的“花招”感到厌倦了,请随时跳到下一节。

让我们看看 Arrow.jl 如何允许序列化 nothing 值,在 Julia 中,nothing 通常被称为“软件工程师的 NULL”。虽然 Arrow.jl 将 missing 视为默认的 Arrow NULL 值,但 nothing 非常相似,但如果可能,我们仍然希望将其单独对待。以下是如何在 ArrowTypes 模块中启用序列化/反序列化的方法:

ArrowTypes.ArrowKind(::Type{Nothing}) = ArrowTypes.NullKind()
ArrowTypes.ArrowType(::Type{Nothing}) = Missing
ArrowTypes.toarrow(::Nothing) = missing
const NOTHING = Symbol("JuliaLang.Nothing")
ArrowTypes.arrowname(::Type{Nothing}) = NOTHING
ArrowTypes.JuliaType(::Val{NOTHING}) = Nothing
ArrowTypes.fromarrow(::Type{Nothing}, ::Missing) = nothing

让我们逐行看看这里发生了什么:

  • ArrowKind 重载:ArrowKind 是 Arrow 格式支持的类型的通用“类别”,例如 PrimitiveKindListKind 等。它们各自对应于 Arrow 格式支持的不同数据布局策略。在这里,我们将 nothing 的种类定义为 NullKind,这意味着不需要实际内存来存储,它严格来说是一种“元数据”类型,我们在其中存储类型和元素数量。在我们的 Person 示例中,我们不需要重载此项,因为声明为 struct Tmutable struct T 的类型默认被定义为 ArrowTypes.StructKind
  • ArrowType 重载:在这里我们表示我们的类型 (Nothing) 映射到原生的 Arrow 类型 Missing;这对序列化器很重要,这样它就知道要序列化哪种 Arrow 类型。同样,我们不需要为 Person 重载此项,因为序列化器知道如何自动序列化自定义 struct,方法是使用反射方法,如 fieldnames(T)getfield(x, i)
  • ArrowTypes.toarrow 重载:这是 ArrowType 的姊妹方法;我们说我们的类型将映射到 Missing Arrow 类型,所以在这里我们实际定义了 ___如何___ 将其转换为 Arrow 类型;在这种情况下,它只返回 missing。这也是 Person 不需要重载的另一个方法;为什么呢?正如我们在 ArrowType 中提到的,序列化器已经知道如何通过使用其所有字段来序列化自定义 struct;如果由于某种原因我们想省略一些字段或以其他方式转换内容,那么我们可以定义相应的 ArrowTypetoarrow 方法。
  • arrowname 重载:类似于我们的 Person 示例,我们需要指示序列化器如何标记 Arrow 类型元数据中的自定义类型;这里我们给它符号 Symbol("JuliaLang.Nothing")。请注意,虽然这最终允许我们在读取 Arrow 数据时区分 nothingmissing,但如果我们将此数据传递给其他语言实现,它们只会将数据视为 missing,因为它们(可能)不知道如何“理解”JuliaLang.Nothing 类型标签。
  • JuliaType 重载:同样,就像我们的 Person 示例一样,我们指示反序列化器在遇到 JuliaLang.Nothing 类型标签时,应将这些值视为 Nothing 类型。
  • 最后是 fromarrow 重载:这允许指定如何将原生 Arrow 数据转换回我们的自定义类型。默认情况下,fromarrow(T, x...) 会调用 T(x...),这就是我们不需要为 Person 重载此项的原因,但在本例中,Nothing(missing) 不会起作用,所以我们定义了自己的自定义转换。

让我们再通过一个复杂的例子,只是为了好玩,真正看看系统能被推到多远。

using Intervals
table = (col = [
    Interval{Closed,Unbounded}(1,nothing),
],)
const NAME = Symbol("JuliaLang.Interval")
ArrowTypes.arrowname(::Type{Interval{T, L, R}}) where {T, L, R} = NAME
const LOOKUP = Dict(
    "Closed" => Closed,
    "Unbounded" => Unbounded
)
ArrowTypes.arrowmetadata(::Type{Interval{T, L, R}}) where {T, L, R} = string(L, ".", R)
function ArrowTypes.JuliaType(::Val{NAME}, ::Type{NamedTuple{names, types}}, meta) where {names, types}
    L, R = split(meta, ".")
    return Interval{fieldtype(types, 1), LOOKUP[L], LOOKUP[R]}
end
ArrowTypes.fromarrow(::Type{Interval{T, L, R}}, first, last) where {T, L, R} = Interval{L, R}(first, R == Unbounded ? nothing : last)
io = Arrow.tobuffer(table)
tbl = Arrow.Table(io)

再次,让我们分解一下这里发生的事情:

  • 这里我们试图将一个 Interval 类型保存到 Arrow 格式中;该类型很独特,因为它有两个类型参数(ClosedUnbounded),它们不是通过字段推断的,而是仅仅是类型本身的“类型标签”。
  • 注意,我们定义了一个通用的 arrowname 方法,用于所有 Intervals,无论类型参数如何。我们只是想让 Arrow 知道我们正在处理哪种通用类型。
  • 接下来,我们使用一个新的方法 ArrowTypes.arrowmetadata 将两个非字段类型的参数编码为带有点分隔符的字符串;我们在这里编码此信息是因为请记住,我们需要在 JuliaType(::Val(name)) 定义中匹配我们的 arrowname 符号类型名,以便正确分派;如果我们把类型参数编码在 arrowname 中,我们就需要为每种类型参数的唯一组合定义单独的 arrowname 定义,以及相应的 JuliaType 定义;太麻烦了。相反,我们让 arrowname 对我们的类型是通用的,并使用 arrowmetadata 来存储特定于此列的类型参数。
  • 现在在 JuliaType 中,注意我们使用了三参数重载;我们想要序列化 Interval 的原生 Arrow 类型,即 NamedTuple 类型;我们用它来检索 Interval 的第一个类型参数,它就是两个字段 firstlast 的类型。然后我们使用第三个参数,即我们从 arrowmetadata 返回的任何字符串。我们调用 L, R = split(meta, ".") 来解析两个类型参数(在本例中为 ClosedUnbounded),然后从预定义的 LOOKUP 字典中查找这些字符串,该字典将类型参数名称(作为字符串)与其对应的实际类型相匹配。然后我们拥有了重建完整的 Interval 类型所需的所有信息。真棒!
  • 最后一个小问题是在我们的 fromarrow 方法中;UnboundedIntervals 实际上将 nothing 作为第二个参数。因此,让默认的 fromarrow 定义调用 Interval{T, L, R}(first, last)(其中 firstlast 都是整数)将不起作用。因此,我们检查 R 类型参数是否为 Unbounded,如果是,则传递 nothing 作为第二个参数,否则我们可以传递 last

如果你盯着看久了,这些东西绝对会让你眼花缭乱。一如既往,请随时通过 #data slack 频道或打开一个新 issue 详细说明您要完成的工作,以获得快速问题的帮助。

Arrow.Stream

除了 Arrow.Table 之外,Arrow.jl 包还提供 Arrow.Stream 用于处理 Arrow 数据。虽然 Arrow.Table 会迭代 Arrow 文件/流中的所有记录批次,并将列连接起来,但 Arrow.Stream 提供了一种逐个迭代记录批次的方法。每次迭代都会产生一个 Arrow.Table 实例,其中包含单个记录批次(的列/数据)。如果需要,这允许一次一个记录批次地对 Arrow 数据进行“批处理”处理,而不是像 Arrow.Table 那样创建一个单一的长表。

自定义应用程序元数据

Arrow 格式允许数据生产者附加自定义元数据到各种 Arrow 对象。

Arrow.jl 通过 Arrow.getmetadata 提供了一个便捷的访问器。Arrow.getmetadata(t::Arrow.Table) 将返回一个不可变的 AbstractDict{String,String},表示Schema 关联的 custom_metadata(如果不存在此类元数据,则返回 nothing),而 Arrow.getmetadata(c::Arrow.ArrowVector) 将返回Field 关联的 custom_metadata的类似表示(如果不存在此类元数据,则返回 nothing)。

要在序列化时向 Arrow 表附加自定义 Schema/列元数据,请参阅 Arrow.writemetadatacolmetadata 关键字参数。

写入 Arrow 数据

好的,这对读取 Arrow 数据做了一个很好的概述,但是如何生成 Arrow 数据呢?Arrow.write 应运而生。

Arrow.write

使用 Arrow.write,您可以提供一个 io::IO 参数或一个 file_path 来写入 Arrow 数据,以及一个兼容 Tables.jl 的源,其中包含要写入的数据。

哪些是兼容 Tables.jl 的源的示例?以下是一些示例:

  • Arrow.write(io, df::DataFrame)DataFrame 是可索引列的集合
  • Arrow.write(io, CSV.File(file)):从 CSV 文件读取数据并以 Arrow 格式写入
  • Arrow.write(io, DBInterface.execute(db, sql_query)):通过DBInterface.jl接口对数据库执行 SQL 查询,并将查询结果集直接以 Arrow 格式写入。实现 DBInterface 的包包括SQLite.jlMySQL.jlODBC.jl
  • df |> @map(...) |> Arrow.write(io):将 Query.jl 操作链的结果直接写入 Arrow 数据
  • jsontable(json) |> Arrow.write(io):将 JSON 对象数组或数组对象视为“表”,并使用JSONTables.jl包将其写入 Arrow 数据
  • Arrow.write(io, (col1=data1, col2=data2, ...))NamedTupleAbstractVectors 或 AbstractVectorNamedTuples 默认都被视为表,因此如果您已经有数据列,可以快速构建它们以便于写入 Arrow 数据

这些只是众多集成中的几个示例。

除了仅仅将单个数据“表”写入单个 Arrow 记录批次之外,Arrow.write 还支持在输入支持 Tables.partitions 功能时写入多个记录批次。一个直接的,虽然可能不是非常实用的例子是 Arrow.StreamArrow.Stream 在迭代“表”(特别是 Arrow.Table)时实现了 Tables.partitions,因此 Arrow.write 将迭代 Arrow.Stream,并将每个 Arrow.Table 写入为单独的记录批次。另一个重要的点是为什么这个例子有效,是因为 Arrow.Stream 迭代的 Arrow.Table 都具有相同的 Schema。这很重要,因为在写入 Arrow 数据时,始终首先写入一个“Schema”消息,所有后续的记录批次都写入与初始 Schema 匹配的数据。

除了支持 Tables.partitions 的输入外,请注意 Tables.jl 本身提供了 Tables.partitioner 函数,它允许您提供自己独立的、具有相似 Schema 的表作为“分区”,例如:

# treat 2 separate NamedTuples of vectors with same schema as 1 table, 2 partitions
tbl_parts = Tables.partitioner([(col1=data1, col2=data2), (col1=data3, col2=data4)])
Arrow.write(io, tbl_parts)

# treat an array of csv files with same schema where each file is a partition
# in this form, a function `CSV.File` is applied to each element of 2nd argument
csv_parts = Tables.partitioner(CSV.File, csv_files)
Arrow.write(io, csv_parts)

Arrow.Writer

使用 Arrow.Writer,您可以实例化一个 Arrow.Writer 对象,使用它写入源,然后关闭它。这允许对同一接收器进行增量写入。它类似于 Arrow.append,但无需在每次写入之间关闭和重新打开接收器,并且没有仅支持 IPC 流格式的限制。

多线程写入

默认情况下,Arrow.write 将使用多个线程同时写入多个记录批次(例如,如果 Julia 是用 julia -t 8 启动的或设置了 JULIA_NUM_THREADS 环境变量)。可用于写入的并发任务数可以通过将 ntasks 关键字参数传递给 Arrow.write 来控制。传递 ntasks=1 会避免在写入时的任何多线程。

压缩

写入时通过 compress 关键字参数支持压缩。可能的值包括 :lz4:zstd,或您自己初始化的 LZ4FrameCompressorZstdCompressor 对象;这将导致每个记录批次中的所有缓冲区使用相应的压缩编码或压缩器。