二维数据集#

记录批次(Record Batches)#

class RecordBatch#

符合特定 Schema(模式) 的等长数组集合。

记录批次是一种表状数据结构,在语义上是一系列字段的序列,每个字段都是一个连续的 Arrow 数组。

公共函数

Result<std::shared_ptr<StructArray>> ToStructArray() const#

将记录批次转换为结构体数组。

创建一个结构体数组,其子数组为记录批次的各列。请注意,记录批次的顶级字段元数据无法在生成的结构体数组中体现。

Result<std::shared_ptr<Tensor>> ToTensor(bool null_to_nan = false, bool row_major = true, MemoryPool *pool = default_memory_pool()) const#

将具有单一数据类型的记录批次转换为 Tensor(张量)

创建一个形状为 (行数, 列数) 且步幅为 (类型字节大小, 类型字节大小 * 行数) 的 Tensor 对象。生成的 Tensor 将采用列优先布局。

参数:
  • null_to_nan[输入] 若为 true,将空值转换为 NaN

  • row_major[输入] 若为 true,则创建行优先的 Tensor,否则创建列优先的 Tensor

  • pool[输入] 用于分配张量缓冲区的内存池

返回:

生成的 Tensor

bool Equals(const RecordBatch &other, bool check_metadata = false, const EqualOptions &opts = EqualOptions::Defaults()) const#

确定两个记录批次是否相等。

参数:
  • other[输入] 要比较的 RecordBatch

  • check_metadata[输入] 若为 true,则比较模式元数据,无论 EqualOptions::use_metadata 中的值如何

  • opts[输入] 相等性比较的选项

返回:

若批次相等则返回 true

bool Equals(const RecordBatch &other, const EqualOptions &opts) const#

确定两个记录批次是否相等。

参数:
  • other[输入] 要比较的 RecordBatch

  • opts[输入] 相等性比较的选项

返回:

若批次相等则返回 true

inline bool ApproxEquals(const RecordBatch &other, const EqualOptions &opts = EqualOptions::Defaults()) const#

确定两个记录批次是否近似相等。

参数:
  • other[输入] 要比较的 RecordBatch

  • opts[输入] 相等性比较的选项

返回:

若批次近似相等则返回 true

inline const std::shared_ptr<Schema> &schema() const#
返回:

记录批次的模式

Result<std::shared_ptr<RecordBatch>> ReplaceSchema(std::shared_ptr<Schema> schema) const#

替换模式,使用另一个具有相同类型、但字段名称和/或元数据可能不同的模式。

virtual const std::vector<std::shared_ptr<Array>> &columns() const = 0#

一次获取所有列。

virtual std::shared_ptr<Array> column(int i) const = 0#

从记录批次中获取一个数组。

参数:

i[输入] 字段索引,不进行边界检查

返回:

一个 Array 对象

std::shared_ptr<Array> GetColumnByName(const std::string &name) const#

从记录批次中获取一个数组。

参数:

name[输入] 字段名称

返回:

一个 Array,若未找到字段则返回空

virtual std::shared_ptr<ArrayData> column_data(int i) const = 0#

从记录批次中获取数组的内部数据。

参数:

i[输入] 字段索引,不进行边界检查

返回:

内部 ArrayData 对象

virtual const ArrayDataVector &column_data() const = 0#

从记录批次中获取所有数组的内部数据。

virtual Result<std::shared_ptr<RecordBatch>> AddColumn(int i, const std::shared_ptr<Field> &field, const std::shared_ptr<Array> &column) const = 0#

将列添加到记录批次中,生成一个新的 RecordBatch

参数:
  • i[输入] 字段索引,将进行边界检查

  • field[输入] 要添加的字段

  • column[输入] 要添加的列

virtual Result<std::shared_ptr<RecordBatch>> AddColumn(int i, std::string field_name, const std::shared_ptr<Array> &column) const#

将新的可空列添加到记录批次中,生成一个新的 RecordBatch

对于不可空列,请使用此方法的基于字段(Field)的版本。

参数:
  • i[输入] 字段索引,将进行边界检查

  • field_name[输入] 要添加的字段名称

  • column[输入] 要添加的列

virtual Result<std::shared_ptr<RecordBatch>> SetColumn(int i, const std::shared_ptr<Field> &field, const std::shared_ptr<Array> &column) const = 0#

替换记录批次中的列,生成一个新的 RecordBatch

参数:
  • i[输入] 字段索引,进行边界检查

  • field[输入] 要替换的字段

  • column[输入] 要替换的列

virtual Result<std::shared_ptr<RecordBatch>> RemoveColumn(int i) const = 0#

从记录批次中移除列,生成一个新的 RecordBatch

参数:

i[输入] 字段索引,进行边界检查

const std::string &column_name(int i) const#

第 i 列的名称。

int num_columns() const#
返回:

表中列的数量

inline int64_t num_rows() const#
返回:

行数(即每一列对应的长度)

Result<std::shared_ptr<RecordBatch>> CopyTo(const std::shared_ptr<MemoryManager> &to) const#

将整个 RecordBatch 复制到目标 MemoryManager

此方法在记录批次的每一列上使用 Array::CopyTo 来创建一个新的记录批次,其中列的所有底层缓冲区都已复制到目标 MemoryManager。它底层使用 MemoryManager::CopyBuffer

Result<std::shared_ptr<RecordBatch>> ViewOrCopyTo(const std::shared_ptr<MemoryManager> &to) const#

将整个 RecordBatch 视图化或复制到目标 MemoryManager

此方法在记录批次的每一列上使用 Array::ViewOrCopyTo 来创建一个新的记录批次,其中列的所有底层缓冲区都在目标 MemoryManager 上以零拷贝方式进行视图化,如果无法进行零拷贝视图化,则退化为复制。它底层使用 Buffer::ViewOrCopy

virtual std::shared_ptr<RecordBatch> Slice(int64_t offset) const#

对记录批次中的每个数组进行切片。

参数:

offset[输入] 开始切片的偏移量,一直到批次末尾

返回:

新的记录批次

virtual std::shared_ptr<RecordBatch> Slice(int64_t offset, int64_t length) const = 0#

对记录批次中的每个数组进行切片。

参数:
  • offset[输入] 开始切片的偏移量

  • length[输入] 从偏移量开始切取的元素数量

返回:

新的记录批次

std::string ToString() const#
返回:

适合调试的 PrettyPrint(格式化输出)表示

std::vector<std::string> ColumnNames() const#

返回所有列的名称。

Result<std::shared_ptr<RecordBatch>> RenameColumns(const std::vector<std::string> &names) const#

使用提供的名称重命名列。

Result<std::shared_ptr<RecordBatch>> SelectColumns(const std::vector<int> &indices) const#

返回带有指定列的新记录批次。

virtual Status Validate() const#

执行简单的验证检查,以确定记录批次的模式和内部数据中是否存在明显的矛盾。

时间复杂度为 O(k),其中 k 是字段和数组后代的总数。

返回:

Status(状态)

virtual Status ValidateFull() const#

执行全面的验证检查,以确定记录批次的模式和内部数据中是否存在矛盾。

潜在时间复杂度为 O(k*n),其中 n 是行数。

返回:

Status(状态)

virtual const std::shared_ptr<Device::SyncEvent> &GetSyncEvent() const = 0#

实验性功能:返回此记录批次的顶级同步事件对象。

如果此记录批次的所有数据都在 CPU 内存中,则此方法将返回 null。如果此批次的数据在设备上,且在访问数据前需要同步,则返回的同步事件将允许进行同步。

返回:

null 或一个 Device::SyncEvent

Result<std::shared_ptr<Array>> MakeStatisticsArray(MemoryPool *pool = default_memory_pool()) const#

创建此记录批次的统计信息数组。

创建的数组遵循 C 数据接口统计规范。详细信息请参阅 https://arrow.apache.org/docs/format/StatisticsSchema.html

参数:

pool[输入] 用于从中分配内存的内存池

返回:

此记录批次的统计信息数组

公共静态函数

static std::shared_ptr<RecordBatch> Make(std::shared_ptr<Schema> schema, int64_t num_rows, std::vector<std::shared_ptr<Array>> columns, std::shared_ptr<Device::SyncEvent> sync_event = NULLPTR)#
参数:
  • schema[输入] 记录批次模式

  • num_rows[输入] 记录批次中字段的长度。每个数组的长度应与 num_rows 相同

  • columns[输入] 以数组向量形式表示的记录批次字段

  • sync_event[输入] 缓冲区所使用的非 CPU 设备内存的可选同步事件

static std::shared_ptr<RecordBatch> Make(std::shared_ptr<Schema> schema, int64_t num_rows, std::vector<std::shared_ptr<ArrayData>> columns, DeviceAllocationType device_type = DeviceAllocationType::kCPU, std::shared_ptr<Device::SyncEvent> sync_event = NULLPTR)#

使用内部数据结构的向量构造记录批次。

此类旨在供内部使用或供高级用户使用。

0.5.0

参数:
  • schema – 记录批次模式

  • num_rows – 记录批次中语义上的行数。此值应等于每个字段的长度

  • columns – 批次列的数据

  • device_type – Arrow 列所分配设备的类型

  • sync_event – 缓冲区所使用的非 CPU 设备内存的可选同步事件

static Result<std::shared_ptr<RecordBatch>> MakeEmpty(std::shared_ptr<Schema> schema, MemoryPool *pool = default_memory_pool())#

创建一个具有给定模式的空 RecordBatch

输出的 RecordBatch 将使用给定模式中的 DataTypes(数据类型)创建。

参数:
  • schema[输入]RecordBatch 的模式

  • pool[输入] 用于从中分配内存的内存池

返回:

生成的 RecordBatch

static Result<std::shared_ptr<RecordBatch>> FromStructArray(const std::shared_ptr<Array> &array, MemoryPool *pool = default_memory_pool())#

从结构体数组构造记录批次。

这使用给定数组的子数组构造记录批次,该数组必须是结构体数组。

此操作通常是零拷贝的。但是,如果结构体数组具有偏移量或有效性位图,则需要将它们推入子数组中。推入偏移量是零拷贝的,但推入有效性位图则不是。

参数:
  • array[输入] 源数组,必须是 StructArray

  • pool[输入] 用于分配新有效性位图的内存池

class RecordBatchReader#

用于读取记录批次流的抽象接口。

子类包括 arrow::TableBatchReader, arrow::csv::StreamingReader, arrow::flight::sql::example::SqliteStatementBatchReader, arrow::flight::sql::example::SqliteTablesWithSchemaBatchReader, arrow::ipc::RecordBatchStreamReader, arrow::json::StreamingReader

公共函数

virtual std::shared_ptr<Schema> schema() const = 0#
返回:

流中记录批次的共享模式

virtual Status ReadNext(std::shared_ptr<RecordBatch> *batch) = 0#

读取流中的下一个记录批次。

到达流末尾时,批次返回 null

示例

while (true) {
  std::shared_ptr<RecordBatch> batch;
  ARROW_RETURN_NOT_OK(reader->ReadNext(&batch));
  if (!batch) {
    break;
  }
  // handling the `batch`, the `batch->num_rows()`
  // might be 0.
}
参数:

batch[输出] 下一个加载的批次,流末尾为 null。返回一个空批次并不意味着流结束,因为空批次也是有效数据。

返回:

Status(状态)

inline Result<std::shared_ptr<RecordBatch>> Next()#

Iterator(迭代器) 接口。

inline virtual Status Close()#

结束读取器

inline virtual DeviceAllocationType device_type() const#

实验性功能:获取此读取器生成的记录批次的设备类型。

默认实现是返回 DeviceAllocationType::kCPU

inline RecordBatchReaderIterator begin()#

返回指向流中第一个记录批次的迭代器。

inline RecordBatchReaderIterator end()#

返回指向流末尾的迭代器。

Result<RecordBatchVector> ToRecordBatches()#

将整个流作为记录批次向量消费。

Result<std::shared_ptr<Table>> ToTable()#

读取所有批次并连接为 arrow::Table

公共静态函数

static Result<std::shared_ptr<RecordBatchReader>> Make(RecordBatchVector batches, std::shared_ptr<Schema> schema = NULLPTR, DeviceAllocationType device_type = DeviceAllocationType::kCPU)#

RecordBatch 向量创建一个 RecordBatchReader

参数:
  • batches[in] 用于读取的 RecordBatch 向量

  • schema[in] 需遵循的模式 (schema)。如果未提供,将从第一个元素推断。

  • device_type[in] 批处理所分配的设备类型

static Result<std::shared_ptr<RecordBatchReader>> MakeFromIterator(Iterator<std::shared_ptr<RecordBatch>> batches, std::shared_ptr<Schema> schema, DeviceAllocationType device_type = DeviceAllocationType::kCPU)#

RecordBatchIterator 创建一个 RecordBatchReader

参数:
  • batches[in] 用于读取的 RecordBatch 迭代器。

  • schema[in] 迭代器中每个记录批处理需遵循的模式 (schema)。

  • device_type[in] 批处理所分配的设备类型

class RecordBatchReaderIterator#
class TableBatchReader : public arrow::RecordBatchReader#

从一个(可能是分块的)Table 计算记录批处理流。

该转换是零拷贝的:每个记录批处理都是表列切片的一个视图。

在使用批处理读取器之前,表必须是有效的。

公共函数

explicit TableBatchReader(const Table &table)#

为给定的表构造一个 TableBatchReader

virtual std::shared_ptr<Schema> schema() const override#
返回:

流中记录批次的共享模式

virtual Status ReadNext(std::shared_ptr<RecordBatch> *out) override#

读取流中的下一个记录批次。

到达流末尾时,批次返回 null

示例

while (true) {
  std::shared_ptr<RecordBatch> batch;
  ARROW_RETURN_NOT_OK(reader->ReadNext(&batch));
  if (!batch) {
    break;
  }
  // handling the `batch`, the `batch->num_rows()`
  // might be 0.
}
参数:

batch[输出] 下一个加载的批次,流末尾为 null。返回一个空批次并不意味着流结束,因为空批次也是有效数据。

返回:

Status(状态)

void set_chunksize(int64_t chunksize)#

设置记录批处理所需的行数上限。

每个记录批处理中的实际行数可能会更少,具体取决于每个表列的实际分块特性。

表 (Tables)#

class Table#

作为分块数组序列的逻辑表。

公共函数

inline const std::shared_ptr<Schema> &schema() const#

返回表模式 (schema)。

virtual std::shared_ptr<ChunkedArray> column(int i) const = 0#

通过索引返回一列。

virtual const std::vector<std::shared_ptr<ChunkedArray>> &columns() const = 0#

返回表中所有列的向量。

inline std::shared_ptr<Field> field(int i) const#

通过索引返回列的字段。

std::vector<std::shared_ptr<Field>> fields() const#

返回表中所有字段的向量。

virtual std::shared_ptr<Table> Slice(int64_t offset, int64_t length) const = 0#

构造一个具有指定偏移量和长度的表零拷贝切片。

参数:
  • offset[in] 切片中第一行的索引

  • length[in] 切片的行数。如果表中的行数不足,长度将进行相应调整

返回:

返回一个新的被 std::shared_ptr<Table> 包装的对象

inline std::shared_ptr<Table> Slice(int64_t offset) const#

从偏移量处的第一行到表末尾进行切片。

inline std::shared_ptr<ChunkedArray> GetColumnByName(const std::string &name) const#

通过名称返回一列。

参数:

name[输入] 字段名称

返回:

一个 Array,若未找到字段则返回空

virtual Result<std::shared_ptr<Table>> RemoveColumn(int i) const = 0#

从表中移除一列,生成一个新的 Table

virtual Result<std::shared_ptr<Table>> AddColumn(int i, std::shared_ptr<Field> field_arg, std::shared_ptr<ChunkedArray> column) const = 0#

向表中添加一列,生成一个新的 Table

virtual Result<std::shared_ptr<Table>> SetColumn(int i, std::shared_ptr<Field> field_arg, std::shared_ptr<ChunkedArray> column) const = 0#

替换表中的一列,生成一个新的 Table

std::vector<std::string> ColumnNames() const#

返回所有列的名称。

Result<std::shared_ptr<Table>> RenameColumns(const std::vector<std::string> &names) const#

使用提供的名称重命名列。

Result<std::shared_ptr<Table>> SelectColumns(const std::vector<int> &indices) const#

返回带有指定列的新表。

virtual std::shared_ptr<Table> ReplaceSchemaMetadata(const std::shared_ptr<const KeyValueMetadata> &metadata) const = 0#

用新元数据替换模式的键值元数据。

0.5.0

参数:

metadata[in] 新的 KeyValueMetadata

返回:

返回一个新的 Table

virtual Result<std::shared_ptr<Table>> Flatten(MemoryPool *pool = default_memory_pool()) const = 0#

展平表,生成一个新的 Table

任何具有结构类型的列都将被展平为多个列。

参数:

pool[in] 用于缓冲区分配的内存池(如果有)

std::string ToString() const#
返回:

适合调试的 PrettyPrint(格式化输出)表示

virtual Status Validate() const = 0#

执行轻量级验证检查,以确定表模式和内部数据中是否存在明显的不一致。

其复杂度为 O(k*m),其中 k 是字段后代的总数,m 是块的数量。

返回:

Status(状态)

virtual Status ValidateFull() const = 0#

执行广泛的验证检查,以确定表模式和内部数据中是否存在不一致。

其复杂度为 O(k*n),其中 k 是字段后代的总数,n 是行数。

返回:

Status(状态)

inline int num_columns() const#

返回表中的列数。

inline int64_t num_rows() const#

返回行数(等于每列的逻辑长度)

bool Equals(const Table &other, const EqualOptions &opts) const#

确定两个表是否相等。

参数:
  • other[in] 要进行比较的表

  • opts[输入] 相等性比较的选项

返回:

如果两个表相等,则返回 true

inline bool Equals(const Table &other, bool check_metadata = false, const EqualOptions &opts = EqualOptions::Defaults()) const#

确定两个表是否相等。

参数:
  • other[in] 要进行比较的表

  • check_metadata[输入] 若为 true,则比较模式元数据,无论 EqualOptions::use_metadata 中的值如何

  • opts[输入] 相等性比较的选项

返回:

如果两个表相等,则返回 true

Result<std::shared_ptr<Table>> CombineChunks(MemoryPool *pool = default_memory_pool()) const#

通过组合该表的块来创建一个新表。

每列 ChunkedArray 中的所有基础块被连接为零个或一个块。

为避免缓冲区溢出,二进制列可能会被合并为多个块。块将具有尽可能大的长度。

参数:

pool[in] 用于缓冲区分配的内存池

Result<std::shared_ptr<RecordBatch>> CombineChunksToBatch(MemoryPool *pool = default_memory_pool()) const#

通过组合该表的块来创建一个新记录批处理。

每列 ChunkedArray 中的所有基础块被连接为一个块。

参数:

pool[in] 用于缓冲区分配的内存池

公共静态函数

static std::shared_ptr<Table> Make(std::shared_ptr<Schema> schema, std::vector<std::shared_ptr<ChunkedArray>> columns, int64_t num_rows = -1)#

从模式和列构造一个 Table

如果列的长度为零,则表的行数为零。

参数:
  • schema[in] 表模式(列类型)

  • columns[in] 作为分块数组的表列

  • num_rows[in] 表中的行数,-1(默认值)表示从列中推断

static std::shared_ptr<Table> Make(std::shared_ptr<Schema> schema, const std::vector<std::shared_ptr<Array>> &arrays, int64_t num_rows = -1)#

从模式和数组构造一个 Table

参数:
  • schema[in] 表模式(列类型)

  • arrays[in] 作为数组的表列

  • num_rows[in] 表中的行数,-1(默认值)表示从列中推断

static Result<std::shared_ptr<Table>> MakeEmpty(std::shared_ptr<Schema> schema, MemoryPool *pool = default_memory_pool())#

创建一个给定模式的空 Table

输出的 Table 将为每列创建一个单一的空块。

参数:
  • schema[in]Table 的模式

  • pool[输入] 用于从中分配内存的内存池

返回:

生成的 Table

static Result<std::shared_ptr<Table>> FromRecordBatchReader(RecordBatchReader *reader)#

RecordBatchReader 构造一个 Table

参数:

reader[in] 产生批处理的 arrow::RecordBatchReader

static Result<std::shared_ptr<Table>> FromRecordBatches(const std::vector<std::shared_ptr<RecordBatch>> &batches)#

RecordBatch 构造一个 Table,使用由第一个 RecordBatch 提供的模式。

参数:

batches[in] 一个记录批处理的 std::vector

static Result<std::shared_ptr<Table>> FromRecordBatches(std::shared_ptr<Schema> schema, const std::vector<std::shared_ptr<RecordBatch>> &batches)#

RecordBatch 构造一个 Table,使用提供的模式。

可以有零个记录批处理

参数:
  • schema[in] 每个批处理的 arrow::Schema

  • batches[in] 一个记录批处理的 std::vector

static Result<std::shared_ptr<Table>> FromChunkedStructArray(const std::shared_ptr<ChunkedArray> &array)#

从分块的 StructArray 构造一个 Table

对于 StructArray 的每个字段都将产生一列。

参数:

array[in] 分块的 StructArray

Result<std::shared_ptr<Table>> arrow::ConcatenateTables(const std::vector<std::shared_ptr<Table>> &tables, ConcatenateTablesOptions options = ConcatenateTablesOptions::Defaults(), MemoryPool *memory_pool = default_memory_pool())#

从多个输入表构造一个新表。

如果模式完全相同,新表将直接从现有的列块组装,无需拷贝。如果模式不完全匹配且选项中启用了 unify_schemas(默认关闭),将尝试统一它们,然后将列块转换为各自统一的数据类型,这可能会产生拷贝。使用 :func:arrow::PromoteTableToSchema 来统一模式。

表将按提供的顺序连接,并且表中行的顺序将被保留。

参数:
  • tables[in] 要连接的表的 std::vector

  • options[in] 指定如何统一输入表的模式

  • memory_pool[in] 如果需要创建填充 null 的数组,或者现有的列块需要进行类型转换时使用的 MemoryPool

返回:

返回一个新的 Table

Result<std::shared_ptr<Table>> arrow::PromoteTableToSchema(const std::shared_ptr<Table> &table, const std::shared_ptr<Schema> &schema, MemoryPool *pool = default_memory_pool())#

将表提升至遵循给定的模式。

如果模式中的某个字段在表中没有对应的列,则会将一列 null 添加到生成的表中。如果对应的列是 Null 类型,它将被提升为模式指定的类型,并填充 null 值。列将被转换为模式指定的类型。

如果出现以下情况,则返回错误:

  • 如果对应列的类型与模式不兼容。

  • 如果表中存在模式中不存在的列。

  • 如果转换失败,或者需要转换但无法提供。

参数:
  • table[in] 输入的 Table

  • schema[in] 要提升至的目标模式

  • pool[in] 如果需要创建填充 null 的数组,则使用的内存池。

Result<std::shared_ptr<Table>> arrow::PromoteTableToSchema(const std::shared_ptr<Table> &table, const std::shared_ptr<Schema> &schema, const compute::CastOptions &options, MemoryPool *pool = default_memory_pool())#

将表提升至遵循给定的模式。

如果模式中的某个字段在表中没有对应的列,则会将一列 null 添加到生成的表中。如果对应的列是 Null 类型,它将被提升为模式指定的类型,并填充 null 值。列将被转换为模式指定的类型。

如果出现以下情况,则返回错误:

  • 如果对应列的类型与模式不兼容。

  • 如果表中存在模式中不存在的列。

  • 如果转换失败,或者需要转换但无法提供。

参数:
  • table[in] 输入的 Table

  • schema[in] 要提升至的目标模式

  • options[in] 允许类型提升的转换选项

  • pool[in] 如果需要创建填充 null 的数组,则使用的内存池。