Arrow JDBC 适配器¶
The Arrow Java JDBC 模块 将 JDBC ResultSet 转换为 Arrow VectorSchemaRoots。
ResultSet 到 VectorSchemaRoot 的转换¶
帮助我们转换 ResultSet 到 VectorSchemaRoot 的主要类是 JdbcToArrow
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.ibatis.jdbc.ScriptRunner;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
try (BufferAllocator allocator = new RootAllocator();
Connection connection = DriverManager.getConnection(
"jdbc:h2:mem:h2-jdbc-adapter")) {
ScriptRunner runnerDDLDML = new ScriptRunner(connection);
runnerDDLDML.setLogWriter(null);
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./thirdpartydeps/jdbc/h2-ddl.sql")));
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./thirdpartydeps/jdbc/h2-dml.sql")));
try (ResultSet resultSet = connection.createStatement().executeQuery(
"SELECT int_field1, bool_field2, bigint_field5 FROM TABLE1");
ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator(
resultSet, allocator)) {
while (iterator.hasNext()) {
try (VectorSchemaRoot root = iterator.next()) {
System.out.print(root.contentToTSVString());
}
}
}
} catch (SQLException | IOException e) {
e.printStackTrace();
}
INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5
101 true 1000000000300
102 true 100000000030
103 true 10000000003
配置数组子类型¶
JdbcToArrow 通过 JdbcToArrowConfig 接受配置。例如,数组列元素的类型可以通过 setArraySubTypeByColumnNameMap
指定。
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.ibatis.jdbc.ScriptRunner;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;
try (BufferAllocator allocator = new RootAllocator();
Connection connection = DriverManager.getConnection(
"jdbc:h2:mem:h2-jdbc-adapter")) {
ScriptRunner runnerDDLDML = new ScriptRunner(connection);
runnerDDLDML.setLogWriter(null);
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./thirdpartydeps/jdbc/h2-ddl.sql")));
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./thirdpartydeps/jdbc/h2-dml.sql")));
JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator,
JdbcToArrowUtils.getUtcCalendar())
.setArraySubTypeByColumnNameMap(
new HashMap<>() {{
put("LIST_FIELD19",
new JdbcFieldInfo(Types.INTEGER));
}}
)
.build();
try (ResultSet resultSet = connection.createStatement().executeQuery(
"SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1");
ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator(
resultSet, config)) {
while (iterator.hasNext()) {
try (VectorSchemaRoot root = iterator.next()) {
System.out.print(root.contentToTSVString());
}
}
}
} catch (SQLException | IOException e) {
e.printStackTrace();
}
INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19
101 true 1000000000300 some char text [1,2,3]
102 true 100000000030 some char text [1,2]
103 true 10000000003 some char text [1]
配置批处理大小¶
默认情况下,适配器将以批处理方式读取最多 1024 行。这可以通过 setTargetBatchSize
自定义。
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.ibatis.jdbc.ScriptRunner;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;
try (BufferAllocator allocator = new RootAllocator();
Connection connection = DriverManager.getConnection(
"jdbc:h2:mem:h2-jdbc-adapter")) {
ScriptRunner runnerDDLDML = new ScriptRunner(connection);
runnerDDLDML.setLogWriter(null);
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./thirdpartydeps/jdbc/h2-ddl.sql")));
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./thirdpartydeps/jdbc/h2-dml.sql")));
JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator,
JdbcToArrowUtils.getUtcCalendar())
.setTargetBatchSize(2)
.setArraySubTypeByColumnNameMap(
new HashMap<>() {{
put("LIST_FIELD19",
new JdbcFieldInfo(Types.INTEGER));
}}
)
.build();
try (ResultSet resultSet = connection.createStatement().executeQuery(
"SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1");
ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator(
resultSet, config)) {
while (iterator.hasNext()) {
try (VectorSchemaRoot root = iterator.next()) {
System.out.print(root.contentToTSVString());
}
}
}
} catch (SQLException | IOException e) {
e.printStackTrace();
}
INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19
101 true 1000000000300 some char text [1,2,3]
102 true 100000000030 some char text [1,2]
INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19
103 true 10000000003 some char text [1]
配置数值(小数)精度和比例¶
默认情况下,任何小数值的比例必须与列的 Arrow 类型定义的比例完全匹配,否则将抛出 UnsupportedOperationException,并显示类似 BigDecimal scale must equal that in the Arrow vector
的消息。
这可能是因为 Arrow 从 ResultSet 元数据推断类型,而这对于所有数据库驱动程序来说并不准确。JDBC 适配器允许您通过覆盖小数比例或通过 setBigDecimalRoundingMode
提供 RoundingMode 来避免这种情况,以将值转换为预期比例。
在此示例中,我们有一个 BigInt 列。默认情况下,推断的比例为 0。我们覆盖比例为 7,然后设置 RoundingMode 以将值转换为给定比例。
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.ibatis.jdbc.ScriptRunner;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.math.RoundingMode;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;
try (BufferAllocator allocator = new RootAllocator();
Connection connection = DriverManager.getConnection(
"jdbc:h2:mem:h2-jdbc-adapter")) {
ScriptRunner runnerDDLDML = new ScriptRunner(connection);
runnerDDLDML.setLogWriter(null);
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./thirdpartydeps/jdbc/h2-ddl.sql")));
runnerDDLDML.runScript(new BufferedReader(
new FileReader("./thirdpartydeps/jdbc/h2-dml.sql")));
JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator,
JdbcToArrowUtils.getUtcCalendar())
.setTargetBatchSize(2)
.setArraySubTypeByColumnNameMap(
new HashMap<>() {{
put("LIST_FIELD19",
new JdbcFieldInfo(Types.INTEGER));
}}
)
.setExplicitTypesByColumnName(
new HashMap<>() {{
put("BIGINT_FIELD5",
new JdbcFieldInfo(Types.DECIMAL, 20, 7));
}}
)
.setBigDecimalRoundingMode(RoundingMode.UNNECESSARY)
.build();
try (ResultSet resultSet = connection.createStatement().executeQuery(
"SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1");
ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator(
resultSet, config)) {
while (iterator.hasNext()) {
try (VectorSchemaRoot root = iterator.next()) {
System.out.print(root.contentToTSVString());
}
}
}
} catch (SQLException | IOException e) {
e.printStackTrace();
}
INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19
101 true 1000000000300.0000000 some char text [1,2,3]
102 true 100000000030.0000000 some char text [1,2]
INT_FIELD1 BOOL_FIELD2 BIGINT_FIELD5 CHAR_FIELD16 LIST_FIELD19
103 true 10000000003.0000000 some char text [1]