38

golang sql 包连接池分析

 5 years ago
source link: https://www.tuicool.com/articles/Yjae2mn
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

golang 在使用 mysql 的时候会用到 database/sql 这个库,每次都在黑盒使用它,有必要来梳理一下整个请求流程和细节,以免以后碰到问题会有解决的思路。

阅读之前的几个问题

  • sql 的连接池的连接怎么维护的?
  • Query / Exec 如何获取查询的连接?
  • 连接池的连接如何释放的?

几个重要的结构

DB struct

先来看看 DB 结构,该结构是 sql 包的核心结构。DB 是表示零个或多个底层连接池的数据库句柄,是并发安全的。

sql 包可以自动创建和释放连接,它还维护一个空闲连接池。如果数据库具有每个连接状态的概念,则只能在事务中可靠地观察到这种状态。调用 DB.Begin 后,返回的 Tx 将绑定到单个连接。在事务上调用 Commit 或 Rollback 后,该事务的连接将返回到 DB 的空闲连接池。SetMaxIdleConns 用来控制连接池大小。

type DB struct {
    driver driver.Driver // 数据库驱动
    dsn    string // 数据库连接参数,比如 username,hostname,password 等等
    numClosed uint64 // numClosed 是一个原子计数器,表示已关闭连接的总数。Stmt.openStmt 在清除 Stmt.css 中的已关闭连接之前对其进行检查。

    mu           sync.Mutex // 保护下面的字段
    freeConn     []*driverConn // 空闲连接
    connRequests map[uint64]chan connRequest // 阻塞请求队列。当达到最大连接数时,后续请求将插入该队列来等待可用连接
    nextRequest  uint64 // connRequests 的下一个 key
    numOpen      int    // 已连接或者正等待连接的数量

    // 一个创建新连接的信号,
    // 运行connectionOpener()的goroutine读取此chan,maybeOpenNewConnections发送此chan(每个需要的连接发送一次)
    // 它在db.Close()时关闭,并通知connectionOpener goroutine退出。
    openerCh    chan struct{} 
    closed      bool
    dep         map[finalCloser]depSet
    lastPut     map[*driverConn]string // 用于 debug
    maxIdle     int                    // 最大空闲连接数, 0等价于 defaultMaxIdleConns 常量(代码中值为2),负数等价于0
    maxOpen     int                    // 数据库的最大连接数,0 等价于不限制最大连接数
    maxLifetime time.Duration          // 连接的最大生命周期
    cleanerCh   chan struct{} // 用于释放连接池中过期的连接的信号
}

driverConn struct

driverConn 使用互斥锁封装一个 driver.Conn 结构,在所有对 Conn 的调用期间保持(包括对通过该 Conn 返回的接口的任何调用,例如对 Tx,Stmt,Result,Rows 的调用)

type driverConn struct {
    db        *DB
    createdAt time.Time

    sync.Mutex  // 保护下面的字段
    ci          driver.Conn
    closed      bool
    finalClosed bool // ci.Close 已经被调用则为 true
    openStmt    map[*driverStmt]bool

    // 下面的字段被 db.mu 保护
    inUse      bool
    onPut      []func() // 下次返回 conn 时运行的代码
    dbmuClosed bool     // 与 closed 字段相同,但由 db.mu 保护,用于 removeClosedStmtLocked
}

// driver.Conn 是具体的接口 用来支持不同的数据库
// Conn 是与数据库的连接,不是 gotoutines 安全的。
// Conn 被认为是有状态的。
type Conn interface {
    // Prepare 返回绑定到该连接的就绪语句 Stmt。
    Prepare(query string) (Stmt, error)

    // Close 使当前就绪的语句和事务无效并可能停止,将此连接标记为不再使用。
    //
    // 因为sql包维护一个空闲的连接池,并且只有在空闲连接过剩时才调用Close,所以驱动不需要做自己的连接缓存。
    Close() error

    // Begin 启动并返回一个新的事务 Tx
    Begin() (Tx, error)
}

// 可以看到driverConn的这个方法,看名字就知道是释放连接的 调用了DB 的 putConn 方法,这里先留个印象
func (dc *driverConn) releaseConn(err error) {
    dc.db.putConn(dc, err)
}

驱动注册绑定

我们在使用指定数据库时需要使用 import _ "github.com/go-sql-driver/mysql" 来执行 init() 函数。这个 init() 函数主要用来将指定的数据库驱动注册到 sql 的 一个 map 类型的 drivers 变量中。

mysql/driver.go
// 该方法注册到驱动,也就是db.Open的调用,返回的mc是实现的driver.Conn接口的结构,dsn 为连接该数据库的配置
func (d MySQLDriver) Open(dsn string) (driver.Conn, error) {
    // New mysqlConn
    mc := &mysqlConn{
        maxAllowedPacket: maxPacketSize,
        maxWriteSize:     maxPacketSize - 1,
        closech:          make(chan struct{}),
    }
    ...
    return mc, nil
}

// 注册驱动 MySQLDriver 结构实现了 driver.Driver 接口
func init() {
    sql.Register("mysql", &MySQLDriver{})
}

连接

创建连接的过程

NVVnMfF.png!web

创建连接

可以看到,调用sql.Open的时候会启动一个 goroutine 一直阻塞读取 db.openerCh 。当这个openerCh收到信号时,会启动创建连接的流程,调用驱动提供的创建连接的方法创建连接。如果创建成功,优先把改连接给 db.connRequests 中阻塞的请求使用,如果没有阻塞的请求就把这个新连接放入 db.freeConn 中待请求使用。

关键方法

// 调用驱动的 Open 方法创建新连接
func (db *DB) openNewConnection() {
    // 创建新连接
    ci, err := db.driver.Open(db.dsn)
    db.mu.Lock()
    defer db.mu.Unlock()
    if db.closed {
        if err == nil {
            ci.Close()
        }
        db.numOpen--
        return
    }
    if err != nil {
        db.numOpen--
        db.putConnDBLocked(nil, err)
        db.maybeOpenNewConnections()
        return
    }
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
    }
    // 直接给阻塞的请求使用 或者 放入连接池
    if db.putConnDBLocked(dc, err) {
        db.addDepLocked(dc, dc)
    } else {
        db.numOpen--
        ci.Close()
    }
}

// 给 阻塞在 connRequest 队列的请求分配连接
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
    if db.closed {
        return false
    }
    // 如果超过最大连接,直接返回false,connRequest 队列的请求继续阻塞
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    // 把连接分配给 connRequests 阻塞的请求
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        // 取第一个
        for reqKey, req = range db.connRequests {
            break
        }
        // 阻塞的请求得到连接 删除 connRequests 的记录
        delete(db.connRequests, reqKey)
        // 标记该连接正在使用
        if err == nil {
            dc.inUse = true
        }
        // 通过 chan 把该连接发送给请求
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    // 如果空闲连接数小于最大连接限制 把该连接放到 freeConn 中
    } else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) {
        db.freeConn = append(db.freeConn, dc)
        // 根据 db.maxLifetime 起一个 goroutine 清除freeConn中过期的连接 
        db.startCleanerLocked()
        return true
    }
    return false
}

查询如何获取连接

先来看提供的两个基本的查询的方法 Query / Exec

// 使用方法
db.Query("SELECT * FROM table")
db.Exec("INSERT INTO table VALUES (1)")
  • Query:执行需要返回 rows 的操作,例如 SELECT)不释放连接,但在调用后仍然保持连接,即放回 freeConn。
  • Exec:执行没有返回 rows 的操作,例如 INSERT, UPDATE,DELETE)在调用后自动释放连接。

    yiArAnY.png!web

    Query 查询

关键方法

// conn 返回新打开的连接,或者从连接池freeConn中取
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    ...
    // 检查上下文是否被取消
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()
    }
    lifetime := db.maxLifetime

    // cachedOrNewConn 模式获取连接
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }

    // 如果连接数已经超过限制,将该请求放入connRequest中阻塞,直到有空闲连接
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // Make the connRequest channel. It's buffered so that the
        // connectionOpener doesn't block while waiting for the req to be read.
        req := make(chan connRequest, 1)
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req
        db.mu.Unlock()

        // 上下文判断请求超时
        select {
        case <-ctx.Done():
            // 删除 connRequests 中阻塞的请求
            db.mu.Lock()
            delete(db.connRequests, reqKey)
            db.mu.Unlock()
            select {
            default:
            case ret, ok := <-req:
                if ok {
                    // 如果收到了连接,由于超时了,回收该连接
                    db.putConn(ret.conn, ret.err)
                }
            }
            return nil, ctx.Err()
        // 获取到了连接,返回处理
        case ret, ok := <-req:
            if !ok {
                return nil, errDBClosed
            }
            if ret.err == nil && ret.conn.expired(lifetime) {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            return ret.conn, ret.err
        }
    }

    // 连接池中没有连接,且打开的连接数没有超限,创建新连接
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.driver.Open(db.dsn)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
    }
    db.addDepLocked(dc, dc)
    dc.inUse = true
    db.mu.Unlock()
    return dc, nil
}

连接的回收或释放

被动回收或释放

我们沿着上面的 Query 请求分析下来,在 queryConn 的方法中会看到一个 releaseConn 的方法,它调用了 putConn 方法去处理这个 dc 连接。

func (dc *driverConn) releaseConn(err error) {
    dc.db.putConn(dc, err)
}

再来看看 putConn 方法的定义

// 把 dc 连接放回连接池 freeConn 或者释放
func (db *DB) putConn(dc *driverConn, err error) {
    db.mu.Lock()
    // 回收一个没被使用的连接 会panic
    if !dc.inUse {
        if debugGetPut {
            fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
        }
        panic("sql: connection returned that was never out")
    }
    if debugGetPut {
        db.lastPut[dc] = stack()
    }
    // 将该连接置为 未使用
    dc.inUse = false

    // 执行完该连接的函数
    for _, fn := range dc.onPut {
        fn()
    }
    dc.onPut = nil
    // 不重用无效的连接
    if err == driver.ErrBadConn {
        // 该函数会判断 阻塞在 connRequest 的请求数量,然后在不超限的情况下,通过 openerCh 唤醒 connectionOpener goroutine 创建新连接处理请求。
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        // 释放连接
        dc.Close()
        return
    }
    if putConnHook != nil {
        putConnHook(db, dc)
    }
    // 如果是有效的连接,将该连接给 阻塞在 connRequest 的请求使用,或者放回连接池
    added := db.putConnDBLocked(dc, nil)
    db.mu.Unlock()

    // 改连接没被回收,释放
    if !added {
        dc.Close()
    }
}

主动回收或释放

除了上述的连接回收释放方式,还有没有其他地方回收释放呢。当我们设置 db.SetConnMaxLifetime 也就是设置连接的最大存活时间时,都会调起一个 goroutine 负责处理连接池中过期的连接。同 openerCh 信号一样,释放也用到了一个 cleanerCh 用于通知该 goroutine 处理任务。

func (db *DB) SetConnMaxLifetime(d time.Duration) {
    if d < 0 {
        d = 0
    }
    db.mu.Lock()
    // 当缩小 maxLifetime 的时候,直接清理不符的连接
    if d > 0 && d < db.maxLifetime && db.cleanerCh != nil {
        select {
        case db.cleanerCh <- struct{}{}:
        default:
        }
    }
    db.maxLifetime = d
    // 该方法会起一个 goroutine  负责释放过期连接
    db.startCleanerLocked()
    db.mu.Unlock()
}

// 满足条件开启一个 goroutine 维护过期的连接
func (db *DB) startCleanerLocked() {
    if db.maxLifetime > 0 && db.numOpen > 0 && db.cleanerCh == nil {
        db.cleanerCh = make(chan struct{}, 1)
        go db.connectionCleaner(db.maxLifetime)
    }
}

// 核心的逻辑
func (db *DB) connectionCleaner(d time.Duration) {
    const minInterval = time.Second

    if d < minInterval {
        d = minInterval
    }
    t := time.NewTimer(d)

    // 阻塞等待 cleanerCh 信号
    for {
        select {
        case <-t.C:
        case <-db.cleanerCh: // maxLifetime 修改 或者 db 关闭会发送该信号
        }

        db.mu.Lock()
        d = db.maxLifetime
        if db.closed || db.numOpen == 0 || d <= 0 {
            db.cleanerCh = nil
            db.mu.Unlock()
            return
        }

        expiredSince := nowFunc().Add(-d)
        var closing []*driverConn
        // 从连接池 freeConn 获取过期连接
        for i := 0; i < len(db.freeConn); i++ {
            c := db.freeConn[i]
            if c.createdAt.Before(expiredSince) {
                closing = append(closing, c)
                last := len(db.freeConn) - 1
                db.freeConn[i] = db.freeConn[last]
                db.freeConn[last] = nil
                db.freeConn = db.freeConn[:last]
                i--
            }
        }
        db.mu.Unlock()
        // 释放连接
        for _, c := range closing {
            c.Close()
        }

        if d < minInterval {
            d = minInterval
        }
        t.Reset(d)
    }
}

小结

现在回过头来看看开始的三个问题,基本就有解了。

  • sql 的连接池的连接怎么维护的?
    有效的连接存储在连接池 freeConn 中。启用一个 connectionOpener goroutine 通过接受 openerCh 信号负责调用驱动的 Open 方法创建连接。当用 db.SetConnMaxLifetime 设置 MaxLifetime 或者调用 putConnDBLocked 方法满足条件时候会启用一个 connectionCleaner goroutine 通过接受 cleanerCh 信号负责清理连接。
  • Query / Exec 如何获取查询的连接?
    1. 先查看 freeConn 是否有可用的连接,如果有就从连接池取。如果没有进入下一步。
    2. 判断当前连接数是否超限。如果超限,将该请求放入 connRequests 阻塞等待可用连接。如果没超限进入下一步。
    3. 创建新的连接
  • 连接池的连接如何回收/释放的?
    1. 被动回收/释放。通过查询等操作返回错误的时候会执行 releaseConn 函数回收连接。当满足条件时 会起一个 connectionCleaner goroutine 清理连接池的无效连接。
    2. 主动回收/释放。设置 db.SetConnMaxLifetime 的时候会触发一次 connectionCleaner goroutine 清理连接池。

可以看到sql库的连接池的实现机制其实还是蛮复杂的,生产者 connectionOpener goroutine 阻塞监听 openerCh 创建连接放入连接池。当请求来时,先查询连接池有没有空闲连接,如果没有空闲连接则创建,Query 类的请求用完继续放回连接池重用。当需要清理连接池的连接时调用 connectionCleaner goroutine。分析过一遍,以后遇到问题会更快的处理。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK