Substrait#
该 arrow-dataset
模块可以通过 Acero 查询引擎执行 Substrait 计划。
使用 Substrait 计划执行查询#
计划可以通过 URI 引用文件中的数据,或者引用必须与计划一起提供的“命名表”。
以下是一个使用 Java Substrait 查询 Parquet 文件的 Java 程序示例(此示例使用 Substrait Java 项目将 SQL 查询编译为 Substrait 计划)
import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlToSubstrait;
import io.substrait.proto.Plan;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class ClientSubstrait {
public static void main(String[] args) {
String uri = "file:///data/tpch_parquet/nation.parquet";
ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
try (
BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()
) {
// map table to reader
Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
mapTableToArrowReader.put("NATION", reader);
// get binary plan
Plan plan = getPlan();
ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length);
substraitPlan.put(plan.toByteArray());
// run query
try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery(
substraitPlan,
mapTableToArrowReader
)) {
while (arrowReader.loadNextBatch()) {
System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
static Plan getPlan() throws SqlParseException {
String sql = "SELECT * from nation";
String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " +
"N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation));
return plan;
}
}
// Results example:
FieldPath(0) FieldPath(1) FieldPath(2) FieldPath(3)
0 ALGERIA 0 haggle. carefully final deposits detect slyly agai
1 ARGENTINA 1 al foxes promise slyly according to the regular accounts. bold requests alon
使用扩展表达式执行投影和过滤器#
Dataset 还支持使用 Substrait 的 扩展表达式 进行投影和过滤。这需要 substrait-java 库。
此 Java 程序
加载包含来自 TPC-H 基准测试的“nation”表的 Parquet 文件。
- 应用过滤器
N_NATIONKEY > 18
- 投影两列新列
N_REGIONKEY + 10
N_NAME || ' - ' || N_COMMENT
import com.google.common.collect.ImmutableList;
import io.substrait.isthmus.SqlExpressionToSubstrait;
import io.substrait.proto.ExtendedExpression;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
import org.apache.arrow.dataset.jni.NativeMemoryPool;
import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.calcite.sql.parser.SqlParseException;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Optional;
public class ClientSubstraitExtendedExpressionsCookbook {
public static void main(String[] args) throws SqlParseException {
projectAndFilterDataset();
}
private static void projectAndFilterDataset() throws SqlParseException {
String uri = "file:///Users/data/tpch_parquet/nation.parquet";
ScanOptions options =
new ScanOptions.Builder(/*batchSize*/ 32768)
.columns(Optional.empty())
.substraitFilter(getByteBuffer(new String[]{"N_NATIONKEY > 18"}))
.substraitProjection(getByteBuffer(new String[]{"N_REGIONKEY + 10",
"N_NAME || CAST(' - ' as VARCHAR) || N_COMMENT"}))
.build();
try (BufferAllocator allocator = new RootAllocator();
DatasetFactory datasetFactory =
new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()) {
while (reader.loadNextBatch()) {
System.out.println(reader.getVectorSchemaRoot().contentToTSVString());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static ByteBuffer getByteBuffer(String[] sqlExpression) throws SqlParseException {
String schema =
"CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME VARCHAR, "
+ "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)";
SqlExpressionToSubstrait expressionToSubstrait = new SqlExpressionToSubstrait();
ExtendedExpression expression =
expressionToSubstrait.convert(sqlExpression, ImmutableList.of(schema));
byte[] expressionToByte =
Base64.getDecoder().decode(Base64.getEncoder().encodeToString(expression.toByteArray()));
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(expressionToByte.length);
byteBuffer.put(expressionToByte);
return byteBuffer;
}
}
column-1 column-2
13 ROMANIA - ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account
14 SAUDI ARABIA - ts. silent requests haggle. closely express packages sleep across the blithely
12 VIETNAM - hely enticingly express accounts. even, final
13 RUSSIA - requests against the platelets use never according to the quickly regular pint
13 UNITED KINGDOM - eans boost carefully special requests. accounts are. carefull
11 UNITED STATES - y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be