6

JDBC 基本流程源码分析

 3 years ago
source link: https://segmentfault.com/a/1190000040580622
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

最近为了阅读 MyBatis 源码,大概看了下 JDBC 的代码,了解在不使用 ORM 框架的情况下 Java 实现 SQL 查询的原理。

JDBC 定义如下:

JDBC(Java DataBase Connectivity,java数据库连接)是一种用于执行 SQL 语句的 Java API,可以为多种关系数据库提供统一访问,它由一组用 Java 语言编写的类和接口组成。JDBC 提供了一种基准,据此可以构建更高级的工具和接口,使数据库开发人员能够编写数据库应用程序。

也就是说 JDBC 是 SUN 公司提出来的一系列规范,但它只定义了接口规范,具体的实现则交给各个数据库的厂商去做。

1. 使用流程

JDBC 流程:

  1. 通过 SPI 加载 Driver 驱动类。
  2. 建立数据库连接,获取 Connection 连接对象。
  3. 通过连接创建 Statement 对象,执行 SQL 语句,获取返回结果。
  4. 释放资源。

在 Maven 中添加对 mysql 驱动的依赖:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.23</version>
</dependency>

数据库表:
学生表

1.1 普通查询

@Test
public void query() throws Exception {
    // 打印至控制台
    DriverManager.setLogWriter(new PrintWriter(System.out));
    // 建立连接
    Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb", "test", "test");
    // 执行 SQL 查询,获取返回结果
    Statement statement = conn.createStatement();
    statement.setQueryTimeout(60);
    ResultSet resultSet = statement.executeQuery("select * from t_student");
    while (resultSet.next()) {
        System.out.println("id:" + resultSet.getInt(1) + " address:" + resultSet.getString(2) + " name:" + resultSet.getString(4));
    }
}

执行结果:

DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb")
    trying com.mysql.cj.jdbc.Driver
getConnection returning com.mysql.cj.jdbc.Driver
id:1 address:hunan name:zhangsan
id:2 address:guangzhou name:lisi
id:3 address:四川 name:大大

1.2 参数查询

@Test
public void queryByParam() throws SQLException {
    DriverManager.setLogWriter(new PrintWriter(System.out));
    // 建立连接
    Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb", "test", "test");
    // 执行 SQL 查询,获取返回结果
    PreparedStatement preparedStatement = conn.prepareStatement("select * from t_student where id = ? and age = ?");
    preparedStatement.setString(1, "1");
    preparedStatement.setInt(2, 11);
    ResultSet resultSet = preparedStatement.executeQuery();
    while (resultSet.next()) {
        System.out.println("id:" + resultSet.getInt(1) + " address:" + resultSet.getString(2) + " name:" + resultSet.getString(4));
    }
}

1.3 批量更新

@Test
public void updateBatch() throws SQLException {
    DriverManager.setLogWriter(new PrintWriter(System.out));
    Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb", "test", "test");
    PreparedStatement preparedStatement = conn.prepareStatement("update t_student set age = ? where id = ?");

    preparedStatement.setInt(1, 10);
    preparedStatement.setString(2, "1");
    preparedStatement.addBatch();

    preparedStatement.setInt(1, 10);
    preparedStatement.setString(2, "2");
    preparedStatement.addBatch();

    int[] result = preparedStatement.executeBatch();
    System.out.println("result = " + result.length);
}

2. 源码解析

2.1 加载驱动

主要流程:

  1. 通过 SPI 加载 MySQL 驱动包中的 Driver 类。
  2. 将 Driver 类注册到 JDBC 的 DriverManager 之中。

2.1.1 SPI 加载驱动类

DriverManager 中定义了静态代码块,会通过 SPI 来加载数据库驱动类。

java.sql.DriverManager

/**
 * Load the initial JDBC drivers by checking the System property
 * jdbc.properties and then use the {@code ServiceLoader} mechanism
 */
static {
    loadInitialDrivers();
    println("JDBC DriverManager initialized");
}

java.sql.DriverManager#loadInitialDrivers

private static void loadInitialDrivers() {
    // ...
    AccessController.doPrivileged(new PrivilegedAction<Void>() {
        public Void run() {

            // 采用 SPI 机制加载数据库驱动
            ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class); 
            Iterator<Driver> driversIterator = loadedDrivers.iterator();

            /* Load these drivers, so that they can be instantiated.
             * It may be the case that the driver class may not be there
             * i.e. there may be a packaged driver with the service class
             * as implementation of java.sql.Driver but the actual class
             * may be missing. In that case a java.util.ServiceConfigurationError
             * will be thrown at runtime by the VM trying to locate
             * and load the service.
             *
             * Adding a try catch block to catch those runtime errors
             * if driver not available in classpath but it's
             * packaged as service and that service is there in classpath.
             */
            try{
                while(driversIterator.hasNext()) {
                    driversIterator.next();
                }
            } catch(Throwable t) {
            // Do nothing
            }
            return null;
        }
    });
    // ...
}

通过 SPI,加载配置文件 META-INF/services/java.sql.Driver,由于项目中具有 MySql 驱动的 jar 包,这里是读取到配置文件:
mysql-connector-java-8.0.23.jar!\META-INF\services\java.sql.Driver
文件中的内容为 com.mysql.cj.jdbc.Driver,反射获取得到该类,并进行实例化。

2.1.2 注册驱动类

实例化 MySQL 驱动的时候,会通过 DriverManager#registerDriver 方法来注册驱动。

com.mysql.cj.jdbc.Driver

public class Driver extends NonRegisteringDriver implements java.sql.Driver {
    //
    // Register ourselves with the DriverManager
    //
    static {
        try {
            java.sql.DriverManager.registerDriver(new Driver()); // 注册驱动
        } catch (SQLException E) {
            throw new RuntimeException("Can't register driver!");
        }
    }

    /**
     * Construct a new driver and register it with DriverManager
     * 
     * @throws SQLException
     *             if a database error occurs.
     */
    public Driver() throws SQLException {
        // Required for Class.forName().newInstance()
    }
}

其底层实现是,将 MySQL 驱动 Driver 对象包装为 DriverInfo,存储在 DriverManager#registeredDrivers 集合中。

java.sql.DriverManager#registerDriver

// List of registered JDBC drivers
private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();

public static synchronized void registerDriver(java.sql.Driver driver)
    throws SQLException {

    registerDriver(driver, null);
}

public static synchronized void registerDriver(java.sql.Driver driver,
        DriverAction da)
    throws SQLException {

    /* Register the driver if it has not already been added to our list */
    if(driver != null) {
        registeredDrivers.addIfAbsent(new DriverInfo(driver, da)); // 注册驱动
    } else {
        // This is for compatibility with the original DriverManager
        throw new NullPointerException();
    }

    println("registerDriver: " + driver);

}

2.2 建立连接

使用 DriverManager#getConnection 方法来建立与 MySQL 服务器的连接。

Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/testdb", "test", "test");

其中遍历 CopyOnWriteArrayList<DriverInfo> registeredDrivers 中已注册的数据库驱动,这里是 com.mysql.cj.jdbc.Driver 实例。

java.sql.DriverManager#getConnection

for(DriverInfo aDriver : registeredDrivers) { // 遍历已注册的数据库驱动
    // If the caller does not have permission to load the driver then
    // skip it.
    if(isDriverAllowed(aDriver.driver, callerCL)) {
        try {
            println("    trying " + aDriver.driver.getClass().getName());
            Connection con = aDriver.driver.connect(url, info); // 建立数据库连接,返回连接对象
            if (con != null) {
                // Success!
                println("getConnection returning " + aDriver.driver.getClass().getName());
                return (con);
            }
        } catch (SQLException ex) {
            if (reason == null) {
                reason = ex;
            }
        }

    } else {
        println("    skipping: " + aDriver.getClass().getName());
    }

}

通过 MySQL 数据库驱动来建立连接:

  1. 根据数据库地址、用户名密码等参数,构建 ConnectionUrl 对象,该对象默认是 SINGLE_CONNECTION 类型。
  2. 根据 ConnectionUrl 对象,创建数据库连接实例 ConnectionImpl,建立 Socket 连接。

2.2.1 构建 ConnectionUrl 对象

com.mysql.cj.jdbc.NonRegisteringDriver#connect

public java.sql.Connection connect(String url, Properties info) throws SQLException {
    // ...
    ConnectionUrl conStr = ConnectionUrl.getConnectionUrlInstance(url, info);
    switch (conStr.getType()) {
        // 单个
        case SINGLE_CONNECTION:
            return com.mysql.cj.jdbc.ConnectionImpl.getInstance(conStr.getMainHost());
        // 失效转移 
        case FAILOVER_CONNECTION:
        case FAILOVER_DNS_SRV_CONNECTION:
            return FailoverConnectionProxy.createProxyInstance(conStr);
        // 负载均衡        
        case LOADBALANCE_CONNECTION:
        case LOADBALANCE_DNS_SRV_CONNECTION:
            return LoadBalancedConnectionProxy.createProxyInstance(conStr);
        // 复制
        case REPLICATION_CONNECTION:
        case REPLICATION_DNS_SRV_CONNECTION:
            return ReplicationConnectionProxy.createProxyInstance(conStr);
        default:
            return null;
    }
    // ...
}

2.2.2 MySQL 驱动协议说明

1. Failover

jdbc:mysql://[primary-host]:[port],[secondary-host]:[port],.../[database]?[property=<value>]&[property=<value>]

即 Client 链接失效时,将会尝试与其他 host 建立链接,这个过程对 application 是透明的。读(写)操作总是只发生在一个 host 上。

2. Load Balancing

jdbc:mysql:loadbalance://[host]:[port],[host]:[port],...[/database]?[property=<value>]&[property=<value>]

格式同 failover\replication 类似,所有 host 没有主次之分都是平级,支持参数控制。

负载均衡策略定义了 BalanceStrategy 接口,mysql 支持已经实现该接口的策略有:

  • BestResponseTimeBalanceStrategy:选中事务响应最快的 host
  • RandomBalanceStrategy:(默认)随机选中一个 host
  • SequentialBalanceStrategy:第一次随机之后顺序选后一个至循环往复

3. Replication

jdbc:mysql:replication://[master-host]:[port],[slave-host]:[port],.../database?[property=<value>]

具体格式类似 failover,比较大的变化是第一个 host 为 master 库是 write/read 模式,后面都是 slave 库是 read 模式,也是支持参数进行配置。

replication 协议是建立在 failover 和 loadbalance 基础上,适应 Replication 架构需要为解决读写分离、负载均衡场景的。
在事务 read only 模式下请求会被转向到 slave host,若多个 slave 情况下采用 round-robin(轮询)策略。
对于非 read only 请求(write/read)都将转向到 master host。

6.5.1.27 后版本支持多个 master,多个 master 下采用 load balance 策略,具体参考 loadbalance 协议介绍。
7.5.1.28 版本后又支持动态添加节点,也就是程序运行是动态添加新的 host 到 URL 中而不需要重启服务器,我们经常会聊的动态数据源场景。

本节参考:
mysql驱动协议之loadbalance和replication
Chapter 8 Multi-Host Connections

2.3 构建 ConnectionImpl 实例

构建 ConnectionImpl 实例,其中会创建 Socket 连接。

com.mysql.cj.jdbc.ConnectionImpl#getInstance
com.mysql.cj.jdbc.ConnectionImpl#ConnectionImpl(com.mysql.cj.conf.HostInfo)

public ConnectionImpl(HostInfo hostInfo) throws SQLException {
    // ...
    createNewIO(false); // 关键位置
    // ...
}

com.mysql.cj.jdbc.ConnectionImpl#createNewIO
com.mysql.cj.jdbc.ConnectionImpl#connectOneTryOnly

建立会话,这里用的是 BIO。

com.mysql.cj.NativeSession#connect
com.mysql.cj.protocol.a.NativeSocketConnection#connect
com.mysql.cj.protocol.StandardSocketFactory#connect

@SuppressWarnings("unchecked")
public <T extends Closeable> T connect(String hostname, int portNumber, PropertySet pset, int loginTimeout) throws IOException {

    this.loginTimeoutCountdown = loginTimeout;

    if (pset != null) {
        this.host = hostname;

        this.port = portNumber;

        String localSocketHostname = pset.getStringProperty(PropertyKey.localSocketAddress).getValue();
        InetSocketAddress localSockAddr = null;
        if (localSocketHostname != null && localSocketHostname.length() > 0) {
            localSockAddr = new InetSocketAddress(InetAddress.getByName(localSocketHostname), 0);
        }

        int connectTimeout = pset.getIntegerProperty(PropertyKey.connectTimeout).getValue();

        if (this.host != null) {
            InetAddress[] possibleAddresses = InetAddress.getAllByName(this.host);

            if (possibleAddresses.length == 0) {
                throw new SocketException("No addresses for host");
            }

            // save last exception to propagate to caller if connection fails
            SocketException lastException = null;

            // Need to loop through all possible addresses. Name lookup may return multiple addresses including IPv4 and IPv6 addresses. Some versions of
            // MySQL don't listen on the IPv6 address so we try all addresses.
            for (int i = 0; i < possibleAddresses.length; i++) {
                try {
                    this.rawSocket = createSocket(pset);

                    configureSocket(this.rawSocket, pset);

                    InetSocketAddress sockAddr = new InetSocketAddress(possibleAddresses[i], this.port);
                    // bind to the local port if not using the ephemeral port
                    if (localSockAddr != null) {
                        this.rawSocket.bind(localSockAddr);
                    }

                    this.rawSocket.connect(sockAddr, getRealTimeout(connectTimeout)); // 建立 Socket 连接

                    break;
                } catch (SocketException ex) {
                    lastException = ex;
                    resetLoginTimeCountdown();
                    this.rawSocket = null;
                }
            }

            if (this.rawSocket == null && lastException != null) {
                throw lastException;
            }

            resetLoginTimeCountdown();

            this.sslSocket = this.rawSocket;
            return (T) this.rawSocket;
        }
    }

    throw new SocketException("Unable to create socket");
}

2.3 执行语句

2.3.1 Statement

Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery("select * from t_student");

发起 SQL 查询,获取返回结果:

com.mysql.cj.jdbc.StatementImpl#executeQuery

public java.sql.ResultSet executeQuery(String sql) throws SQLException {
    // ...
    this.results = ((NativeSession) locallyScopedConn.getSession()).execSQL(this, sql, this.maxRows, null, 
        createStreamingResultSet(), getResultSetFactory(), cachedMetaData, false);
}

发起 SQL 查询,分为两种方式:1. 字符串;2. 二进制数据包。

com.mysql.cj.NativeSession#execSQL

public <T extends Resultset> T execSQL(Query callingQuery, String query, int maxRows, NativePacketPayload packet, boolean streamResults, ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory, ColumnDefinition cachedMetadata, boolean isBatch) {
    // ...
    return packet == null
            ? ((NativeProtocol) this.protocol).sendQueryString(callingQuery, query, this.characterEncoding.getValue(), maxRows, streamResults, cachedMetadata, resultSetFactory)
            : ((NativeProtocol) this.protocol).sendQueryPacket(callingQuery, packet, maxRows, streamResults, cachedMetadata, resultSetFactory);
}

由于 SQL 语句为 select * from t_student,先执行 sendQueryString() 方法将字符串转换为二级制包(NativePacketPayload 类实例),再调用 sendQueryPacket() 方法发送数据。

com.mysql.cj.protocol.a.NativeProtocol#sendQueryString
com.mysql.cj.protocol.a.NativeProtocol#sendQueryPacket

public final <T extends Resultset> T sendQueryPacket(Query callingQuery, NativePacketPayload queryPacket, int maxRows, boolean streamResults, ColumnDefinition cachedMetadata, ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) throws IOException {
    // ...
    // Send query command and sql query string
    NativePacketPayload resultPacket = sendCommand(queryPacket, false, 0);
    T rs = readAllResults(maxRows, streamResults, resultPacket, false, cachedMetadata, resultSetFactory);
    return rs;
}

向 mysql 服务器发送数据包。

com.mysql.cj.protocol.a.NativeProtocol#sendCommand(com.mysql.cj.protocol.Message, boolean, int)
com.mysql.cj.protocol.a.NativeProtocol#send(com.mysql.cj.protocol.Message, int)
com.mysql.cj.protocol.a.TimeTrackingPacketSender#send(byte[], int, byte)
com.mysql.cj.protocol.a.SimplePacketSender#send(byte[], int, byte)

public void send(byte[] packet, int packetLen, byte packetSequence) throws IOException {
    PacketSplitter packetSplitter = new PacketSplitter(packetLen);
    while (packetSplitter.nextPacket()) {
        this.outputStream.write(NativeUtils.encodeMysqlThreeByteInteger(packetSplitter.getPacketLen()));
        this.outputStream.write(packetSequence++);
        this.outputStream.write(packet, packetSplitter.getOffset(), packetSplitter.getPacketLen());
    }
    this.outputStream.flush();
}

2.3.2 PreparedStatement

PreparedStatement 的功能类似 Statement,但不同的是 PreparedStatement 可以使用占位符,它是由占位符标识需要输入数据的位置,然后再逐一填入数据。当然,PreparedStatement 也可以执行没有占位符的 sql 语句。

PreparedStatement preparedStatement = conn.prepareStatement("select * from t_student where id = ? and age = ?");
preparedStatement.setString(1, "1");
preparedStatement.setInt(2, 11);
ResultSet resultSet = preparedStatement.executeQuery();

接口 java.sql.PreparedStatement 在 MySQL 驱动中对应的实现类为 com.mysql.cj.jdbc.ClientPreparedStatement

使用 PreparedStatement 进行查询:

com.mysql.cj.jdbc.ClientPreparedStatement#executeQuery

public java.sql.ResultSet executeQuery() throws SQLException {
    // ...
    Message sendPacket = ((PreparedQuery<?>) this.query).fillSendPacket();
    this.results = executeInternal(this.maxRows, sendPacket, createStreamingResultSet(), true, cachedMetadata, false);
    return this.results;
}

其中,首先对 SQL 中的占位符进行填补,再打成二进制包进行发送:

com.mysql.cj.AbstractPreparedQuery#fillSendPacket

sendPacket.writeBytes(StringLengthDataType.STRING_FIXED, bindValues[i].getByteValue());

对于原始的 SQL:select * from t_student where id = ? and age = ?
根据占位符拆分成三个字符串,再遍历各个字符串,绑定上参数。

  1. select * from t_student where id =
  2. and age =

补充完参数之后,再打成一个二进制包,发送给数据库服务器。
后续流程与使用 Statement 一致:

com.mysql.cj.jdbc.ClientPreparedStatement#executeInternal
com.mysql.cj.NativeSession#execSQL
com.mysql.cj.protocol.a.NativeProtocol#sendQueryPacket

2.4 获取响应

向 MySQL 发送完数据之后,读取响应。

  1. 先读取响应的列数(即有多少个字段)。

com.mysql.cj.protocol.a.NativeProtocol#sendQueryPacket

public final <T extends Resultset> T sendQueryPacket(Query callingQuery, NativePacketPayload queryPacket, int maxRows, boolean streamResults, ColumnDefinition cachedMetadata, ProtocolEntityFactory<T, NativePacketPayload> resultSetFactory) throws IOException {
    // ...
    // Send query command and sql query string
    NativePacketPayload resultPacket = sendCommand(queryPacket, false, 0);
    T rs = readAllResults(maxRows, streamResults, resultPacket, false, cachedMetadata, resultSetFactory); // 读取列数
    return rs;
}
  1. 读取具体每一行的数据(字段格式和字段内容)。

com.mysql.cj.protocol.a.NativeProtocol#readAllResults
com.mysql.cj.protocol.a.NativeProtocol#read
com.mysql.cj.protocol.a.TextResultsetReader#read

public Resultset read(int maxRows, boolean streamResults, NativePacketPayload resultPacket, ColumnDefinition metadata, ProtocolEntityFactory<Resultset, NativePacketPayload> resultSetFactory) throws IOException {
    Resultset rs = null;
    long columnCount = resultPacket.readInteger(IntegerDataType.INT_LENENC); // 获取字段的个数
    if (columnCount > 0) {
        // Build a result set with rows.

        // Read in the column information // 读取全部字段的类型信息
        ColumnDefinition cdef = this.protocol.read(ColumnDefinition.class, new ColumnDefinitionFactory(columnCount, metadata));

        // There is no EOF packet after fields when CLIENT_DEPRECATE_EOF is set
        if (!this.protocol.getServerSession().isEOFDeprecated()) {
            this.protocol.skipPacket();
            //this.protocol.readServerStatusForResultSets(this.protocol.readPacket(this.protocol.getReusablePacket()), true);
        }

        ResultsetRows rows = null;
        if (!streamResults) {
            TextRowFactory trf = new TextRowFactory(this.protocol, cdef, resultSetFactory.getResultSetConcurrency(), false);
            ArrayList<ResultsetRow> rowList = new ArrayList<>();

            // 依次读取每一行数据,获取全部字段的值信息(二进制格式)
            ResultsetRow row = this.protocol.read(ResultsetRow.class, trf);
            while (row != null) {
                if ((maxRows == -1) || (rowList.size() < maxRows)) {
                    rowList.add(row);
                }
                row = this.protocol.read(ResultsetRow.class, trf);
            }
            rows = new ResultsetRowsStatic(rowList, cdef); // 对象封装:ResultsetRow -> ResultsetRows

        } else {
            rows = new ResultsetRowsStreaming<>(this.protocol, cdef, false, resultSetFactory);
            this.protocol.setStreamingData(rows);
        }
        /*
         * Build ResultSet from ResultsetRows
         */
        rs = resultSetFactory.createFromProtocolEntity(rows); // 对象封装:ResultsetRows -> Resultset
    }
    return rs;
}        

2.4.1 读取字段信息

循环遍历每一列,对字段的类型信息进行读取和解析。

com.mysql.cj.protocol.a.NativeProtocol#read
com.mysql.cj.protocol.a.ColumnDefinitionReader#read

@Override
public ColumnDefinition read(ProtocolEntityFactory<ColumnDefinition, NativePacketPayload> sf) {

    ColumnDefinitionFactory cdf = (ColumnDefinitionFactory) sf;

    long columnCount = cdf.getColumnCount();
    ColumnDefinition cdef = cdf.getColumnDefinitionFromCache();

    if (cdef != null && !cdf.mergeColumnDefinitions()) {
        for (int i = 0; i < columnCount; i++) {
            this.protocol.skipPacket();
        }
        return cdef;
    }

    /* read the metadata from the server */
    Field[] fields = null;
    boolean checkEOF = !this.protocol.getServerSession().isEOFDeprecated();

    // Read in the column information

    fields = new Field[(int) columnCount];

    for (int i = 0; i < columnCount; i++) { // 循环遍历每一列,对字段的类型信息进行读取和解析
        NativePacketPayload fieldPacket = this.protocol.readMessage(null); // 读取二进制包
        // next check is needed for SSPS
        if (checkEOF && fieldPacket.isEOFPacket()) {
            break;
        }
        fields[i] = unpackField(fieldPacket, this.protocol.getServerSession().getCharacterSetMetadata()); // 解析二进制包
    }

    return cdf.createFromFields(fields);
}

例如,读取到 testdb 库的 t_student 表的 id 字段的信息,封装成 Field 类如下:
Field

2.4.2 读取字段的值

从二进制数据包中,解析得到多个字段的值。

com.mysql.cj.protocol.a.ResultsetRowReader#read

@Override
public ResultsetRow read(ProtocolEntityFactory<ResultsetRow, NativePacketPayload> sf) throws IOException {
    AbstractRowFactory rf = (AbstractRowFactory) sf;
    NativePacketPayload rowPacket = null;
    NativePacketHeader hdr = this.protocol.getPacketReader().readHeader();

    // read the entire packet(s) // 读到一整行完整的数据(二进制格式)
    rowPacket = this.protocol.getPacketReader()
            .readMessage(rf.canReuseRowPacketForBufferRow() ? Optional.ofNullable(this.protocol.getReusablePacket()) : Optional.empty(), hdr);
    this.protocol.checkErrorMessage(rowPacket);
    // Didn't read an error, so re-position to beginning of packet in order to read result set data
    rowPacket.setPosition(rowPacket.getPosition() - 1);

    // exit early with null if there's an EOF packet
    if (!this.protocol.getServerSession().isEOFDeprecated() && rowPacket.isEOFPacket()
            || this.protocol.getServerSession().isEOFDeprecated() && rowPacket.isResultSetOKPacket()) {
        this.protocol.readServerStatusForResultSets(rowPacket, true);
        return null;
    }

    return sf.createFromMessage(rowPacket); // 解析数据
}

例如 id 为 1 的表数据如下:

学生表
这里读取到的二进制内容如下左边所示:

01 31 05 68 75 6e 61 6e     . 1 . h u n a n
02 31 31 08 7a 68 61 6e     . 1 1 . z h a n
67 73 61 6e                 g s a n

解析二进制数据包:

com.mysql.cj.protocol.a.TextRowFactory#createFromMessage

@Override
public ResultsetRow createFromMessage(NativePacketPayload rowPacket) {

    // use a buffer row for reusable packets (streaming results), blobs and long strings
    // or if we're over the threshold
    boolean useBufferRow = this.canReuseRowPacketForBufferRow || this.columnDefinition.hasLargeFields()
            || rowPacket.getPayloadLength() >= this.useBufferRowSizeThreshold.getValue();

    if (this.resultSetConcurrency == Concurrency.UPDATABLE || !useBufferRow) {
        byte[][] rowBytes = new byte[this.columnDefinition.getFields().length][];

        for (int i = 0; i < this.columnDefinition.getFields().length; i++) { // 根据字段的个数,拆分二进制包
            rowBytes[i] = rowPacket.readBytes(StringSelfDataType.STRING_LENENC);
        }

        return new ByteArrayRow(rowBytes, this.exceptionInterceptor); // 将拆分后的二进制数据,存储在 ByteArrayRow 对象中。
    }

    return new TextBufferRow(rowPacket, this.columnDefinition, this.exceptionInterceptor, this.valueDecoder);
}

2.4.3 构造 Resultset

最后,回到 com.mysql.cj.protocol.a.TextResultsetReader#read 方法中。
将每一行数据解析后得到的 ResultsetRow 对象,存入集合,存放在 ResultsetRows 对象中。再根据 ResultsetRows 对象来构造 Resultset 对象。

即:ResultsetRow -> ResultsetRows -> Resultset

com.mysql.cj.jdbc.result.ResultSetFactory#createFromProtocolEntity
com.mysql.cj.jdbc.result.ResultSetFactory#createFromResultsetRows

public ResultSetImpl createFromResultsetRows(int resultSetConcurrency, int resultSetType, ResultsetRows rows) throws SQLException {

    ResultSetImpl rs;

    StatementImpl st = this.stmt;

    if (rows.getOwner() != null) {
        st = ((ResultSetImpl) rows.getOwner()).getOwningStatement();
    }

    switch (resultSetConcurrency) {
        case java.sql.ResultSet.CONCUR_UPDATABLE:
            rs = new UpdatableResultSet(rows, this.conn, st);
            break;

        default:
            // CONCUR_READ_ONLY
            rs = new ResultSetImpl(rows, this.conn, st); // 对象封装:ResultsetRows -> ResultSet
            break;
    }

    rs.setResultSetType(resultSetType);
    rs.setResultSetConcurrency(resultSetConcurrency);

    if (rows instanceof ResultsetRowsCursor && st != null) {
        rs.setFetchSize(st.getFetchSize());
    }
    return rs;
}
  1. JDBC 使用 SPI 机制加载数据库驱动,这是为了解决 BootstrapClassloader 无法加载第三方的类的问题,将第三方的类委托给线程上下文类加载器来加载。
  2. 获取数据库连接对象 Connection,实际上是对数据库建立 Socket 连接。可以使用数据库连接池,以重复利用 Connection。
  3. 通过 Statement 来查询数据,底层是向 Socket 写入二进制数据,再从 Socket 读取二进制数据,封装在 Resultset 对象之中。

作者:Sumkor
链接:https://segmentfault.com/a/11...


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK