读取和写入 CSV 文件#

Arrow 支持从 CSV 文件读取和向 CSV 文件写入列式数据。目前提供的功能如下:

  • 多线程或单线程读取

  • 输入文件的自动解压缩(基于文件名扩展名,例如 my_data.csv.gz

  • 从 CSV 文件的第一行获取列名

  • 按列进行类型推断并转换为 nullint64float64date32time32[s]timestamp[s]timestamp[ns]duration(来自数字字符串)、stringbinary 数据中的一种

  • stringbinary 列进行机会性字典编码(默认为禁用)

  • 检测各种拼写的 null 值,例如 NaN#N/A

  • 写入 CSV 文件,并提供配置确切输出格式的选项

用法#

CSV 读写功能通过 pyarrow.csv 模块提供。在许多情况下,您只需调用 read_csv() 函数并传入要读取的文件路径即可

>>> from pyarrow import csv
>>> fn = 'tips.csv.gz'
>>> table = csv.read_csv(fn)
>>> table
pyarrow.Table
total_bill: double
tip: double
sex: string
smoker: string
day: string
time: string
size: int64
>>> len(table)
244
>>> df = table.to_pandas()
>>> df.head()
   total_bill   tip     sex smoker  day    time  size
0       16.99  1.01  Female     No  Sun  Dinner     2
1       10.34  1.66    Male     No  Sun  Dinner     3
2       21.01  3.50    Male     No  Sun  Dinner     3
3       23.68  3.31    Male     No  Sun  Dinner     2
4       24.59  3.61  Female     No  Sun  Dinner     4

要写入 CSV 文件,只需调用 write_csv() 并传入 pyarrow.RecordBatchpyarrow.Table 以及路径或类文件对象

>>> import pyarrow as pa
>>> import pyarrow.csv as csv
>>> csv.write_csv(table, "tips.csv")
>>> with pa.CompressedOutputStream("tips.csv.gz", "gzip") as out:
...     csv.write_csv(table, out)

注意

写入器尚不支持所有 Arrow 类型。

自定义解析#

要更改默认解析设置,以应对读取结构不寻常的 CSV 文件,您应该创建一个 ParseOptions 实例并将其传递给 read_csv()

import pyarrow as pa
import pyarrow.csv as csv

table = csv.read_csv('tips.csv.gz', parse_options=csv.ParseOptions(
   delimiter=";",
   invalid_row_handler=skip_handler
))

可用的解析选项有

delimiter(分隔符)

CSV 数据中分隔单个单元格的字符。

quote_char(引用字符)

可选用于引用 CSV 值的字符(如果不允许引用,则为 False)。

double_quote(双引号)

带引号的 CSV 值中的两个引号是否表示数据中的一个引号。

escape_char(转义字符)

可选用于转义特殊字符的字符(如果不允许转义,则为 False)。

newlines_in_values(值中的换行符)

CSV 值中是否允许换行符。

ignore_empty_lines(忽略空行)

CSV 输入中是否忽略空行。

invalid_row_handler(无效行处理器)

无效行的可选处理器。

另请参阅

更多示例请参见 ParseOptions

自定义转换#

要更改 CSV 数据如何转换为 Arrow 类型和数据,您应该创建一个 ConvertOptions 实例并将其传递给 read_csv()

import pyarrow as pa
import pyarrow.csv as csv

table = csv.read_csv('tips.csv.gz', convert_options=csv.ConvertOptions(
    column_types={
        'total_bill': pa.decimal128(precision=10, scale=2),
        'tip': pa.decimal128(precision=10, scale=2),
    }
))

注意

要将列指定为 duration,CSV 值必须是与预期单位匹配的数字字符串(例如,当使用 duration[ms] 时,60000 表示 60 秒)。

可用的转换选项有

check_utf8(检查 UTF8)

是否检查字符串列的 UTF8 有效性。

column_types(列类型)

显式将列名映射到列类型。

null_values(空值)

表示数据中空值的一系列字符串。

true_values(真值)

表示数据中布尔真值的一系列字符串。

false_values(假值)

表示数据中布尔假值的一系列字符串。

decimal_point(小数点)

浮点数和十进制数据中用作小数点的字符。

timestamp_parsers(时间戳解析器)

一系列与 strptime() 兼容的格式字符串,在尝试推断或转换时间戳值时按顺序尝试(也可以给定特殊值 ISO8601())。

strings_can_be_null(字符串可以为空)

字符串/二进制列是否可以有空值。

quoted_strings_can_be_null(引用字符串可以为空)

引用值是否可以为空。

auto_dict_encode(自动字典编码)

是否尝试自动字典编码字符串/二进制数据。

auto_dict_max_cardinality(自动字典最大基数)

auto_dict_encode 的最大字典基数。

include_columns(包含列)

要包含在表中的列名。

include_missing_columns(包含缺失列)

如果为 false,则 include_columns 中存在但在 CSV 文件中不存在的列将报错。

另请参阅

更多示例请参见 ConvertOptions

增量读取#

对于内存受限的环境,也可以使用 open_csv() 一次读取一个批次的 CSV 文件。

有一些注意事项:

  1. 目前,增量读取器始终是单线程的(无论 ReadOptions.use_threads 如何设置)

  2. 类型推断在第一个块上完成,此后类型将被冻结;为确保推断出正确的数据类型,可以设置足够大的 ReadOptions.block_size 值,或者使用 ConvertOptions.column_types 显式设置所需的数据类型。

字符编码#

默认情况下,CSV 文件应采用 UTF8 编码。非 UTF8 数据适用于 binary 列。可以使用 ReadOptions 类更改编码

import pyarrow as pa
import pyarrow.csv as csv

table = csv.read_csv('tips.csv.gz', read_options=csv.ReadOptions(
   column_names=["animals", "n_legs", "entry"],
   skip_rows=1
))

可用的读取选项有

use_threads(使用线程)

是否使用多线程加速读取。

block_size(块大小)

每次从输入流中处理的字节数。

skip_rows(跳过行数)

在列名(如果有)和 CSV 数据之前要跳过的行数。

skip_rows_after_names(列名后跳过的行数)

列名后要跳过的行数。

column_names(列名)

目标表的列名。

autogenerate_column_names(自动生成列名)

如果 column_names 为空,是否自动生成列名。

encoding(编码)

编码:对象

另请参阅

更多示例请参见 ReadOptions

自定义写入#

要更改默认写入设置,以应对写入具有不同约定的 CSV 文件,您可以创建一个 WriteOptions 实例并将其传递给 write_csv()

>>> import pyarrow as pa
>>> import pyarrow.csv as csv
>>> # Omit the header row (include_header=True is the default)
>>> options = csv.WriteOptions(include_header=False)
>>> csv.write_csv(table, "data.csv", options)

增量写入#

要一次写入一个批次的 CSV 文件,请创建一个 CSVWriter。这需要输出(路径或类文件对象)、要写入数据的模式以及可选的写入选项,如上所述

>>> import pyarrow as pa
>>> import pyarrow.csv as csv
>>> with csv.CSVWriter("data.csv", table.schema) as writer:
>>>     writer.write_table(table)

性能#

由于 CSV 文件的结构,不能期望获得与读取 Parquet 等专用二进制格式相同的性能水平。尽管如此,Arrow 仍致力于减少读取 CSV 文件的开销。在性能良好的台式机或笔记本电脑上,每个核心至少 100 MB/s 的合理预期(以源 CSV 字节而不是目标 Arrow 数据字节衡量)。

性能选项可以通过 ReadOptions 类进行控制。多线程读取是默认设置,以实现最高性能,并有效分配工作负载到所有可用核心。

注意

并发线程数由 Arrow 自动推断。您可以使用 cpu_count()set_cpu_count() 函数分别检查和更改它。