golang timeoutHandler解析及kubernetes中的變種


Golang里的http request timeout比較簡單,但是稍不留心就容易出現錯誤,最近在kubernetes生產環境中出現了的一個問題讓我有機會好好捋一捋golang中關於timeout中的所有相關的東西。

Basic

golang中timeout有關的設置, 資料已經比較多, 其中必須閱讀的就是The complete guide to Go net/http timeouts,里面詳述了關於http中各個timeou字段及其影響, 寫的很詳細, 本文就不在重復造輪子了。 所以我們在生產環境中的代碼絕對不能傻傻的使用http.Get("www.baidu.com")了, 很容易造成client hang死, 默認的http client的timeout值為0, 也就是沒有超時。具體的血淚教訓可以參見Don’t use Go’s default HTTP client (in production)。對於http package中default的設置最后還是仔細review一遍再使用。

Advanced

golang http.TimeoutHandler

了解了基本的使用方式后,筆者帶領大家解析一下其中的http.TimeoutHandlerTimeoutHandler顧名思義是一個handler wrapper, 用來限制ServeHttp的最大時間,也就是除去讀寫socket外真正執行服務器邏輯的時間,如果ServeHttp運行時間超過了設定的時間, 將返回一個"503 Service Unavailable" 和一個指定的message。 (golang net中各個結構體中各種timeout的不盡相同,但是並沒有直接設置ServeHttp timeout的方法, TimeoutHandler是唯一一個方法)。
我們來一起探究一下他的實現, 首先是函數定義:  

// TimeoutHandler returns a Handler that runs h with the given time limit.
//
// The new Handler calls h.ServeHTTP to handle each request, but if a
// call runs for longer than its time limit, the handler responds with
// a 503 Service Unavailable error and the given message in its body.
// (If msg is empty, a suitable default message will be sent.)
// After such a timeout, writes by h to its ResponseWriter will return
// ErrHandlerTimeout.
//
// TimeoutHandler buffers all Handler writes to memory and does not
// support the Hijacker or Flusher interfaces.
func TimeoutHandler(h Handler, dt time.Duration, msg string) Handler {
	return &timeoutHandler{
		handler: h,
		body:    msg,
		dt:      dt,
	}
}

可以看到典型的handler wrapper的函數signature, 接收一個handler並返回一個hander, 返回的timeout handler中ServeHttp方法如下:

func (h *timeoutHandler) ServeHTTP(w ResponseWriter, r *Request) {
	ctx := h.testContext
	if ctx == nil {
		var cancelCtx context.CancelFunc
		ctx, cancelCtx = context.WithTimeout(r.Context(), h.dt)
		defer cancelCtx()
	}
	r = r.WithContext(ctx)
	done := make(chan struct{})
	tw := &timeoutWriter{
		w: w,
		h: make(Header),
	}
	panicChan := make(chan interface{}, 1)
	go func() {
		defer func() {
			if p := recover(); p != nil {
				panicChan <- p
			}
		}()
		h.handler.ServeHTTP(tw, r)
		close(done)
	}()
	select {
	case p := <-panicChan:
		panic(p)
	case <-done:
		tw.mu.Lock()
		defer tw.mu.Unlock()
		dst := w.Header()
		for k, vv := range tw.h {
			dst[k] = vv
		}
		if !tw.wroteHeader {
			tw.code = StatusOK
		}
		w.WriteHeader(tw.code)
		w.Write(tw.wbuf.Bytes())
	case <-ctx.Done():
		tw.mu.Lock()
		defer tw.mu.Unlock()
		w.WriteHeader(StatusServiceUnavailable)
		io.WriteString(w, h.errorBody())
		tw.timedOut = true
	}
}

整體流程為:

  1. 首先初始化context的timeout
  2. 初始化一個timeoutWriter, 該timeoutWriter實現了http.ResponseWriter接口, 內部結構體中有一個bytes.Buffer, 所有的Write方法都是寫入到該buffer中。
  3. 異步goroutine調用serveHttp方法, timeoutWriter作為serveHttp的參數, 所以此時寫入的數據並沒有發送給用戶, 而是緩存到了timeoutWriter的buffer中
  4. 最后select監聽各個channel:
    1. 如果子groutine panic,則捕獲該panic並在主grouinte中panic進行propagate
    2. 如果請求正常完成則開始寫入header並將buffer中的內容寫給真正的http writer
    3. 如果請求超時則返回用戶503

為什么需要先寫入buffer, 然后在寫給真正的writer吶? 因為我們無法嚴格意義上的cancel掉一個請求。如果我們已經往一個http writer中寫了部分數據(例如已經寫了hedaer), 而此時因為某些邏輯處理較慢, 並且發現已經過了timeout閾值, 想要cancel該請求。此時已經沒有辦法真正意義上取消了,可能對端已經讀取了部分數據了。一個典型的場景是HTTP/1.1中的分塊傳輸, 我們先寫入header, 然后依次寫入各個chunk, 如果后面的chunk還沒寫已經超時了, 那此時就陷入了兩難的情況。
此時就需要使用golang內置的TimeoutHandler了,它提供了兩個優勢:

  1. 首先是提供了一個buffer, 等到所有的數據寫入完成, 如果此時沒有超時再統一發送給對端。 並且timeoutWriter在每次Write的時候都會判斷此時是否超時, 如果超時就馬上返回錯誤。
  2. 給用戶返回一個友好的503提示

實現上述兩點的代價就是需要維護一個buffer來緩存所有的數據。有些情況下是這個buffer會導致一定的問題,設想一下對於一個高吞吐的server, 每個請求都維護一個buffer勢必是不可接受的, 以kubernete為例, 每次list pods時可能有好幾M的數據, 如果每個請求都寫緩存勢必會占用過多內存, 那kubernetes是如何實現timeout的吶?

kubernetes timeout Handler

kubernetes 為了防止某個請求hang死之后一直占用連接, 所以會對每個請求進行timeout的處理, 這部分邏輯是在一個handler chain中WithTimeoutForNonLongRunningRequests handler實現。其中返回的WithTimeout的實現如下:

// WithTimeout returns an http.Handler that runs h with a timeout
// determined by timeoutFunc. The new http.Handler calls h.ServeHTTP to handle
// each request, but if a call runs for longer than its time limit, the
// handler responds with a 504 Gateway Timeout error and the message
// provided. (If msg is empty, a suitable default message will be sent.) After
// the handler times out, writes by h to its http.ResponseWriter will return
// http.ErrHandlerTimeout. If timeoutFunc returns a nil timeout channel, no
// timeout will be enforced. recordFn is a function that will be invoked whenever
// a timeout happens.
func WithTimeout(h http.Handler, timeoutFunc func(*http.Request) (timeout <-chan time.Time, recordFn func(), err *apierrors.StatusError)) http.Handler {
	return &timeoutHandler{h, timeoutFunc}
}

其中主要是timeoutHandler, 實現如下:

type timeoutHandler struct {
	handler http.Handler
	timeout func(*http.Request) (<-chan time.Time, func(), *apierrors.StatusError)
}

func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	after, recordFn, err := t.timeout(r)
	if after == nil {
		t.handler.ServeHTTP(w, r)
		return
	}

	result := make(chan interface{})
	tw := newTimeoutWriter(w)
	go func() {
		defer func() {
			result <- recover()
		}()
		t.handler.ServeHTTP(tw, r)
	}()
	select {
	case err := <-result:
		if err != nil {
			panic(err)
		}
		return
	case <-after:
		recordFn()
		tw.timeout(err)
	}
}

如上, 在ServeHTTP中主要做了幾件事情:

  1. 調用timeoutHandler.timeout設置一個timer, 如果timeout時間到到達會通過after這個channel傳遞過來, 后面會監聽該channel
  2. 創建timeoutWriter對象, 該timeoutWriter中有一個timeout方法, 該方法會在超時之后會被調用
  3. 異步調用ServeHTTP並將timeoutWriter傳遞進去,如果該groutine panic則進行捕獲並通過channel傳遞到調用方groutine, 因為我們不能因為一個groutine panic導致整個進程退出,而且調用方groutine對這些panic信息比較感興趣,需要傳遞過去。
  4. 監聽定時器channel

如果定時器channel超時會調用timeoutWrite.timeout方法,該方法如下:

func (tw *baseTimeoutWriter) timeout(err *apierrors.StatusError) {
	tw.mu.Lock()
	defer tw.mu.Unlock()

	tw.timedOut = true

	// The timeout writer has not been used by the inner handler.
	// We can safely timeout the HTTP request by sending by a timeout
	// handler
	if !tw.wroteHeader && !tw.hijacked {
		tw.w.WriteHeader(http.StatusGatewayTimeout)
		enc := json.NewEncoder(tw.w)
		enc.Encode(&err.ErrStatus)
	} else {
		// The timeout writer has been used by the inner handler. There is
		// no way to timeout the HTTP request at the point. We have to shutdown
		// the connection for HTTP1 or reset stream for HTTP2.
		//
		// Note from: Brad Fitzpatrick
		// if the ServeHTTP goroutine panics, that will do the best possible thing for both
		// HTTP/1 and HTTP/2. In HTTP/1, assuming you're replying with at least HTTP/1.1 and
		// you've already flushed the headers so it's using HTTP chunking, it'll kill the TCP
		// connection immediately without a proper 0-byte EOF chunk, so the peer will recognize
		// the response as bogus. In HTTP/2 the server will just RST_STREAM the stream, leaving
		// the TCP connection open, but resetting the stream to the peer so it'll have an error,
		// like the HTTP/1 case.
		panic(errConnKilled)
	}
}

可以看到, 如果此時還沒有寫入任何數據, 則直接返回504狀態碼, 否則直接panic。 上面有一大段注釋說明為什么panic, 這段注釋的出處在kubernetes issue:
API server panics when writing response #29001
。 引用的是golang http包作者 Brad Fitzpatrick的話, 意思是: 如果我們已經往一個writer中寫入了部分數據,我們是沒有辦法timeout, 此時goroutine panic或許是最好的選擇, 無論是對於HTTP/1.1還是HTTP/2.0, 如果是HTTP/1.1, 他不會發送任何數據,直接斷開tcp連接, 此時對端就能夠識別出來server異常,如果是HTTP/2.0 此時srever會RST_STREAM該stream, 並且不會影響connnection, 對端也能夠很好的處理。 這部分代碼還是很有意思的, 很難想象kubernetes會以panic掉groutine的方式來處理一個request的超時。

panic掉一個groutine, 如果你上層沒有任何recover機制的話, 整個程序都會退出, 對於kubenernetes apiserver肯定是不能接受的, kubernetes在每個request的handler chain中會有一個genericfilters.WithPanicRecovery進行捕獲這樣的panic, 避免整個進程崩潰。

Other

談完TimeoutHandler, 再回到golang timeout,有時雖然我們正常timeout返回, 但並不意味整個groutine就正常返回了。此時調用返回也只是上層返回了, 異步調用的底層邏輯沒有辦法撤回的。 因為我們沒辦法cancel掉另一個grouine,只能是groutine主動退出, 主動退出的實現思路大部分是通過傳遞一個context或者close channel給該groutine, 該groutine監聽到退出信號就終止, 但是目前很多調用是不支持接收一個context或close channle作為參數的。
例如下面這段代碼:因為在主邏輯中sleep了4s是沒有辦法中斷的, 即時此時request已經返回,但是server端該groutine還是沒有被釋放, 所以golang timeout這塊還是非常容易leak grouine的, 使用的時候需要小心。

package main

import (
	"fmt"
	"net/http"
	"runtime"
	"time"
)

func main() {
	go func() {
		for {
			time.Sleep(time.Second)
			fmt.Printf("groutine num: %d\n", runtime.NumGoroutine())
		}
	}()

	handleFunc := func(w http.ResponseWriter, r *http.Request) {
		fmt.Printf("request %v\n", r.URL)
		time.Sleep(4 * time.Second)
		_, err := fmt.Fprintln(w, "ok")
		if err != nil {
			fmt.Printf("write err: %v\n", err)
		}
	}
	err := http.ListenAndServe("localhost:9999", http.TimeoutHandler(http.HandlerFunc(handleFunc), 2*time.Second, "err: timeout"))
	if err != nil {
		fmt.Printf("%v", err)
	}
}

寫在最后

golang timeout 簡單但是比較繁瑣,只有明白其原理才能真正防患於未然

2020/4/13 更新: 上述代碼存在資源泄露的問題,已經被社區修復,參加 http://likakuli.com/post/2019/12/06/apiserver_goroutine_leak/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM