4

Go内置database/sql连接池 - 源码学习

 2 years ago
source link: https://studygolang.com/articles/35389
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Go内置database/sql连接池 - 源码学习

uuid · 3天之前 · 217 次点击 · 预计阅读时间 8 分钟 · 大约8小时之前 开始浏览    

Go内置了数据库相关的库 - database/sql,实现数据库操作相关的接口,其中还包含一个很重要的功能 - 连接池,用来实现连接的复用,限制连接的数量,从而提高性能,避免连接数量失控,导致资源消耗不可控。

本文借Go内置的database/sql库,来一起学习如何一步步设计包含连接池的数据库组件,包括模型抽象、连接复用,以及如何管理连接数。

首先,我们要对解决领域进行抽象。

我们目标是设计一个数据库连接组件,所以第一个对象模型很明确 - 数据库对象, 我们将数据库对象抽象为一个DB的结构体,一个对象对应的是一个数据库实例,所以DB必须是单例

其次,数据库需要连接,所以可以对连接进行抽象,命名为Conn,这里我们不关心Conn的属性,而是关心行为,所以Conn类型定义成一个interface,包含所需两个方法:预处理方法Prepare和关闭连接方法Close (注:Prepare方法不再继续展开,实际上就是接收一个sql,返回一个实现Stmt接口的预处对象,接着设置一下参数,最后执行数据库操作)。

由于不同的数据库连接的实现方式会有不同,这时就要考虑隔离变化,对连接的方式进行抽象,定义一个连接接口 - Connector,用来创建连接(依赖倒置原则),当初始化DB的时候再将具体实现注入到DB对象中(也就是依赖注入)。

最终我们可以得到以下几个接口和结构体:

// 数据库对象
type DB struct {
    connector Connector
}

type Conn interface {
    Prepare(query string) (Stmt, error)
    Close() error
}

// 数据库连接接口
type Connector interface {
    Connect(context.Context) (Conn, error)
}

最后,我们给DB对象增加一个获取连接的方法Conn,在不考虑连接池的情况下,调用connector.Connect(ctx)直接获取连接:

// 获取一个连接
func (db *DB) Conn(ctx context.Context) (*Conn, error) {
    return db.connector.Connect(ctx)
}

当不考虑到连接池时,上面的实现基本满足需求,但是为了提高性能,连接池还是必须的,接下来我们开始设计连接池的功能。

第一步,需要考虑存储空闲连接,这里采用的是切片来存储:freeConn []*Conn。

接着,需要考虑属性要定义在哪?!空闲的连接需要能被DB实例中的不同方法访问到,所以我们把freeConn定义为DB的一个属性,同时考虑到对freeConn的访问会存在并发安全的问题,需要增加一个锁mu sync.Mutex来保护:

// 数据库对象
type DB struct {
    connector Connector
    mu        sync.Mutex // protects following fields
    freeConn  []*Conn
}

数据库连接获取方法Conn就需要修改为:

func (db *DB) Conn(ctx context.Context) (*Conn, error) {
    db.mu.Lock() // 加锁保护freeConn
    numFree := len(db.freeConn)
    if numFree > 0 {
        conn := db.freeConn[0]
        // 移除第一个连接
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        db.mu.Unlock()
        return conn, nil
    }
    db.mu.Unlock()
    return db.connector.Connect(ctx) // 没有空闲连接,重新创建
}

连接要复用,就不能关闭,用完需要放回连接池中,所以DB需要一个将连接放回连接池的方法 - putConn:

func (db *DB) putConn(dc Conn) {
    db.mu.Lock()
    db.freeConn = append(db.freeConn, dc)
    db.mu.Unlock()
}

但是,putConn方法要在怎么被调用?!不能因为增加了连接池功,让客户端的改变使用方式,所以我们应该考虑将putConn调用放到一个合理的地方 - Conn的Close()方法中。

在没有连接池功能的时候,一个Conn用完了就一定会调用Close()释放资源,有连接池功能后,就不再是直接关闭连接,而是释放到连接池中,提供给后续请求使用。

对此,我们对Conn进行改造,增加一层代理,命名为PConn,将原来的Conn作为PConn的一个属性,同时实现了Conn接口的两个方法:Prepare和Close,这样我们就可以对Close方法进行拦截,修改它的行为:

type PConn interface {
  db        *DB
  ci        Conn
}
func (pc *PConn) Close() error {
    dc.db.putConn(pc)
}
func (pc *PConn) Prepare(query string) (Stmt, error) {
    return pc.ci.Prepare(query)
}

接着我们调整一下Conn的创建方法(也可以重新实现Connector接口,返回的是PConn实例,然后将新的实现注入到DB实例中)

func (db *DB) Conn(ctx context.Context) (Conn, error) {
  ...
  c , err := db.connector.Connect(ctx) // 没有空闲连接,重新创建
  if err!=nil {
    return nil, err
  }
  return &PConn{
      ci: c,
      db: db,
  }
}

这样当PConn使用完后调用Close方法,就不再是关闭连接,而是将连接释放到DB对象的freeConn中,由于客户端定义的类型是Conn接口,并且PConn也实现了Conn接口,所以无需调整(符合开闭原则

到此,连接复用初步完成了,接着就得考虑另外一个核心功能 :连接数量管理

连接数量管理

目前的Conn发现没有连接的时候是调用connector.Connect方法直接创建新连接,没法控制连接的数量,这样很明显是不合理,无限创建连接可能会导致资源耗尽,资源消耗曲线过陡峭,所以我们需要:

  1. 限制连接数量。将连接的数量约束在指定的范围内
  2. 连接请求队列。当连接数量达到最大值时,连接请求需要需要放到等待队列中,等待有连接释放。

首先,需要有地方保存当前连接数量和最大连接数,并且要能被对象中的不同方法访问到,所以我们可以给DB增加两个属性:

type DB struct {
  numOpen      int    // number of opened and pending open connections
  maxOpen      int    // <= 0 means unlimited
}

接着,需要设计当有请求连接时,发现没有空闲连接,并且连接数量等于maxOpen时要怎么办?

Database/sql里是采用一个map来存储(注:这个有点奇怪,为什么不用队列?),在DB结构体增加一个属性:connRequests,类型为:map[uint64]chan connRequest,其中key/value:

  • key :请求唯一标识。调用nextRequestKeyLocked方法生成,实际上就是一个自增的序列,只是为了保持唯一
  • value:等待连接的请求。类型为chan,每个请求封装为一个chan,可以保证并发安全,同时也可以利用其阻塞特性(当chan没有值时阻塞等待),chan接收的数据类型为connRequest格式,当其他协程有释放连接时,会将连接放到一个connRequest对象中发送给该chan,connRequest只包含两个属性:conn和err,用来接收返回连接或是异常

代码如下:

type DB struct {
    ...
  numOpen      int    // number of opened and pending open connections
  maxOpen      int                    // <= 0 means unlimited
  nextRequest  uint64 // Next key to use in connRequests.
  connRequests map[uint64]chan connRequest
}
func (db *DB) nextRequestKeyLocked() uint64 {
    next := db.nextRequest
    db.nextRequest++
    return next
}
// 连接请求
type connRequest struct {
    conn *PConn
    err  error
}

调整获取连接的方法Conn(ctx context.Context) 逻辑:

  1. 判断freeConn是否有空闲连接,有就返回
  2. 判断连接数量numOpen是否大于maxOpen,如果还小于maxOpen,说明还可以创建新连接,创建连接后numOpen++,返回连接
  3. 当numOpen已经是大于等于maxOpen,就不能再创建新连接,这是就把请求放到集合connRequests中,等待连接释放。
func (db *DB) Conn(ctx context.Context) (Conn, error) {
    db.mu.Lock() // 加锁保护freeConn
    numFree := len(db.freeConn)
    if numFree > 0 { // 有空闲连接
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        db.mu.Unlock()
        return conn, nil
    }
    // 连接数已经超出
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        req := make(chan connRequest, 1) // 创建一个chan,接收连接
        reqKey := db.nextRequestKeyLocked() // 生成唯一序号
        db.connRequests[reqKey] = req // 放到全局属性,让其他方法能访问到
        db.mu.Unlock()
        select {
        case <-ctx.Done(): //超时等
            // Remove the connection request and ensure no value has been sent
            // on it after removing.
            db.mu.Lock()
            delete(db.connRequests, reqKey) // 移除
            db.mu.Unlock()
        }
        case ret, ok := <-req: // 收到连接释放
            if !ok {
                return nil, errDBClosed
            }
            return ret.conn, ret.err
        }
    }
  // 连接数没超出,可以创建新连接
  db.numOpen++ // optimistically
  db.mu.Unlock()
    c, err := db.connector.Connect(ctx) // 重新创建
    if err != nil {
        return nil, err
    }
    return &PConn{
        ci: c,
        db: db,
    }, nil
}

接着,我们还需要调整连接释放方法 - putConn:

  1. 增加一个bool返回值,告诉调用方连接是否释放成功(如果失败,客户端可以决定关闭连接)
  2. 如果连接数numOpen大于 maxOpen时,当前连接直接丢弃,返回false
  3. 当len(db.connRequests)大于0时,需要考虑将连接优先给db.connRequests中的请求
  4. 最后才将连接放入空闲列表中。
func (db *DB) putConn(dc Conn) bool{
  db.mu.Lock()
  defer db.mu.Unlock()
  if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    // 当有等待连接的请求时
    if c := len(db.connRequests); c > 0 {
      var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests {
            break
        }
        delete(db.connRequests, reqKey) // Remove from pending requests.
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    }
    // 放入空闲连接池中
    db.freeConn = append(db.freeConn, dc)
  return true
}

PConn的close方法也要稍微调整一下,如果释放连接失败,需要把连接关闭

func (pc *PConn) Close() error {
    ok := dc.db.putConn(pc)
    if !ok {
      dc.ci.Close()
    }
}

到目前为止,一个较完整的包含连接池的数据库组件完成了,上述代码是参照database/sql设计,去除了一些非核心的代码,通过对它的研究,可以学到:

  1. 如何对问题域进行抽象。数据库抽象成DB、连接为Conn,连接器Connector等
  2. 如何命名一些变量。 connRequest(请求连接)、 freeConn(请求连接)等
  3. 如何使用context。实现超时等
  4. 并发编程。chan、mutex、全局属性访问,资源约束等

我的博客https://itart.cn/blogs/2021/explore/database-sql-pool.html


有疑问加站长微信联系(非本文作者))

280

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:701969077


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK