DataFusion:一个用于 Apache Arrow 的 Rust 原生查询引擎


已发布 2019年02月04日
作者 Andy Grove (agrove)

我们很高兴地宣布 DataFusion 已捐赠给 Apache Arrow 项目。DataFusion 是 Apache Arrow 的 Rust 实现的一个内存中查询引擎。

尽管 DataFusion 在两年前就已经启动,但它最近被重写为 Arrow 原生,目前功能有限,但它支持针对 RecordBatch 迭代器的 SQL 查询,并支持 CSV 文件。我们计划添加对 Parquet 文件的支持。

SQL 支持仅限于投影(SELECT)、选择(WHERE)和简单的聚合(MIN, MAX, SUM),并带有可选的 GROUP BY 子句。

支持的表达式包括标识符、字面量、简单的数学运算(+, -, *, /)、二元表达式(AND, OR)、相等和比较运算符(=, !=, <, <=, >=, >)以及 CAST(expr AS type)

示例

以下示例演示了针对 CSV 文件运行简单的聚合 SQL 查询。

// create execution context
let mut ctx = ExecutionContext::new();

// define schema for data source (csv file)
let schema = Arc::new(Schema::new(vec![
    Field::new("c1", DataType::Utf8, false),
    Field::new("c2", DataType::UInt32, false),
    Field::new("c3", DataType::Int8, false),
    Field::new("c4", DataType::Int16, false),
    Field::new("c5", DataType::Int32, false),
    Field::new("c6", DataType::Int64, false),
    Field::new("c7", DataType::UInt8, false),
    Field::new("c8", DataType::UInt16, false),
    Field::new("c9", DataType::UInt32, false),
    Field::new("c10", DataType::UInt64, false),
    Field::new("c11", DataType::Float32, false),
    Field::new("c12", DataType::Float64, false),
    Field::new("c13", DataType::Utf8, false),
]));

// register csv file with the execution context
let csv_datasource =
    CsvDataSource::new("test/data/aggregate_test_100.csv", schema.clone(), 1024);
ctx.register_datasource("aggregate_test_100", Rc::new(RefCell::new(csv_datasource)));

let sql = "SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 WHERE c11 > 0.1 AND c11 < 0.9 GROUP BY c1";

// execute the query
let relation = ctx.sql(&sql).unwrap();
let mut results = relation.borrow_mut();

// iterate over the results
while let Some(batch) = results.next().unwrap() {
    println!(
        "RecordBatch has {} rows and {} columns",
        batch.num_rows(),
        batch.num_columns()
    );

    let c1 = batch
        .column(0)
        .as_any()
        .downcast_ref::<BinaryArray>()
        .unwrap();

    let min = batch
        .column(1)
        .as_any()
        .downcast_ref::<Float64Array>()
        .unwrap();

    let max = batch
        .column(2)
        .as_any()
        .downcast_ref::<Float64Array>()
        .unwrap();

    for i in 0..batch.num_rows() {
        let c1_value: String = String::from_utf8(c1.value(i).to_vec()).unwrap();
        println!("{}, Min: {}, Max: {}", c1_value, min.value(i), max.value(i),);
    }
}

路线图

DataFusion 的路线图将取决于 Rust 社区的兴趣,但以下是一些计划中的短期目标:

  • 扩展现有功能的测试覆盖率
  • 添加对 Parquet 数据源的支持
  • 实现更多的 SQL 功能,例如 JOINORDER BYLIMIT
  • 实现一个 DataFrame API 作为 SQL 的替代方案
  • 利用 Rust 的异步(async/await)功能添加对分区和并行查询执行的支持
  • 创建一个 Docker 镜像,以便可以轻松地将 DataFusion 作为独立的查询工具用于交互式和批处理查询

欢迎贡献者!

如果您对能够使用 Rust 进行数据科学感到兴奋,并希望为这项工作做出贡献,那么有很多方式可以参与进来。最简单的入门方法是针对您自己的数据源试用 DataFusion 并报告您发现的任何问题。您还可以查看当前的问题列表并尝试解决一个。您也可以加入用户邮件列表提问。