27

Go发起HTTP2.0请求流程分析(前篇)

 3 years ago
source link: https://studygolang.com/articles/31100
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

来自公众号:新世界杂货铺

前言

Go中的HTTP请求之——HTTP1.1请求流程分析 之后,中间断断续续,历时近一月,终于才敢开始码字写下本文。

阅读建议

HTTP2.0在建立TCP连接和安全的TLS传输通道与HTTP1.1的流程基本一致。所以笔者建议没有看过 Go中的HTTP请求之——HTTP1.1请求流程分析 这篇文章的先去补一下课,本文会基于前一篇文章仅介绍和HTTP2.0相关的逻辑。

(*Transport).roundTrip

(*Transport).roundTrip 方法会调用 t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) 初始化 TLSClientConfig 以及 h2transport ,而这两者都和HTTP2.0有着紧密的联系。

TLSClientConfig: 初始化client支持的http协议, 并在tls握手时告知server。

h2transport: 如果本次请求是http2,那么h2transport会接管连接,请求和响应的处理逻辑。

下面看看源码:

func (t *Transport) onceSetNextProtoDefaults() {
    // ...此处省略代码...
    t2, err := http2configureTransport(t)
    if err != nil {
        log.Printf("Error enabling Transport HTTP/2 support: %v", err)
        return
    }
    t.h2transport = t2

    // ...此处省略代码...
}
func http2configureTransport(t1 *Transport) (*http2Transport, error) {
    connPool := new(http2clientConnPool)
    t2 := &http2Transport{
        ConnPool: http2noDialClientConnPool{connPool},
        t1:       t1,
    }
    connPool.t = t2
    if err := http2registerHTTPSProtocol(t1, http2noDialH2RoundTripper{t2}); err != nil {
        return nil, err
    }
    if t1.TLSClientConfig == nil {
        t1.TLSClientConfig = new(tls.Config)
    }
    if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "h2") {
        t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)
    }
    if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {
        t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
    }
    upgradeFn := func(authority string, c *tls.Conn) RoundTripper {
        addr := http2authorityAddr("https", authority)
        if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {
            go c.Close()
            return http2erringRoundTripper{err}
        } else if !used {
            // Turns out we don't need this c.
            // For example, two goroutines made requests to the same host
            // at the same time, both kicking off TCP dials. (since protocol
            // was unknown)
            go c.Close()
        }
        return t2
    }
    if m := t1.TLSNextProto; len(m) == 0 {
        t1.TLSNextProto = map[string]func(string, *tls.Conn) RoundTripper{
            "h2": upgradeFn,
        }
    } else {
        m["h2"] = upgradeFn
    }
    return t2, nil
}

笔者将上述的源码简单拆解为以下几个步骤:

  1. 新建一个 http2clientConnPool 并复制给t2,以后http2的请求会优先从该连接池中获取连接。
  2. 初始化 TLSClientConfig ,并将支持的 h2http1.1 协议添加到 TLSClientConfig.NextProtos 中。
  3. 定义一个 h2upgradeFn 存储到 t1.TLSNextProto 里。

鉴于前一篇文章对新建连接前的步骤有了较为详细的介绍,所以这里直接看和server建立连接的部分源码,即 (*Transport).dialConn 方法:

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
    // ...此处省略代码...
    if cm.scheme() == "https" && t.hasCustomTLSDialer() {
        // ...此处省略代码...
    } else {
        conn, err := t.dial(ctx, "tcp", cm.addr())
        if err != nil {
            return nil, wrapErr(err)
        }
        pconn.conn = conn
        if cm.scheme() == "https" {
            var firstTLSHost string
            if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
                return nil, wrapErr(err)
            }
            if err = pconn.addTLS(firstTLSHost, trace); err != nil {
                return nil, wrapErr(err)
            }
        }
    }

    // Proxy setup.
    // ...此处省略代码...

    if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
        if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
            return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil
        }
    }

    // ...此处省略代码...
}

笔者对上述的源码描述如下:

  1. 调用 t.dial(ctx, "tcp", cm.addr()) 创建TCP连接。
  2. 如果是https的请求, 则对请求建立安全的tls传输通道。
  3. 检查tls的握手状态,如果和server协商的 NegotiatedProtocol 协议不为空,且client的 t.TLSNextProto 有该协议,则返回alt不为空的持久连接(HTTP1.1不会进入if条件里)。

笔者对上述的第三点进行展开。经笔者在本地debug验证,当client和server都支持http2时, s.NegotiatedProtocol 的值为 h2s.NegotiatedProtocolIsMutual 的值为 true

在上面分析 http2configureTransport 函数时,我们知道 TLSNextProto 注册了一个key为 h2 的函数,所以调用 next 实际就是调用前面的 upgradeFn 函数。

upgradeFn 会调用 connPool.addConnIfNeeded 向http2的连接池添加一个tls传输通道,并最终返回前面已经创建好的 t2http2Transport

func (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c *tls.Conn) (used bool, err error) {
    p.mu.Lock()
    // ...此处省略代码...
    // 主要用于判断是否有必要像连接池添加新的连接
    // 判断连接池中是否已有同host连接,如果有且该链接能够处理新的请求则直接返回
    call, dup := p.addConnCalls[key]
    if !dup {
        // ...此处省略代码...
        call = &http2addConnCall{
            p:    p,
            done: make(chan struct{}),
        }
        p.addConnCalls[key] = call
        go call.run(t, key, c)
    }
    p.mu.Unlock()

    <-call.done
    if call.err != nil {
        return false, call.err
    }
    return !dup, nil
}
func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) {
    cc, err := t.NewClientConn(tc)

    p := c.p
    p.mu.Lock()
    if err != nil {
        c.err = err
    } else {
        p.addConnLocked(key, cc)
    }
    delete(p.addConnCalls, key)
    p.mu.Unlock()
    close(c.done)
}

分析上述的源码我们能够得到两点结论:

  1. 执行完 upgradeFn 之后,(*Transport).dialConn返回的持久化连接中alt字段已经不是nil了。
  2. t.NewClientConn(tc) 新建出来的连接会保存在http2的连接池即 http2clientConnPool 中,下一小结将对NewClientConn展开分析。

最后我们回到(*Transport).roundTrip方法并分析其中的关键源码:

func (t *Transport) roundTrip(req *Request) (*Response, error) {
    t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
    // ...此处省略代码...
    for {
        select {
        case <-ctx.Done():
            req.closeBody()
            return nil, ctx.Err()
        default:
        }

        // ...此处省略代码...
        pconn, err := t.getConn(treq, cm)
        if err != nil {
            t.setReqCanceler(req, nil)
            req.closeBody()
            return nil, err
        }

        var resp *Response
        if pconn.alt != nil {
            // HTTP/2 path.
            t.setReqCanceler(req, nil) // not cancelable with CancelRequest
            resp, err = pconn.alt.RoundTrip(req)
        } else {
            resp, err = pconn.roundTrip(treq)
        }
        if err == nil {
            return resp, nil
        }

        // ...此处省略代码...
    }
}

结合前面的分析, pconn.alt 在server和client都支持http2协议的情况下是不为nil的。所以,http2的请求会走 pconn.alt.RoundTrip(req) 分支,也就是说http2的请求流程就被 http2Transport 接管啦。

(*http2Transport).NewClientConn

(*http2Transport).NewClientConn内部会调用 t.newClientConn(c, t.disableKeepAlives())

因为本节内容较多,所以笔者不再一次性贴出源码,而是按关键步骤分析并分块儿贴出源码。

1、初始化一个 http2ClientConn

cc := &http2ClientConn{
    t:                     t,
    tconn:                 c,
    readerDone:            make(chan struct{}),
    nextStreamID:          1,
    maxFrameSize:          16 << 10,           // spec default
    initialWindowSize:     65535,              // spec default
    maxConcurrentStreams:  1000,               // "infinite", per spec. 1000 seems good enough.
    peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead.
    streams:               make(map[uint32]*http2clientStream),
    singleUse:             singleUse,
    wantSettingsAck:       true,
    pings:                 make(map[[8]byte]chan struct{}),
}

上面的源码新建了一个默认的http2ClientConn。

initialWindowSize:初始化窗口大小为65535,这个值之后会初始化每一个数据流可发送的数据窗口大小。

maxConcurrentStreams:表示每个连接上允许最多有多少个数据流同时传输数据。

streams:当前连接上的数据流。

singleUse: 控制http2的连接是否允许多个数据流共享,其值由 t.disableKeepAlives() 控制。

2、创建一个条件锁并且新建Writer&Reader。

cc.cond = sync.NewCond(&cc.mu)
cc.flow.add(int32(http2initialWindowSize))
cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr})
cc.br = bufio.NewReader(c)

新建Writer&Reader没什么好说的,需要注意的是 cc.flow.add(int32(http2initialWindowSize))

cc.flow.add 将当前连接的可写流控制窗口大小设置为 http2initialWindowSize ,即65535。

3、新建一个读写数据帧的Framer。

cc.fr = http2NewFramer(cc.bw, cc.br)
cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil)
cc.fr.MaxHeaderListSize = t.maxHeaderListSize()

4、向server发送开场白,并发送一些初始化数据帧。

initialSettings := []http2Setting{
    {ID: http2SettingEnablePush, Val: 0},
    {ID: http2SettingInitialWindowSize, Val: http2transportDefaultStreamFlow},
}
if max := t.maxHeaderListSize(); max != 0 {
    initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxHeaderListSize, Val: max})
}

cc.bw.Write(http2clientPreface)
cc.fr.WriteSettings(initialSettings...)
cc.fr.WriteWindowUpdate(0, http2transportDefaultConnFlow)
cc.inflow.add(http2transportDefaultConnFlow + http2initialWindowSize)
cc.bw.Flush()

client向server发送的开场白内容如下:

const (
    // client首先想server发送以PRI开头的一串字符串。
    http2ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
)
var (
    http2clientPreface = []byte(http2ClientPreface)
)

发送完开场白后,client向server发送 SETTINGS 数据帧。

http2SettingEnablePush: 告知server客户端是否开启push功能。

http2SettingInitialWindowSize:告知server客户端可接受的最大数据窗口是 http2transportDefaultStreamFlow (4M)。

发送完SETTINGS数据帧后,发送WINDOW_UPDATE数据帧, 因为第一个参数为0即streamID为0,则是告知server此连接可接受的最大数据窗口为 http2transportDefaultConnFlow (1G)。

发送完WINDOW_UPDATE数据帧后,将client的可读流控制窗口大小设置为 http2transportDefaultConnFlow + http2initialWindowSize

5、开启读循环并返回

go cc.readLoop()

(*http2Transport).RoundTrip

(*http2Transport).RoundTrip只是一个入口函数,它会调用(*http2Transport). RoundTripOpt方法。

(*http2Transport). RoundTripOpt有两个步骤比较关键:

t.connPool().GetClientConn(req, addr) : 在http2的连接池里面获取一个可用连接,其中连接池的类型为 http2noDialClientConnPool ,参考 http2configureTransport 函数。

cc.roundTrip(req) : 通过获取到的可用连接发送请求并返回响应。

(http2noDialClientConnPool).GetClientConn

根据实际的debug结果(http2noDialClientConnPool).GetClientConn最终会调用 (*http2clientConnPool).getClientConn(req *Request, addr string, dialOnMiss bool)

通过(http2noDialClientConnPool).GetClientConn获取连接时传递给(*http2clientConnPool).getClientConn方法的第三个参数始终为 false ,该参数为false时代表着即使无法正常获取可用连接,也不在这个环节重新发起拨号流程。

在(*http2clientConnPool).getClientConn中会遍历同地址的连接,并判断连接的状态从而获取一个可以处理请求的连接。

for _, cc := range p.conns[addr] {
    if st := cc.idleState(); st.canTakeNewRequest {
        if p.shouldTraceGetConn(st) {
            http2traceGetConn(req, addr)
        }
        p.mu.Unlock()
        return cc, nil
    }
}

cc.idleState() 判断当前连接池中的连接能否处理新的请求:

1、当前连接是否能被多个请求共享,如果仅单个请求使用且已经有一个数据流,则当前连接不能处理新的请求。

if cc.singleUse && cc.nextStreamID > 1 {
    return
}

2、以下几点均为true时,才代表当前连接能够处理新的请求:

maxConcurrentStreams
cc.tooIdleLocked()
st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
        int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
        !cc.tooIdleLocked()

当从链接池成功获取到一个可以处理请求的连接,就可以和server进行数据交互,即 (*http2ClientConn).roundTrip 流程。

(*http2ClientConn).roundTrip

1、在真正开始处理请求前,还要进行header检查,http2对http1.1的某些header是不支持的,笔者就不对这个逻辑进行分析了,直接上源码:

func http2checkConnHeaders(req *Request) error {
    if v := req.Header.Get("Upgrade"); v != "" {
        return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"])
    }
    if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") {
        return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)
    }
    if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !strings.EqualFold(vv[0], "close") && !strings.EqualFold(vv[0], "keep-alive")) {
        return fmt.Errorf("http2: invalid Connection request header: %q", vv)
    }
    return nil
}
func http2commaSeparatedTrailers(req *Request) (string, error) {
    keys := make([]string, 0, len(req.Trailer))
    for k := range req.Trailer {
        k = CanonicalHeaderKey(k)
        switch k {
        case "Transfer-Encoding", "Trailer", "Content-Length":
            return "", &http2badStringError{"invalid Trailer key", k}
        }
        keys = append(keys, k)
    }
    if len(keys) > 0 {
        sort.Strings(keys)
        return strings.Join(keys, ","), nil
    }
    return "", nil
}

2、调用 (*http2ClientConn).awaitOpenSlotForRequest ,一直等到当前连接处理的数据流小于 maxConcurrentStreams , 如果此函数返回错误,则本次请求失败。

2.1、double check当前连接可用。

if cc.closed || !cc.canTakeNewRequestLocked() {
    if waitingForConn != nil {
        close(waitingForConn)
    }
    return http2errClientConnUnusable
}

2.2、如果当前连接处理的数据流小于 maxConcurrentStreams 则直接返回nil。笔者相信大部分逻辑走到这儿就返回了。

if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) {
    if waitingForConn != nil {
        close(waitingForConn)
    }
    return nil
}

2.3、如果当前连接处理的数据流确实已经达到上限,则开始进入等待流程。

if waitingForConn == nil {
    waitingForConn = make(chan struct{})
    go func() {
        if err := http2awaitRequestCancel(req, waitingForConn); err != nil {
            cc.mu.Lock()
            waitingForConnErr = err
            cc.cond.Broadcast()
            cc.mu.Unlock()
        }
    }()
}
cc.pendingRequests++
cc.cond.Wait()
cc.pendingRequests--

通过上面的逻辑知道,当前连接处理的数据流达到上限后有两种情况,一是等待请求被取消,二是等待其他请求结束。如果有其他数据流结束并唤醒当前等待的请求,则重复2.1、2.2和2.3的步骤。

3、调用 cc.newStream() 在连接上创建一个数据流(创建数据流是线程安全的,因为源码中在调用 awaitOpenSlotForRequest 之前先加锁,直到写入请求的header之后才释放锁)。

func (cc *http2ClientConn) newStream() *http2clientStream {
    cs := &http2clientStream{
        cc:        cc,
        ID:        cc.nextStreamID,
        resc:      make(chan http2resAndError, 1),
        peerReset: make(chan struct{}),
        done:      make(chan struct{}),
    }
    cs.flow.add(int32(cc.initialWindowSize))
    cs.flow.setConnFlow(&cc.flow)
    cs.inflow.add(http2transportDefaultStreamFlow)
    cs.inflow.setConnFlow(&cc.inflow)
    cc.nextStreamID += 2
    cc.streams[cs.ID] = cs
    return cs
}

笔者对上述代码简单描述如下:

  • 新建一个 http2clientStream ,数据流ID为 cc.nextStreamID ,新建数据流后, cc.nextStreamID +=2
  • 数据流通过 http2resAndError 管道接收请求的响应。
  • 初始化当前数据流的可写流控制窗口大小为 cc.initialWindowSize ,并保存连接的可写流控制指针。
  • 初始化当前数据流的可读流控制窗口大小为 http2transportDefaultStreamFlow ,并保存连接的可读流控制指针。
  • 最后将新建的数据流注册到当前连接中。

4、调用 cc.t.getBodyWriterState(cs, body) 会返回一个 http2bodyWriterState 结构体。通过该结构体可以知道请求body是否发送成功。

func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) {
    s.cs = cs
    if body == nil {
        return
    }
    resc := make(chan error, 1)
    s.resc = resc
    s.fn = func() {
        cs.cc.mu.Lock()
        cs.startedWrite = true
        cs.cc.mu.Unlock()
        resc <- cs.writeRequestBody(body, cs.req.Body)
    }
    s.delay = t.expectContinueTimeout()
    if s.delay == 0 ||
        !httpguts.HeaderValuesContainsToken(
            cs.req.Header["Expect"],
            "100-continue") {
        return
    }
    // 此处省略代码,因为绝大部分请求都不会设置100-continue的标头
    return
}

s.fn : 标记当前数据流开始写入数据,并且将请求body的发送结果写入 s.resc 管道(本文暂不对 writeRequestBody 展开分析,下篇文章会对其进行分析)。

5、因为是多个请求共享一个连接,那么向连接写入数据帧时需要加锁,比如加锁写入请求头。

cc.wmu.Lock()
endStream := !hasBody && !hasTrailers
werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
cc.wmu.Unlock()

6、如果有请求body,则开始写入请求body,没有请求body则设置响应header的超时时间(有请求body时,响应header的超时时间需要在请求body写完之后设置)。

if hasBody {
    bodyWriter.scheduleBodyWrite()
} else {
    http2traceWroteRequest(cs.trace, nil)
    if d := cc.responseHeaderTimeout(); d != 0 {
        timer := time.NewTimer(d)
        defer timer.Stop()
        respHeaderTimer = timer.C
    }
}

scheduleBodyWrite 的内容如下:

func (s http2bodyWriterState) scheduleBodyWrite() {
    if s.timer == nil {
        // We're not doing a delayed write (see
        // getBodyWriterState), so just start the writing
        // goroutine immediately.
        go s.fn()
        return
    }
    http2traceWait100Continue(s.cs.trace)
    if s.timer.Stop() {
        s.timer.Reset(s.delay)
    }
}

因为笔者的请求header中没有携带 100-continue 标头,所以在前面的 getBodyWriterState 函数中初始化的s.timer为nil即调用 scheduleBodyWrite 会立即开始发送请求body。

7、轮询管道获取响应结果。

在看轮询源码之前,先看一个简单的函数:

handleReadLoopResponse := func(re http2resAndError) (*Response, bool, error) {
    res := re.res
    if re.err != nil || res.StatusCode > 299 {
        bodyWriter.cancel()
        cs.abortRequestBodyWrite(http2errStopReqBodyWrite)
    }
    if re.err != nil {
        cc.forgetStreamID(cs.ID)
        return nil, cs.getStartedWrite(), re.err
    }
    res.Request = req
    res.TLS = cc.tlsState
    return res, false, nil
}

该函数主要就是判断读到的响应是否正常,并根据响应的结果构造 (*http2ClientConn).roundTrip 的返回值。

了解了 handleReadLoopResponse 之后,下面就看看轮询的逻辑:

for {
    select {
    case re := <-readLoopResCh:
        return handleReadLoopResponse(re)
    // 此处省略代码(包含请求取消,请求超时等管道的轮询)
    case err := <-bodyWriter.resc:
        // Prefer the read loop's response, if available. Issue 16102.
        select {
        case re := <-readLoopResCh:
            return handleReadLoopResponse(re)
        default:
        }
        if err != nil {
            cc.forgetStreamID(cs.ID)
            return nil, cs.getStartedWrite(), err
        }
        bodyWritten = true
        if d := cc.responseHeaderTimeout(); d != 0 {
            timer := time.NewTimer(d)
            defer timer.Stop()
            respHeaderTimer = timer.C
        }
    }
}

笔者仅对上面的第二种情况即请求body发送完成进行描述:

  • 能否读到响应,如果能够读取响应则直接返回。
  • 判断请求body是否发送成功,如果发送失败,直接返回。
  • 如果请求body发送成功,则设置响应header的超时时间。

总结

本文主要描述了两个方面的内容:

  1. 确认client和server都支持http2协议,并构建一个http2的连接,同时开启该连接的读循环。
  2. 通过http2连接池获取一个http2连接,并发送请求和读取响应。

预告

鉴于HTTTP2.0的内容较多,且文章篇幅过长时不易阅读,笔者将后续要分析的内容拆为两个部分:

readLoopResCh

最后,衷心希望本文能够对各位读者有一定的帮助。

注:

  1. 写本文时, 笔者所用go版本为: go1.14.2。
  2. 本文对h2c的情况不予以考虑。
  3. 因为笔者分析的是请求流程,所以没有在本地搭建server,而是使用了一个支持http2连接的图片一步步的debug。eg: https://dss0.bdstatic.com/5aV...

参考

https://developers.google.com...

有疑问加站长微信联系

iiUfA3j.png!mobile

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK