Arrow JDBC 适配器

The Arrow Java JDBC 模块 将 JDBC ResultSet 转换为 Arrow VectorSchemaRoots。

ResultSet 到 VectorSchemaRoot 的转换

帮助我们转换 ResultSet 到 VectorSchemaRoot 的主要类是 JdbcToArrow

import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.ibatis.jdbc.ScriptRunner;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;

try (BufferAllocator allocator = new RootAllocator();
     Connection connection = DriverManager.getConnection(
             "jdbc:h2:mem:h2-jdbc-adapter")) {
    ScriptRunner runnerDDLDML = new ScriptRunner(connection);
    runnerDDLDML.setLogWriter(null);
    runnerDDLDML.runScript(new BufferedReader(
            new FileReader("./thirdpartydeps/jdbc/h2-ddl.sql")));
    runnerDDLDML.runScript(new BufferedReader(
            new FileReader("./thirdpartydeps/jdbc/h2-dml.sql")));
    try (ResultSet resultSet = connection.createStatement().executeQuery(
            "SELECT int_field1, bool_field2, bigint_field5 FROM TABLE1");
         ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator(
                 resultSet, allocator)) {
        while (iterator.hasNext()) {
            try (VectorSchemaRoot root = iterator.next()) {
                System.out.print(root.contentToTSVString());
            }
        }
    }
} catch (SQLException | IOException e) {
    e.printStackTrace();
}
INT_FIELD1    BOOL_FIELD2    BIGINT_FIELD5
101    true    1000000000300
102    true    100000000030
103    true    10000000003

配置数组子类型

JdbcToArrow 通过 JdbcToArrowConfig 接受配置。例如,数组列元素的类型可以通过 setArraySubTypeByColumnNameMap 指定。

import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.ibatis.jdbc.ScriptRunner;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;

try (BufferAllocator allocator = new RootAllocator();
     Connection connection = DriverManager.getConnection(
             "jdbc:h2:mem:h2-jdbc-adapter")) {
    ScriptRunner runnerDDLDML = new ScriptRunner(connection);
    runnerDDLDML.setLogWriter(null);
    runnerDDLDML.runScript(new BufferedReader(
            new FileReader("./thirdpartydeps/jdbc/h2-ddl.sql")));
    runnerDDLDML.runScript(new BufferedReader(
            new FileReader("./thirdpartydeps/jdbc/h2-dml.sql")));
    JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator,
            JdbcToArrowUtils.getUtcCalendar())
            .setArraySubTypeByColumnNameMap(
                    new HashMap<>() {{
                        put("LIST_FIELD19",
                                new JdbcFieldInfo(Types.INTEGER));
                    }}
            )
            .build();
    try (ResultSet resultSet = connection.createStatement().executeQuery(
            "SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1");
         ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator(
                 resultSet, config)) {
        while (iterator.hasNext()) {
            try (VectorSchemaRoot root = iterator.next()) {
                System.out.print(root.contentToTSVString());
            }
        }
    }
} catch (SQLException | IOException e) {
    e.printStackTrace();
}
INT_FIELD1    BOOL_FIELD2    BIGINT_FIELD5    CHAR_FIELD16    LIST_FIELD19
101    true    1000000000300    some char text      [1,2,3]
102    true    100000000030    some char text      [1,2]
103    true    10000000003    some char text      [1]

配置批处理大小

默认情况下,适配器将以批处理方式读取最多 1024 行。这可以通过 setTargetBatchSize 自定义。

import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.ibatis.jdbc.ScriptRunner;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;

try (BufferAllocator allocator = new RootAllocator();
     Connection connection = DriverManager.getConnection(
             "jdbc:h2:mem:h2-jdbc-adapter")) {
    ScriptRunner runnerDDLDML = new ScriptRunner(connection);
    runnerDDLDML.setLogWriter(null);
    runnerDDLDML.runScript(new BufferedReader(
            new FileReader("./thirdpartydeps/jdbc/h2-ddl.sql")));
    runnerDDLDML.runScript(new BufferedReader(
            new FileReader("./thirdpartydeps/jdbc/h2-dml.sql")));
    JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator,
            JdbcToArrowUtils.getUtcCalendar())
            .setTargetBatchSize(2)
            .setArraySubTypeByColumnNameMap(
                    new HashMap<>() {{
                        put("LIST_FIELD19",
                                new JdbcFieldInfo(Types.INTEGER));
                    }}
            )
            .build();
    try (ResultSet resultSet = connection.createStatement().executeQuery(
            "SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1");
         ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator(
                 resultSet, config)) {
        while (iterator.hasNext()) {
            try (VectorSchemaRoot root = iterator.next()) {
                System.out.print(root.contentToTSVString());
            }
        }
    }
} catch (SQLException | IOException e) {
    e.printStackTrace();
}
INT_FIELD1    BOOL_FIELD2    BIGINT_FIELD5    CHAR_FIELD16    LIST_FIELD19
101    true    1000000000300    some char text      [1,2,3]
102    true    100000000030    some char text      [1,2]
INT_FIELD1    BOOL_FIELD2    BIGINT_FIELD5    CHAR_FIELD16    LIST_FIELD19
103    true    10000000003    some char text      [1]

配置数值(小数)精度和比例

默认情况下,任何小数值的比例必须与列的 Arrow 类型定义的比例完全匹配,否则将抛出 UnsupportedOperationException,并显示类似 BigDecimal scale must equal that in the Arrow vector 的消息。

这可能是因为 Arrow 从 ResultSet 元数据推断类型,而这对于所有数据库驱动程序来说并不准确。JDBC 适配器允许您通过覆盖小数比例或通过 setBigDecimalRoundingMode 提供 RoundingMode 来避免这种情况,以将值转换为预期比例。

在此示例中,我们有一个 BigInt 列。默认情况下,推断的比例为 0。我们覆盖比例为 7,然后设置 RoundingMode 以将值转换为给定比例。

import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcFieldInfo;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfigBuilder;
import org.apache.arrow.adapter.jdbc.JdbcToArrowUtils;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.ibatis.jdbc.ScriptRunner;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.math.RoundingMode;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.HashMap;

try (BufferAllocator allocator = new RootAllocator();
     Connection connection = DriverManager.getConnection(
             "jdbc:h2:mem:h2-jdbc-adapter")) {
    ScriptRunner runnerDDLDML = new ScriptRunner(connection);
    runnerDDLDML.setLogWriter(null);
    runnerDDLDML.runScript(new BufferedReader(
            new FileReader("./thirdpartydeps/jdbc/h2-ddl.sql")));
    runnerDDLDML.runScript(new BufferedReader(
            new FileReader("./thirdpartydeps/jdbc/h2-dml.sql")));
    JdbcToArrowConfig config = new JdbcToArrowConfigBuilder(allocator,
            JdbcToArrowUtils.getUtcCalendar())
            .setTargetBatchSize(2)
            .setArraySubTypeByColumnNameMap(
                    new HashMap<>() {{
                        put("LIST_FIELD19",
                                new JdbcFieldInfo(Types.INTEGER));
                    }}
            )
            .setExplicitTypesByColumnName(
                    new HashMap<>() {{
                        put("BIGINT_FIELD5",
                                new JdbcFieldInfo(Types.DECIMAL, 20, 7));
                    }}
            )
            .setBigDecimalRoundingMode(RoundingMode.UNNECESSARY)
            .build();
    try (ResultSet resultSet = connection.createStatement().executeQuery(
            "SELECT int_field1, bool_field2, bigint_field5, char_field16, list_field19 FROM TABLE1");
         ArrowVectorIterator iterator = JdbcToArrow.sqlToArrowVectorIterator(
                 resultSet, config)) {
        while (iterator.hasNext()) {
            try (VectorSchemaRoot root = iterator.next()) {
                System.out.print(root.contentToTSVString());
            }
        }
    }
} catch (SQLException | IOException e) {
    e.printStackTrace();
}
INT_FIELD1    BOOL_FIELD2    BIGINT_FIELD5    CHAR_FIELD16    LIST_FIELD19
101    true    1000000000300.0000000    some char text      [1,2,3]
102    true    100000000030.0000000    some char text      [1,2]
INT_FIELD1    BOOL_FIELD2    BIGINT_FIELD5    CHAR_FIELD16    LIST_FIELD19
103    true    10000000003.0000000    some char text      [1]