跳转至内容

Arrow 数据集允许您查询分布在多个文件中的数据。这种数据分片可能表示分区,这可以加速仅涉及某些分区(文件)的查询。调用 open_dataset() 指向数据文件目录并返回一个 Dataset,然后使用 dplyr 方法对其进行查询。

用法

open_dataset(
  sources,
  schema = NULL,
  partitioning = hive_partition(),
  hive_style = NA,
  unify_schemas = NULL,
  format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text", "json"),
  factory_options = list(),
  ...
)

参数

sources

以下之一

  • 包含数据文件的目录的字符串路径或 URI

  • 引用包含数据文件的目录的 FileSystem(例如 s3_bucket() 返回的内容)

  • 单个文件的字符串路径或 URI

  • 指向各个数据文件的路径或 URI 的字符向量

  • 此函数创建的 Dataset 对象列表

  • dataset_factory() 创建的 DatasetFactory 对象列表。

sources 是文件 URI 向量时,它们必须全部使用相同的协议,并指向位于同一文件系统中且格式相同的文件。

schema

DatasetSchema。如果为 NULL(默认值),则将从数据源推断 schema。

partitioning

sources 是目录路径/URI 时,以下之一

  • 一个 Schema,在这种情况下,将解析相对于 sources 的文件路径,并将路径段与 schema 字段匹配。

  • 定义与这些路径段对应的字段名称的字符向量(即,您提供与 Schema 对应的名称,但类型将自动检测)

  • hive_partition() 返回的 PartitioningPartitioningFactory

  • NULL 表示无分区

默认情况下,除非 hive_style = FALSE,否则将自动检测 Hive 样式的分区。有关详细信息,请参阅“分区”部分。当 sources 不是目录路径/URI 时,将忽略 partitioning

hive_style

逻辑值:partitioning 是否应解释为 Hive 样式?默认为 NA,这意味着检查文件路径中是否存在 Hive 样式分区并相应地进行操作。

unify_schemas

逻辑值:是否应扫描所有数据片段(文件、Dataset)以便从中创建统一的 schema?如果为 FALSE,则仅检查第一个片段的 schema。当您知道并信任所有片段都具有相同的 schema 时,请使用此快速路径。从目录路径/URI 或文件路径/URI 向量创建数据集时,默认值为 FALSE(因为可能有很多文件并且扫描可能会很慢),但当 sourcesDataset 列表时,默认值为 TRUE(因为列表中应该只有少量 Dataset 并且它们的 Schema 已经在内存中)。

format

FileFormat 对象,或 x 中文件格式的字符串标识符。当 sourcesDataset 对象列表时,将忽略此参数。当前支持的值

  • "parquet"

  • "ipc"/"arrow"/"feather",彼此的别名;对于 Feather,请注意仅支持版本 2 文件

  • "csv"/"text",相同内容的别名(因为逗号是文本文件的默认分隔符)

  • "tsv",等效于传递 format = "text", delimiter = "\t"

  • "json",用于 JSON 格式的数据集 注意:目前仅支持 newline-delimited JSON (aka ND-JSON) 数据集 默认值为 "parquet",除非还指定了 delimiter,在这种情况下,假定为 "text"。

factory_options

可选 FileSystemFactoryOptions 列表

  • partition_base_dir:使用 DirectoryPartitioning 发现分区信息时要忽略的字符串路径段前缀。对于 HivePartitioning 没有意义(将被忽略并发出警告),在提供文件路径向量时也无效。

  • exclude_invalid_files:逻辑值:是否应排除无效的数据文件?默认值为 FALSE,因为预先检查所有文件会产 I/O,因此速度会变慢,尤其是在远程文件系统上。如果为 false 并且存在无效文件,则扫描时会出现错误。这是唯一一个在提供发现文件的目录路径和提供文件路径向量时都有效的 FileSystemFactoryOption。

  • selector_ignore_prefixes:在目录中发现文件时要忽略的文件前缀的字符向量。如果可以通过这种方式通过公共文件名

...

传递给 dataset_factory() 的其他参数(当 sources 是目录路径/URI 或文件路径/URI 向量时),否则将被忽略。这些参数可能包括指示文件格式的 format 或其他特定于格式的选项(有关如何指定这些选项,请参阅 read_csv_arrow()read_parquet()read_feather())。

返回值

Dataset R6 对象。对其使用 dplyr 方法来查询数据,或调用 $NewScan() 直接构造查询。

分区

数据通常根据数据中一列或多列的值拆分到多个文件中,并嵌套在子目录中。例如,它可能是在查询中 häufig 引用的列,也可能是基于时间的列。以这种方式划分的数据被称为“分区”,这些分区列的值被编码到文件路径段中。这些路径段实际上是数据集中的虚拟列,由于它们的值在读取文件本身之前就已知,我们可以通过完全跳过某些文件来极大地加快过滤查询的速度。

Arrow 支持以两种形式从文件路径读取分区信息

  • “Hive 样式”,源自 Apache Hive 项目,在某些数据库系统中很常见。分区在路径段中编码为“key=value”,例如 “year=2019/month=1/file.parquet”。虽然它们作为文件名可能很尴尬,但它们的优点是能够自我描述。

  • “目录”分区,即没有键名的 Hive,例如 “2019/01/file.parquet”。为了使用这些分区,我们需要至少知道路径段生成的虚拟列应该使用什么名称。

open_dataset() 中的默认行为是检查提供的目录中包含的文件路径,如果它们看起来像 Hive 样式,则将它们解析为 Hive。如果您的数据集在文件路径中具有 Hive 样式的分区,则无需在 open_dataset()partitioning 参数中提供任何内容即可使用它们。如果您确实提供了分区列名称的字符向量,如果它们与检测到的内容匹配,则将忽略它们,如果它们不匹配,则会出现错误。(如果要重命名分区列,请在打开数据集后使用 select()rename() 进行重命名。)。如果您提供一个 Schema 并且名称与检测到的内容匹配,它将使用 Schema 定义的类型。在上面的示例文件路径中,您可以提供一个 Schema 来指定“month”应该是 int8() 而不是默认解析的 int32()

如果您的文件路径看起来不像 Hive 样式,或者您传递了 hive_style = FALSE,则将使用 partitioning 参数创建目录分区。创建分区需要一个名称字符向量;您也可以提供一个 Schema 将这些名称映射到所需的列类型,如上所述。如果两者均未提供,则不会从文件路径中获取任何分区信息。

另请参阅

示例

# Set up directory for examples
tf <- tempfile()
dir.create(tf)
on.exit(unlink(tf))

write_dataset(mtcars, tf, partitioning = "cyl")

# You can specify a directory containing the files for your dataset and
# open_dataset will scan all files in your directory.
open_dataset(tf)
#> FileSystemDataset with 3 Parquet files
#> 11 columns
#> mpg: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#> cyl: int32
#> 
#> See $metadata for additional Schema metadata

# You can also supply a vector of paths
open_dataset(c(file.path(tf, "cyl=4/part-0.parquet"), file.path(tf, "cyl=8/part-0.parquet")))
#> FileSystemDataset with 2 Parquet files
#> 10 columns
#> mpg: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#> 
#> See $metadata for additional Schema metadata

## You must specify the file format if using a format other than parquet.
tf2 <- tempfile()
dir.create(tf2)
on.exit(unlink(tf2))
write_dataset(mtcars, tf2, format = "ipc")
# This line will results in errors when you try to work with the data
if (FALSE) { # \dontrun{
open_dataset(tf2)
} # }
# This line will work
open_dataset(tf2, format = "ipc")
#> FileSystemDataset with 1 Feather file
#> 11 columns
#> mpg: double
#> cyl: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#> 
#> See $metadata for additional Schema metadata

## You can specify file partitioning to include it as a field in your dataset
# Create a temporary directory and write example dataset
tf3 <- tempfile()
dir.create(tf3)
on.exit(unlink(tf3))
write_dataset(airquality, tf3, partitioning = c("Month", "Day"), hive_style = FALSE)

# View files - you can see the partitioning means that files have been written
# to folders based on Month/Day values
tf3_files <- list.files(tf3, recursive = TRUE)

# With no partitioning specified, dataset contains all files but doesn't include
# directory names as field names
open_dataset(tf3)
#> FileSystemDataset with 153 Parquet files
#> 4 columns
#> Ozone: int32
#> Solar.R: int32
#> Wind: double
#> Temp: int32
#> 
#> See $metadata for additional Schema metadata

# Now that partitioning has been specified, your dataset contains columns for Month and Day
open_dataset(tf3, partitioning = c("Month", "Day"))
#> FileSystemDataset with 153 Parquet files
#> 6 columns
#> Ozone: int32
#> Solar.R: int32
#> Wind: double
#> Temp: int32
#> Month: int32
#> Day: int32
#> 
#> See $metadata for additional Schema metadata

# If you want to specify the data types for your fields, you can pass in a Schema
open_dataset(tf3, partitioning = schema(Month = int8(), Day = int8()))
#> FileSystemDataset with 153 Parquet files
#> 6 columns
#> Ozone: int32
#> Solar.R: int32
#> Wind: double
#> Temp: int32
#> Month: int8
#> Day: int8
#> 
#> See $metadata for additional Schema metadata