跳转到内容

本文描述了 arrow 提供的各种数据对象类型,并记录了这些对象的结构。

arrow 包提供了几个用于表示数据的对象类。RecordBatchTableDataset 对象是用于存储表格数据的二维矩形数据结构。 对于列式一维数据,提供了 ArrayChunkedArray 类。 最后,Scalar 对象表示单个值。 下表总结了这些对象,并展示了如何使用 R6 类对象创建新实例,以及以更传统的类似 R 的方式提供相同功能的便捷函数。

维度 如何创建实例 便捷函数
0 标量 Scalar$create(value, type)
1 数组 Array$create(vector, type) as_arrow_array(x)
1 分块数组 ChunkedArray$create(..., type) chunked_array(..., type)
2 记录批次 RecordBatch$create(...) record_batch(...)
2 Table$create(...) arrow_table(...)
2 数据集 Dataset$create(sources, schema) open_dataset(sources, schema)

在本文的后面,我们将更详细地介绍每个对象。 现在我们注意到,这些对象类中的每一个都对应于底层 Arrow C++ 库中同名的类。

除了这些数据对象之外,arrow 还定义了以下用于表示元数据的类:

  • Schema 是用于描述表格数据对象结构的 Field 对象列表;其中
  • Field 指定一个字符串名称和一个 DataType;并且
  • DataType 是一个控制值如何表示的属性。

这些元数据对象在确保正确表示数据方面发挥着重要作用,所有三种表格数据对象类型(记录批次、表和数据集)都包含用于表示元数据的显式 Schema 对象。 要了解有关这些元数据类的更多信息,请参阅元数据文章

标量

标量对象只是一个可以是任何类型的单个值。 它可以是整数、字符串、时间戳或 Arrow 支持的任何其他 DataType 对象。 arrow R 包的大多数用户不太可能直接创建标量,但如果需要,您可以通过调用 Scalar$create() 方法来创建。

Scalar$create("hello")
## Scalar
## hello

数组

数组对象是有序的标量值集合。 与标量一样,大多数用户不需要直接创建数组,但如果需要,可以使用 Array$create() 方法创建新的数组。

integer_array <- Array$create(c(1L, NA, 2L, 4L, 8L))
integer_array
## Array
## <int32>
## [
##   1,
##   null,
##   2,
##   4,
##   8
## ]
string_array <- Array$create(c("hello", "amazing", "and", "cruel", "world"))
string_array
## Array
## <string>
## [
##   "hello",
##   "amazing",
##   "and",
##   "cruel",
##   "world"
## ]

可以使用方括号对数组进行子集化,如下所示。

string_array[4:5]
## Array
## <string>
## [
##   "cruel",
##   "world"
## ]

数组是不可变对象:一旦创建了数组,就不能修改或扩展它。

分块数组

在实践中,arrow R 包的大多数用户可能会使用分块数组而不是简单数组。 在底层,分块数组是一个或多个数组的集合,可以像单个数组一样对其进行索引。 数据对象布局文章中描述了 Arrow 提供此功能的原因,但就目前而言,只需注意分块数组在常规数据分析中的行为类似于数组就足够了。

为了说明这一点,让我们使用 chunked_array() 函数。

chunked_string_array <- chunked_array(
  string_array,
  c("I", "love", "you")
)

chunked_array() 函数只是 ChunkedArray$create() 提供的功能的包装器。 让我们打印该对象。

chunked_string_array
## ChunkedArray
## <string>
## [
##   [
##     "hello",
##     "amazing",
##     "and",
##     "cruel",
##     "world"
##   ],
##   [
##     "I",
##     "love",
##     "you"
##   ]
## ]

此输出中的双括号旨在强调分块数组是围绕一个或多个数组的包装器。 但是,尽管由多个不同的数组组成,但分块数组可以像它们在单个“类似向量”的对象中首尾相连一样进行索引。 如下所示。

我们可以使用 `chunked_string_array` 来演示这一点。

chunked_string_array[4:7]
## ChunkedArray
## <string>
## [
##   [
##     "cruel",
##     "world"
##   ],
##   [
##     "I",
##     "love"
##   ]
## ]

需要注意的重要一点是,“分块”在语义上没有意义。 它只是一个实现细节:用户永远不应该将块视为一个有意义的单元。 例如,将数据写入磁盘通常会导致数据被组织成不同的块。 同样,包含分配给不同块的相同值的两个分块数组被认为是等效的。 为了说明这一点,我们可以创建一个分块数组,它包含与 `chunked_string_array[4:7]` 相同的四个相同值,但组织成一个块而不是分成两个块。

cruel_world <- chunked_array(c("cruel", "world", "I", "love"))
cruel_world
## ChunkedArray
## <string>
## [
##   [
##     "cruel",
##     "world",
##     "I",
##     "love"
##   ]
## ]

使用 `==` 测试相等性会产生元素级比较,结果是一个包含四个(布尔类型)`true` 值的新分块数组。

cruel_world == chunked_string_array[4:7]
## ChunkedArray
## <bool>
## [
##   [
##     true,
##     true,
##     true,
##     true
##   ]
## ]

简而言之,其目的是让用户与分块数组交互,就好像它们是普通的一维数据结构一样,而无需过多考虑底层的分块排列。

分块数组是可变的,在特定意义上:可以从分块数组中添加和删除数组。

记录批次

记录批次是由命名数组和随附的 Schema 组成的表格数据结构,该 Schema 指定与每个数组关联的名称和数据类型。 记录批次是 Arrow 中数据交换的基本单元,但通常不用于数据分析。 表和数据集在分析上下文中通常更方便。

这些数组可以是不同类型的,但长度必须相同。 每个数组都被称为记录批次的“字段”或“列”之一。 您可以使用 ` record_batch()` 函数或使用 `RecordBatch$create()` 方法创建记录批次。 这些函数非常灵活,可以接受多种格式的输入:您可以传递数据框、一个或多个命名向量、输入流,甚至是包含适当二进制数据的原始向量。 例如:

rb <- record_batch(
  strs = string_array,
  ints = integer_array,
  dbls = c(1.1, 3.2, 0.2, NA, 11)
)
rb
## RecordBatch
## 5 rows x 3 columns
## $strs <string>
## $ints <int32>
## $dbls <double>

这是一个包含 5 行 3 列的记录批次,其概念结构如下所示。

arrow 包为 Record Batch 对象提供了一个 `$` 方法,用于按名称提取单个列。

rb$strs
## Array
## <string>
## [
##   "hello",
##   "amazing",
##   "and",
##   "cruel",
##   "world"
## ]

您可以使用双括号 `[[` 按位置引用列。 `rb$ints` 数组是我们记录批次中的第二列,因此我们可以使用它来提取它。

rb[[2]]
## Array
## <int32>
## [
##   1,
##   null,
##   2,
##   4,
##   8
## ]

还有一个`[`方法,允许您以与数据框相同的方式提取记录批次的子集。 命令 `rb[1:3, 1:2]` 提取前三行和前两列。

rb[1:3, 1:2]
## RecordBatch
## 3 rows x 2 columns
## $strs <string>
## $ints <int32>

记录批次不能连接:因为它们由数组组成,而数组是不可变对象,所以一旦创建就无法向记录批次添加新行。

表由命名的分块数组组成,就像记录批次由命名的数组组成一样。 与记录批次一样,表包含一个显式 Schema,用于指定每个分块数组的名称和数据类型。

您可以使用 `$`、`[[` 和 `[` 对表进行子集化,就像对记录批次一样。 与记录批次不同,表可以连接(因为它们由分块数组组成)。 假设第二个记录批次到达。

new_rb <- record_batch(
  strs = c("I", "love", "you"),
  ints = c(5L, 0L, 0L),
  dbls = c(7.1, -0.1, 2)
)

不可能创建一个将 `new_rb` 中的数据追加到 `rb` 中的数据的记录批次,除非在内存中创建全新的对象。 但是,使用表,我们可以这样做。

df <- arrow_table(rb)
new_df <- arrow_table(new_rb)

我们现在将数据集的两个片段表示为表。 表和记录批次之间的区别在于列都表示为分块数组。 原始记录批次中的每个数组都是表中相应分块数组中的一个块。

rb$strs
## Array
## <string>
## [
##   "hello",
##   "amazing",
##   "and",
##   "cruel",
##   "world"
## ]
df$strs
## ChunkedArray
## <string>
## [
##   [
##     "hello",
##     "amazing",
##     "and",
##     "cruel",
##     "world"
##   ]
## ]

它是相同的底层数据——实际上,相同的不可变数组被两者引用——只是被一个新的、灵活的分块数组包装器包围。 但是,正是这个包装器允许我们连接表。

concat_tables(df, new_df)
## Table
## 8 rows x 3 columns
## $strs <string>
## $ints <int32>
## $dbls <double>

结果对象如下所示。

请注意,新表中的分块数组保留了这种分块结构,因为原始数组都没有移动。

df_both <- concat_tables(df, new_df)
df_both$strs
## ChunkedArray
## <string>
## [
##   [
##     "hello",
##     "amazing",
##     "and",
##     "cruel",
##     "world"
##   ],
##   [
##     "I",
##     "love",
##     "you"
##   ]
## ]

数据集

与记录批次和表对象一样,数据集用于表示表格数据。 在抽象层面上,数据集可以被视为由行和列组成的对象,并且就像记录批次和表一样,它包含一个显式 Schema,用于指定与每列关联的名称和数据类型。

但是,表和记录批次是显式表示在内存中的数据,而数据集不是。 相反,数据集是一个抽象概念,指的是存储在一个或多个文件中的磁盘上的数据。 存储在数据文件中的值作为批处理加载到内存中. 仅在需要时才加载,并且仅在对数据执行查询时才加载。 在这方面,Arrow 数据集与 Arrow 表是截然不同的对象,但用于分析它们的 dplyr 命令基本相同。 在本节中,我们将讨论数据集的结构。 如果您想了解有关分析数据集的实际细节的更多信息,请参阅关于 分析多文件数据集 的文章。

磁盘上的数据文件

简而言之,数据集的磁盘结构只是一组数据文件,每个文件存储数据的一个子集。 这些子集有时被称为“片段”,分区过程有时被称为“分片”。 按照惯例,这些文件被组织成一个称为 Hive 样式分区的文件夹结构:有关详细信息,请参阅 ` hive_partition()`。

为了说明这是如何工作的,让我们手动将一个多文件数据集写入磁盘,而不使用任何 Arrow 数据集功能来完成这项工作。 我们将从三个小的数据框开始,每个数据框都包含我们要存储的数据的一个子集。

df_a <- data.frame(id = 1:5, value = rnorm(5), subset = "a")
df_b <- data.frame(id = 6:10, value = rnorm(5), subset = "b")
df_c <- data.frame(id = 11:15, value = rnorm(5), subset = "c")

我们的目标是将每个数据框存储在一个单独的数据文件中。 如您所见,这是一个非常结构化的分区:`subset = "a"` 的所有数据都属于一个文件,`subset = "b"` 的所有数据都属于另一个文件,`subset = "c"` 的所有数据都属于第三个文件。

第一步是定义并创建一个将保存所有文件的文件夹。

ds_dir <- "mini-dataset"
dir.create(ds_dir)

下一步是手动创建 Hive 样式文件夹结构。

ds_dir_a <- file.path(ds_dir, "subset=a")
ds_dir_b <- file.path(ds_dir, "subset=b")
ds_dir_c <- file.path(ds_dir, "subset=c")

dir.create(ds_dir_a)
dir.create(ds_dir_b)
dir.create(ds_dir_c)

请注意,我们已将每个文件夹命名为“键=值”格式,该格式精确地描述了将写入该文件夹的数据子集。这种命名结构是 Hive 风格分区的本质。

现在我们有了这些文件夹,我们将使用 write_parquet() 为三个子集中的每一个创建一个单独的 parquet 文件。

write_parquet(df_a, file.path(ds_dir_a, "part-0.parquet"))
write_parquet(df_b, file.path(ds_dir_b, "part-0.parquet"))
write_parquet(df_c, file.path(ds_dir_c, "part-0.parquet"))

如果我们愿意,我们可以进一步细分数据集。如果需要,一个文件夹可以包含多个文件(part-0.parquetpart-1.parquet 等)。同样,也没有特别的理由以这种方式命名文件 part-0.parquet:如果我们愿意,将这些文件称为 subset-a.parquetsubset-b.parquetsubset-c.parquet 也是可以的。如果我们愿意,我们可以写入其他文件格式,并且我们不一定要使用 Hive 风格的文件夹。您可以通过阅读 open_dataset() 的帮助文档来了解更多受支持的格式,并通过 help("Dataset", package = "arrow") 了解如何进行细粒度控制。

在任何情况下,我们都使用 Hive 风格的分区创建了一个磁盘上的 parquet 数据集。我们的数据集由以下文件定义

list.files(ds_dir, recursive = TRUE)
## [1] "subset=a/part-0.parquet" "subset=b/part-0.parquet"
## [3] "subset=c/part-0.parquet"

为了验证一切正常,让我们使用 open_dataset() 打开数据并调用 glimpse() 来检查其内容。

ds <- open_dataset(ds_dir)
glimpse(ds)
## FileSystemDataset with 3 Parquet files
## 15 rows x 3 columns
## $ id      <int32> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
## $ value  <double> -1.400043517, 0.255317055, -2.437263611, -0.005571287, 0.62155~
## $ subset <string> "a", "a", "a", "a", "a", "b", "b", "b", "b", "b", "c", "c", "c~
## Call `print()` for full schema details

如您所见,ds 数据集对象聚合了三个单独的数据文件。事实上,在这种特殊情况下,数据集非常小,以至于所有三个文件的值都出现在 glimpse() 的输出中。

应该注意的是,在日常数据分析工作中,您不需要以这种方式手动写入数据文件。上面的例子完全是为了说明目的。可以使用以下命令创建完全相同的数据集

ds |>
  group_by(subset) |>
  write_dataset("mini-dataset")

事实上,即使 ds 指的是大于内存的数据源,此命令仍然应该有效,因为数据集功能的编写是为了确保在此类管道期间分段加载数据,以避免耗尽内存。

数据集对象

在上一节中,我们检查了数据集的磁盘结构。现在我们转向数据集对象本身(即前一个示例中的 ds)的内存结构。创建数据集对象时,arrow 会搜索数据集文件夹以查找适当的文件,但不会加载这些文件的内容。这些文件的路径存储在活动绑定 ds$files 中。

ds$files
## [1] "/build/r/vignettes/mini-dataset/subset=a/part-0.parquet"
## [2] "/build/r/vignettes/mini-dataset/subset=b/part-0.parquet"
## [3] "/build/r/vignettes/mini-dataset/subset=c/part-0.parquet"

调用 open_dataset() 时发生的另一件事是为数据集构建显式模式并将其存储为 ds$schema

ds$schema
## Schema
## id: int32
## value: double
## subset: string
## 
## See $metadata for additional Schema metadata

默认情况下,此模式是通过仅检查第一个文件来推断的,但可以在检查所有文件后构建统一模式。为此,在调用 open_dataset() 时设置 unify_schemas = TRUE。也可以使用 schema 参数 open_dataset() 显式指定模式(有关详细信息,请参阅 schema() 函数)。

读取数据的行为由扫描器对象执行。使用 dplyr 接口分析数据集时,您永远不需要手动构造扫描器,但为了解释的目的,我们将在此处进行操作。

scan <- Scanner$create(dataset = ds)

调用 ToTable() 方法会将数据集(磁盘上)物化成表(内存中)。

scan$ToTable()
## Table
## 15 rows x 3 columns
## $id <int32>
## $value <double>
## $subset <string>
## 
## See $metadata for additional Schema metadata

默认情况下,此扫描过程是多线程的,但如果需要,可以通过在调用 Scanner$create() 时设置 use_threads = FALSE 来禁用线程。

查询数据集

当对数据集执行查询时,将启动新的扫描并将结果拉回到 R 中。例如,考虑以下 dplyr 表达式

ds |>
  filter(value > 0) |>
  mutate(new_value = round(100 * value)) |>
  select(id, subset, new_value) |>
  collect()
## # A tibble: 6 x 3
##      id subset new_value
##   <int> <chr>      <dbl>
## 1     2 a             26
## 2     5 a             62
## 3     6 b            115
## 4    12 c             63
## 5    13 c            207
## 6    15 c             51

我们可以通过指定 filterprojection 参数来创建新的扫描来使用低级数据集接口复制它到Scanner$create()。要使用这些参数,您需要了解一些关于 Arrow 表达式的知识,为此,您可能会发现阅读 help("Expression", package = "arrow") 中的帮助文档很有帮助。

下面定义的扫描器模仿了上面显示的 dplyr 管道,

scan <- Scanner$create(
  dataset = ds,
  filter = Expression$field_ref("value") > 0,
  projection = list(
    id = Expression$field_ref("id"),
    subset = Expression$field_ref("subset"),
    new_value = Expression$create("round", 100 * Expression$field_ref("value"))
  )
)

如果我们调用 as.data.frame(scan$ToTable()),它将产生与 dplyr 版本相同的结果,尽管行可能不会以相同的顺序出现。

为了更好地了解查询执行时会发生什么,我们将在这里调用 scan$ScanBatches()。与 ToTable() 方法非常相似,ScanBatches() 方法分别针对每个文件执行查询,但它返回一个记录批列表,每个文件一个。此外,我们将分别将这些记录批转换为数据帧。

lapply(scan$ScanBatches(), as.data.frame)
## [[1]]
##   id subset new_value
## 1  2      a        26
## 2  5      a        62
## 
## [[2]]
##   id subset new_value
## 1  6      b       115
## 
## [[3]]
##   id subset new_value
## 1 12      c        63
## 2 13      c       207
## 3 15      c        51

如果我们回到之前进行的 dplyr 查询,并使用 compute() 返回一个表而不是使用 collect() 返回一个数据帧,我们可以看到这个过程的证据在起作用。表对象是通过连接对三个数据文件执行查询时产生的三个记录批创建的,因此,定义表列的分块数组反映了数据文件中存在的分区结构。

tbl <- ds |>
  filter(value > 0) |>
  mutate(new_value = round(100 * value)) |>
  select(id, subset, new_value) |>
  compute()

tbl$subset
## ChunkedArray
## <string>
## [
##   [
##     "a",
##     "a"
##   ],
##   [
##     "b"
##   ],
##   [
##     "c",
##     "c",
##     "c"
##   ]
## ]

补充说明

  • 先前讨论中忽略的一个区别是 FileSystemDatasetInMemoryDataset 对象之间的区别。通常情况下,构成数据集的数据存储在磁盘上的文件中。毕竟,这是数据集优于表的首要优势。但是,在某些情况下,从已存储在内存中的数据创建数据集可能很有用。在这种情况下,创建的对象将具有 InMemoryDataset 类型。

  • 前面的讨论假设数据集中存储的所有文件都具有相同的模式。通常情况下,这是正确的,因为每个文件在概念上都是单个矩形表的一个子集。但这并非严格要求。

有关这些主题的更多信息,请参阅 help("Dataset", package = "arrow")

延伸阅读

  • 要了解有关数组内部结构的更多信息,请参阅关于 数据对象布局 的文章。
  • 要了解有关 Arrow 使用的不同数据类型的更多信息,请参阅关于 数据类型 的文章。
  • 要了解有关如何实现 Arrow 对象的更多信息,请参阅 Arrow 规范 页面。