跳至内容

本文介绍了 arrow 提供的各种数据对象类型,并记录了这些对象的结构。

arrow 包提供了几个用于表示数据的对象类。RecordBatchTableDataset 对象是用于存储表格数据的二维矩形数据结构。对于列式的一维数据,提供了 ArrayChunkedArray 类。最后,Scalar 对象表示单个值。下表总结了这些对象,并展示了如何使用 R6 类对象创建新实例,以及提供相同功能的更传统 R 式便利函数

维度 如何创建实例 便利函数
0 标量 Scalar$create(value, type)
1 数组 Array$create(vector, type) as_arrow_array(x)
1 分块数组 ChunkedArray$create(..., type) chunked_array(..., type)
2 记录批次 RecordBatch$create(...) record_batch(...)
2 Table$create(...) arrow_table(...)
2 数据集 Dataset$create(sources, schema) open_dataset(sources, schema)

在本文后面,我们将更详细地介绍每个对象。目前,我们注意到每个对象类都对应于底层 Arrow C++ 库中同名的类。

除了这些数据对象外,arrow 还定义了以下用于表示元数据的类

  • 一个 Schema 是一个 Field 对象列表,用于描述表格数据对象的结构;其中
  • 一个 Field 指定一个字符字符串名称和一个 DataType;以及
  • 一个 DataType 是一个控制值表示方式的属性

这些元数据对象在确保数据正确表示方面发挥着重要作用,并且所有三种表格数据对象类型(记录批次、表和数据集)都包含用于表示元数据的显式 Schema 对象。要了解有关这些元数据类的更多信息,请参阅 元数据文章

标量

标量对象只是一个可以是任何类型的单个值。它可以是整数、字符串、时间戳或 Arrow 支持的任何 DataType 对象。arrow R 包的大多数用户不太可能直接创建标量,但如果需要,您可以通过调用 Scalar$create() 方法来实现

Scalar$create("hello")
## Scalar
## hello

数组

数组对象是有序的标量值集。与标量一样,大多数用户不需要直接创建数组,但如果需要,有一个 Array$create() 方法允许您创建新的数组

integer_array <- Array$create(c(1L, NA, 2L, 4L, 8L))
integer_array
## Array
## <int32>
## [
##   1,
##   null,
##   2,
##   4,
##   8
## ]
string_array <- Array$create(c("hello", "amazing", "and", "cruel", "world"))
string_array
## Array
## <string>
## [
##   "hello",
##   "amazing",
##   "and",
##   "cruel",
##   "world"
## ]

可以使用方括号对数组进行子集化,如下所示

string_array[4:5]
## Array
## <string>
## [
##   "cruel",
##   "world"
## ]

数组是不可变对象:一旦创建了数组,就无法修改或扩展它。

分块数组

实际上,arrow R 包的大多数用户可能会使用分块数组而不是简单的数组。在后台,分块数组是一个或多个数组的集合,可以单个数组一样对其进行索引。Arrow 提供此功能的原因在 数据对象布局文章 中进行了描述,但就目前而言,注意到分块数组在常规数据分析中表现得像数组就足够了。

为了说明,让我们使用 chunked_array() 函数

chunked_string_array <- chunked_array(
  string_array,
  c("I", "love", "you")
)

chunked_array() 函数只是 ChunkedArray$create() 提供的功能的包装器。让我们打印该对象

chunked_string_array
## ChunkedArray
## <string>
## [
##   [
##     "hello",
##     "amazing",
##     "and",
##     "cruel",
##     "world"
##   ],
##   [
##     "I",
##     "love",
##     "you"
##   ]
## ]

此输出中的双括号旨在突出显示分块数组是围绕一个或多个数组的包装器这一事实。但是,尽管由多个不同的数组组成,但可以将分块数组索引为它们在一个“类似向量”的对象中首尾相接。如下所示

我们可以使用 chunked_string_array 来说明这一点

chunked_string_array[4:7]
## ChunkedArray
## <string>
## [
##   [
##     "cruel",
##     "world"
##   ],
##   [
##     "I",
##     "love"
##   ]
## ]

需要注意的一件重要事情是,“分块”在语义上没有意义。它只是一个实现细节:用户永远不应该将块视为有意义的单元。例如,将数据写入磁盘通常会导致数据被组织成不同的块。类似地,包含相同值的两个分块数组被分配到不同的块,被认为是等效的。为了说明这一点,我们可以创建一个分块数组,它包含与 chunked_string_array[4:7] 相同的四个相同的值,但组织成一个块而不是分成两个块

cruel_world <- chunked_array(c("cruel", "world", "I", "love"))
cruel_world
## ChunkedArray
## <string>
## [
##   [
##     "cruel",
##     "world",
##     "I",
##     "love"
##   ]
## ]

使用 == 进行相等性测试会产生逐元素比较,结果是一个包含四个(布尔类型)true 值的新分块数组

cruel_world == chunked_string_array[4:7]
## ChunkedArray
## <bool>
## [
##   [
##     true,
##     true,
##     true,
##     true
##   ]
## ]

简而言之,目的是让用户像对待普通的一维数据结构一样与分块数组交互,而无需过多考虑底层的分块安排。

分块数组是可变的,从特定意义上讲:可以向分块数组添加和删除数组。

记录批次

记录批次是由命名数组和伴随的 Schema 组成的表格数据结构,该 Schema 指定与每个数组关联的名称和数据类型。记录批次是 Arrow 中数据交换的基本单元,但通常不用于数据分析。在分析环境中,表和数据集通常更方便。

这些数组可以是不同类型,但必须都具有相同的长度。每个数组被称为记录批次的“字段”或“列”之一。您可以使用 record_batch() 函数或 RecordBatch$create() 方法创建记录批次。这些函数很灵活,可以接受多种格式的输入:您可以传递数据框、一个或多个命名向量、输入流,甚至包含适当二进制数据的原始向量。例如

rb <- record_batch(
  strs = string_array,
  ints = integer_array,
  dbls = c(1.1, 3.2, 0.2, NA, 11)
)
rb
## RecordBatch
## 5 rows x 3 columns
## $strs <string>
## $ints <int32>
## $dbls <double>

这是一个包含 5 行 3 列的记录批次,其概念结构如下所示

arrow 包为记录批次对象提供了 $ 方法,用于按名称提取单个列

rb$strs
## Array
## <string>
## [
##   "hello",
##   "amazing",
##   "and",
##   "cruel",
##   "world"
## ]

您可以使用双括号 [[ 按位置引用列。rb$ints 数组是我们的记录批次中的第二列,因此我们可以用它来提取它

rb[[2]]
## Array
## <int32>
## [
##   1,
##   null,
##   2,
##   4,
##   8
## ]

还有一个 [ 方法,允许您以与数据框相同的方式提取记录批次的子集。命令 rb[1:3, 1:2] 提取前三行和前两列

rb[1:3, 1:2]
## RecordBatch
## 3 rows x 2 columns
## $strs <string>
## $ints <int32>

记录批次无法连接:因为它们由数组组成,并且数组是不可变对象,所以一旦创建,就无法向记录批次添加新行。

表由命名分块数组组成,就像记录批次由命名数组组成一样。与记录批次一样,表包含一个显式 Schema,用于指定每个分块数组的名称和数据类型。

您可以使用 $[[[ 对表进行子集化,就像对记录批次一样。与记录批次不同,表可以连接(因为它们由分块数组组成)。假设第二个记录批次到达

new_rb <- record_batch(
  strs = c("I", "love", "you"),
  ints = c(5L, 0L, 0L),
  dbls = c(7.1, -0.1, 2)
)

无法创建将 new_rb 中的数据附加到 rb 中数据的记录批次,除非在内存中创建全新的对象。但是,使用表,我们可以

df <- arrow_table(rb)
new_df <- arrow_table(new_rb)

现在,我们有两个表示为表的数据库片段。表和记录批次之间的区别在于,所有列都表示为分块数组。原始记录批次中的每个数组都是表中相应分块数组中的一个块

rb$strs
## Array
## <string>
## [
##   "hello",
##   "amazing",
##   "and",
##   "cruel",
##   "world"
## ]
df$strs
## ChunkedArray
## <string>
## [
##   [
##     "hello",
##     "amazing",
##     "and",
##     "cruel",
##     "world"
##   ]
## ]

这是相同的基础数据——实际上,相同不可变的数组由两者引用——只是包含在一个新的、灵活的分块数组包装器中。但是,正是这个包装器允许我们连接表

concat_tables(df, new_df)
## Table
## 8 rows x 3 columns
## $strs <string>
## $ints <int32>
## $dbls <double>

结果对象在下图中以示意图形式显示

请注意,新表中的分块数组保留此分块结构,因为没有移动任何原始数组

df_both <- concat_tables(df, new_df)
df_both$strs
## ChunkedArray
## <string>
## [
##   [
##     "hello",
##     "amazing",
##     "and",
##     "cruel",
##     "world"
##   ],
##   [
##     "I",
##     "love",
##     "you"
##   ]
## ]

数据集

与记录批次和表对象一样,数据集用于表示表格数据。在抽象级别上,数据集可以被视为一个由行和列组成的对象,就像记录批次和表一样,它包含一个明确的 Schema,用于指定与每一列关联的名称和数据类型。

但是,表和记录批次是显式地表示在内存中的数据,而数据集则不是。相反,数据集是一个抽象概念,它引用一个或多个文件中存储在磁盘上的数据。存储在数据文件中的值以批处理过程加载到内存中。加载仅在需要时发生,并且仅在对数据执行查询时发生。在这方面,Arrow 数据集与 Arrow 表是一种非常不同的对象,但用于分析它们 dplyr 命令基本相同。在本节中,我们将讨论数据集的结构。如果您想了解有关分析数据集的实用细节的更多信息,请参阅有关 分析多文件数据集 的文章。

磁盘上的数据文件

简化到最简单的形式,数据集的磁盘结构只是一个数据文件的集合,每个文件存储数据的一个子集。这些子集有时称为“片段”,分区过程有时称为“分片”。按照约定,这些文件组织到一个称为 Hive 样式分区的文件夹结构中:有关详细信息,请参阅 hive_partition()

为了说明其工作原理,让我们手动将多文件数据集写入磁盘,而无需使用任何 Arrow 数据集功能来完成这项工作。我们将从三个小型数据框开始,每个数据框包含我们要存储的数据的一个子集

df_a <- data.frame(id = 1:5, value = rnorm(5), subset = "a")
df_b <- data.frame(id = 6:10, value = rnorm(5), subset = "b")
df_c <- data.frame(id = 11:15, value = rnorm(5), subset = "c")

我们的目的是将每个数据框存储在单独的数据文件中。如您所见,这是一个非常结构化的分区:所有 subset = "a" 的数据属于一个文件,所有 subset = "b" 的数据属于另一个文件,所有 subset = "c" 的数据属于第三个文件。

第一步是定义并创建一个将保存所有文件的文件夹

ds_dir <- "mini-dataset"
dir.create(ds_dir)

下一步是手动创建 Hive 样式文件夹结构

ds_dir_a <- file.path(ds_dir, "subset=a")
ds_dir_b <- file.path(ds_dir, "subset=b")
ds_dir_c <- file.path(ds_dir, "subset=c")

dir.create(ds_dir_a)
dir.create(ds_dir_b)
dir.create(ds_dir_c)

请注意,我们已使用“key=value”格式命名每个文件夹,该格式准确描述了将写入该文件夹的数据子集。此命名结构是 Hive 样式分区的本质。

现在我们有了文件夹,我们将使用write_parquet()为三个子集中的每一个创建一个单独的parquet文件。

write_parquet(df_a, file.path(ds_dir_a, "part-0.parquet"))
write_parquet(df_b, file.path(ds_dir_b, "part-0.parquet"))
write_parquet(df_c, file.path(ds_dir_c, "part-0.parquet"))

如果需要,我们可以进一步细分数据集。如果需要,一个文件夹可以包含多个文件(part-0.parquetpart-1.parquet等)。同样,也没有特别的理由将文件命名为part-0.parquet:如果需要,我们可以将这些文件命名为subset-a.parquetsubset-b.parquetsubset-c.parquet。如果需要,我们可以写入其他文件格式,并且我们不一定非要使用Hive风格的文件夹。您可以通过阅读open_dataset()的帮助文档来了解支持的格式,并了解如何使用help("Dataset", package = "arrow")进行细粒度的控制。

无论如何,我们已经使用Hive风格的分区创建了一个磁盘上的parquet数据集。我们的数据集由这些文件定义。

list.files(ds_dir, recursive = TRUE)
## [1] "subset=a/part-0.parquet" "subset=b/part-0.parquet"
## [3] "subset=c/part-0.parquet"

为了验证一切是否正常工作,让我们使用open_dataset()打开数据,并调用glimpse()来检查其内容。

ds <- open_dataset(ds_dir)
glimpse(ds)
## FileSystemDataset with 3 Parquet files
## 15 rows x 3 columns
## $ id      <int32> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15
## $ value  <double> -1.400043517, 0.255317055, -2.437263611, -0.005571287, 0.62155~
## $ subset <string> "a", "a", "a", "a", "a", "b", "b", "b", "b", "b", "c", "c", "c~
## Call `print()` for full schema details

如您所见,ds数据集对象聚合了三个单独的数据文件。事实上,在这个特定案例中,数据集非常小,以至于所有三个文件中的值都出现在glimpse()的输出中。

需要注意的是,在日常数据分析工作中,您不需要以这种方式手动写入数据文件。上面的示例完全是为了说明目的。可以使用以下命令创建完全相同的数据集。

ds |>
  group_by(subset) |>
  write_dataset("mini-dataset")

事实上,即使ds碰巧引用了一个大于内存的数据源,此命令也应该可以工作,因为数据集功能的编写是为了确保在此类管道期间按段加载数据以避免耗尽内存。

数据集对象

在上一节中,我们检查了数据集的磁盘结构。现在我们转向数据集对象本身的内存结构(即,前面示例中的ds)。创建数据集对象时,arrow会在数据集文件夹中搜索合适的文件,但不会加载这些文件的内容。这些文件的路径存储在一个活动绑定ds$files中。

ds$files
## [1] "/build/r/vignettes/mini-dataset/subset=a/part-0.parquet"
## [2] "/build/r/vignettes/mini-dataset/subset=b/part-0.parquet"
## [3] "/build/r/vignettes/mini-dataset/subset=c/part-0.parquet"

调用open_dataset()时发生的另一件事是,为数据集构建一个显式的Schema并将其存储为ds$schema

ds$schema
## Schema
## id: int32
## value: double
## subset: string
## 
## See $metadata for additional Schema metadata

默认情况下,此Schema仅通过检查第一个文件来推断,尽管可以在检查所有文件后构建统一的schema。为此,在调用open_dataset()时设置unify_schemas = TRUE。还可以使用schema参数传递给open_dataset()来显式指定Schema(有关详细信息,请参阅schema()函数)。

读取数据的操作由Scanner对象执行。使用dplyr接口分析数据集时,您无需手动构建Scanner,但出于解释目的,我们将在此处进行操作。

scan <- Scanner$create(dataset = ds)

调用ToTable()方法将数据集(磁盘上)具体化为表(内存中)。

scan$ToTable()
## Table
## 15 rows x 3 columns
## $id <int32>
## $value <double>
## $subset <string>
## 
## See $metadata for additional Schema metadata

此扫描过程默认情况下是多线程的,但如果需要,可以通过在调用Scanner$create()时设置use_threads = FALSE来禁用线程。

查询数据集

当对数据集执行查询时,会启动新的扫描并将结果拉回到R中。例如,请考虑以下dplyr表达式。

ds |>
  filter(value > 0) |>
  mutate(new_value = round(100 * value)) |>
  select(id, subset, new_value) |>
  collect()
## # A tibble: 6 x 3
##      id subset new_value
##   <int> <chr>      <dbl>
## 1     2 a             26
## 2     5 a             62
## 3     6 b            115
## 4    12 c             63
## 5    13 c            207
## 6    15 c             51

我们可以使用低级数据集接口通过为Scanner$create()指定filterprojection参数来创建新的扫描来复制此操作。要使用这些参数,您需要了解一些关于Arrow表达式的知识,您可能会发现阅读help("Expression", package = "arrow")中的帮助文档很有用。

下面定义的扫描程序模拟了上面显示的dplyr管道,

scan <- Scanner$create(
  dataset = ds,
  filter = Expression$field_ref("value") > 0,
  projection = list(
    id = Expression$field_ref("id"),
    subset = Expression$field_ref("subset"),
    new_value = Expression$create("round", 100 * Expression$field_ref("value"))
  )
)

如果我们调用as.data.frame(scan$ToTable()),它将产生与dplyr版本相同的结果,尽管行可能不会按相同的顺序出现。

为了更好地了解查询执行时发生的情况,我们将在此处调用scan$ScanBatches()。与ToTable()方法非常相似,ScanBatches()方法分别对每个文件执行查询,但它返回一个记录批次的列表,每个文件一个。此外,我们将这些记录批次分别转换为数据框。

lapply(scan$ScanBatches(), as.data.frame)
## [[1]]
##   id subset new_value
## 1  2      a        26
## 2  5      a        62
## 
## [[2]]
##   id subset new_value
## 1  6      b       115
## 
## [[3]]
##   id subset new_value
## 1 12      c        63
## 2 13      c       207
## 3 15      c        51

如果我们回到之前进行的dplyr查询,并使用compute()返回一个表而不是使用collect()返回一个数据框,我们可以看到此过程正在起作用的证据。表对象是通过连接查询在三个数据文件上执行时生成的三个记录批次创建的,因此定义表的列的块状数组反映了数据文件中存在的分区结构。

tbl <- ds |>
  filter(value > 0) |>
  mutate(new_value = round(100 * value)) |>
  select(id, subset, new_value) |>
  compute()

tbl$subset
## ChunkedArray
## <string>
## [
##   [
##     "a",
##     "a"
##   ],
##   [
##     "b"
##   ],
##   [
##     "c",
##     "c",
##     "c"
##   ]
## ]

其他说明

  • 前面讨论中忽略的一个区别是FileSystemDatasetInMemoryDataset对象之间的区别。在通常情况下,构成数据集的数据存储在磁盘上的文件中。毕竟,这是数据集优于表的首要优势。但是,在某些情况下,可能需要从已存储在内存中的数据创建数据集。在这种情况下,创建的对象将具有类型InMemoryDataset

  • 前面的讨论假设存储在数据集中的所有文件都具有相同的Schema。在通常情况下,这将是正确的,因为每个文件在概念上都是单个矩形表的子集。但这并不是严格要求的。

有关这些主题的更多信息,请参阅help("Dataset", package = "arrow")

进一步阅读

  • 要了解有关数组内部结构的更多信息,请参阅有关数据对象布局的文章。
  • 要了解有关Arrow使用的不同数据类型的更多信息,请参阅有关数据类型的文章。
  • 要了解有关Arrow对象的实现方式的更多信息,请参阅Arrow规范页面。