Substrait

Arrow 可以使用 Substrait 与其他语言集成。

查询数据集

Arrow 中的 Substrait 支持结合了 Datasetsubstrait-java,使用 Acero 作为后端来查询数据集。

Acero 目前支持

  • 读取 Arrow、CSV、ORC 和 Parquet 文件

  • 过滤器

  • 投影

  • 连接

  • 聚合

以下是一个查询 Parquet 文件的 Java 程序示例

import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlToSubstrait;
import io.substrait.proto.Plan;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

static Plan queryTableNation() throws SqlParseException {
   String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17";
   String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " +
           "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
   SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
   Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation));
   return plan;
}

static void queryDatasetThruSubstraitPlanDefinition() {
   String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet";
   ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
   try (
       BufferAllocator allocator = new RootAllocator();
       DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
               FileFormat.PARQUET, uri);
       Dataset dataset = datasetFactory.finish();
       Scanner scanner = dataset.newScan(options);
       ArrowReader reader = scanner.scanBatches()
   ) {
       Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
       mapTableToArrowReader.put("NATION", reader);
       // get binary plan
       Plan plan = queryTableNation();
       ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length);
       substraitPlan.put(plan.toByteArray());
       // run query
       try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery(
           substraitPlan,
           mapTableToArrowReader
       )) {
           while (arrowReader.loadNextBatch()) {
               System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
           }
       }
   } catch (Exception e) {
       e.printStackTrace();
   }
}

queryDatasetThruSubstraitPlanDefinition();
N_NATIONKEY    N_NAME    N_REGIONKEY    N_COMMENT
17    PERU    1    platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun

也可以查询多个数据集并根据某些条件将它们连接起来。例如,我们可以连接 TPC-H 基准测试中的 nation 和 customer 表

import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlToSubstrait;
import io.substrait.proto.Plan;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

static Plan queryTableNationJoinCustomer() throws SqlParseException {
    String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM NATION n JOIN CUSTOMER c " +
        "ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " +
        "GROUP BY n.n_name";
    String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, " +
        "N_NAME CHAR(25), N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
    String customer = "CREATE TABLE CUSTOMER (C_CUSTKEY BIGINT NOT NULL, " +
        "C_NAME VARCHAR(25), C_ADDRESS VARCHAR(40), C_NATIONKEY BIGINT NOT NULL, " +
        "C_PHONE CHAR(15), C_ACCTBAL DECIMAL, C_MKTSEGMENT CHAR(10), " +
        "C_COMMENT VARCHAR(117) )";
    SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
    Plan plan = sqlToSubstrait.execute(sql,
        ImmutableList.of(nation, customer));
    return plan;
}

static void queryTwoDatasetsThruSubstraitPlanDefinition() {
    String uriNation = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet";
    String uriCustomer = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/customer.parquet";
    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
    try (
        BufferAllocator allocator = new RootAllocator();
        DatasetFactory datasetFactory = new FileSystemDatasetFactory(
            allocator, NativeMemoryPool.getDefault(),
            FileFormat.PARQUET, uriNation);
        Dataset dataset = datasetFactory.finish();
        Scanner scanner = dataset.newScan(options);
        ArrowReader readerNation = scanner.scanBatches();
        DatasetFactory datasetFactoryCustomer = new FileSystemDatasetFactory(
            allocator, NativeMemoryPool.getDefault(),
            FileFormat.PARQUET, uriCustomer);
        Dataset datasetCustomer = datasetFactoryCustomer.finish();
        Scanner scannerCustomer = datasetCustomer.newScan(options);
        ArrowReader readerCustomer = scannerCustomer.scanBatches()
    ) {
        // map table to reader
        Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
        mapTableToArrowReader.put("NATION", readerNation);
        mapTableToArrowReader.put("CUSTOMER", readerCustomer);
        // get binary plan
        Plan plan = queryTableNationJoinCustomer();
        ByteBuffer substraitPlan = ByteBuffer.allocateDirect(
            plan.toByteArray().length);
        substraitPlan.put(plan.toByteArray());
        // run query
        try (ArrowReader arrowReader = new AceroSubstraitConsumer(
            allocator).runQuery(
            substraitPlan,
            mapTableToArrowReader
        )) {
            while (arrowReader.loadNextBatch()) {
                System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

queryTwoDatasetsThruSubstraitPlanDefinition();
N_NAME    NUMBER_CUSTOMER
PERU    573