Go Http包解析:為什么需要response.Body.Close()


 

簡單來講就是:為了提高效率,http.Get 等請求的 TCP 連接是不會關閉的(再次向同一個域名請求時,復用連接),所以必須要手動關閉。

2019-01-24 10:43:32 更新

不管是否使用 Resp 的內容都需要手動關閉,否則會導致進程打開的 fd 一直變多,最終系統殺掉進程,報錯類似: http: Accept error: accept tcp [::]:4200: accept4: too many open files; retrying in 1s

參考: http://www.zhangjiee.com/blog/2018/go-http-get-close-body.html

 

最近線上的一個項目遇到了內存泄露的問題,查了heap之后,發現 http包的 dialConn函數竟然占了內存使用的大頭,這個有點懵逼了,后面在網上查詢資料的時候無意間發現一句話

10次內存泄露,有9次是goroutine泄露。

結果發現,正是我認為的不可能的goroutine泄露導致了這次的內存泄露,而goroutine泄露的原因就是 沒有調用 response.Body.Close()

實驗

既然發現是 response.Body.Close() 惹的禍,那就做個實驗證實一下

不close response.Body

func main() { for true { requestWithNoClose() time.Sleep(time.Microsecond * 100) } } func requestWithNoClose() { _, err := http.Get("https://www.baidu.com") if err != nil { fmt.Printf("error occurred while fetching page, error: %s", err.Error()) } fmt.Println("ok") }

close response.Body

func main() { for true { requestWithClose() time.Sleep(time.Microsecond * 10) } } func requestWithClose() { resp, err := http.Get("https://www.baidu.com") if err != nil { fmt.Printf("error occurred while fetching page, error: %s", err.Error()) return } defer resp.Body.Close() fmt.Println("ok") }

結果

同樣的代碼,區別只有 是否resp.Body.Close() 是否被調用,我們運行一段時間后,發現內存差距如此之大

后面,我們就帶着問題,深入一下Http包的底層實現來找出具體原因

結構體

只分析我們可能用會用到的

Transport

type Transport struct { idleMu sync.Mutex wantIdle bool // user has requested to close all idle conns // 空閑的連接 緩存的地方 idleConn map[connectMethodKey][]*persistConn // most recently used at end // connectMethodKey => 空閑連接的chan 形成的map // 有空閑連接放入的時候,首先嘗試放入這個chan,方便另一個可能需要連接的goroutine直接使用,如果沒有goroutine需要連接,就放入到上面的idleConn里面,便於后面的請求連接復用 idleConnCh map[connectMethodKey]chan *persistConn // DisableKeepAlives, if true, disables HTTP keep-alives and // will only use the connection to the server for a single // HTTP request. // // This is unrelated to the similarly named TCP keep-alives. // 是否開啟 keepAlive,為true的話,連接不會被復用 DisableKeepAlives bool // MaxIdleConns controls the maximum number of idle (keep-alive) // connections across all hosts. Zero means no limit. // 所有hosts對應的最大的連接總數 MaxIdleConns int // 每一個host對應的最大的空閑連接數 MaxIdleConnsPerHost int // 每一個host對應的最大連接數 MaxConnsPerHost int }

pconnect

type persistConn struct { // alt optionally specifies the TLS NextProto RoundTripper. // This is used for HTTP/2 today and future protocols later. // If it's non-nil, the rest of the fields are unused. alt RoundTripper t *Transport cacheKey connectMethodKey conn net.Conn tlsState *tls.ConnectionState br *bufio.Reader // from conn bw *bufio.Writer // to conn nwrite int64 // bytes written // roundTrip 往 這個chan 里寫入request,readLoop從這個 chan 讀取request reqch chan requestAndChan // written by roundTrip; read by readLoop // roundTrip 往 這個chan 里寫入request 和 writeErrCh,writeLoop從這個 chan 讀取request寫入大盤 連接 里,並寫入 err 到 writeErrCh chan writech chan writeRequest // written by roundTrip; read by writeLoop closech chan struct{} // closed when conn closed // 判斷body是否讀取完 sawEOF bool // whether we've seen EOF from conn; owned by readLoop // writeErrCh passes the request write error (usually nil) // from the writeLoop goroutine to the readLoop which passes // it off to the res.Body reader, which then uses it to decide // whether or not a connection can be reused. Issue 7569. // writeLoop 寫入 err的 chan writeErrCh chan error // writeLoop 結束的時候關閉 writeLoopDone chan struct{} // closed when write loop ends }

writeRequest

type writeRequest struct { req *transportRequest ch chan<- error // Optional blocking chan for Expect: 100-continue (for receive). // If not nil, writeLoop blocks sending request body until // it receives from this chan. continueCh <-chan struct{} }

requestAndChan

type requestAndChan struct { req *Request ch chan responseAndError // unbuffered; always send in select on callerGone // whether the Transport (as opposed to the user client code) // added the Accept-Encoding gzip header. If the Transport // set it, only then do we transparently decode the gzip. addedGzip bool // Optional blocking chan for Expect: 100-continue (for send). // If the request has an "Expect: 100-continue" header and // the server responds 100 Continue, readLoop send a value // to writeLoop via this chan. continueCh chan<- struct{} callerGone <-chan struct{} // closed when roundTrip caller has returned }

請求流程

這里的函數沒有太多的邏輯,貼出來主要是為了追蹤過程

這里用一個簡單的例子表示

func main() { // 調用 Http 包的 Get 函數處理 resp, err := http.Get("https://www.baidu.com") if err != nil { panic(err) } resp.Body.Close() }

client.Get

var DefaultClient = &Client{} .... // 使用默認的 client, 調用 client.Get 來處理 func Get(url string) (resp *Response, err error) { return DefaultClient.Get(url) }
func (c *Client) Get(url string) (resp *Response, err error) { // request這里的創建忽略 req, err := NewRequest("GET", url, nil) if err != nil { return nil, err } // 調用 client.Do 函數來處理, 然后 client.Do 調用 client.do 來處理,不懂為啥非要多一層嵌套 return c.Do(req) }

client.do

func (c *Client) do(req *Request) (retres *Response, reterr error) { // URL 及 hook檢測,忽略... var ( deadline = c.deadline() reqs []*Request resp *Response copyHeaders = c.makeHeadersCopier(req) reqBodyClosed = false // have we closed the current req.Body? // Redirect behavior: redirectMethod string includeBody bool ) // 錯誤自定義處理,忽略.... for { // 省略無關的代碼.... reqs = append(reqs, req) var err error var didTimeout func() bool // 調用 client.send 方法來獲取response,主要邏輯 if resp, didTimeout, err = c.send(req, deadline); err != nil { // c.send() always closes req.Body reqBodyClosed = true if !deadline.IsZero() && didTimeout() { err = &httpError{ // TODO: early in cycle: s/Client.Timeout exceeded/timeout or context cancelation/ err: err.Error() + " (Client.Timeout exceeded while awaiting headers)", timeout: true, } } return nil, uerr(err) } // 判斷是否需要跳轉,進而進一步請求 var shouldRedirect bool redirectMethod, shouldRedirect, includeBody = redirectBehavior(req.Method, resp, reqs[0]) if !shouldRedirect { return resp, nil } req.closeBody() } 

client.send

func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) { if c.Jar != nil { for _, cookie := range c.Jar.Cookies(req.URL) { req.AddCookie(cookie) } } // 調用 send 方法來獲取 response resp, didTimeout, err = send(req, c.transport(), deadline) if err != nil { return nil, didTimeout, err } if c.Jar != nil { if rc := resp.Cookies(); len(rc) > 0 { c.Jar.SetCookies(req.URL, rc) } } return resp, nil, nil }

send

func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) { req := ireq // req is either the original request, or a modified fork // URL Hader 等判斷及請求fork,忽略.... stopTimer, didTimeout := setRequestCancel(req, rt, deadline) // 調用 Transport.RoundTrip 來處理請求 resp, err = rt.RoundTrip(req) if err != nil { stopTimer() if resp != nil { log.Printf("RoundTripper returned a response & error; ignoring response") } if tlsErr, ok := err.(tls.RecordHeaderError); ok { // If we get a bad TLS record header, check to see if the // response looks like HTTP and give a more helpful error. // See golang.org/issue/11111. if string(tlsErr.RecordHeader[:]) == "HTTP/" { err = errors.New("http: server gave HTTP response to HTTPS client") } } return nil, didTimeout, err } if !deadline.IsZero() { resp.Body = &cancelTimerBody{ stop: stopTimer, rc: resp.Body, reqDidTimeout: didTimeout, } } return resp, nil, nil }

Transport.roundTrip

這里開始接近重點區域了

這個函數主要就是湖區連接,然后獲取response返回

func (t *Transport) roundTrip(req *Request) (*Response, error) { t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) ctx := req.Context() trace := httptrace.ContextClientTrace(ctx) // URL, header schema 等判斷,與主流程無關,忽略... for { // 判斷context 是否完成,超時等 select { case <-ctx.Done(): req.closeBody() return nil, ctx.Err() default: } // treq gets modified by roundTrip, so we need to recreate for each retry. // treq會被 roundTrip 方法修改,所有每一次循環需要創建一個新的 treq := &transportRequest{Request: req, trace: trace} // 根據當前的請求獲取 connectMethod,包含schema和address,方便請求的復用,這里不重要,不做詳細分析 cm, err := t.connectMethodForRequest(treq) if err != nil { req.closeBody() return nil, err } // Get the cached or newly-created connection to either the // host (for http or https), the http proxy, or the http proxy // pre-CONNECTed to https server. In any case, we'll be ready // to send it requests. // 根據請求和connectMethod獲取一個可用的連接,重要,后面會具體分析 pconn, err := t.getConn(treq, cm) if err != nil { t.setReqCanceler(req, nil) req.closeBody() return nil, err } var resp *Response if pconn.alt != nil { // HTTP/2 path. // http2 使用,這里不展開 t.decHostConnCount(cm.key()) // don't count cached http2 conns toward conns per host t.setReqCanceler(req, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { // 獲取response,這里是重點,后面展開 resp, err = pconn.roundTrip(treq) } // 判斷獲取response是否有誤及錯誤處理等操作,無關緊要,忽略 } }

接下來,進入重點分析了 getConn persistConn.roundTrip Transport.dialConn 以及內存泄露的罪魁禍首 persistConn.readLoop persistConn.writeLoop

Transport.getConn

這個方法根據connectMethod,也就是 schema和addr(忽略proxy代理),復用連接或者創建一個新的連接,同時開啟了兩個goroutine,分別 讀取response 和 寫request

func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) { // trace相關的忽略... req := treq.Request trace := treq.trace ctx := req.Context() if trace != nil && trace.GetConn != nil { trace.GetConn(cm.addr()) } // 從idleConn里面獲取一個 connectMethod對應的空閑的 連接,獲取到了直接返回 if pc, idleSince := t.getIdleConn(cm); pc != nil { if trace != nil && trace.GotConn != nil { trace.GotConn(pc.gotIdleConnTrace(idleSince)) } // set request canceler to some non-nil function so we // can detect whether it was cleared between now and when // we enter roundTrip t.setReqCanceler(req, func(error) {}) return pc, nil } type dialRes struct { pc *persistConn err error } // 沒有獲取到空閑連接,定義一個 dialRes 結構體,用於接收 自身創建的另一個goroutine創建的 連接 dialc := make(chan dialRes) cmKey := cm.key() // Copy these hooks so we don't race on the postPendingDial in // the goroutine we launch. Issue 11136. testHookPrePendingDial := testHookPrePendingDial testHookPostPendingDial := testHookPostPendingDial // 處理 dialc 中暫時用不到的連接的方法,后面會講到為什么有可能創建的連接是沒有人使用的 handlePendingDial := func() { testHookPrePendingDial() go func() { if v := <-dialc; v.err == nil { t.putOrCloseIdleConn(v.pc) } else { t.decHostConnCount(cmKey) } testHookPostPendingDial() }() } cancelc := make(chan error, 1) t.setReqCanceler(req, func(err error) { cancelc <- err }) // 忽略部分判斷... // 開啟一個goroutine,去創建一個連接,dialConn 是重點,后面深入分析 go func() { pc, err := t.dialConn(ctx, cm) dialc <- dialRes{pc, err} }() // 獲取 idleChan中對應connectMethod的 channel idleConnCh := t.getIdleConnCh(cm) // 從多個chan中獲取連接,獲取取消信號,先來的先處理 select { case v := <-dialc: // 上面 goroutine首先創建完成了一個連接,使用這個鏈接 // Our dial finished. if v.pc != nil { if trace != nil && trace.GotConn != nil && v.pc.alt == nil { trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn}) } return v.pc, nil } // Our dial failed. See why to return a nicer error // value. t.decHostConnCount(cmKey) select { case <-req.Cancel: // It was an error due to cancelation, so prioritize that // error value. (Issue 16049) return nil, errRequestCanceledConn case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err default: // It wasn't an error due to cancelation, so // return the original error message: return nil, v.err } case pc := <-idleConnCh: // 另一個goroutine的request首先完成了,然后會把這個鏈接首先嘗試放入對應connectMethod對應的 chan,如果放入不了,則放入idleConns的map中,進入這里說明,另一個goroutine把連接放入了chan,並被當前goroutine捕獲了,那么上面 // go func() { // pc, err := t.dialConn(ctx, cm) // dialc <- dialRes{pc, err} // }() // 生成的連接就暫時沒用了,這時候就用到上面 handlePendingDial 定義的方法,去處理這個多余的連接 // Another request finished first and its net.Conn // became available before our dial. Or somebody // else's dial that they didn't use. // But our dial is still going, so give it away // when it finishes: handlePendingDial() if trace != nil && trace.GotConn != nil { trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()}) } return pc, nil case <-req.Cancel: handlePendingDial() return nil, errRequestCanceledConn case <-req.Context().Done(): handlePendingDial() return nil, req.Context().Err() case err := <-cancelc: handlePendingDial() if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err } }

上面的 handlePendingDial 方法中,調用了 putOrCloseIdleConn,這個方法到底干了什么,跟 idleConnCh 和 idleConn 有什么關系?

Transport.putOrCloseIdleConn

func (t *Transport) putOrCloseIdleConn(pconn *persistConn) { if err := t.tryPutIdleConn(pconn); err != nil { pconn.close(err) } }
Transport.tryPutIdleConn
func (t *Transport) tryPutIdleConn(pconn *persistConn) error { if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 { return errKeepAlivesDisabled } if pconn.isBroken() { return errConnBroken } // http2判斷 if pconn.alt != nil { return errNotCachingH2Conn } // 標記為 reused pconn.markReused() // cacheKey是由 connectMethod得到的 key := pconn.cacheKey t.idleMu.Lock() defer t.idleMu.Unlock() // 獲取connectMethod對應的 idleConnCh waitingDialer := t.idleConnCh[key] select { case waitingDialer <- pconn: // 在這里嘗試將 連接 放到 connectMethod對應的chan里面,如果沒有另一個goroutine接收就算了 // We're done with this pconn and somebody else is // currently waiting for a conn of this type (they're // actively dialing, but this conn is ready // first). Chrome calls this socket late binding. See // https://insouciant.org/tech/connection-management-in-chromium/ return nil default: // 沒有另一個goroutine接收的chan,從map中刪除,便於垃圾回收 if waitingDialer != nil { // They had populated this, but their dial won // first, so we can clean up this map entry. delete(t.idleConnCh, key) } } if t.wantIdle { return errWantIdle } if t.idleConn == nil { t.idleConn = make(map[connectMethodKey][]*persistConn) } idles := t.idleConn[key] // 設定了每個 connectMethod對應的最大空閑連接數,超過就不再往里面填充 if len(idles) >= t.maxIdleConnsPerHost() { return errTooManyIdleHost } for _, exist := range idles { if exist == pconn { log.Fatalf("dup idle pconn %p in freelist", pconn) } } // 后面就是清理多有的連接,及重置timer等操作,與主流程無關,不展開分析 t.idleConn[key] = append(idles, pconn) t.idleLRU.add(pconn) if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns { oldest := t.idleLRU.removeOldest() oldest.close(errTooManyIdle) t.removeIdleConnLocked(oldest) } if t.IdleConnTimeout > 0 { if pconn.idleTimer != nil { pconn.idleTimer.Reset(t.IdleConnTimeout) } else { pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle) } } pconn.idleAt = time.Now() return nil }

Transport.dialConn

跑偏了一會,現在接着 getConn分析 dialConn 這個函數

這個函數主要就是創建了一個 連接,然后 創建了兩個goroutine,分別去往這個連接寫入請求(writeLoop函數)和讀取響應(readLoop函數)

而這兩個函數,又會與 persistConn.roundTrip 通過chan進行關聯,這里先對函數進行分析,分析完成后,再畫出對應的關聯圖示

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) { pconn := &persistConn{ t: t, cacheKey: cm.key(), reqch: make(chan requestAndChan, 1), writech: make(chan writeRequest, 1), closech: make(chan struct{}), writeErrCh: make(chan error, 1), writeLoopDone: make(chan struct{}), } trace := httptrace.ContextClientTrace(ctx) wrapErr := func(err error) error { if cm.proxyURL != nil { // Return a typed error, per Issue 16997 return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err} } return err } // 構建一個 TLS的連接 if cm.scheme() == "https" && t.DialTLS != nil { var err error pconn.conn, err = t.DialTLS("tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } if pconn.conn == nil { return nil, wrapErr(errors.New("net/http: Transport.DialTLS returned (nil, nil)")) } if tc, ok := pconn.conn.(*tls.Conn); ok { // Handshake here, in case DialTLS didn't. TLSNextProto below // depends on it for knowing the connection state. if trace != nil && trace.TLSHandshakeStart != nil { trace.TLSHandshakeStart() } if err := tc.Handshake(); err != nil { go pconn.conn.Close() if trace != nil && trace.TLSHandshakeDone != nil { trace.TLSHandshakeDone(tls.ConnectionState{}, err) } return nil, err } cs := tc.ConnectionState() if trace != nil && trace.TLSHandshakeDone != nil { trace.TLSHandshakeDone(cs, nil) } pconn.tlsState = &cs } } else { // 構建一個普通的tcp連接 conn, err := t.dial(ctx, "tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } pconn.conn = conn if cm.scheme() == "https" { var firstTLSHost string if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil { return nil, wrapErr(err) } if err = pconn.addTLS(firstTLSHost, trace); err != nil { return nil, wrapErr(err) } } } // proxy 設置、 協議轉換等,忽略... if t.MaxConnsPerHost > 0 { pconn.conn = &connCloseListener{Conn: pconn.conn, t: t, cmKey: pconn.cacheKey} } pconn.br = bufio.NewReader(pconn) pconn.bw = bufio.NewWriter(persistConnWriter{pconn}) go pconn.readLoop() go pconn.writeLoop() return pconn, nil }
persistConn.readLoop

readLoop 這里從連接中讀取 response,然后通過chan發送給persistConn.roundTrip,最后等待結束

func (pc *persistConn) readLoop() { closeErr := errReadLoopExiting // default value, if not changed below defer func() { // 關閉這個鏈接,這里的關閉函數 相當於 close(pc.closech),然后 writeLoop 的 <-pc.closech 就不會阻塞,而正常退出了,這樣就可以理解,為什么readLoop函數退出后,writeLoop函數也就退出了 pc.close(closeErr) pc.t.removeIdleConn(pc) }() // 這個函數同上面分析的 Transport.tryPutIdleConn tryPutIdleConn := func(trace *httptrace.ClientTrace) bool { if err := pc.t.tryPutIdleConn(pc); err != nil { closeErr = err if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled { trace.PutIdleConn(err) } return false } if trace != nil && trace.PutIdleConn != nil { trace.PutIdleConn(nil) } return true } // eofc is used to block caller goroutines reading from Response.Body // at EOF until this goroutines has (potentially) added the connection // back to the idle pool. eofc := make(chan struct{}) defer close(eofc) // unblock reader on errors // 省略部分... alive := true for alive { pc.readLimit = pc.maxHeaderResponseSize() _, err := pc.br.Peek(1) pc.mu.Lock() if pc.numExpectedResponses == 0 { pc.readLoopPeekFailLocked(err) pc.mu.Unlock() return } pc.mu.Unlock() // 從當前連接中獲取request, 這里標記為 m1 rc := <-pc.reqch trace := httptrace.ContextClientTrace(rc.req.Context()) var resp *Response if err == nil { // 從請求中獲取response,就是那么簡單 resp, err = pc.readResponse(rc, trace) } else { err = transportReadFromServerError{err} closeErr = err } // 如果讀取response失敗,則包裝錯誤返回給 上層,即 persistConn.roundTrip 函數 if err != nil { if pc.readLimit <= 0 { err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize()) } select { case rc.ch <- responseAndError{err: err}: case <-rc.callerGone: return } return } pc.readLimit = maxInt64 // effictively no limit for response bodies pc.mu.Lock() pc.numExpectedResponses-- pc.mu.Unlock() hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 if resp.Close || rc.req.Close || resp.StatusCode <= 199 { // Don't do keep-alive on error if either party requested a close // or we get an unexpected informational (1xx) response. // StatusCode 100 is already handled above. alive = false } // body為空的處理,忽略.... waitForBodyRead := make(chan bool, 2) body := &bodyEOFSignal{ body: resp.Body, // resp.Body.Close() 的最終調用的函數, Close()影響readLoop 和 writeLoop 兩個goroutine 這兩個goroutine的關閉,在后面講close的時候具體介紹 earlyCloseFn: func() error { waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function return nil }, // 上面函數出錯后,會調用這個函數,這個函數影響readLoop 和 writeLoop 兩個goroutine的形式,與上面的邏輯大致相同 fn: func(err error) error { isEOF := err == io.EOF waitForBodyRead <- isEOF if isEOF { <-eofc // see comment above eofc declaration } else if err != nil { if cerr := pc.canceled(); cerr != nil { return cerr } } return err }, } // 組裝resp resp.Body = body if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") { resp.Body = &gzipReader{body: body} resp.Header.Del("Content-Encoding") resp.Header.Del("Content-Length") resp.ContentLength = -1 resp.Uncompressed = true } // 將resp通過chan返回給 persistConn.roundTrip 函數 select { case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return } // Before looping back to the top of this function and peeking on // the bufio.Reader, wait for the caller goroutine to finish // reading the response body. (or for cancelation or death) // 阻塞在這里,等待 請求 body close 或 請求cancel 或 context done 或 pc.closech select { case bodyEOF := <-waitForBodyRead: pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool alive = alive && bodyEOF && !pc.sawEOF && pc.wroteRequest() && tryPutIdleConn(trace) if bodyEOF { eofc <- struct{}{} } case <-rc.req.Cancel: alive = false pc.t.CancelRequest(rc.req) case <-rc.req.Context().Done(): alive = false pc.t.cancelRequest(rc.req, rc.req.Context().Err()) case <-pc.closech: alive = false } testHookReadLoopBeforeNextRead() } }
persistConn.writeLoop

相對於persistConn.readLoop, 這個函數就簡單很多,其主要功能也就是往連接里面寫request請求

func (pc *persistConn) writeLoop() { defer close(pc.writeLoopDone) for { select { // 首先通過pc.writech chan 從 persistConn.roundTrip 函數中獲取 writeRequest, 可以簡單理解為 request case wr := <-pc.writech: startBytesWritten := pc.nwrite err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) if bre, ok := err.(requestBodyReadError); ok { err = bre.error // Errors reading from the user's // Request.Body are high priority. // Set it here before sending on the // channels below or calling // pc.close() which tears town // connections and causes other // errors. wr.req.setError(err) } if err == nil { err = pc.bw.Flush() } if err != nil { wr.req.Request.closeBody() if pc.nwrite == startBytesWritten { err = nothingWrittenError{err} } } // 把 err 通過 chan 返回給 persistConn.roundTrip 函數,persistConn.roundTrip 函數判斷 err是否為 nil及相應的處理 pc.writeErrCh <- err // to the body reader, which might recycle us wr.ch <- err // to the roundTrip function if err != nil { // 如果 寫入請求出現錯誤,這里關閉,pc.closech chan,readLoop的第151行就會停止阻塞,將alive設為false,進而結束循環,終止 readLoop的goroutine pc.close(err) return } case <-pc.closech: // 這里結束阻塞,是由 readLoop 結束是,調用 第3行的 defer函數,關閉 pc.closech chan 導致的 return } } }

persistConn.roundTrip

無論是 persistConn.readLoop 還是 persistConn.writeLoop 都避免不了和這個函數交互,這個函數的重要性也就不言而喻了

但是 這個函數的主要邏輯就是 創建個連接的 writeRequest chan, 也就是 writeLoop 用到的chan,然后把request 通過這個 chan 傳給 persistConn.writeLoop ,然后 在創建一個 responseAndError chan,也就是 readLoop 用到的chan,從 這個chan中獲取 persistConn.readLoop 獲取到的 response,最后把 response返回給上層函數

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { // 設置 request的頭信息,cancel函數,省略.... var continueCh chan struct{} if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() { continueCh = make(chan struct{}, 1) } gone := make(chan struct{}) defer close(gone) defer func() { if err != nil { pc.t.setReqCanceler(req.Request, nil) } }() const debugRoundTrip = false // Write the request concurrently with waiting for a response, // in case the server decides to reply before reading our full // request body. startBytesWritten := pc.nwrite writeErrCh := make(chan error, 1) // 把 request 放入 writech chan 里面,這樣, `persistConn.writeLoop` 的第6行就可以拿到 request,往 連接 里面寫入請求信息了 pc.writech <- writeRequest{req, writeErrCh, continueCh} // 定義responseAndError chan,並放入 reqch chan 里面,這樣 `persistConn.readLoop` 的第46行,也就是 m1標記的地方,就可以解除阻塞,開始獲取 response的邏輯了 resc := make(chan responseAndError) pc.reqch <- requestAndChan{ req: req.Request, ch: resc, addedGzip: requestedGzip, continueCh: continueCh, callerGone: gone, } var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel ctxDoneChan := req.Context().Done() // 阻塞在這里,直到 獲取 writeLoop 返回的寫入錯誤或 pc.closech的關閉信息,連接超時的信息或 readLoop的 resp或cancel或ctx done的信息 for { testHookWaitResLoop() select { // 這里獲取到 writeLoop的寫入信息,可能是err,也可能不是,下面做對應的處理 case err := <-writeErrCh: if debugRoundTrip { req.logf("writeErrCh resv: %T/%#v", err, err) } if err != nil { pc.close(fmt.Errorf("write error: %v", err)) return nil, pc.mapRoundTripError(req, startBytesWritten, err) } // 寫入request沒有問題,判斷是否有超時 if d := pc.t.ResponseHeaderTimeout; d > 0 { if debugRoundTrip { req.logf("starting timer for %v", d) } timer := time.NewTimer(d) defer timer.Stop() // prevent leaks respHeaderTimer = timer.C } // 到此獲取到了 writeLoop的信息,但是並沒有return,進入下一個循環 case <-pc.closech: // closech的關閉信息 if debugRoundTrip { req.logf("closech recv: %T %#v", pc.closed, pc.closed) } return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed) case <-respHeaderTimer: // 等待超時的信息 if debugRoundTrip { req.logf("timeout waiting for response headers.") } pc.close(errTimeout) return nil, errTimeout case re := <-resc: // 從 readLoop獲取到 response if (re.res == nil) == (re.err == nil) { panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil)) } if debugRoundTrip { req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err) } if re.err != nil { return nil, pc.mapRoundTripError(req, startBytesWritten, re.err) } // 返回 response,這里結束循環 return re.res, nil case <-cancelChan: pc.t.CancelRequest(req.Request) cancelChan = nil case <-ctxDoneChan: pc.t.cancelRequest(req.Request, req.Context().Err()) cancelChan = nil ctxDoneChan = nil } } }

交互圖示

上面 persistConn.roundTrip persistConn.readLoop persistConn.writeLoop 之間的數據交互,可能靠語言比較蒼白,這里畫一下圖示

小結

綜上分析,可以發現,readLoop和 writeLoop 兩個goroutine在 寫入請求並獲取response返回后,並沒有跳出for循環,而繼續阻塞在 下一次for循環的select 語句里面,所以,兩個函數所在的goroutine並沒有運行結束,導致了最開的現象: goroutine持續增加導致內存持續增加

 

 

轉自: https://segmentfault.com/a/1190000020086816


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM