Arrow 数据集允许您查询已拆分为多个文件的数据。这种数据分片可能表明存在分区,这可以加快仅触及部分分区(文件)的查询。调用 open_dataset() 指向一个数据文件目录并返回一个 Dataset,然后使用 dplyr 方法查询它。
用法
open_dataset(
sources,
schema = NULL,
partitioning = hive_partition(),
hive_style = NA,
unify_schemas = NULL,
format = c("parquet", "arrow", "ipc", "feather", "csv", "tsv", "text", "json"),
factory_options = list(),
...
)参数
- sources
以下之一
包含数据文件的目录的字符串路径或 URI
引用包含数据文件的目录的 FileSystem(例如
s3_bucket()返回的内容)单个文件的字符串路径或 URI
单个数据文件的路径或 URI 的字符向量
此函数创建的
Dataset对象的列表dataset_factory()创建的DatasetFactory对象的列表。
当
sources是文件 URI 的向量时,它们必须都使用相同的协议,并指向位于相同文件系统且具有相同格式的文件。- schema
Dataset的 Schema。如果为NULL(默认值),则将从数据源推断 Schema。- partitioning
当
sources是目录路径/URI 时,以下之一一个
Schema,在这种情况下,将解析相对于sources的文件路径,并将路径段与 Schema 字段匹配。一个字符向量,定义与这些路径段对应的字段名(也就是说,您提供与
Schema对应的名称,但类型将自动检测)一个
Partitioning或PartitioningFactory,例如hive_partition()返回的内容NULL表示无分区
默认情况下,除非
hive_style = FALSE,否则会自动检测 Hive 样式分区。有关详细信息,请参阅“分区”部分。当sources不是目录路径/URI 时,partitioning将被忽略。- hive_style
逻辑值:
partitioning是否应解释为 Hive 样式?默认值为NA,这意味着检查文件路径以确定 Hive 样式分区并相应地处理。- unify_schemas
逻辑值:是否应扫描所有数据片段(文件、
Datasets)以从中创建统一的 Schema?如果为FALSE,则仅检查第一个片段的 Schema。当您知道并相信所有片段都具有相同的 Schema 时,请使用此快速路径。当从目录路径/URI 或文件路径/URI 向量创建数据集时,默认值为FALSE(因为文件可能很多,扫描可能很慢),但当sources是Dataset列表时,默认值为TRUE(因为列表中的Dataset应该很少,并且它们的Schemas 已经在内存中)。- format
一个 FileFormat 对象,或
x中文件格式的字符串标识符。当sources是Dataset对象列表时,此参数将被忽略。当前支持的值"parquet"
"ipc"/"arrow"/"feather",彼此的别名;对于 Feather,请注意仅支持版本 2 文件
"csv"/"text",相同的别名(因为逗号是文本文件的默认分隔符)
"tsv",等效于传递
format = "text", delimiter = "\t""json",用于 JSON 格式数据集。注意:目前仅支持换行符分隔的 JSON(即 ND-JSON)数据集。默认值为 "parquet",除非还指定了
delimiter,在这种情况下,它被假定为 "text"。
- factory_options
FileSystemFactoryOptions 的可选列表
partition_base_dir:字符串路径段前缀,在使用 DirectoryPartitioning 发现分区信息时忽略。对于 HivePartitioning 无意义(会发出警告并忽略),也无法在提供文件路径向量时使用。exclude_invalid_files:逻辑值:是否应排除无效数据文件?默认值为FALSE,因为预先检查所有文件会产生 I/O,因此会更慢,尤其是在远程文件系统上。如果为 false 且存在无效文件,则在扫描时会出错。这是唯一在提供用于发现文件的目录路径和提供文件路径向量时都有效的 FileSystemFactoryOption。selector_ignore_prefixes:在目录中发现文件时要忽略的文件前缀的字符向量。如果可以通过这种方式通过常见文件名后缀排除无效文件,则可以避免exclude_invalid_files的 I/O 成本。在提供文件路径向量时无效(但如果您提供文件列表,则可以自行过滤无效文件)。
- ...
当
sources是目录路径/URI 或文件路径/URI 向量时,传递给dataset_factory()的附加参数,否则忽略。这些参数可能包括format以指示文件格式,或其他特定于格式的选项(请参阅read_csv_arrow()、read_parquet()和read_feather()关于如何指定这些)。
返回值
一个 Dataset R6 对象。对其使用 dplyr 方法查询数据,或调用 $NewScan() 直接构造查询。
分区
数据通常根据数据中一个或多个列的值拆分为多个文件并嵌套在子目录中。它可能是一个在查询中常用到的列,或者可能是基于时间的,例如。以这种方式划分的数据是“分区的”,这些分区列的值被编码到文件路径段中。这些路径段实际上是数据集中的虚拟列,由于它们的值在读取文件本身之前就已经知道,因此我们可以通过完全跳过某些文件来大大加快过滤查询的速度。
Arrow 支持以两种形式从文件路径读取分区信息
"Hive 样式",源自 Apache Hive 项目,并常见于某些数据库系统。分区以“key=value”的形式编码在路径段中,例如
"year=2019/month=1/file.parquet"。虽然它们作为文件名可能有些笨拙,但它们的优点是自描述。"目录" 分区,即没有键名的 Hive 样式,例如
"2019/01/file.parquet"。为了使用这些,我们至少需要知道要给来自路径段的虚拟列起什么名称。
open_dataset() 中的默认行为是检查所提供目录中包含的文件路径,如果它们看起来像 Hive 样式,则将其解析为 Hive 样式。如果您的数据集的文件路径中存在 Hive 样式分区,则无需在 open_dataset() 的 partitioning 参数中提供任何内容即可使用它们。如果您确实提供了分区列名的字符向量,如果它们与检测到的匹配,则会被忽略;如果不匹配,则会收到错误。 (如果要重命名分区列,请在打开数据集后使用 select() 或 rename() 进行操作。)如果您提供了 Schema 并且名称与检测到的匹配,它将使用 Schema 定义的类型。在上面的示例文件路径中,您可以提供 Schema 来指定“month”应为 int8(),而不是默认情况下解析为的 int32()。
如果您的文件路径看起来不是 Hive 样式,或者您传递了 hive_style = FALSE,则 partitioning 参数将用于创建目录分区。需要一个名称的字符向量来创建分区;您可以改为提供一个 Schema 将这些名称映射到所需的列类型,如上所述。如果两者都没有提供,则不会从文件路径中获取分区信息。
示例
# Set up directory for examples
tf <- tempfile()
dir.create(tf)
write_dataset(mtcars, tf, partitioning = "cyl")
# You can specify a directory containing the files for your dataset and
# open_dataset will scan all files in your directory.
open_dataset(tf)
#> FileSystemDataset with 3 Parquet files
#> 11 columns
#> mpg: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#> cyl: int32
#>
#> See $metadata for additional Schema metadata
# You can also supply a vector of paths
open_dataset(c(file.path(tf, "cyl=4/part-0.parquet"), file.path(tf, "cyl=8/part-0.parquet")))
#> FileSystemDataset with 2 Parquet files
#> 10 columns
#> mpg: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#>
#> See $metadata for additional Schema metadata
## You must specify the file format if using a format other than parquet.
tf2 <- tempfile()
dir.create(tf2)
write_dataset(mtcars, tf2, format = "ipc")
# This line will results in errors when you try to work with the data
if (FALSE) { # \dontrun{
open_dataset(tf2)
} # }
# This line will work
open_dataset(tf2, format = "ipc")
#> FileSystemDataset with 1 Feather file
#> 11 columns
#> mpg: double
#> cyl: double
#> disp: double
#> hp: double
#> drat: double
#> wt: double
#> qsec: double
#> vs: double
#> am: double
#> gear: double
#> carb: double
#>
#> See $metadata for additional Schema metadata
## You can specify file partitioning to include it as a field in your dataset
# Create a temporary directory and write example dataset
tf3 <- tempfile()
dir.create(tf3)
write_dataset(airquality, tf3, partitioning = c("Month", "Day"), hive_style = FALSE)
# View files - you can see the partitioning means that files have been written
# to folders based on Month/Day values
tf3_files <- list.files(tf3, recursive = TRUE)
# With no partitioning specified, dataset contains all files but doesn't include
# directory names as field names
open_dataset(tf3)
#> FileSystemDataset with 153 Parquet files
#> 4 columns
#> Ozone: int32
#> Solar.R: int32
#> Wind: double
#> Temp: int32
#>
#> See $metadata for additional Schema metadata
# Now that partitioning has been specified, your dataset contains columns for Month and Day
open_dataset(tf3, partitioning = c("Month", "Day"))
#> FileSystemDataset with 153 Parquet files
#> 6 columns
#> Ozone: int32
#> Solar.R: int32
#> Wind: double
#> Temp: int32
#> Month: int32
#> Day: int32
#>
#> See $metadata for additional Schema metadata
# If you want to specify the data types for your fields, you can pass in a Schema
open_dataset(tf3, partitioning = schema(Month = int8(), Day = int8()))
#> FileSystemDataset with 153 Parquet files
#> 6 columns
#> Ozone: int32
#> Solar.R: int32
#> Wind: double
#> Temp: int32
#> Month: int8
#> Day: int8
#>
#> See $metadata for additional Schema metadata