跳到内容

本文介绍由 arrow 提供的各种数据对象类型,并记录这些对象的结构。

arrow 包提供了几个对象类,用于表示数据。RecordBatchTableDataset 对象是二维矩形数据结构,用于存储表格数据。对于柱状、一维数据,提供了 ArrayChunkedArray 类。最后,Scalar 对象表示单个值。下表总结了这些对象,并展示了如何使用 R6 类对象创建新实例,以及以更传统的 R 风格提供相同功能的便捷函数

维度 如何创建实例 便捷函数
0 标量 Scalar$create(值, 类型)
1 数组 Array$create(向量, 类型) as_arrow_array(x)
1 分块数组 ChunkedArray$create(..., 类型) chunked_array(..., 类型)
2 记录批次 RecordBatch$create(...) record_batch(...)
2 表格 Table$create(...) arrow_table(...)
2 数据集 Dataset$create(来源, 模式) open_dataset(来源, 模式)

稍后在本文中,我们将更详细地了解这些对象。现在我们注意到,这些对象类中的每一个都对应于底层 Arrow C++ 库中同名的类。

除了这些数据对象之外,arrow 还定义了以下类来表示元数据

  • SchemaField 对象的一个列表,用于描述表格数据对象的结构;其中
  • Field 指定一个字符串名称和一个 DataType;并且
  • DataType 是一个控制值如何表示的属性

这些元数据对象在确保数据正确表示方面发挥着重要作用,并且所有三种表格数据对象类型(记录批次、表格和数据集)都包含显式的 Schema 对象,用于表示元数据。要了解有关这些元数据类的更多信息,请参见元数据文章

标量

Scalar 对象只是一个可以具有任何类型的单个值。它可以是整数、字符串、时间戳或 Arrow 支持的任何不同的 DataType 对象。arrow R 包的大多数用户不太可能直接创建标量,但如果需要,您可以通过调用 Scalar$create() 方法来执行此操作

Scalar$create("hello")
## Scalar
## hello

数组

Array 对象是 Scalar 值的有序集合。与 Scalars 一样,大多数用户不需要直接创建 Arrays,但如果需要,有一个 Array$create() 方法允许您创建新的 Arrays

integer_array <- Array$create(c(1L, NA, 2L, 4L, 8L))
integer_array
## Array
## <int32>
## [
##   1,
##   null,
##   2,
##   4,
##   8
## ]
string_array <- Array$create(c("hello", "amazing", "and", "cruel", "world"))
string_array
## Array
## <string>
## [
##   "hello",
##   "amazing",
##   "and",
##   "cruel",
##   "world"
## ]

可以使用方括号对数组进行子集化,如下所示

string_array[4:5]
## Array
## <string>
## [
##   "cruel",
##   "world"
## ]

数组是不可变对象:一旦创建了数组,就无法修改或扩展它。

分块数组

在实践中,arrow R 包的大多数用户可能使用分块数组而不是简单数组。在底层,分块数组是一个或多个数组的集合,可以像好像它们是单个数组一样进行索引。Arrow 提供此功能的原因在数据对象布局文章中进行了描述,但就目前而言,足以注意到分块数组在常规数据分析中的行为类似于数组。

为了说明这一点,让我们使用 chunked_array() 函数

chunked_string_array <- chunked_array(
  string_array,
  c("I", "love", "you")
)

chunked_array() 函数只是 ChunkedArray$create() 提供的功能的包装器。让我们打印对象

chunked_string_array
## ChunkedArray
## <string>
## [
##   [
##     "hello",
##     "amazing",
##     "and",
##     "cruel",
##     "world"
##   ],
##   [
##     "I",
##     "love",
##     "you"
##   ]
## ]

此输出中的双重括号旨在突出显示分块数组是一个或多个数组的包装器。但是,尽管由多个不同的数组组成,但可以将分块数组索引为好像它们在单个“类似向量”的对象中首尾相连。如下所示

我们可以使用 chunked_string_array 来说明这一点

chunked_string_array[4:7]
## ChunkedArray
## <string>
## [
##   [
##     "cruel",
##     "world"
##   ],
##   [
##     "I",
##     "love"
##   ]
## ]

需要注意的重要一点是,“分块”在语义上没有意义。这只是一个实现细节:用户永远不应将块视为有意义的单元。例如,将数据写入磁盘通常会导致数据被组织成不同的块。同样,包含相同值的两个分块数组被分配给不同的块被认为是等效的。为了说明这一点,我们可以创建一个分块数组,其中包含与 chunked_string_array[4:7] 相同的四个相同值,但组织成一个块而不是分成两个

cruel_world <- chunked_array(c("cruel", "world", "I", "love"))
cruel_world
## ChunkedArray
## <string>
## [
##   [
##     "cruel",
##     "world",
##     "I",
##     "love"
##   ]
## ]

使用 == 测试相等性会产生逐个元素的比较,结果是一个新的分块数组,其中包含四个(布尔类型)true

cruel_world == chunked_string_array[4:7]
## ChunkedArray
## <bool>
## [
##   [
##     true,
##     true,
##     true,
##     true
##   ]
## ]

简而言之,其目的是让用户与分块数组进行交互,就好像它们是普通的一维数据结构一样,而无需过多考虑底层的分块安排。

分块数组是可变的,在特定的意义上:可以从分块数组中添加和删除数组。

记录批次

记录批次是由命名数组组成的表格数据结构,以及一个随附的 Schema,用于指定与每个数组关联的名称和数据类型。记录批次是 Arrow 中数据交换的基本单元,但通常不用于数据分析。表格和数据集在分析上下文中通常更方便。

这些数组可以是不同的类型,但必须都具有相同的长度。每个数组都称为记录批次的“字段”或“列”之一。您可以使用 record_batch() 函数或使用 RecordBatch$create() 方法创建记录批次。这些函数很灵活,可以接受多种格式的输入:您可以传递数据帧、一个或多个命名向量、输入流,甚至包含适当二进制数据的原始向量。例如

rb <- record_batch(
  strs = string_array,
  ints = integer_array,
  dbls = c(1.1, 3.2, 0.2, NA, 11)
)
rb
## RecordBatch
## 5 rows x 3 columns
## $strs <string>
## $ints <int32>
## $dbls <double>

这是一个包含 5 行 3 列的记录批次,其概念结构如下所示

arrow 包为记录批次对象提供了一个 $ 方法,用于按名称提取单个列

rb$strs
## Array
## <string>
## [
##   "hello",
##   "amazing",
##   "and",
##   "cruel",
##   "world"
## ]

您可以使用双括号 [[ 按位置引用列。rb$ints 数组是我们记录批次中的第二列,因此我们可以使用它来提取它

rb[[2]]
## Array
## <int32>
## [
##   1,
##   null,
##   2,
##   4,
##   8
## ]

还有一个 [ 方法,允许您以与数据帧相同的方式提取记录批次的子集。命令 rb[1:3, 1:2] 提取前三行和前两列

rb[1:3, 1:2]
## RecordBatch
## 3 rows x 2 columns
## $strs <string>
## $ints <int32>

记录批次无法连接:因为它们由数组组成,而数组是不可变对象,所以一旦创建记录批次,就无法向其添加新行。

表格

表格由命名分块数组组成,其方式与记录批次由命名数组组成的方式相同。与记录批次一样,表格包含一个显式的 Schema,用于指定每个分块数组的名称和数据类型。

您可以使用 $[[[ 对表格进行子集化,其方式与对记录批次进行子集化的方式相同。与记录批次不同,表格可以连接(因为它们由分块数组组成)。假设到达第二个记录批次

new_rb <- record_batch(
  strs = c("I", "love", "you"),
  ints = c(5L, 0L, 0L),
  dbls = c(7.1, -0.1, 2)
)

不可能创建一个将 new_rb 中的数据附加到 rb 中的数据的记录批次,而不在内存中创建全新的对象。但是,使用表格,我们可以

df <- arrow_table(rb)
new_df <- arrow_table(new_rb)

现在我们将数据集的两个片段表示为表格。表格和记录批次之间的区别在于,列都表示为分块数组。原始记录批次中的每个数组都是表格中相应分块数组中的一个块

rb$strs
## Array
## <string>
## [
##   "hello",
##   "amazing",
##   "and",
##   "cruel",
##   "world"
## ]
df$strs
## ChunkedArray
## <string>
## [
##   [
##     "hello",
##     "amazing",
##     "and",
##     "cruel",
##     "world"
##   ]
## ]

它是相同的基础数据 – 实际上,同一个不可变数组被两者引用 – 只是被一个新的、灵活的分块数组包装器包围。但是,正是这个包装器使我们能够连接表格

concat_tables(df, new_df)
## Table
## 8 rows x 3 columns
## $strs <string>
## $ints <int32>
## $dbls <double>

结果对象在下图中以示意图方式显示

请注意,新表格中的分块数组保留了这种分块结构,因为没有移动任何原始数组

df_both <- concat_tables(df, new_df)
df_both$strs
## ChunkedArray
## <string>
## [
##   [
##     "hello",
##     "amazing",
##     "and",
##     "cruel",
##     "world"
##   ],
##   [
##     "I",
##     "love",
##     "you"
##   ]
## ]

数据集

与记录批次和表格对象一样,数据集用于表示表格数据。在抽象级别上,数据集可以被视为由行和列组成的对象,并且与记录批次和表格一样,它包含一个显式的 Schema,用于指定与每一列关联的名称和数据类型。

但是,表格和记录批次是在内存中显式表示的数据,而数据集则不是。相反,数据集是一个抽象,它引用存储在一个或多个文件中的磁盘上的数据。存储在数据文件中的值作为批量过程加载到内存中。加载仅在需要时发生,并且仅在针对数据执行查询时发生。在这方面,Arrow 数据集与 Arrow 表格是一种非常不同的对象,但用于分析它们的 dplyr 命令本质上是相同的。在本节中,我们将讨论数据集的结构。如果您想了解有关分析数据集的实际细节的更多信息,请参见有关分析多文件数据集的文章。

磁盘上的数据文件

简化到最简单的形式,Dataset 的磁盘结构仅仅是数据文件的集合,每个文件存储数据的一个子集。这些子集有时被称为“片段”(fragments),而分区过程有时被称为“分片”(sharding)。按照惯例,这些文件被组织成一种称为 Hive 风格分区的文件夹结构:有关详细信息,请参阅 hive_partition()

为了说明它是如何工作的,让我们手动将一个多文件数据集写入磁盘,而不使用任何 Arrow Dataset 功能来完成这项工作。我们将从三个小数据框开始,每个数据框包含我们要存储的数据的一个子集

df_a <- data.frame(id = 1:5, value = rnorm(5), subset = "a")
df_b <- data.frame(id = 6:10, value = rnorm(5), subset = "b")
df_c <- data.frame(id = 11:15, value = rnorm(5), subset = "c")

我们的意图是,每个数据框都应该存储在一个单独的数据文件中。正如你所看到的,这是一个结构化的分区:所有 subset = "a" 的数据属于一个文件,所有 subset = "b" 的数据属于另一个文件,所有 subset = "c" 的数据属于第三个文件。

第一步是定义和创建一个将保存所有文件的文件夹

ds_dir <- "mini-dataset"
dir.create(ds_dir)

下一步是手动创建 Hive 风格的文件夹结构

ds_dir_a <- file.path(ds_dir, "subset=a")
ds_dir_b <- file.path(ds_dir, "subset=b")
ds_dir_c <- file.path(ds_dir, "subset=c")

dir.create(ds_dir_a)
dir.create(ds_dir_b)
dir.create(ds_dir_c)

请注意,我们以“key=value”格式命名了每个文件夹,这准确地描述了将写入该文件夹的数据子集。这种命名结构是 Hive 风格分区的本质。

现在我们有了文件夹,我们将使用 write_parquet() 为每个子集创建一个单独的 parquet 文件

write_parquet(df_a, file.path(ds_dir_a, "part-0.parquet"))
write_parquet(df_b, file.path(ds_dir_b, "part-0.parquet"))
write_parquet(df_c, file.path(ds_dir_c, "part-0.parquet"))

如果我们想,我们可以进一步细分数据集。如果需要,一个文件夹可以包含多个文件(part-0.parquetpart-1.parquet 等)。 同样,没有特别的理由以 part-0.parquet 这种方式命名文件:如果我们愿意,也可以将这些文件称为 subset-a.parquetsubset-b.parquetsubset-c.parquet。如果我们愿意,我们可以编写其他文件格式,并且我们不一定必须使用 Hive 风格的文件夹。你可以通过阅读 open_dataset() 的帮助文档了解更多关于支持的格式,并通过 help("Dataset", package = "arrow") 了解如何进行细粒度控制。

无论如何,我们已经使用 Hive 风格的分区创建了一个磁盘上的 parquet Dataset。我们的 Dataset 由这些文件定义

list.files(ds_dir, recursive = TRUE)
## [1] "subset=a/part-0.parquet" "subset=b/part-0.parquet"
## [3] "subset=c/part-0.parquet"

为了验证一切正常,让我们用 open_dataset() 打开数据,并调用 glimpse() 来检查其内容

ds <- open_dataset(ds_dir)
glimpse(ds)
## FileSystemDataset with 3 Parquet files
## 15 rows x 3 columns
## $ id      <int32> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
## $ value  <double> -1.400043517, 0.255317055, -2.437263611, -0.005571287, 0.62155~
## $ subset <string> "a", "a", "a", "a", "a", "b", "b", "b", "b", "b", "c", "c", "c~
## Call `print()` for full schema details

正如你所看到的,ds Dataset 对象聚合了三个单独的数据文件。事实上,在这种特殊情况下,Dataset 非常小,以至于所有三个文件的值都出现在 glimpse() 的输出中。

应该注意的是,在日常数据分析工作中,你不需要以这种方式手动编写数据文件。上面的例子完全是为了说明问题。可以使用以下命令创建完全相同的数据集

ds |>
  group_by(subset) |>
  write_dataset("mini-dataset")

事实上,即使 ds 恰好引用一个大于内存的数据源,这个命令仍然应该有效,因为 Dataset 功能的编写是为了确保在此类管道期间,数据被分段加载,以避免耗尽内存。

Dataset 对象

在上一节中,我们检查了 Dataset 的磁盘结构。现在我们转向 Dataset 对象本身的内存结构(即上一个例子中的 ds)。创建 Dataset 对象时,arrow 搜索数据集文件夹以查找适当的文件,但不加载这些文件的内容。这些文件的路径存储在活动绑定 ds$files

ds$files
## [1] "/build/r/vignettes/mini-dataset/subset=a/part-0.parquet"
## [2] "/build/r/vignettes/mini-dataset/subset=b/part-0.parquet"
## [3] "/build/r/vignettes/mini-dataset/subset=c/part-0.parquet"

调用 open_dataset() 时发生的另一件事是,为 Dataset 构建了一个显式的 Schema 并将其存储为 ds$schema

ds$schema
## Schema
## id: int32
## value: double
## subset: string
## 
## See $metadata for additional Schema metadata

默认情况下,此 Schema 仅通过检查第一个文件来推断,尽管可以在检查所有文件后构建统一的 schema。为此,在调用 open_dataset() 时设置 unify_schemas = TRUE。也可以使用 schema 参数来 open_dataset() 来显式指定 Schema(有关详细信息,请参阅 schema() 函数)。

读取数据的行为由 Scanner 对象执行。在使用 dplyr 界面分析 Dataset 时,你永远不需要手动构建 Scanner,但为了便于解释,我们在此处进行构建

scan <- Scanner$create(dataset = ds)

调用 ToTable() 方法将 Dataset(在磁盘上)物化为 Table(在内存中)

scan$ToTable()
## Table
## 15 rows x 3 columns
## $id <int32>
## $value <double>
## $subset <string>
## 
## See $metadata for additional Schema metadata

默认情况下,此扫描过程是多线程的,但如果需要,可以通过在调用 Scanner$create() 时设置 use_threads = FALSE 来禁用线程。

查询 Dataset

当针对 Dataset 执行查询时,将启动新的扫描并将结果拉回到 R 中。例如,考虑以下 dplyr 表达式

ds |>
  filter(value > 0) |>
  mutate(new_value = round(100 * value)) |>
  select(id, subset, new_value) |>
  collect()
## # A tibble: 6 x 3
##      id subset new_value
##   <int> <chr>      <dbl>
## 1     2 a             26
## 2     5 a             62
## 3     6 b            115
## 4    12 c             63
## 5    13 c            207
## 6    15 c             51

我们可以通过指定 filterprojection 参数来 Scanner$create() 来创建新的扫描,从而使用底层 Dataset 接口来复制此操作。要使用这些参数,你需要了解一些关于 Arrow 表达式的知识,为此你可能会发现阅读 help("Expression", package = "arrow") 中的帮助文档很有用。

下面定义的 scanner 模仿了上面显示的 dplyr 管道,

scan <- Scanner$create(
  dataset = ds,
  filter = Expression$field_ref("value") > 0,
  projection = list(
    id = Expression$field_ref("id"),
    subset = Expression$field_ref("subset"),
    new_value = Expression$create("round", 100 * Expression$field_ref("value"))
  )
)

如果我们调用 as.data.frame(scan$ToTable()),它将产生与 dplyr 版本相同的结果,尽管行可能不会以相同的顺序出现。

为了更好地了解查询执行时发生的情况,我们将在此处调用 scan$ScanBatches()。与 ToTable() 方法非常相似,ScanBatches() 方法针对每个文件单独执行查询,但它返回 Record Batches 的列表,每个文件一个。此外,我们将单独将这些 Record Batches 转换为数据帧

lapply(scan$ScanBatches(), as.data.frame)
## [[1]]
##   id subset new_value
## 1  2      a        26
## 2  5      a        62
## 
## [[2]]
##   id subset new_value
## 1  6      b       115
## 
## [[3]]
##   id subset new_value
## 1 12      c        63
## 2 13      c       207
## 3 15      c        51

如果我们返回到之前所做的 dplyr 查询,并使用 compute() 返回一个 Table,而不是使用 collect() 返回一个数据帧,我们可以看到此过程的工作原理的证据。Table 对象是通过连接在针对三个数据文件执行查询时生成的三个 Record Batches 来创建的,因此,定义 Table 列的 Chunked Array 反映了数据文件中存在的分区结构

tbl <- ds |>
  filter(value > 0) |>
  mutate(new_value = round(100 * value)) |>
  select(id, subset, new_value) |>
  compute()

tbl$subset
## ChunkedArray
## <string>
## [
##   [
##     "a",
##     "a"
##   ],
##   [
##     "b"
##   ],
##   [
##     "c",
##     "c",
##     "c"
##   ]
## ]

补充说明

  • 之前的讨论中忽略了一个区别,即 FileSystemDatasetInMemoryDataset 对象之间的区别。通常情况下,组成 Dataset 的数据存储在磁盘上的文件中。毕竟,这是 Dataset 相对于 Table 的主要优势。但是,在某些情况下,从已经存储在内存中的数据创建 Dataset 可能很有用。在这种情况下,创建的对象将具有 InMemoryDataset 类型。

  • 之前的讨论假设存储在 Dataset 中的所有文件都具有相同的 Schema。通常情况下,这是正确的,因为每个文件在概念上都是单个矩形表的一个子集。但这并非绝对要求。

有关这些主题的更多信息,请参阅 help("Dataset", package = "arrow")

进一步阅读

  • 要了解有关数组内部结构的更多信息,请参阅有关 数据对象布局的文章。
  • 要了解更多关于 Arrow 使用的不同数据类型的信息,请参阅关于 数据类型的文章。
  • 要了解有关 Arrow 对象如何实现的更多信息,请参阅 Arrow 规范页面。