詳解golang net之transport


關於golang http transport的講解,網上有很多文章進行了解讀,但都比較粗,很多代碼實現並沒有講清楚。故給出更加詳細的實現說明。整體看下來細節實現層面還是比較難懂的。

本次使用golang版本1.12.9

transport實現了RoundTripper接口,該接口只有一個方法RoundTrip(),故transport的入口函數就是RoundTrip()。transport的主要功能其實就是緩存了長連接,用於大量http請求場景下的連接復用,減少發送請求時TCP(TLS)連接建立的時間損耗,同時transport還能對連接做一些限制,如連接超時時間,每個host的最大連接數等。transport對長連接的緩存和控制僅限於TCP+(TLS)+HTTP1,不對HTTP2做緩存和限制。

tranport包含如下幾個主要概念:

  • 連接池:在idleConn中保存了不同類型(connectMethodKey)的請求連接(persistConn)。當發生請求時,首先會嘗試從連接池中取一條符合其請求類型的連接使用
  • readLoop/writeLoop:連接之上的功能,循環處理該類型的請求(發送request,返回response)
  • roundTrip:請求的真正入口,接收到一個請求后會交給writeLoop和readLoop處理。

一對readLoop/writeLoop只能處理一條連接,如果這條連接上沒有更多的請求,則關閉連接,退出循環,釋放系統資源

下述代碼都來自golang源碼的src/net/httptransport.go文件

type RoundTripper interface {
    // RoundTrip executes a single HTTP transaction, returning
    // a Response for the provided Request.
    //
    // RoundTrip should not attempt to interpret the response. In
    // particular, RoundTrip must return err == nil if it obtained
    // a response, regardless of the response's HTTP status code.
    // A non-nil err should be reserved for failure to obtain a
    // response. Similarly, RoundTrip should not attempt to
    // handle higher-level protocol details such as redirects,
    // authentication, or cookies.
    //
    // RoundTrip should not modify the request, except for
    // consuming and closing the Request's Body. RoundTrip may
    // read fields of the request in a separate goroutine. Callers
    // should not mutate or reuse the request until the Response's
    // Body has been closed.
    //
    // RoundTrip must always close the body, including on errors,
    // but depending on the implementation may do so in a separate
    // goroutine even after RoundTrip returns. This means that
    // callers wanting to reuse the body for subsequent requests
    // must arrange to wait for the Close call before doing so.
    //
    // The Request's URL and Header fields must be initialized.
    RoundTrip(*Request) (*Response, error)
}

Transport結構體中的主要成員如下(沒有列出所有成員):

wantIdle                                                要求關閉所有idle的persistConn
reqCanceler map[*Request]func(error)                   用於取消request
idleConn   map[connectMethodKey][]*persistConn          idle狀態的persistConn連接池,最大值受maxIdleConnsPerHost限制
idleConnCh map[connectMethodKey]chan *persistConn       用於給調用者傳遞persistConn
connPerHostCount     map[connectMethodKey]int 表示一類連接上的host數目,最大值受MaxConnsPerHost限制
connPerHostAvailable map[connectMethodKey]chan struct{} 與connPerHostCount配合使用,判斷該類型的連接數目是否已經達到上限
idleLRU    connLRU                                      長度受MaxIdleConns限制,隊列方式保存所有idle的pconn
altProto   atomic.Value                                 nil or map[string]RoundTripper,key為URI scheme,表示處理該scheme的RoundTripper實現。注意與TLSNextProto的不同,前者表示URI的scheme,后者表示tls之上的協議。如前者不會體現http2,后者會體現http2
Proxy func(*Request) (*url.URL, error)                  為request返回一個代理的url
DisableKeepAlives bool 是否取消長連接
DisableCompression bool 是否取消HTTP壓縮
MaxIdleConns int 所有host的idle狀態的最大連接數目,即idleConn中所有連接數
MaxIdleConnsPerHost int 每個host的idle狀態的最大連接數目,即idleConn中的key對應的連接數
MaxConnsPerHost                                         每個host上的最大連接數目,含dialing/active/idle狀態的connections。http2時,每個host只允許有一條idle的conneciton
DialContext func(ctx context.Context, network, addr string) (net.Conn, error) 創建未加密的tcp連接,比Dial函數增加了context控制
Dial func(network, addr string) (net.Conn, error)       創建未加密的tcp連接,廢棄,使用DialContext
DialTLS func(network, addr string) (net.Conn, error)    為非代理模式的https創建連接的函數,如果該函數非空,則不會使用Dial函數,且忽略TLSClientConfig和TLSHandshakeTimeout;反之使用Dila和TLSClientConfig。即有限使用DialTLS進行tls協商
TLSClientConfig *tls.Config                             tls client用於tls協商的配置
IdleConnTimeout                                       連接保持idle狀態的最大時間,超時關閉pconn
TLSHandshakeTimeout time.Duration                       tls協商的超時時間
ResponseHeaderTimeout time.Duration                     發送完request后等待serve response的時間
TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper 在tls協商帶NPN/ALPN的擴展后,transport如何切換到其他協議。指tls之上的協議(next指的就是tls之上的意思)
ProxyConnectHeader Header                               在CONNECT請求時,配置request的首部信息,可選
MaxResponseHeaderBytes                                  指定server響應首部的最大字節數

Transport.roundTrip是主入口,它通過傳入一個request參數,由此選擇一個合適的長連接來發送該request並返回response。整個流程主要分為兩步:

使用getConn函數來獲得底層TCP(TLS)連接;調用roundTrip函數進行上層協議(HTTP)處理。

func (t *Transport) roundTrip(req *Request) (*Response, error) {
    t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
    ctx := req.Context()
    trace := httptrace.ContextClientTrace(ctx)

    if req.URL == nil {
        req.closeBody()
        return nil, errors.New("http: nil Request.URL")
    }
    if req.Header == nil {
        req.closeBody()
        return nil, errors.New("http: nil Request.Header")
    }
    scheme := req.URL.Scheme
    isHTTP := scheme == "http" || scheme == "https"
// 下面判斷request首部的有效性 if isHTTP { for k, vv := range req.Header { if !httpguts.ValidHeaderFieldName(k) { return nil, fmt.Errorf("net/http: invalid header field name %q", k) } for _, v := range vv { if !httpguts.ValidHeaderFieldValue(v) { return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k) } } } } // 判斷是否使用注冊的RoundTrip來處理對應的scheme。對於使用tcp+tls+http1(wss協議升級)的場景
// 不能使用注冊的roundTrip。后續代碼對tcp+tls+http1或tcp+http1進行了roundTrip處理
if t.useRegisteredProtocol(req) { altProto, _ := t.altProto.Load().(map[string]RoundTripper) if altRT := altProto[scheme]; altRT != nil { if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol { return resp, err } } }

// 后續僅處理URL scheme為http或https的連接
if !isHTTP { req.closeBody() return nil, &badStringError{"unsupported protocol scheme", scheme} } if req.Method != "" && !validMethod(req.Method) { return nil, fmt.Errorf("net/http: invalid method %q", req.Method) } if req.URL.Host == "" { req.closeBody() return nil, errors.New("http: no Host in request URL") }
// 下面for循環用於在request出現錯誤的時候進行請求重試。但不是所有的請求失敗都會被嘗試,如請求被取消(errRequestCanceled)
// 的情況是不會進行重試的。具體參見shouldRetryRequest函數
for { select { case <-ctx.Done(): req.closeBody() return nil, ctx.Err() default: } // treq gets modified by roundTrip, so we need to recreate for each retry. treq := &transportRequest{Request: req, trace: trace}
// connectMethodForRequest函數通過輸入一個request返回一個connectMethod(簡稱cm),該類型通過
// {proxyURL,targetScheme,tartgetAddr,onlyH1},即{代理URL,server端的scheme,server的地址,是否HTTP1}
// 來表示一個請求。一個符合connectMethod描述的request將會在Transport.idleConn中匹配到一類長連接。
cm, err := t.connectMethodForRequest(treq) if err != nil { req.closeBody() return nil, err }
// 獲取一條長連接,如果連接池中有現成的連接則直接返回,否則返回一條新建的連接。該連接可能是HTTP2格式的,存放在persistCnn.alt中,
//
使用其自注冊的RoundTrip處理。該函數描述參見下面內容。
// 從getConn的實現中可以看到,一個請求只能在idle的連接上執行,反之一條連接只能同時處理一個請求。
        pconn, err := t.getConn(treq, cm)
// 如果獲取底層連接失敗,無法繼續上層協議的請求,直接返回錯誤
if err != nil {
// 每個request都會在getConn中設置reqCanceler,獲取連接失敗,清空設置
t.setReqCanceler(req, nil) req.closeBody() return nil, err } var resp *Response
// pconn.alt就是從Transport.TLSNextProto中獲取的,它表示TLS之上的協議,如HTTP2。從persistConn.alt的注釋中可以看出
// 目前alt僅支持HTTP2協議,后續可能會支持更多協議。
if pconn.alt != nil { // HTTP2處理,使用HTTP2時,由於不緩存HTTP2連接,不對其做限制 t.decHostConnCount(cm.key())
// 清除getConn中設置的標記。具體參見getConn
t.setReqCanceler(req, nil) resp, err = pconn.alt.RoundTrip(req) } else {
// pconn.roundTrip中做了比較復雜的處理,該函數用於發送request並返回response。
// 通過writeLoop發送request,通過readLoop返回response
resp, err = pconn.roundTrip(treq) }
// 如果成功返回response,則整個處理結束.
if err == nil { return resp, nil }
// 判斷該request是否滿足重試條件,大部分場景是不支持重試的,僅有少部分情況支持,如errServerClosedIdle
// err 非nil時實際並沒有在原來的連接上重試,且pconn沒有關閉,提了issue
if !pconn.shouldRetryRequest(req, err) { // Issue 16465: return underlying net.Conn.Read error from peek, // as we've historically done. if e, ok := err.(transportReadFromServerError); ok { err = e.err } return nil, err } testHookRoundTripRetried() // Rewind the body if we're able to.
// 用於重定向場景
if req.GetBody != nil { newReq := *req var err error newReq.Body, err = req.GetBody() if err != nil { return nil, err } req = &newReq } } }

 getConn用於返回一條長連接。長連接的來源有2種路徑:連接池中獲取;當連接池中無法獲取到時會新建一條連接

func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {
    req := treq.Request
    trace := treq.trace
    ctx := req.Context()
    if trace != nil && trace.GetConn != nil {
        trace.GetConn(cm.addr())
    }
// 從連接池中找一條合適的連接,如果找到則返回該連接,否則新建連接
if pc, idleSince := t.getIdleConn(cm); pc != nil { if trace != nil && trace.GotConn != nil { trace.GotConn(pc.gotIdleConnTrace(idleSince)) }
// 此處設置transport.reqCanceler比較難理解,主要功能是做一個標記,用於判斷當前到執行pconn.roundTrip
// 期間,request有沒有被(如Request.Cancel,Request.Context().Done())取消,被取消的request將無需繼續roundTrip處理
t.setReqCanceler(req, func(error) {}) return pc, nil } type dialRes struct { pc *persistConn err error }
// 該chan中用於存放通過dialConn函數新創建的長連接persistConn(后續簡稱pconn),表示一條TCP(TLS)的底層連接. dialc :
= make(chan dialRes)
// cmKey實際就是把connectMethod中的元素全部字符串化。cmKey作為一類連接的標識,如Transport.idleConn[cmKey]就表示一類特定的連接 cmKey :
= cm.key() // Copy these hooks so we don't race on the postPendingDial in // the goroutine we launch. Issue 11136. testHookPrePendingDial := testHookPrePendingDial testHookPostPendingDial := testHookPostPendingDial // 在嘗試獲取連接的時候,如果此時正在創建一條連接,但最后沒有選擇這條新建的連接(有其它調用者釋放了一條連接),
// 此時,handlePendingDial負責將這條新創建的連接放到Transport.idleConn連接池中
handlePendingDial :
= func() { testHookPrePendingDial() go func() { if v := <-dialc; v.err == nil {
// 將一條連接放入連接池中,描述見下文--tryPutIdleConn t.putOrCloseIdleConn(v.pc) }
else { t.decHostConnCount(cmKey) } testHookPostPendingDial() }() } cancelc := make(chan error, 1)
// 為request設置ReqCanceler。transport代碼中不會主動調用該ReqCanceler函數(會在
// roundTrip中調用replaceReqCanceler將其覆蓋),可能的原因是transport提供了一個對外API CancelRequest,
// 用戶可以調用該函數取消連接,此時會調用該ReqCanceler。需要注意的是從CancelRequest的注釋中可以看出,該API
// 已經被廢棄,這段代碼后面可能會被刪除(如果有不同看法,請指出)
t.setReqCanceler(req, func(err error) { cancelc
<- err }) // 如果對host上建立的連接有限制 if t.MaxConnsPerHost > 0 { select {
// incHostConnCount會根據主機已經建立的連接是否達到t.MaxConnsPerHost來返回一個未關閉
// 的chan(連接數達到MaxConnsPerHost)或關閉的chan(連接數未達到MaxConnsPerHost),
// 返回未關閉的chan時會阻塞等待其他請求釋放連接,不能新創建pconn;反之可以使用新創建的pconn
case <-t.incHostConnCount(cmKey): // 等待獲取某一類連接對應的chan。tryPutIdleConn函數中會嘗試將新建或釋放的連接放入到該chan中 case pc := <-t.getIdleConnCh(cm): if trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()}) } return pc, nil
// 下面2個case都表示request被取消,其中Cancel被廢棄,建議使用Context來取消request
case <-req.Cancel: return nil, errRequestCanceledConn case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err } } go func() {
// 新建連接,創建好后將其放入dialc chan中 pc, err :
= t.dialConn(ctx, cm) dialc <- dialRes{pc, err} }()
// 下面會通過兩種途徑來獲得連接:從dialc中獲得通過dialConn新建的連接;通過idleConnCh獲得其他request釋放的連接
// 如果首先獲取到的是dialConn新建的連接,直接返回該連接即可;如果首先獲取到的是其他request釋放的連接,在返回該連接前
// 需要調用handlePendingDial來處理dialConn新建的連接。
idleConnCh :
= t.getIdleConnCh(cm) select {
// 獲取dialConn新建的連接
case v := <-dialc: // Our dial finished. if v.pc != nil { if trace != nil && trace.GotConn != nil && v.pc.alt == nil { trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn}) } return v.pc, nil } // 僅針對MaxConnsPerHost>0有效,對應上面的incHostConnCount() t.decHostConnCount(cmKey)
// 下面用於返回更易讀的錯誤信息
select { case <-req.Cancel: // It was an error due to cancelation, so prioritize that // error value. (Issue 16049) return nil, errRequestCanceledConn case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err default: // It wasn't an error due to cancelation, so // return the original error message: return nil, v.err }
// 獲取其他request釋放的連接
case pc := <-idleConnCh: // Another request finished first and its net.Conn // became available before our dial. Or somebody // else's dial that they didn't use. // But our dial is still going, so give it away // when it finishes: handlePendingDial() if trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()}) } return pc, nil
// 如果request取消,也需要調用handlePendingDial處理新建的連接
case <-req.Cancel: handlePendingDial() return nil, errRequestCanceledConn case <-req.Context().Done(): handlePendingDial() return nil, req.Context().Err() case err := <-cancelc: handlePendingDial() if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err } }

tryPutIdleConn函數用來將一條新創建或回收的連接放回連接池中,以便后續使用。與getIdleConnCh配合使用,后者用於獲取一類連接對應的chan。在如下場景會將一個連接放回idleConn中

    • 在readLoop成功之后(當然還有其他判斷,如底層鏈路沒有返回EOF錯誤);
    • 創建一個新連接且新連接沒有被使用時;
    • roundTrip一開始發現request被取消時
func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
// 當不使用長連接或該主機上的連接數小於0(即不允許緩存任何連接)時,返回錯誤並關閉創建的連接(此處沒有做關閉處理,
// 但存在不適用的連接時必須關閉,如使用putOrCloseIdleConn)。
// 可以看出當不使用長連接時,Transport不能緩存連接
if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 { return errKeepAlivesDisabled } if pconn.isBroken() { return errConnBroken }
// 如果是HTTP2連接,則直接返回,不緩存該連接
if pconn.alt != nil { return errNotCachingH2Conn }
// 為新連接標記可重用狀態,新創建的連接肯定是可以重用的,用於在Transport.roundTrip
// 中的shouldRetryRequest函數中判斷連接是否可以重用
pconn.markReused()
// 該key對應Transport.idleConn中的key,標識特定的連接 key :
= pconn.cacheKey t.idleMu.Lock() defer t.idleMu.Unlock() // idleConnCh中的chan元素用於存放可用的連接pconn,每類連接都有一個chan waitingDialer := t.idleConnCh[key] select {
// 如果此時有調用者等待一個連接,則直接將該連接傳遞出去,不進行保存,這種做法有利於提高效率
case waitingDialer <- pconn: // We're done with this pconn and somebody else is // currently waiting for a conn of this type (they're // actively dialing, but this conn is ready // first). Chrome calls this socket late binding. See // https://insouciant.org/tech/connection-management-in-chromium/ return nil default:
// 如果沒有調用者等待連接,則清除該chan。刪除map中的chan直接會關閉該chan
if waitingDialer != nil { // They had populated this, but their dial won // first, so we can clean up this map entry. delete(t.idleConnCh, key) } }
// 與DisableKeepAlives有點像,當用戶需要關閉所有idle的連接時,不會再緩存連接
if t.wantIdle { return errWantIdle } if t.idleConn == nil { t.idleConn = make(map[connectMethodKey][]*persistConn) } idles := t.idleConn[key]
// 當主機上該類連接數超過Transport.MaxIdleConnsPerHost時,不能再保存新的連接,返回錯誤並關閉連接
if len(idles) >= t.maxIdleConnsPerHost() { return errTooManyIdleHost }
// 需要緩存的連接與連接池中已有的重復,系統退出(這種情況下系統已經發生了混亂,直接退出)
for _, exist := range idles { if exist == pconn { log.Fatalf("dup idle pconn %p in freelist", pconn) } }
// 添加待緩存的連接 t.idleConn[key]
= append(idles, pconn) t.idleLRU.add(pconn)
// 受MaxIdleConns的限制,添加策略變為:添加新的連接,刪除最老的連接。
// MaxIdleConns限制了所有類型的idle狀態的最大連接數目,而MaxIdleConnsPerHost限制了host上單一類型的最大連接數目
// idleLRU中保存了所有的連接,此處的作用為,找出最老的連接並移除
if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns { oldest := t.idleLRU.removeOldest() oldest.close(errTooManyIdle) t.removeIdleConnLocked(oldest) }
// 為新添加的連接設置超時時間
if t.IdleConnTimeout > 0 { if pconn.idleTimer != nil {
// 如果該連接是被釋放的,則重置超時時間 pconn.idleTimer.Reset(t.IdleConnTimeout) }
else {
// 如果該連接時新建的,則設置超時時間並設置超時動作pconn.closeConnIfStillIdle
// closeConnIfStillIdle用於釋放連接,從Transport.idleLRU和Transport.idleConn中移除並關閉該連接
pconn.idleTimer
= time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle) } }
pconn.idleAt
= time.Now() return nil }

 dialConn用於新創建一條連接,並為該連接啟動readLoop和writeLoop

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) {
pconn :
= &persistConn{ t: t, cacheKey: cm.key(), reqch: make(chan requestAndChan, 1), writech: make(chan writeRequest, 1), closech: make(chan struct{}), writeErrCh: make(chan error, 1), writeLoopDone: make(chan struct{}), } trace := httptrace.ContextClientTrace(ctx) wrapErr := func(err error) error { if cm.proxyURL != nil { // Return a typed error, per Issue 16997 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err} } return err }
// 調用注冊的DialTLS處理tls。使用自注冊的TLS處理函數時,transport的TLSClientConfig和TLSHandshakeTimeout
// 參數會被忽略
if cm.scheme() == "https" && t.DialTLS != nil { var err error
// 調用注冊的連接函數創建一條連接,注意cm.addr()的實現,如果該連接存在proxy,則此處是與proxy建立TLS連接;否則直接連server。
// 存在proxy時,與server建立連接分為2步:與proxy建立TLP(TLS)連接;與server建立HTTP(HTTPS)連接
// func (cm *connectMethod) addr() string { // if cm.proxyURL != nil { // return canonicalAddr(cm.proxyURL) // } // return cm.targetAddr // }
pconn.conn, err = t.DialTLS("tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } if pconn.conn == nil { return nil, wrapErr(errors.New("net/http: Transport.DialTLS returned (nil, nil)")) }
// 如果連接類型是TLS的,則需要處理TLS協商
if tc, ok := pconn.conn.(*tls.Conn); ok { // Handshake here, in case DialTLS didn't. TLSNextProto below // depends on it for knowing the connection state. if trace != nil && trace.TLSHandshakeStart != nil { trace.TLSHandshakeStart() }
// 啟動TLS協商,如果協商失敗需要關閉連接
if err := tc.Handshake(); err != nil { go pconn.conn.Close() if trace != nil && trace.TLSHandshakeDone != nil { trace.TLSHandshakeDone(tls.ConnectionState{}, err) } return nil, err } cs := tc.ConnectionState() if trace != nil && trace.TLSHandshakeDone != nil { trace.TLSHandshakeDone(cs, nil) }
// 保存TLS協商結果
pconn.tlsState = &cs } } else {
// 使用默認方式創建連接,此時會用到transport的TLSClientConfig和TLSHandshakeTimeout參數。同樣注意cm.addr()
conn, err := t.dial(ctx, "tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } pconn.conn = conn
// 如果scheme是需要TLS協商的,則處理TLS協商,否則為普通的HTTP連接
if cm.scheme() == "https" { var firstTLSHost string if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil { return nil, wrapErr(err) }
// 進行TLS協商,具體參見下文addTLS
if err = pconn.addTLS(firstTLSHost, trace); err != nil { return nil, wrapErr(err) } } } // 處理proxy的情況 switch {
// 不存在proxy 直接跳過
case cm.proxyURL == nil: case cm.proxyURL.Scheme == "socks5": conn := pconn.conn d := socksNewDialer("tcp", conn.RemoteAddr().String()) if u := cm.proxyURL.User; u != nil { auth := &socksUsernamePassword{ Username: u.Username(), } auth.Password, _ = u.Password() d.AuthMethods = []socksAuthMethod{ socksAuthMethodNotRequired, socksAuthMethodUsernamePassword, } d.Authenticate = auth.Authenticate } if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil { conn.Close() return nil, err }
// 如果存在proxy,且server的scheme為"http",如果需要代理認證,則設置認證信息
case cm.targetScheme == "http": pconn.isProxy = true if pa := cm.proxyAuth(); pa != "" { pconn.mutateHeaderFunc = func(h Header) { h.Set("Proxy-Authorization", pa) } }
// 如果存在proxy,且server的scheme為"https"。與"http"不同,在與server進行tls協商前,會給proxy
// 發送一個method為"CONNECT"的HTTP請求,如果請求通過(返回200),則可以繼續與server進行TLS協商
case cm.targetScheme == "https":
// 該conn表示與proxy建立的連接 conn :
= pconn.conn hdr := t.ProxyConnectHeader if hdr == nil { hdr = make(Header) } connectReq := &Request{ Method: "CONNECT", URL: &url.URL{Opaque: cm.targetAddr}, Host: cm.targetAddr, Header: hdr, } if pa := cm.proxyAuth(); pa != "" { connectReq.Header.Set("Proxy-Authorization", pa) }
// 發送"CONNECT" http請求 connectReq.Write(conn)
// Read response. // Okay to use and discard buffered reader here, because // TLS server will not speak until spoken to. br := bufio.NewReader(conn) resp, err := ReadResponse(br, connectReq) if err != nil { conn.Close() return nil, err }
// proxy返回非200,表示無法建立連接,可能情況如proxy認證失敗
if resp.StatusCode != 200 { f := strings.SplitN(resp.Status, " ", 2) conn.Close() if len(f) < 2 { return nil, errors.New("unknown status code") } return nil, errors.New(f[1]) } } // 與proxy建立連接后,再與server進行TLS協商 if cm.proxyURL != nil && cm.targetScheme == "https" { if err := pconn.addTLS(cm.tlsHost(), trace); err != nil { return nil, err } } // 后續進行TLS之上的協議處理,如果TLS之上的協議為注冊協議,則使用注冊的roundTrip進行處理
// TLS之上的協議為TLS協商過程中使用NPN/ALPN擴展協商出的協議,如HTTP2(參見golang.org/x/net/http2)
if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" { if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok { return &persistConn{alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil } } if t.MaxConnsPerHost > 0 { pconn.conn = &connCloseListener{Conn: pconn.conn, t: t, cmKey: pconn.cacheKey} }
// 創建讀寫通道,writeLoop用於發送request,readLoop用於接收響應。roundTrip函數中會通過chan給writeLoop發送
// request,通過chan從readLoop接口response。每個連接都有一個readLoop和writeLoop,連接關閉后,這2個Loop也會退出。
// pconn.br給readLoop使用,pconn.bw給writeLoop使用,注意此時已經建立了tcp連接。
pconn.br
= bufio.NewReader(pconn) pconn.bw = bufio.NewWriter(persistConnWriter{pconn}) go pconn.readLoop() go pconn.writeLoop() return pconn, nil }

 addTLS用於進行非注冊協議下的TLS協商

func (pconn *persistConn) addTLS(name string, trace *httptrace.ClientTrace) error {
    // Initiate TLS and check remote host name against certificate.
    cfg := cloneTLSConfig(pconn.t.TLSClientConfig)
    if cfg.ServerName == "" {
        cfg.ServerName = name
    }
    if pconn.cacheKey.onlyH1 {
        cfg.NextProtos = nil
    }
    plainConn := pconn.conn
// 配置TLS client,包含一個TCP連接和TLC配置 tlsConn :
= tls.Client(plainConn, cfg) errc := make(chan error, 2) var timer *time.Timer
// 設置TLS超時時間,並在超時后往errc中寫入一個tlsHandshakeTimeoutError{} if d := pconn.t.TLSHandshakeTimeout; d != 0 { timer = time.AfterFunc(d, func() { errc <- tlsHandshakeTimeoutError{} }) } go func() { if trace != nil && trace.TLSHandshakeStart != nil { trace.TLSHandshakeStart() }
// 執行TLS協商,如果協商沒有超時,則將協商結果err放入errc中 err :
= tlsConn.Handshake() if timer != nil { timer.Stop() } errc <- err }()
// 阻塞等待TLS協商結果,如果協商失敗或協商超時,關閉底層連接
if err := <-errc; err != nil { plainConn.Close() if trace != nil && trace.TLSHandshakeDone != nil { trace.TLSHandshakeDone(tls.ConnectionState{}, err) } return err }
// 獲取協商結果並設置到pconn.tlsState cs :
= tlsConn.ConnectionState() if trace != nil && trace.TLSHandshakeDone != nil { trace.TLSHandshakeDone(cs, nil) } pconn.tlsState = &cs pconn.conn = tlsConn return nil }

 在獲取到底層TCP(TLS)連接后在roundTrip中處理上層協議:即發送HTTP request,返回HTTP response。roundTrip給writeLoop提供request,從readLoop獲取response。

一個roundTrip用於處理一類request。

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
    testHookEnterRoundTrip()
// 此處與getConn中的"t.setReqCanceler(req, func(error) {})"相對應,用於判斷request是否被取消
// 返回false表示request被取消,不必繼續后續請求,關閉連接並返回錯誤
if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) { pc.t.putOrCloseIdleConn(pc) return nil, errRequestCanceled } pc.mu.Lock()
// 與readLoop配合使用,表示期望的響應的個數 pc.numExpectedResponses
++
// dialConn中定義的函數,設置了proxy的認證信息 headerFn := pc.mutateHeaderFunc pc.mu.Unlock() if headerFn != nil { headerFn(req.extraHeaders()) } // Ask for a compressed version if the caller didn't set their // own value for Accept-Encoding. We only attempt to // uncompress the gzip stream if we were the layer that // requested it. requestedGzip := false
// 如果需要在request中設置可接受的解碼方法,則在request中添加對應的首部。僅支持gzip方式且
// 僅在調用者沒有設置這些首部時設置
if !pc.t.DisableCompression && req.Header.Get("Accept-Encoding") == "" && req.Header.Get("Range") == "" && req.Method != "HEAD" { // Request gzip only, not deflate. Deflate is ambiguous and // not as universally supported anyway. // See: https://zlib.net/zlib_faq.html#faq39 // // Note that we don't request this for HEAD requests, // due to a bug in nginx: // https://trac.nginx.org/nginx/ticket/358 // https://golang.org/issue/5522 // // We don't request gzip if the request is for a range, since // auto-decoding a portion of a gzipped document will just fail // anyway. See https://golang.org/issue/8923 requestedGzip = true req.extraHeaders().Set("Accept-Encoding", "gzip") }
// 用於處理首部含"Expect: 100-continue"的request,客戶端使用該首部探測服務器是否能夠
// 處理request首部中的規格要求(如長度過大的request)。
var continueCh chan struct{} if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() { continueCh = make(chan struct{}, 1) } // HTTP1.1默認使用長連接,當transport設置DisableKeepAlives時會導致處理每個request時都會
// 新建一個連接。此處的處理邏輯是:如果transport設置了DisableKeepAlives,而request沒有設置
// "Connection: close",則為request設置該首部。將底層表現與上層協議保持一致。
if pc.t.DisableKeepAlives && !req.wantsClose() { req.extraHeaders().Set("Connection", "close") }
// 用於在異常場景(如request取消)下通知readLoop,roundTrip是否已經退出,防止ReadLoop發送response阻塞 gone :
= make(chan struct{}) defer close(gone) defer func() { if err != nil { pc.t.setReqCanceler(req.Request, nil) } }() const debugRoundTrip = false // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body.
// 表示發送了多少個字節的request,debug使用 startBytesWritten := pc.nwrite
// 給writeLoop封裝並發送信息,注意此處的先后順序。首先給writeLoop發送數據,阻塞等待writeLoop
// 接收,待writeLoop接收后才能發送數據給readLoop,因此發送request總會優先接收response writeErrCh :
= make(chan error, 1) pc.writech <- writeRequest{req, writeErrCh, continueCh}
// 給readLoop封裝並發送信息 resc :
= make(chan responseAndError) pc.reqch <- requestAndChan{ req: req.Request, ch: resc, addedGzip: requestedGzip, continueCh: continueCh, callerGone: gone, } var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel ctxDoneChan := req.Context().Done()
// 該循環主要用於處理獲取response超時和request取消時的條件跳轉。正常情況下收到reponse
// 退出roundtrip函數
for { testHookWaitResLoop() select {
// writeLoop返回發送request后的結果
case err := <-writeErrCh: if debugRoundTrip { req.logf("writeErrCh resv: %T/%#v", err, err) } if err != nil { pc.close(fmt.Errorf("write error: %v", err)) return nil, pc.mapRoundTripError(req, startBytesWritten, err) }
// 設置一個接收response的定時器,如果在這段時間內沒有接收到response(即沒有進入下面代碼
// 的"case re := <-resc:"分支),超時后進入""case <-respHeaderTimer:分支,關閉連接,
// 防止readLoop一直等待讀取response,導致處理阻塞;沒有超時則關閉定時器
if d := pc.t.ResponseHeaderTimeout; d > 0 { if debugRoundTrip { req.logf("starting timer for %v", d) } timer := time.NewTimer(d) defer timer.Stop() // prevent leaks respHeaderTimer = timer.C }
// 處理底層連接關閉。"case <-cancelChan:"和”case <-ctxDoneChan:“為request關閉,request
// 關閉也會導致底層連接關閉,但必須處理非上層協議導致底層連接關閉的情況。
case <-pc.closech: if debugRoundTrip { req.logf("closech recv: %T %#v", pc.closed, pc.closed) } return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
// 等待獲取response超時,關閉連接
case <-respHeaderTimer: if debugRoundTrip { req.logf("timeout waiting for response headers.") } pc.close(errTimeout) return nil, errTimeout
// 接收到readLoop返回的response結果
case re := <-resc:
// 極異常情況,直接程序panic
if (re.res == nil) == (re.err == nil) { panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil)) } if debugRoundTrip { req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err) } if re.err != nil { return nil, pc.mapRoundTripError(req, startBytesWritten, re.err) } return re.res, nil
// request取消
case <-cancelChan: pc.t.CancelRequest(req.Request)
// 將關閉之后的chan置為nil,用來防止select一直進入該case(close的chan不會阻塞讀,讀取的數據為0) cancelChan
= nil case <-ctxDoneChan: pc.t.cancelRequest(req.Request, req.Context().Err()) cancelChan = nil ctxDoneChan = nil } } }

 writeLoop用於發送request請求

func (pc *persistConn) writeLoop() {
    defer close(pc.writeLoopDone)
    for {
// writeLoop會阻塞等待兩個IO case:
// 循環等待並處理roundTrip發來的writeRequest數據,此時需要發送request;
// 如果底層連接關閉,則退出writeLoop
select { case wr := <-pc.writech: startBytesWritten := pc.nwrite
// 構造request並發送request請求。waitForContinue用於處理首部含"Expect: 100-continue"的request err :
= wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) if bre, ok := err.(requestBodyReadError); ok { err = bre.error // Errors reading from the user's // Request.Body are high priority. // Set it here before sending on the // channels below or calling // pc.close() which tears town // connections and causes other // errors. wr.req.setError(err) }
if err == nil { err = pc.bw.Flush() }
// 請求失敗時,需要關閉request和底層連接
if err != nil { wr.req.Request.closeBody() if pc.nwrite == startBytesWritten { err = nothingWrittenError{err} } }
// 將結果發送給readLoop的
pc.wroteRequest()函數處理
            pc.writeErrCh <- err // to the body reader, which might recycle us
// 將結果返回給roundTrip處理,防止響應超時 wr.ch <- err
// 如果發送request失敗,需要關閉連接。writeLoop退出時會關閉pc.conn和pc.closech,
// 同時會導致readLoop退出
if err != nil { pc.close(err) return } case <-pc.closech: return } } }

readLoop循環接收response響應,成功獲得response后會將連接返回連接池,便於后續復用。當readLoop正常處理完一個response之后,會將連接重新放入到連接池中;

當readloop退出后,該連接會被關閉移除。

func (pc *persistConn) readLoop() {
    closeErr := errReadLoopExiting // default value, if not changed below
// 當writeLoop或readLoop(異常)跳出循環后,都需要關閉底層連接。即一條連接包含writeLoop和readLoop兩個
// 處理,任何一個loop退出(協議升級除外)則該連接不可用
// readLoo跳出循環的正常原因是連接上沒有待處理的請求,此時關閉連接,釋放資源 defer func() { pc.close(closeErr) pc.t.removeIdleConn(pc) }()
// 嘗試將連接放回連接池 tryPutIdleConn :
= func(trace *httptrace.ClientTrace) bool { if err := pc.t.tryPutIdleConn(pc); err != nil { closeErr = err if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled { trace.PutIdleConn(err) } return false } if trace != nil && trace.PutIdleConn != nil { trace.PutIdleConn(nil) } return true } // eofc is used to block caller goroutines reading from Response.Body // at EOF until this goroutines has (potentially) added the connection // back to the idle pool.
// 從上面注釋可以看出該變量主要用於阻塞調用者協程讀取EOF的resp.body,
// 直到該連接重新放入連接池中。處理邏輯與上面先嘗試放入連接池,然后返回response一樣,
// 便於連接快速重用 eofc := make(chan struct{})
// 出現錯誤時也需要釋放讀取resp.Body的協程,防止調用者協程掛死 defer close(eofc)
// unblock reader on errors // Read this once, before loop starts. (to avoid races in tests) testHookMu.Lock() testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead testHookMu.Unlock() alive := true for alive {
// 獲取允許的response首部的最大字節數 pc.readLimit
= pc.maxHeaderResponseSize()
// 從接收buffer中peek一個字節來判斷底層是否接收到response。roundTrip保證了request先於response發送。
// 此處peek會阻塞等待response(這也是roundtrip中設置response超時定時器的原因)。goroutine中的read/write
// 操作都是阻塞模式。 _, err :
= pc.br.Peek(1) pc.mu.Lock()
// 如果期望的response為0,則直接退出readLoop並關閉連接此時連接上沒有需要處理的數據,
// 關閉連接,釋放系統資源。
if pc.numExpectedResponses == 0 { pc.readLoopPeekFailLocked(err) pc.mu.Unlock() return } pc.mu.Unlock() // 阻塞等待roundTrip發來的數據 rc := <-pc.reqch trace := httptrace.ContextClientTrace(rc.req.Context()) var resp *Response
// 如果有response數據,則讀取並解析為Response格式
if err == nil { resp, err = pc.readResponse(rc, trace) } else {
// 可能的錯誤如server端關閉,發送EOF err
= transportReadFromServerError{err} closeErr = err }
// 底層沒有接收到server的任何數據,斷開該連接,可能原因是在client發出request的同時,server關閉
// 了連接。參見transportReadFromServerError的注釋。
if err != nil { if pc.readLimit <= 0 { err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize()) } // 傳遞錯誤信息給roundTrip並退出loop select { case rc.ch <- responseAndError{err: err}: case <-rc.callerGone: return } return } pc.readLimit = maxInt64 // effictively no limit for response bodies pc.mu.Lock() pc.numExpectedResponses-- pc.mu.Unlock()
// 判斷response是否可寫,在使用101 Switching Protocol進行協議升級時需要返回一個可寫的resp.body
// 如果使用了101 Switching Protocol,升級完成后就與transport沒有關系了(后續使用http2或websocket等)
bodyWritable :
= resp.bodyIsWritable()
// 判斷response的body是否為空,如果body為空,則不必讀取body內容(HEAD的resp.body沒有數據) hasBody :
= rc.req.Method != "HEAD" && resp.ContentLength != 0 // 如果server關閉連接或client關閉連接或非預期的響應碼或使用了協議升級,這幾種情況下不能在該連接上繼續
// 接收響應,退出readLoop if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable { // Don't do keep-alive on error if either party requested a close // or we get an unexpected informational (1xx) response. // StatusCode 100 is already handled above. alive = false }
// 此處用於處理body為空或協議升級場景,會嘗試將連接放回連接池,對於后者,連接由調用者管理,退出readLoop
if !hasBody || bodyWritable { pc.t.setReqCanceler(rc.req, nil)
// 在返回response前將連接放回連接池,快速回收利用。回收連接需要按順序滿足:
// 1.alive 為true
// 2.接收到EOF錯誤,此時底層連接關閉,該連接不可用
// 3.成功發送request;

// 此處的執行順序很重要,將連接返回連接池的操作放到最后,即在協議升級的場景,服務端不再

// 發送數據的場景,以及request發送失敗的場景下都不會將連接放回連接池,這些情況會導致
// alive為false,readLoop退出並關閉該連接(協議升級后的連接不能關閉)
alive = alive && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) if bodyWritable {
// 協議升級之后還是會使用同一條連接,設置closeErr為errCallerOwnsConn,這樣在readLoop
// return后不會被pc.close(closeErr)關閉連接 closeErr
= errCallerOwnsConn } select {
// 1:將response成功返回后繼續等待下一個response;
// 2:如果roundTrip退出,(此時無法返回給roundTrip response)則退出readLoop。
// 即roundTrip接收完response后退出不會影響readLoop繼續運行
case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return } // Now that they've read from the unbuffered channel, they're safely // out of the select that also waits on this goroutine to die, so // we're allowed to exit now if needed (if alive is false) testHookReadLoopBeforeNextRead() continue }
// 下面處理response body存在數據的場景,邏輯與body不存在數據的場景類似 waitForBodyRead :
= make(chan bool, 2)
// 初始化body的處理函數,讀取完response會返回EOF,這類連接是可重用的 body :
= &bodyEOFSignal{ body: resp.Body, earlyCloseFn: func() error { waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function return nil },
fn: func(err error) error { isEOF :
= err == io.EOF waitForBodyRead <- isEOF if isEOF { <-eofc // see comment above eofc declaration } else if err != nil { if cerr := pc.canceled(); cerr != nil { return cerr } } return err }, }
//返回的resp.Body類型變為了bodyEOFSignal,如果調用者在讀取resp.Body后沒有關閉,會導致
// readLoop阻塞在下面"case bodyEOF := <-waitForBodyRead:"中
resp.Body
= body if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") { resp.Body = &gzipReader{body: body} resp.Header.Del("Content-Encoding") resp.Header.Del("Content-Length") resp.ContentLength = -1 resp.Uncompressed = true } // 此處與處理不帶resp.body的場景相同 select { case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return } // Before looping back to the top of this function and peeking on // the bufio.Reader, wait for the caller goroutine to finish // reading the response body. (or for cancelation or death) select { case bodyEOF := <-waitForBodyRead: pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool alive = alive &&
// 如果讀取完response的數據,則該連接可以被重用,否則直接釋放。釋放一個未讀取完數據的連接會導致數據丟失。
//
注意區分bodyEOF和pc.sawEOF的區別,一個是上層通道(http response.Body)關閉,一個是底層通道(TCP)關閉。 bodyEOF
&& !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace)
// 釋放阻塞的讀操作
if bodyEOF { eofc <- struct{}{} } case <-rc.req.Cancel: alive = false pc.t.CancelRequest(rc.req) case <-rc.req.Context().Done(): alive = false pc.t.cancelRequest(rc.req, rc.req.Context().Err()) case <-pc.closech: alive = false } testHookReadLoopBeforeNextRead() } }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM