跳到内容

Arrow 数据集允许您查询已拆分为多个文件的数据。数据的这种分片可能表示分区,这可以加速仅触及某些分区(文件)的查询。调用 open_dataset() 指向数据文件目录并返回 Dataset,然后使用 dplyr 方法对其进行查询。

用法

open_dataset(
  sources,
  schema = NULL,
  partitioning = hive_partition(),
  hive_style = NA,
  unify_schemas = NULL,
  format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text", "json"),
  factory_options = list(),
  ...
)

参数

sources

其中之一

  • 包含数据文件的目录的字符串路径或 URI

  • 引用包含数据文件的目录的 FileSystem(例如 s3_bucket() 返回的内容)

  • 单个文件的字符串路径或 URI

  • 单个数据文件的路径或 URI 的字符向量

  • 由此函数创建的 Dataset 对象列表

  • dataset_factory() 创建的 DatasetFactory 对象列表。

sources 是文件 URI 向量时,它们都必须使用相同的协议并指向位于同一文件系统中且具有相同格式的文件。

schema

DatasetSchema。如果为 NULL(默认值),则将从数据源推断架构。

partitioning

sources 是目录路径/URI 时,其中之一

  • 一个 Schema,在这种情况下,相对于 sources 的文件路径将被解析,并且路径段将与架构字段匹配。

  • 定义与这些路径段对应的字段名称的字符向量(即,您提供与 Schema 对应的名称,但类型将自动检测)

  • 一个 PartitioningPartitioningFactory,例如 hive_partition() 返回的内容

  • NULL 表示没有分区

默认情况下是自动检测 Hive 样式分区,除非 hive_style = FALSE。有关详细信息,请参见“分区”部分。当 sources 不是目录路径/URI 时,将忽略 partitioning

hive_style

逻辑值:partitioning 是否应解释为 Hive 样式?默认值为 NA,这意味着检查文件路径以查找 Hive 样式分区并相应地执行。

unify_schemas

逻辑值:是否应扫描所有数据片段(文件、Dataset)以从它们创建统一的架构?如果为 FALSE,则只会检查第一个片段的架构。当您知道并相信所有片段都具有相同的架构时,请使用此快速路径。当从目录路径/URI 或文件路径/URI 向量创建数据集时,默认为 FALSE(因为可能有很多文件并且扫描可能很慢),但当 sourcesDataset 列表时为 TRUE(因为列表中应该只有很少的 Dataset 并且它们的 Schema 已经在内存中)。

format

一个 FileFormat 对象,或 x 中文件的格式的字符串标识符。当 sourcesDataset 对象列表时,此参数将被忽略。当前支持的值

  • "parquet"

  • "ipc"/"arrow"/"feather",它们都是彼此的别名;对于 Feather,请注意仅支持版本 2 文件

  • "csv"/"text",表示相同内容的别名(因为逗号是文本文件的默认分隔符)

  • "tsv",等效于传递 format = "text", delimiter = "\t"

  • "json",用于 JSON 格式的数据集注意:当前仅支持换行分隔的 JSON(又名 ND-JSON)数据集默认值为 "parquet",除非还指定了 delimiter,在这种情况下,它被假定为 "text"。

factory_options

可选 FileSystemFactoryOptions 列表

  • partition_base_dir:要忽略的字符串路径段前缀,用于使用 DirectoryPartitioning 发现分区信息。对于 HivePartitioning 没有意义(将发出警告并忽略),当提供文件路径向量时也不有效。

  • exclude_invalid_files:逻辑值:是否应排除不是有效数据文件的文件?默认为 FALSE,因为预先检查所有文件会产生 I/O,因此速度会变慢,尤其是在远程文件系统上。如果为 false 并且存在无效文件,则在扫描时会出现错误。这是在提供要发现文件的目录路径以及提供文件路径向量时都有效的唯一 FileSystemFactoryOption。

  • selector_ignore_prefixes:在目录中发现文件时要忽略的文件前缀的字符向量。如果可以通过常见的文件名前缀排除无效文件,则可以避免 exclude_invalid_files 的 I/O 成本。当提供文件路径向量时无效(但是如果您提供文件列表,则可以自己过滤无效文件)。

...

sources 是目录路径/URI 或文件路径/URI 向量时,传递给 dataset_factory() 的其他参数,否则将被忽略。这些可能包括 format 以指示文件格式,或其他特定于格式的选项(请参阅 read_csv_arrow()read_parquet()read_feather() 如何指定这些)。

返回值

一个 Dataset R6 对象。使用 dplyr 方法对其进行查询,或调用 $NewScan() 直接构建查询。

分区

数据通常根据数据中一个或多个列的值拆分为多个文件并嵌套在子目录中。它可能是一个在查询中经常引用的列,或者它可能是基于时间的,举几个例子。以这种方式划分的数据是“分区”,并且这些分区列的值被编码到文件路径段中。这些路径段实际上是数据集中的虚拟列,并且由于它们的值在读取文件本身之前就已经知道,因此我们可以通过完全跳过某些文件来大大加快过滤查询的速度。

Arrow 支持以两种形式从文件路径读取分区信息

  • "Hive 样式",源自 Apache Hive 项目并且一些数据库系统中很常见。分区被编码为路径段中的“key=value”,例如 "year=2019/month=1/file.parquet"。虽然它们作为文件名可能很笨拙,但它们具有自描述的优点。

  • "目录"分区,它是没有键名称的 Hive,例如 "2019/01/file.parquet"。为了使用它们,我们至少需要知道要为来自路径段的虚拟列提供什么名称。

open_dataset() 中的默认行为是检查提供的目录中包含的文件路径,如果它们看起来像 Hive 样式,则将其解析为 Hive。如果您的数据集在文件路径中具有 Hive 样式分区,则无需在 partitioning 参数中向 open_dataset() 提供任何内容即可使用它们。如果您确实提供了分区列名称的字符向量,如果它们与检测到的内容匹配,则将被忽略,如果不匹配,则会收到错误。(如果您想重命名分区列,请在打开数据集后使用 select()rename() 执行此操作)。如果您提供了一个 Schema 并且名称与检测到的内容匹配,它将使用 Schema 定义的类型。在上例文件路径中,您可以提供一个 Schema 来指定“month”应为 int8() 而不是默认情况下将被解析为的 int32()

如果您的文件路径看起来不像 Hive 样式,或者如果您传递 hive_style = FALSE,则将使用 partitioning 参数创建目录分区。需要一个名称的字符向量来创建分区;您也可以提供一个 Schema 将这些名称映射到所需的列类型,如上所述。如果没有提供两者,则不会从文件路径中获取任何分区信息。

另请参阅

示例

# Set up directory for examples
tf <- tempfile()
dir.create(tf)
on.exit(unlink(tf))

write_dataset(mtcars, tf, partitioning = "cyl")

# You can specify a directory containing the files for your dataset and
# open_dataset will scan all files in your directory.
open_dataset(tf)
#> FileSystemDataset with 3 Parquet files
#> 11 columns
#> mpg: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#> cyl: int32
#> 
#> See $metadata for additional Schema metadata

# You can also supply a vector of paths
open_dataset(c(file.path(tf, "cyl=4/part-0.parquet"), file.path(tf, "cyl=8/part-0.parquet")))
#> FileSystemDataset with 2 Parquet files
#> 10 columns
#> mpg: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#> 
#> See $metadata for additional Schema metadata

## You must specify the file format if using a format other than parquet.
tf2 <- tempfile()
dir.create(tf2)
on.exit(unlink(tf2))
write_dataset(mtcars, tf2, format = "ipc")
# This line will results in errors when you try to work with the data
if (FALSE) { # \dontrun{
open_dataset(tf2)
} # }
# This line will work
open_dataset(tf2, format = "ipc")
#> FileSystemDataset with 1 Feather file
#> 11 columns
#> mpg: double
#> cyl: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#> 
#> See $metadata for additional Schema metadata

## You can specify file partitioning to include it as a field in your dataset
# Create a temporary directory and write example dataset
tf3 <- tempfile()
dir.create(tf3)
on.exit(unlink(tf3))
write_dataset(airquality, tf3, partitioning = c("Month", "Day"), hive_style = FALSE)

# View files - you can see the partitioning means that files have been written
# to folders based on Month/Day values
tf3_files <- list.files(tf3, recursive = TRUE)

# With no partitioning specified, dataset contains all files but doesn't include
# directory names as field names
open_dataset(tf3)
#> FileSystemDataset with 153 Parquet files
#> 4 columns
#> Ozone: int32
#> Solar.R: int32
#> Wind: double
#> Temp: int32
#> 
#> See $metadata for additional Schema metadata

# Now that partitioning has been specified, your dataset contains columns for Month and Day
open_dataset(tf3, partitioning = c("Month", "Day"))
#> FileSystemDataset with 153 Parquet files
#> 6 columns
#> Ozone: int32
#> Solar.R: int32
#> Wind: double
#> Temp: int32
#> Month: int32
#> Day: int32
#> 
#> See $metadata for additional Schema metadata

# If you want to specify the data types for your fields, you can pass in a Schema
open_dataset(tf3, partitioning = schema(Month = int8(), Day = int8()))
#> FileSystemDataset with 153 Parquet files
#> 6 columns
#> Ozone: int32
#> Solar.R: int32
#> Wind: double
#> Temp: int32
#> Month: int8
#> Day: int8
#> 
#> See $metadata for additional Schema metadata