golang中Context的使用場景
context在Go1.7之后就進入標准庫中了。它主要的用處如果用一句話來說,是在於控制goroutine的生命周期。當一個計算任務被goroutine承接了之后,由於某種原因(超時,或者強制退出)我們希望中止這個goroutine的計算任務,那么就用得到這個Context了。
關於Context的四種結構,CancelContext,TimeoutContext,DeadLineContext,ValueContext的使用在這一篇快速掌握 Golang context 包已經說的很明白了。
本文主要來盤一盤golang中context的一些使用場景:
場景一:RPC調用
在主goroutine上有4個RPC,RPC2/3/4是並行請求的,我們這里希望在RPC2請求失敗之后,直接返回錯誤,並且讓RPC3/4停止繼續計算。這個時候,就使用的到Context。
這個的具體實現如下面的代碼。
package main
import (
"context"
"sync"
"github.com/pkg/errors"
)
func Rpc(ctx context.Context, url string) error {
result := make(chan int)
err := make(chan error)
go func() {
// 進行RPC調用,並且返回是否成功,成功通過result傳遞成功信息,錯誤通過error傳遞錯誤信息
isSuccess := true
if isSuccess {
result <- 1
} else {
err <- errors.New("some error happen")
}
}()
select {
case <- ctx.Done():
// 其他RPC調用調用失敗
return ctx.Err()
case e := <- err:
// 本RPC調用失敗,返回錯誤信息
return e
case <- result:
// 本RPC調用成功,不返回錯誤信息
return nil
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
// RPC1調用
err := Rpc(ctx, "http://rpc_1_url")
if err != nil {
return
}
wg := sync.WaitGroup{}
// RPC2調用
wg.Add(1)
go func(){
defer wg.Done()
err := Rpc(ctx, "http://rpc_2_url")
if err != nil {
cancel()
}
}()
// RPC3調用
wg.Add(1)
go func(){
defer wg.Done()
err := Rpc(ctx, "http://rpc_3_url")
if err != nil {
cancel()
}
}()
// RPC4調用
wg.Add(1)
go func(){
defer wg.Done()
err := Rpc(ctx, "http://rpc_4_url")
if err != nil {
cancel()
}
}()
wg.Wait()
}
當然我這里使用了waitGroup來保證main函數在所有RPC調用完成之后才退出。
在Rpc函數中,第一個參數是一個CancelContext, 這個Context形象的說,就是一個傳話筒,在創建CancelContext的時候,返回了一個聽聲器(ctx)和話筒(cancel函數)。所有的goroutine都拿着這個聽聲器(ctx),當主goroutine想要告訴所有goroutine要結束的時候,通過cancel函數把結束的信息告訴給所有的goroutine。當然所有的goroutine都需要內置處理這個聽聲器結束信號的邏輯(ctx->Done())。我們可以看Rpc函數內部,通過一個select來判斷ctx的done和當前的rpc調用哪個先結束。
這個waitGroup和其中一個RPC調用就通知所有RPC的邏輯,其實有一個包已經幫我們做好了。errorGroup。具體這個errorGroup包的使用可以看這個包的test例子。
有人可能會擔心我們這里的cancel()會被多次調用,context包的cancel調用是冪等的。可以放心多次調用。
我們這里不妨品一下,這里的Rpc函數,實際上我們的這個例子里面是一個“阻塞式”的請求,這個請求如果是使用http.Get或者http.Post來實現,實際上Rpc函數的Goroutine結束了,內部的那個實際的http.Get卻沒有結束。所以,需要理解下,這里的函數最好是“非阻塞”的,比如是http.Do,然后可以通過某種方式進行中斷。比如像這篇文章Cancel http.Request using Context中的這個例子:
func httpRequest(
ctx context.Context,
client *http.Client,
req *http.Request,
respChan chan []byte,
errChan chan error
) {
req = req.WithContext(ctx)
tr := &http.Transport{}
client.Transport = tr
go func() {
resp, err := client.Do(req)
if err != nil {
errChan <- err
}
if resp != nil {
defer resp.Body.Close()
respData, err := ioutil.ReadAll(resp.Body)
if err != nil {
errChan <- err
}
respChan <- respData
} else {
errChan <- errors.New("HTTP request failed")
}
}()
for {
select {
case <-ctx.Done():
tr.CancelRequest(req)
errChan <- errors.New("HTTP request cancelled")
return
case <-errChan:
tr.CancelRequest(req)
return
}
}
}
它使用了http.Client.Do,然后接收到ctx.Done的時候,通過調用transport.CancelRequest來進行結束。
我們還可以參考net/dail/DialContext
換而言之,如果你希望你實現的包是“可中止/可控制”的,那么你在你包實現的函數里面,最好是能接收一個Context函數,並且處理了Context.Done。
場景二:PipeLine
pipeline模式就是流水線模型,流水線上的幾個工人,有n個產品,一個一個產品進行組裝。其實pipeline模型的實現和Context並無關系,沒有context我們也能用chan實現pipeline模型。但是對於整條流水線的控制,則是需要使用上Context的。這篇文章Pipeline Patterns in Go的例子是非常好的說明。這里就大致對這個代碼進行下說明。
runSimplePipeline的流水線工人有三個,lineListSource負責將參數一個個分割進行傳輸,lineParser負責將字符串處理成int64,sink根據具體的值判斷這個數據是否可用。他們所有的返回值基本上都有兩個chan,一個用於傳遞數據,一個用於傳遞錯誤。(<-chan string, <-chan error)輸入基本上也都有兩個值,一個是Context,用於傳聲控制的,一個是(in <-chan)輸入產品的。
我們可以看到,這三個工人的具體函數里面,都使用switch處理了case <-ctx.Done()。這個就是生產線上的命令控制。
func lineParser(ctx context.Context, base int, in <-chan string) (
<-chan int64, <-chan error, error) {
...
go func() {
defer close(out)
defer close(errc)
for line := range in {
n, err := strconv.ParseInt(line, base, 64)
if err != nil {
errc <- err
return
}
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out, errc, nil
}
場景三:超時請求
我們發送RPC請求的時候,往往希望對這個請求進行一個超時的限制。當一個RPC請求超過10s的請求,自動斷開。當然我們使用CancelContext,也能實現這個功能(開啟一個新的goroutine,這個goroutine拿着cancel函數,當時間到了,就調用cancel函數)。
鑒於這個需求是非常常見的,context包也實現了這個需求:timerCtx。具體實例化的方法是 WithDeadline 和 WithTimeout。
具體的timerCtx里面的邏輯也就是通過time.AfterFunc來調用ctx.cancel的。
官方的例子:
package main
import (
"context"
"fmt"
"time"
)
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
select {
case <-time.After(1 * time.Second):
fmt.Println("overslept")
case <-ctx.Done():
fmt.Println(ctx.Err()) // prints "context deadline exceeded"
}
}
在http的客戶端里面加上timeout也是一個常見的辦法
uri := "https://httpbin.org/delay/3"
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
log.Fatalf("http.NewRequest() failed with '%s'\n", err)
}
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*100)
req = req.WithContext(ctx)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatalf("http.DefaultClient.Do() failed with:\n'%s'\n", err)
}
defer resp.Body.Close()
在http服務端設置一個timeout如何做呢?
package main
import (
"net/http"
"time"
)
func test(w http.ResponseWriter, r *http.Request) {
time.Sleep(20 * time.Second)
w.Write([]byte("test"))
}
func main() {
http.HandleFunc("/", test)
timeoutHandler := http.TimeoutHandler(http.DefaultServeMux, 5 * time.Second, "timeout")
http.ListenAndServe(":8080", timeoutHandler)
}
我們看看TimeoutHandler的內部,本質上也是通過context.WithTimeout來做處理。
func (h *timeoutHandler) ServeHTTP(w ResponseWriter, r *Request) {
...
ctx, cancelCtx = context.WithTimeout(r.Context(), h.dt)
defer cancelCtx()
...
go func() {
...
h.handler.ServeHTTP(tw, r)
}()
select {
...
case <-ctx.Done():
...
}
}
場景四:HTTP服務器的request互相傳遞數據
context還提供了valueCtx的數據結構。
這個valueCtx最經常使用的場景就是在一個http服務器中,在request中傳遞一個特定值,比如有一個中間件,做cookie驗證,然后把驗證后的用戶名存放在request中。
我們可以看到,官方的request里面是包含了Context的,並且提供了WithContext的方法進行context的替換。
package main
import (
"net/http"
"context"
)
type FooKey string
var UserName = FooKey("user-name")
var UserId = FooKey("user-id")
func foo(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := context.WithValue(r.Context(), UserId, "1")
ctx2 := context.WithValue(ctx, UserName, "yejianfeng")
next(w, r.WithContext(ctx2))
}
}
func GetUserName(context context.Context) string {
if ret, ok := context.Value(UserName).(string); ok {
return ret
}
return ""
}
func GetUserId(context context.Context) string {
if ret, ok := context.Value(UserId).(string); ok {
return ret
}
return ""
}
func test(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("welcome: "))
w.Write([]byte(GetUserId(r.Context())))
w.Write([]byte(" "))
w.Write([]byte(GetUserName(r.Context())))
}
func main() {
http.Handle("/", foo(test))
http.ListenAndServe(":8080", nil)
}
在使用ValueCtx的時候需要注意一點,這里的key不應該設置成為普通的String或者Int類型,為了防止不同的中間件對這個key的覆蓋。最好的情況是每個中間件使用一個自定義的key類型,比如這里的FooKey,而且獲取Value的邏輯盡量也抽取出來作為一個函數,放在這個middleware的同包中。這樣,就會有效避免不同包設置相同的key的沖突問題了。
參考
快速掌握 Golang context 包
視頻筆記:如何正確使用 Context - Jack Lindamood
Go Concurrency Patterns: Context
Cancel http.Request using Context
Pipeline Patterns in Go