文件系统接口#

PyArrow 带有一个抽象的文件系统接口,以及针对各种存储类型的具体实现。

文件系统接口提供输入和输出流以及目录操作。它公开了底层数据存储的简化视图。数据路径表示为抽象路径,即使在 Windows 上也是用 / 分隔,并且不应包含 ... 等特殊路径组件。如果底层存储支持符号链接,则会自动取消引用。仅提供有关文件条目的基本元数据,例如文件大小和修改时间。

核心接口由基类FileSystem表示。

Pyarrow 本地实现了以下文件系统子类

也可以使用您自己的符合fsspec的文件系统,并使用Arrow的功能,如使用与Arrow兼容的fsspec文件系统部分所述。

用法#

实例化文件系统#

可以使用以下构造函数之一创建FileSystem对象(并检查相应的构造函数以了解其选项)

>>> from pyarrow import fs
>>> local = fs.LocalFileSystem()

或者从URI推断

>>> s3, path = fs.FileSystem.from_uri("s3://my-bucket")
>>> s3
<pyarrow._s3fs.S3FileSystem at 0x7f6760cbf4f0>
>>> path
'my-bucket'

读取和写入文件#

PyArrow中的一些与IO相关的函数要么接受URI(并推断文件系统),要么接受显式的filesystem参数来指定要从中读取或写入的文件系统。例如,pyarrow.parquet.read_table()函数可以用以下方式使用

import pyarrow.parquet as pq

# using a URI -> filesystem is inferred
pq.read_table("s3://my-bucket/data.parquet")
# using a path and filesystem
s3 = fs.S3FileSystem(..)
pq.read_table("my-bucket/data.parquet", filesystem=s3)

文件系统接口还允许直接打开文件进行读取(输入)或写入(输出),这可以与处理类文件对象的函数结合使用。例如

import pyarrow as pa

local = fs.LocalFileSystem()

with local.open_output_stream("test.arrow") as file:
   with pa.RecordBatchFileWriter(file, table.schema) as writer:
      writer.write_table(table)

列出文件#

可以使用FileSystem.get_file_info()方法检查文件系统上的目录和文件。要列出目录的内容,请使用FileSelector对象指定选择

>>> local.get_file_info(fs.FileSelector("dataset/", recursive=True))
[<FileInfo for 'dataset/part=B': type=FileType.Directory>,
 <FileInfo for 'dataset/part=B/data0.parquet': type=FileType.File, size=1564>,
 <FileInfo for 'dataset/part=A': type=FileType.Directory>,
 <FileInfo for 'dataset/part=A/data0.parquet': type=FileType.File, size=1564>]

这将返回一个FileInfo对象列表,其中包含有关类型(文件或目录)、大小、上次修改日期等信息。

您也可以获取单个显式路径(或路径列表)的此信息

>>> local.get_file_info('test.arrow')
<FileInfo for 'test.arrow': type=FileType.File, size=3250>

>>> local.get_file_info('non_existent')
<FileInfo for 'non_existent': type=FileType.NotFound>

本地文件系统#

LocalFileSystem允许您访问本地机器上的文件。

写入磁盘并读取回磁盘的示例

>>> from pyarrow import fs
>>> local = fs.LocalFileSystem()
>>> with local.open_output_stream('/tmp/pyarrowtest.dat') as stream:
        stream.write(b'data')
4
>>> with local.open_input_stream('/tmp/pyarrowtest.dat') as stream:
        print(stream.readall())
b'data'

S3#

PyArrow原生实现了用于S3兼容存储的S3文件系统。

S3FileSystem构造函数有几个选项可以配置S3连接(例如凭据、区域、端点覆盖等)。此外,构造函数还将检查AWS支持的已配置S3凭据(例如AWS_ACCESS_KEY_IDAWS_SECRET_ACCESS_KEY环境变量、AWS配置文件以及EC2实例元数据服务,用于EC2节点)。

如何从S3存储桶读取内容的示例

>>> from pyarrow import fs
>>> s3 = fs.S3FileSystem(region='eu-west-3')

# List all contents in a bucket, recursively
>>> s3.get_file_info(fs.FileSelector('my-test-bucket', recursive=True))
[<FileInfo for 'my-test-bucket/File1': type=FileType.File, size=10>,
 <FileInfo for 'my-test-bucket/File5': type=FileType.File, size=10>,
 <FileInfo for 'my-test-bucket/Dir1': type=FileType.Directory>,
 <FileInfo for 'my-test-bucket/Dir2': type=FileType.Directory>,
 <FileInfo for 'my-test-bucket/EmptyDir': type=FileType.Directory>,
 <FileInfo for 'my-test-bucket/Dir1/File2': type=FileType.File, size=11>,
 <FileInfo for 'my-test-bucket/Dir1/Subdir': type=FileType.Directory>,
 <FileInfo for 'my-test-bucket/Dir2/Subdir': type=FileType.Directory>,
 <FileInfo for 'my-test-bucket/Dir2/Subdir/File3': type=FileType.File, size=10>]

# Open a file for reading and download its contents
>>> f = s3.open_input_stream('my-test-bucket/Dir1/File2')
>>> f.readall()
b'some data'

请注意,务必使用正确的区域配置S3FileSystem,以用于正在使用的存储桶。如果未设置region,则AWS SDK将选择一个值,如果SDK版本<1.8,则默认为‘us-east-1’。否则,它将尝试使用各种启发式方法(环境变量、配置文件、EC2元数据服务器)来解析区域。

也可以通过使用pyarrow.fs.resolve_s3_region()pyarrow.fs.S3FileSystem.from_uri()从存储桶名称解析区域,以用于S3FileSystem

以下是一些代码示例

>>> from pyarrow import fs
>>> s3 = fs.S3FileSystem(region=fs.resolve_s3_region('my-test-bucket'))

# Or via URI:
>>> s3, path = fs.S3FileSystem.from_uri('s3://[access_key:secret_key@]bucket/path]')

另请参阅

有关配置AWS凭据的不同方法,请参阅AWS文档

pyarrow.fs.resolve_s3_region()用于从存储桶名称解析区域。

故障排除#

使用S3FileSystem时,仅对致命错误或打印返回值时才生成输出。对于故障排除,可以使用环境变量ARROW_S3_LOG_LEVEL设置日志级别。必须在运行与S3交互的任何代码之前设置日志级别。可能的值包括FATAL(默认值)、ERRORWARNINFODEBUG(推荐)、TRACEOFF

Google Cloud Storage文件系统#

PyArrow原生实现了用于GCS存储的Google Cloud Storage (GCS)支持的文件系统。

如果不在Google Cloud Platform (GCP)上运行,这通常需要GOOGLE_APPLICATION_CREDENTIALS环境变量指向包含凭据的JSON文件。或者,使用gcloud CLI在默认位置生成凭据文件

gcloud auth application-default login

要连接到不使用任何凭据的公共存储桶,必须将anonymous=True传递给GcsFileSystem。否则,文件系统将报告Couldn't resolve host name,因为经过身份验证和公共访问的主机名不同。

显示如何从GCS存储桶读取内容的示例

>>> from datetime import timedelta
>>> from pyarrow import fs
>>> gcs = fs.GcsFileSystem(anonymous=True, retry_time_limit=timedelta(seconds=15))

# List all contents in a bucket, recursively
>>> uri = "gcp-public-data-landsat/LC08/01/001/003/"
>>> file_list = gcs.get_file_info(fs.FileSelector(uri, recursive=True))

# Open a file for reading and download its contents
>>> f = gcs.open_input_stream(file_list[0].path)
>>> f.read(64)
b'GROUP = FILE_HEADER\n  LANDSAT_SCENE_ID = "LC80010032013082LGN03"\n  S'

另请参阅

GcsFileSystem构造函数默认使用GCS文档中描述的过程来解析凭据。

Hadoop分布式文件系统 (HDFS)#

PyArrow附带了Hadoop文件系统的绑定(基于使用libhdfs的C++绑定,这是Java Hadoop客户端的基于JNI的接口)。您可以使用HadoopFileSystem构造函数进行连接

from pyarrow import fs
hdfs = fs.HadoopFileSystem(host, port, user=user, kerb_ticket=ticket_cache_path)

libhdfs库是在**运行时**加载的(而不是在链接/库加载时加载,因为库可能不在您的LD_LIBRARY_PATH中),并且依赖于一些环境变量。

  • HADOOP_HOME:已安装的Hadoop发行版的根目录。通常包含lib/native/libhdfs.so

  • JAVA_HOME:Java SDK安装的位置。

  • ARROW_LIBHDFS_DIR(可选):libhdfs.so的显式位置,如果它安装在$HADOOP_HOME/lib/native以外的其他位置。

  • CLASSPATH:必须包含Hadoop jar包。您可以使用以下方法设置它们

    export CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath --glob`
    # or on Windows
    %HADOOP_HOME%/bin/hadoop classpath --glob > %CLASSPATH%
    

    与使用pa.hdfs.connect的旧版HDFS文件系统相反,设置CLASSPATH不是可选的(pyarrow不会尝试推断它)。

使用与Arrow兼容的fsspec文件系统#

上面提到的文件系统由Arrow C++/PyArrow原生支持。但是,Python生态系统还有一些文件系统包。这些遵循fsspec接口的包也可以在PyArrow中使用。

接受文件系统对象的函数也将接受fsspec子类。例如

# creating an fsspec-based filesystem object for Google Cloud Storage
import gcsfs
fs = gcsfs.GCSFileSystem(project='my-google-project')

# using this to read a partitioned dataset
import pyarrow.dataset as ds
ds.dataset("data/", filesystem=fs)

Azure Blob存储也是如此

import adlfs
# ... load your credentials and configure the filesystem
fs = adlfs.AzureBlobFileSystem(account_name=account_name, account_key=account_key)

import pyarrow.dataset as ds
ds.dataset("mycontainer/data/", filesystem=fs)

在后台,fsspec文件系统对象被包装到基于python的PyArrow文件系统(PyFileSystem)中,使用FSSpecHandler。您也可以手动执行此操作以获取具有PyArrow文件系统接口的对象

from pyarrow.fs import PyFileSystem, FSSpecHandler
pa_fs = PyFileSystem(FSSpecHandler(fs))

然后所有FileSystem的功能都可访问

# write data
with pa_fs.open_output_stream('mycontainer/pyarrowtest.dat') as stream:
   stream.write(b'data')

# read data
with pa_fs.open_input_stream('mycontainer/pyarrowtest.dat') as stream:
   print(stream.readall())
#b'data'

# read a partitioned dataset
ds.dataset("data/", filesystem=pa_fs)

使用Arrow文件系统与fsspec#

Arrow文件系统接口的API表面有限,面向开发人员。这足以满足基本交互以及与Arrow的IO功能一起使用。另一方面,fsspec接口提供了非常大的API,其中包含许多辅助方法。如果您想使用这些方法,或者如果您需要与期望fsspec兼容文件系统对象的包进行交互,则可以使用fsspec包装Arrow文件系统对象。

fsspec版本2021.09开始,可以使用ArrowFSWrapper来实现此目的

>>> from pyarrow import fs
>>> local = fs.LocalFileSystem()
>>> from fsspec.implementations.arrow import ArrowFSWrapper
>>> local_fsspec = ArrowFSWrapper(local)

现在,结果对象具有与fsspec兼容的接口,同时在后台由Arrow文件系统支持。创建目录和文件以及列出内容的示例用法

>>> local_fsspec.mkdir("./test")
>>> local_fsspec.touch("./test/file.txt")
>>> local_fsspec.ls("./test/")
['./test/file.txt']

有关更多信息,请参阅fsspec文档。