一次golang fasthttp踩坑經驗


一個簡單的系統,結構如下:

 

 

 我們的服務A接受外部的http請求,然后通過golang的fasthttp將請求轉發給服務B,流程非常簡單。線上運行一段時間之后,發現服務B完全不再接收任何請求,查看服務A的日志,發現大量的如下錯誤

 

 

 

 

   從錯誤原因看是因為連接被占滿導致的。進入服務A的容器中(服務A和服務B都是通過docker啟動的),通過netstat -anlp查看,發現有大量的tpc連接,處於ESTABLISH。我們采用的是長連接的方式,此時心里非常疑惑:1. fasthttp是能夠復用連接的,為什么還會有如此多的TCP連接,2.為什么這些連接不能夠使用了,出現上述異常,原因是什么?

  從fasthttpclient源碼出發,我們調用請求轉發的時候是用的是

f.Client.DoTimeout(req, resp, f.ExecTimeout),其中f.Client是一個fasthttp.HostClient,f.ExecTimeout設置的是5s。
追查代碼,直到client.go中的這個方法
func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) {
	if req == nil {
		panic("BUG: req cannot be nil")
	}
	if resp == nil {
		panic("BUG: resp cannot be nil")
	}

	atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix))

	// Free up resources occupied by response before sending the request,
	// so the GC may reclaim these resources (e.g. response body).
	resp.Reset()

	// If we detected a redirect to another schema
	if req.schemaUpdate {
		c.IsTLS = bytes.Equal(req.URI().Scheme(), strHTTPS)
		c.Addr = addMissingPort(string(req.Host()), c.IsTLS)
		c.addrIdx = 0
		c.addrs = nil
		req.schemaUpdate = false
		req.SetConnectionClose()
	}

	cc, err := c.acquireConn()
	if err != nil {
		return false, err
	}
	conn := cc.c

	resp.parseNetConn(conn)

	if c.WriteTimeout > 0 {
		// Set Deadline every time, since golang has fixed the performance issue
		// See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
		currentTime := time.Now()
		if err = conn.SetWriteDeadline(currentTime.Add(c.WriteTimeout)); err != nil {
			c.closeConn(cc)
			return true, err
		}
	}

	resetConnection := false
	if c.MaxConnDuration > 0 && time.Since(cc.createdTime) > c.MaxConnDuration && !req.ConnectionClose() {
		req.SetConnectionClose()
		resetConnection = true
	}

	userAgentOld := req.Header.UserAgent()
	if len(userAgentOld) == 0 {
		req.Header.userAgent = c.getClientName()
	}
	bw := c.acquireWriter(conn)
	err = req.Write(bw)

	if resetConnection {
		req.Header.ResetConnectionClose()
	}

	if err == nil {
		err = bw.Flush()
	}
	if err != nil {
		c.releaseWriter(bw)
		c.closeConn(cc)
		return true, err
	}
	c.releaseWriter(bw)

	if c.ReadTimeout > 0 {
		// Set Deadline every time, since golang has fixed the performance issue
		// See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
		currentTime := time.Now()
		if err = conn.SetReadDeadline(currentTime.Add(c.ReadTimeout)); err != nil {
			c.closeConn(cc)
			return true, err
		}
	}

	if !req.Header.IsGet() && req.Header.IsHead() {
		resp.SkipBody = true
	}
	if c.DisableHeaderNamesNormalizing {
		resp.Header.DisableNormalizing()
	}

	br := c.acquireReader(conn)
	if err = resp.ReadLimitBody(br, c.MaxResponseBodySize); err != nil {
		c.releaseReader(br)
		c.closeConn(cc)
		// Don't retry in case of ErrBodyTooLarge since we will just get the same again.
		retry := err != ErrBodyTooLarge
		return retry, err
	}
	c.releaseReader(br)

	if resetConnection || req.ConnectionClose() || resp.ConnectionClose() {
		c.closeConn(cc)
	} else {
		c.releaseConn(cc)
	}

	return false, err
}

  請注意c.acquireConn()這個方法,這個方法即從連接池中獲取連接,如果沒有可用連接,則創建新的連接,該方法實現如下

func (c *HostClient) acquireConn() (*clientConn, error) {
	var cc *clientConn
	createConn := false
	startCleaner := false

	var n int
	c.connsLock.Lock()
	n = len(c.conns)
	if n == 0 {
		maxConns := c.MaxConns
		if maxConns <= 0 {
			maxConns = DefaultMaxConnsPerHost
		}
		if c.connsCount < maxConns {
			c.connsCount++
			createConn = true
			if !c.connsCleanerRun {
				startCleaner = true
				c.connsCleanerRun = true
			}
		}
	} else {
		n--
		cc = c.conns[n]
		c.conns[n] = nil
		c.conns = c.conns[:n]
	}
	c.connsLock.Unlock()

	if cc != nil {
		return cc, nil
	}
	if !createConn {
		return nil, ErrNoFreeConns
	}

	if startCleaner {
		go c.connsCleaner()
	}

	conn, err := c.dialHostHard()
	if err != nil {
		c.decConnsCount()
		return nil, err
	}
	cc = acquireClientConn(conn)

	return cc, nil
}

其中 ErrNoFreeConns 即為errors.New("no free connections available to host"),該錯誤就是我們服務中出現的錯誤。那原因很明顯就是因為!createConn,即無法創建新的連接,為什么無法創建新的連接,是因為連接數已經達到了 maxConns =  DefaultMaxConnsPerHost = 512(默認值)。連接數達到最大值了,但是為什么連接沒有回收也沒有復用,從這塊看,還是沒有看出來。又仔細的查了一下業務代碼,發現很多服務A到服務B的請求,都是因為超時了而結束的,即達到了 f.ExecTimeout = 5s。

又從頭查看源碼,終於發現了玄機。

func clientDoDeadline(req *Request, resp *Response, deadline time.Time, c clientDoer) error {
	timeout := -time.Since(deadline)
	if timeout <= 0 {
		return ErrTimeout
	}

	var ch chan error
	chv := errorChPool.Get()
	if chv == nil {
		chv = make(chan error, 1)
	}
	ch = chv.(chan error)

	// Make req and resp copies, since on timeout they no longer
	// may be accessed.
	reqCopy := AcquireRequest()
	req.copyToSkipBody(reqCopy)
	swapRequestBody(req, reqCopy)
	respCopy := AcquireResponse()
	if resp != nil {
		// Not calling resp.copyToSkipBody(respCopy) here to avoid
		// unexpected messing with headers
		respCopy.SkipBody = resp.SkipBody
	}

	// Note that the request continues execution on ErrTimeout until
	// client-specific ReadTimeout exceeds. This helps limiting load
	// on slow hosts by MaxConns* concurrent requests.
	//
	// Without this 'hack' the load on slow host could exceed MaxConns*
	// concurrent requests, since timed out requests on client side
	// usually continue execution on the host.

	var mu sync.Mutex
	var timedout bool
        //這個goroutine是用來處理連接以及發送請求的
	go func() {
		errDo := c.Do(reqCopy, respCopy)
		mu.Lock()
		{
			if !timedout {
				if resp != nil {
					respCopy.copyToSkipBody(resp)
					swapResponseBody(resp, respCopy)
				}
				swapRequestBody(reqCopy, req)
				ch <- errDo
			}
		}
		mu.Unlock()

		ReleaseResponse(respCopy)
		ReleaseRequest(reqCopy)
	}()
        //這塊內容是用來處理超時的
	tc := AcquireTimer(timeout)
	var err error
	select {
	case err = <-ch:
	case <-tc.C:
		mu.Lock()
		{
			timedout = true
			err = ErrTimeout
		}
		mu.Unlock()
	}
	ReleaseTimer(tc)

	select {
	case <-ch:
	default:
	}
	errorChPool.Put(chv)

	return err
}

  我們看到,請求的超時時間是如何處理的。當我的請求超時后,主流程直接返回了超時錯誤,而此時,goroutine里面還在等待請求的返回,而偏偏B服務,由於一些情況會拋出異常,也就是沒有對這個請求進行返回,從而導致這個鏈接一直未得到釋放,終於解答了為什么有大量的連接一直被占有從而導致無連接可用的情況。

  最后,當我心里還在腹誹為什么fasthttp這么優秀的框架會有這種問題,如果服務端拋異常(不對請求進行返回)就會把連接打滿?又自己看了一下代碼,原來,

// DoTimeout performs the given request and waits for response during
// the given timeout duration.
//
// Request must contain at least non-zero RequestURI with full url (including
// scheme and host) or non-zero Host header + RequestURI.
//
// The function doesn't follow redirects. Use Get* for following redirects.
//
// Response is ignored if resp is nil.
//
// ErrTimeout is returned if the response wasn't returned during
// the given timeout.
//
// ErrNoFreeConns is returned if all HostClient.MaxConns connections
// to the host are busy.
//
// It is recommended obtaining req and resp via AcquireRequest
// and AcquireResponse in performance-critical code.
//
// Warning: DoTimeout does not terminate the request itself. The request will
// continue in the background and the response will be discarded.
// If requests take too long and the connection pool gets filled up please
// try setting a ReadTimeout.
func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
	return clientDoTimeout(req, resp, timeout, c)
}

  人家這個方法的注釋早就說明了,看最后一段注釋,大意就是超時之后,請求依然會繼續等待返回值,只是返回值會被丟棄,如果請求時間太長,會把連接池占滿,正好是我們遇到的問題。為了解決,需要設置ReadTimeout字段,這個字段的我個人理解的意思就是當請求發出之后,達到ReadTimeout時間還沒有得到返回值,客戶端就會把連接斷開(釋放)。

   以上就是這次經驗之談,切記,使用fasthttp的時候,加上ReadTimeout字段。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM