文件系统接口#
PyArrow 提供了一个抽象的文件系统接口,以及各种存储类型的具体实现。
文件系统接口提供输入和输出流以及目录操作。它公开了底层数据存储的简化视图。数据路径表示为抽象路径,这些路径使用 /
分隔,即使在 Windows 上也是如此,并且不应包含特殊的路径组件,例如 .
和 ..
。如果底层存储支持符号链接,则会自动取消引用。只有关于文件条目的基本 metadata
(例如文件大小和修改时间)可用。
核心接口由基类 FileSystem
表示。
Pyarrow 本地实现了以下文件系统子类:
Azure Storage 文件系统 (
AzureFileSystem
)
也可以将您自己的 fsspec 兼容文件系统与 pyarrow 功能一起使用,如 将 fsspec 兼容的文件系统与 Arrow 一起使用 部分所述。
用法#
实例化文件系统#
可以使用其中一个构造函数创建一个 FileSystem 对象(并检查相应构造函数的选项):
>>> from pyarrow import fs
>>> local = fs.LocalFileSystem()
或者,也可以从 URI 推断:
>>> s3, path = fs.FileSystem.from_uri("s3://my-bucket")
>>> s3
<pyarrow._s3fs.S3FileSystem at 0x7f6760cbf4f0>
>>> path
'my-bucket'
读取和写入文件#
PyArrow 中的几个与 IO 相关的函数接受 URI(并推断文件系统)或显式的 filesystem
参数,以指定要读取或写入的文件系统。例如,可以按以下方式使用 pyarrow.parquet.read_table()
函数:
import pyarrow.parquet as pq
# using a URI -> filesystem is inferred
pq.read_table("s3://my-bucket/data.parquet")
# using a path and filesystem
s3 = fs.S3FileSystem(..)
pq.read_table("my-bucket/data.parquet", filesystem=s3)
文件系统接口进一步允许直接打开文件进行读取(输入)或写入(输出),这可以与使用类文件对象的功能结合使用。例如:
import pyarrow as pa
local = fs.LocalFileSystem()
with local.open_output_stream("test.arrow") as file:
with pa.RecordBatchFileWriter(file, table.schema) as writer:
writer.write_table(table)
列出文件#
使用 FileSystem.get_file_info()
方法可以检查文件系统上的目录和文件。要列出目录的内容,请使用 FileSelector
对象来指定选择:
>>> local.get_file_info(fs.FileSelector("dataset/", recursive=True))
[<FileInfo for 'dataset/part=B': type=FileType.Directory>,
<FileInfo for 'dataset/part=B/data0.parquet': type=FileType.File, size=1564>,
<FileInfo for 'dataset/part=A': type=FileType.Directory>,
<FileInfo for 'dataset/part=A/data0.parquet': type=FileType.File, size=1564>]
这将返回 FileInfo
对象的列表,其中包含有关类型(文件或目录)、大小、上次修改日期等的信息。
您还可以获取单个显式路径(或路径列表)的此信息:
>>> local.get_file_info('test.arrow')
<FileInfo for 'test.arrow': type=FileType.File, size=3250>
>>> local.get_file_info('non_existent')
<FileInfo for 'non_existent': type=FileType.NotFound>
本地文件系统#
LocalFileSystem
允许您访问本地计算机上的文件。
示例:如何写入磁盘并将其读回:
>>> from pyarrow import fs
>>> local = fs.LocalFileSystem()
>>> with local.open_output_stream('/tmp/pyarrowtest.dat') as stream:
stream.write(b'data')
4
>>> with local.open_input_stream('/tmp/pyarrowtest.dat') as stream:
print(stream.readall())
b'data'
S3#
PyArrow 本地实现了 S3 兼容存储的 S3 文件系统。
S3FileSystem
构造函数有几个选项来配置 S3 连接(例如,凭据、区域、端点覆盖等)。此外,构造函数还将检查 AWS 支持的已配置的 S3 凭据(例如 AWS_ACCESS_KEY_ID
和 AWS_SECRET_ACCESS_KEY
环境变量、AWS 配置文件以及 EC2 节点的 EC2 实例元数据服务)。
示例:如何从 S3 存储桶中读取内容:
>>> from pyarrow import fs
>>> s3 = fs.S3FileSystem(region='eu-west-3')
# List all contents in a bucket, recursively
>>> s3.get_file_info(fs.FileSelector('my-test-bucket', recursive=True))
[<FileInfo for 'my-test-bucket/File1': type=FileType.File, size=10>,
<FileInfo for 'my-test-bucket/File5': type=FileType.File, size=10>,
<FileInfo for 'my-test-bucket/Dir1': type=FileType.Directory>,
<FileInfo for 'my-test-bucket/Dir2': type=FileType.Directory>,
<FileInfo for 'my-test-bucket/EmptyDir': type=FileType.Directory>,
<FileInfo for 'my-test-bucket/Dir1/File2': type=FileType.File, size=11>,
<FileInfo for 'my-test-bucket/Dir1/Subdir': type=FileType.Directory>,
<FileInfo for 'my-test-bucket/Dir2/Subdir': type=FileType.Directory>,
<FileInfo for 'my-test-bucket/Dir2/Subdir/File3': type=FileType.File, size=10>]
# Open a file for reading and download its contents
>>> f = s3.open_input_stream('my-test-bucket/Dir1/File2')
>>> f.readall()
b'some data'
请注意,重要的是为正在使用的存储桶配置正确的区域的 S3FileSystem
。如果未设置 region
,则 AWS SDK 将选择一个值,如果 SDK 版本 <1.8,则默认为“us-east-1”。否则,它将尝试使用各种启发式方法(环境变量、配置文件、EC2 元数据服务器)来解析该区域。
也可以通过使用 pyarrow.fs.resolve_s3_region()
或 pyarrow.fs.S3FileSystem.from_uri()
从 S3FileSystem
的存储桶名称中解析该区域。
以下是一些代码示例:
>>> from pyarrow import fs
>>> s3 = fs.S3FileSystem(region=fs.resolve_s3_region('my-test-bucket'))
# Or via URI:
>>> s3, path = fs.S3FileSystem.from_uri('s3://[access_key:secret_key@]bucket/path]')
故障排除#
使用 S3FileSystem
时,仅针对致命错误或在打印返回值时生成输出。为了进行故障排除,可以使用环境变量 ARROW_S3_LOG_LEVEL
设置日志级别。必须在运行与 S3 交互的任何代码之前设置日志级别。可能的值包括 FATAL
(默认值)、ERROR
、WARN
、INFO
、DEBUG
(推荐)、TRACE
和 OFF
。
Google Cloud Storage 文件系统#
PyArrow 本地实现了 GCS 存储的 Google Cloud Storage (GCS) 支持的文件系统。
如果未在 Google Cloud Platform (GCP) 上运行,则通常需要环境变量 GOOGLE_APPLICATION_CREDENTIALS
指向包含凭据的 JSON 文件。或者,使用 gcloud
CLI 在默认位置生成凭据文件:
gcloud auth application-default login
要连接到公共存储桶而不使用任何凭据,您必须将 anonymous=True
传递给 GcsFileSystem
。否则,文件系统将报告 Couldn't resolve host name
,因为经过身份验证的访问和公共访问的主机名不同。
示例:显示如何从 GCS 存储桶中读取内容:
>>> from datetime import timedelta
>>> from pyarrow import fs
>>> gcs = fs.GcsFileSystem(anonymous=True, retry_time_limit=timedelta(seconds=15))
# List all contents in a bucket, recursively
>>> uri = "gcp-public-data-landsat/LC08/01/001/003/"
>>> file_list = gcs.get_file_info(fs.FileSelector(uri, recursive=True))
# Open a file for reading and download its contents
>>> f = gcs.open_input_stream(file_list[0].path)
>>> f.read(64)
b'GROUP = FILE_HEADER\n LANDSAT_SCENE_ID = "LC80010032013082LGN03"\n S'
另请参阅
GcsFileSystem
构造函数默认使用 GCS 文档中描述的流程来解析凭据。
Hadoop 分布式文件系统 (HDFS)#
PyArrow 附带 Hadoop 文件系统的绑定(基于使用 libhdfs
的 C++ 绑定,这是基于 JNI 的 Java Hadoop 客户端接口)。您可以使用 HadoopFileSystem
构造函数进行连接:
from pyarrow import fs
hdfs = fs.HadoopFileSystem(host, port, user=user, kerb_ticket=ticket_cache_path)
libhdfs
库是在运行时加载的(而不是在链接/库加载时加载,因为该库可能不在您的 LD_LIBRARY_PATH 中),并且依赖于某些环境变量。
HADOOP_HOME
:已安装的 Hadoop 发行版的根目录。通常有lib/native/libhdfs.so
。JAVA_HOME
:Java SDK 安装的位置。ARROW_LIBHDFS_DIR
(可选):如果libhdfs.so
安装在$HADOOP_HOME/lib/native
以外的其他位置,则显式指定其位置。CLASSPATH
:必须包含 Hadoop jars。您可以使用以下命令进行设置:export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob` # or on Windows %HADOOP_HOME%/bin/hadoop classpath --glob > %CLASSPATH%
与使用
pa.hdfs.connect
的旧版 HDFS 文件系统相比,设置CLASSPATH
不是可选的(pyarrow 不会尝试推断它)。
Azure Storage 文件系统#
PyArrow 原生实现了用于 Azure Blob Storage 的 Azure 文件系统,无论是否启用分层命名空间。
AzureFileSystem
构造函数有多个选项来配置 Azure Blob Storage 连接(例如,帐户名、帐户密钥、SAS 令牌等)。
如果既未指定 account_key
也未指定 sas_token
,则使用 DefaultAzureCredential 进行身份验证。 这意味着它会尝试几种身份验证类型,并使用第一个有效的类型。 如果在初始化 FileSystem 时提供了任何身份验证参数,它们将代替默认凭据使用。
以下示例展示了如何从 Azure Blob Storage 帐户读取内容
>>> from pyarrow import fs
>>> azure_fs = fs.AzureFileSystem(account_name='myaccount')
# List all contents in a container, recursively
>>> azure_fs.get_file_info(fs.FileSelector('my-container', recursive=True))
[<FileInfo for 'my-container/File1': type=FileType.File, size=10>,
<FileInfo for 'my-container/File2': type=FileType.File, size=20>,
<FileInfo for 'my-container/Dir1': type=FileType.Directory>,
<FileInfo for 'my-container/Dir1/File3': type=FileType.File, size=30>]
# Open a file for reading and download its contents
>>> f = azure_fs.open_input_stream('my-container/File1')
>>> f.readall()
b'some data'
有关参数和用法的更多详细信息,请参阅 AzureFileSystem
类文档。
另请参阅
有关身份验证和配置选项的更多信息,请参阅 Azure SDK for C++ 文档。
将与 fsspec 兼容的文件系统与 Arrow 结合使用#
以上提到的文件系统由 Arrow C++ / PyArrow 原生支持。 但是,Python 生态系统也有几个文件系统包。 这些遵循 fsspec 接口的包也可以在 PyArrow 中使用。
接受文件系统对象的函数也将接受 fsspec 子类。 例如
# creating an fsspec-based filesystem object for Google Cloud Storage
import gcsfs
fs = gcsfs.GCSFileSystem(project='my-google-project')
# using this to read a partitioned dataset
import pyarrow.dataset as ds
ds.dataset("data/", filesystem=fs)
类似地,对于 Azure Blob Storage
import adlfs
# ... load your credentials and configure the filesystem
fs = adlfs.AzureBlobFileSystem(account_name=account_name, account_key=account_key)
import pyarrow.dataset as ds
ds.dataset("mycontainer/data/", filesystem=fs)
在底层,fsspec 文件系统对象被包装到基于 Python 的 PyArrow 文件系统(PyFileSystem
)中使用 FSSpecHandler
。 您也可以手动执行此操作以获取具有 PyArrow FileSystem 接口的对象
from pyarrow.fs import PyFileSystem, FSSpecHandler
pa_fs = PyFileSystem(FSSpecHandler(fs))
然后可以访问 FileSystem
的所有功能
# write data
with pa_fs.open_output_stream('mycontainer/pyarrowtest.dat') as stream:
stream.write(b'data')
# read data
with pa_fs.open_input_stream('mycontainer/pyarrowtest.dat') as stream:
print(stream.readall())
#b'data'
# read a partitioned dataset
ds.dataset("data/", filesystem=pa_fs)
将 Arrow 文件系统与 fsspec 结合使用#
Arrow FileSystem 接口具有有限的、面向开发人员的 API 界面。 这足以进行基本交互以及与 Arrow 的 IO 功能一起使用。 另一方面,fsspec 接口提供了非常大的 API,其中包含许多辅助方法。 如果你想使用这些方法,或者需要与期望 fsspec 兼容的文件系统对象的软件包进行交互,你可以用 fsspec 包装 Arrow FileSystem 对象。
从 fsspec
版本 2021.09 开始,可以使用 ArrowFSWrapper
来实现此目的
>>> from pyarrow import fs
>>> local = fs.LocalFileSystem()
>>> from fsspec.implementations.arrow import ArrowFSWrapper
>>> local_fsspec = ArrowFSWrapper(local)
生成的对象现在具有与 fsspec 兼容的接口,同时由底层的 Arrow FileSystem 提供支持。 示例用法:创建目录和文件,并列出内容
>>> local_fsspec.mkdir("./test")
>>> local_fsspec.touch("./test/file.txt")
>>> local_fsspec.ls("./test/")
['./test/file.txt']
有关更多信息,请参阅 fsspec 文档。