Context


什么是context

goroutine 的上下文,包含 goroutine 的運行狀態、環境、現場等信息。 context 主要用來在 goroutine 之間傳遞上下文信息,包括:取消信號、超時時間、截止時間、k-v 等。最重要的是它是並發安全的。
Context的底層實現是mutex與channel的結合,前者用於初始部分參數,后者用於通信

context底層原理

類型 名稱 作用
Context 接口 定義了 Context 接口的四個方法
emptyCtx 結構 實現了 Context 接口,它其實是個空的 context
CancelFunc 函數 取消函數
canceler 接口 context 取消接口,定義了兩個方法
cancelCtx 結構體 可以被取消
timerCtx 結構體 超時會被取消
valueCtx 結構體 可以存儲 k-v 對
Background 函數 返回一個空的 context,常作為根 context
TODO 函數 返回一個空的 context,常用於重構時期,沒有合適的 context 可用
WithCancel 函數 基於父 context,生成一個可以取消的 context
newCancelCtx 函數 創建一個可取消的 context
propagateCancel 函數 向下傳遞 context 節點間的取消關系
parentCancelCtx 函數 找到第一個可取消的父節點
removeChild 函數 去掉父節點的孩子節點
init 函數 包初始化
WithDeadline 函數 創建一個有 deadline 的 context (在某個時間點)
WithTimeout 函數 創建一個有 timeout 的 context (在多長時間)
WithValue 函數 創建一個存儲 k-v 對的 context

整體類圖

接口

Context

type Context interface {
	// 當 context 被取消或者到了 deadline,返回一個被關閉的 channel
	Done() <-chan struct{}

	// 在 channel Done 關閉后,返回 context 取消原因
	Err() error

	// 返回 context 是否會被取消以及自動取消時間(即 deadline)
	Deadline() (deadline time.Time, ok bool)

	// 獲取 key 對應的 value
	Value(key interface{}) interface{}
}

Context 是一個接口,定義了 4 個方法,它們都是冪等的。也就是說連續多次調用同一個方法,得到的結果都是相同的。

Done() 返回一個 channel,可以表示 context 被取消的信號:當這個 channel 被關閉時,說明 context 被取消了。注意,這是一個只讀的channel。 我們又知道,讀一個關閉的 channel 會讀出相應類型的零值。並且源碼里沒有地方會向這個 channel 里面塞入值。換句話說,這是一個 receive-only 的 channel。因此在子協程里讀這個 channel,除非被關閉,否則讀不出來任何東西。也正是利用了這一點,子協程從 channel 里讀出了值(零值)后,就可以做一些收尾工作,盡快退出。

Err() 返回一個錯誤,表示 channel 被關閉的原因。例如是被取消,還是超時。

Deadline() 返回 context 的截止時間,通過此時間,函數就可以決定是否進行接下來的操作,如果時間太短,就可以不往下做了,否則浪費系統資源。當然,也可以用這個 deadline 來設置一個 I/O 操作的超時時間。

Value() 獲取之前設置的 key 對應的 value

canceler

type canceler interface {
	cancel(removeFromParent bool, err error)
	Done() <-chan struct{}
}

實現了上面定義的兩個方法的 Context,就表明該 Context 是可取消的。源碼中有兩個類型實現了 canceler 接口:*cancelCtx*timerCtx。注意是加了 * 號的,是這兩個結構體的指針實現了 canceler 接口。

Context 接口設計成這個樣子的原因:

  • “取消”操作應該是建議性,而非強制性
    caller 不應該去關心、干涉 callee 的情況,決定如何以及何時 return 是 callee 的責任。caller 只需發送“取消”信息,callee 根據收到的信息來做進一步的決策,因此接口並沒有定義 cancel 方法。

  • “取消”操作應該可傳遞
    “取消”某個函數時,和它相關聯的其他函數也應該“取消”。因此,Done() 方法返回一個只讀的 channel,所有相關函數監聽此 channel。一旦 channel 關閉,通過 channel 的“廣播機制”,所有監聽者都能收到。

結構體

emptyCtx

是Context接口的實現

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
	return nil
}

這實際上是一個空的 context,永遠不會被 cancel,沒有存儲值,也沒有 deadline。

var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)
func Background() Context {
	return background
}

func TODO() Context {
	return todo
}

background 通常用在 main 函數中,作為所有 context 的根節點。

todo 通常用在並不知道傳遞什么 context的情形。例如,調用一個需要傳遞 context 參數的函數,你手頭並沒有其他 context 可以傳遞,這時就可以傳遞 todo。這常常發生在重構進行中,給一些函數添加了一個 Context 參數,但不知道要傳什么,就用 todo “占個位子”,最終要換成其他 context。

cancelCtx

type cancelCtx struct {
	Context

	// 保護之后的字段
	mu       sync.Mutex
	done     chan struct{}
	children map[canceler]struct{}
	err      error
}

這是一個可以取消的 Context,實現了 canceler 接口.它直接將接口 Context 作為它的一個匿名字段,這樣,它就可以被看成一個 Context

func (c *cancelCtx) Done() <-chan struct{} {
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{})
	}
	d := c.done
	c.mu.Unlock()
	return d
}

c.done 是“懶漢式”創建,只有調用了 Done() 方法的時候才會被創建。再次說明,函數返回的是一個只讀的 channel,而且沒有地方向這個 channel 里面寫數據。所以,直接調用讀這個 channel,協程會被 block 住。一般通過搭配 select 來使用。一旦關閉,就會立即讀出零值。

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    // 必須要傳 err
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // 已經被其他協程取消
	}
	// 給 err 字段賦值
	c.err = err
	// 關閉 channel,通知其他協程
	if c.done == nil {
		c.done = closedchan
	} else {
		close(c.done)
	}
	
	// 遍歷它的所有子節點
	for child := range c.children {
	    // 遞歸地取消所有子節點
		child.cancel(false, err)
	}
	// 將子節點置空
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
	    // 從父節點中移除自己 
		removeChild(c.Context, c)
	}
}

cancel() 方法的功能就是關閉 channel:c.done;遞歸地取消它的所有子節點;從父節點從刪除自己。達到的效果是通過關閉 channel,將取消信號傳遞給了它的所有子節點。goroutine 接收到取消信號的方式就是 select 語句中的讀 c.done 被選中

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) }//返回的cannel函數
}

func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

這是一個暴露給用戶的方法,傳入一個父 Context(這通常是一個 background,作為根節點),返回新建的 context,新 context 的 done channel 是新建的.
當 WithCancel 函數返回的 CancelFunc 被調用或者是父節點的 done channel 被關閉(父節點的 CancelFunc 被調用),此 context(子節點) 的 done channel 也會被關閉
WithCancel返回的cannel函數中調用了cancelCtx.cancel
參數前者是 true,也就是說取消的時候,需要將自己從父節點里刪除。第二個參數則是一個固定的取消錯誤類型

var Canceled = errors.New("context canceled"

還注意到一點,調用子節點 cancel 方法的時候,傳入的第一個參數 removeFromParent 是 false。

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    ...
    for child := range c.children {
	// NOTE: acquiring the child's lock while holding parent's lock.
	child.cancel(false, err) //?什么時候傳true,什么時候傳false
    }
    ...
}

調用 WithCancel() 方法的時候,也就是新創建一個可取消的 context 節點時,返回的 cancelFunc 函數會傳入 true。這樣做的結果是:當調用返回的 cancelFunc 時,會將這個 context 從它的父節點里“除名”,因為父節點可能有很多子節點,你自己取消了,所以我要和你斷絕關系,對其他人沒影響

func removeChild(parent Context, child canceler) {
	p, ok := parentCancelCtx(parent)
	if !ok {
		return
	}
	p.mu.Lock()
	if p.children != nil {
		delete(p.children, child)
	}
	p.mu.Unlock()
}


代表一棵 context 樹。當調用左圖中標紅 context 的 cancel 方法后,該 context 從它的父 context 中去除掉了:實線箭頭變成了虛線。且虛線圈框出來的 context 都被取消了,圈內的 context 間的父子關系都盪然無存了

timeCtx

type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.

	deadline time.Time
}

timerCtx 首先是一個 cancelCtx,所以它能取消。看下 cancel() 方法

func (c *timerCtx) cancel(removeFromParent bool, err error) {
        // 直接調用 cancelCtx 的取消方法
	c.cancelCtx.cancel(false, err)
	if removeFromParent {
		// 從父節點中刪除子節點
		removeChild(c.cancelCtx.Context, c)
	}
	c.mu.Lock()
	if c.timer != nil {
                // 關掉定時器,這樣,在deadline 到來時,不會再次取消
		c.timer.Stop()
		c.timer = nil
	}
	c.mu.Unlock()
}

創建 timerCtx

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
	return WithDeadline(parent, time.Now().Add(timeout))
}

WithTimeout 函數直接調用了 WithDeadline,傳入的 deadline 是當前時間加上 timeout 的時間,也就是從現在開始再經過 timeout 時間就算超時。也就是說,WithDeadline 需要用的是絕對時間。重點來看它

func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
	if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
		// 如果父節點 context 的 deadline 早於指定時間。直接構建一個可取消的 context。
		// 原因是一旦父節點超時,自動調用 cancel 函數,子節點也會隨之取消。
		// 所以不用單獨處理子節點的計時器時間到了之后,自動調用 cancel 函數
		return WithCancel(parent)
	}
	
	// 構建 timerCtx
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  deadline,
	}
	// 掛靠到父節點上
	propagateCancel(parent, c)
	
	// 計算當前距離 deadline 的時間
	d := time.Until(deadline)
	if d <= 0 {
		// 直接取消
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(true, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.err == nil {
		// d 時間后,timer 會自動調用 cancel 函數。自動取消
		c.timer = time.AfterFunc(d, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
	return c, func() { c.cancel(true, Canceled) }
}

valueCtx

type valueCtx struct {
	Context
	key, val interface{}
}
func (c *valueCtx) String() string {
	return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}
func WithValue(parent Context, key, val interface{}) Context {
	if key == nil {
		panic("nil key")
	}
	if !reflect.TypeOf(key).Comparable() {
		panic("key is not comparable")
	}
	return &valueCtx{parent, key, val}
}

對 key 的要求是可比較,因為之后需要通過 key 取出 context 中的值,可比較是必須的。
通過層層傳遞 context,最終形成這樣一棵樹

和鏈表有點像,只是它的方向相反:Context 指向它的父節點,鏈表則指向下一個節點。通過 WithValue 函數,可以創建層層的 valueCtx,存儲 goroutine 間可以共享的變量。

取值的過程,實際上是一個遞歸查找的過程

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}

傳遞共享數據

package main

import (
	"context"
	"fmt"
)
type TraceId struct{}
func main() {
	ctx := context.Background()
	process(ctx)
        //key不使用字符串
	//ctx = context.WithValue(ctx, "traceId", "qcrao-2019")
          ctx = context.WithValue(ctx, TraceId{}, "qcrao-2019")
	process(ctx)
}

func process(ctx context.Context) {
	traceId, ok := ctx.Value(TraceId{}).(string)
	if ok {
		fmt.Printf("process over. trace_id=%s\n", traceId)
	} else {
		fmt.Printf("process over. no trace_id\n")
	}
}

超時取消 goroutine

func ctxTimeout() {
	ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*10)
	// defer cancel()
	go func(ctx context.Context) {
		select {
		case <-ctx.Done():
			fmt.Println(ctx.Err())
			// return
		case <-time.After(time.Millisecond * 100):
			fmt.Println("Time out")
		}
	}(ctx)
	time.Sleep(time.Second)
}

設置截止時間,超時觸發

func ctxDeadline() {
	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond))
	defer cancel()
	go func(ctx context.Context) {
		select {
		case <-ctx.Done():
			fmt.Println(ctx.Err())
			// return
		case <-time.After(time.Millisecond * 100):
			fmt.Println("Time out")
		}
	}(ctx)
	time.Sleep(time.Second)
}

調用cancel,關閉goroutine

func ctxCancel() {
	ctx, cancel := context.WithCancel(context.Background())
	go func(ctx context.Context) {
		select {
		case <-ctx.Done():
			fmt.Println(ctx.Err())
			return
		case <-time.After(time.Millisecond * 100):
			fmt.Println("Time out")
		}
	}(ctx)
	cancel()
}

多任務時間

type paramKey struct{}

func main() {
        //設置共享數據
	c := context.WithValue(context.Background(),
		paramKey{}, "abc")
        //設置最多任務執行時長,子任務可根據總時長分配子任務時長
	c, cancel := context.WithTimeout(c, 10*time.Second)
	defer cancel()
	go mainTask(c)

	var cmd string
	for {
		fmt.Scan(&cmd)
		if cmd == "c" {
			cancel()
		}
	}
}
func mainTask(c context.Context) {
	fmt.Printf("main task started with param %q\n", c.Value(paramKey{}))
	go func() {
		c1, cancel := context.WithTimeout(context.Background(), 10*time.Second)
		defer cancel()
		smallTask(c1, "background_task", 9*time.Second)
	}()
	go func() {
                //基於父context,創建子context
		c1, cancel := context.WithTimeout(c, 10*time.Second)
		defer cancel()
		smallTask(c1, "sub_task", 9*time.Second)
	}()
	smallTask(c, "same_task", 8*time.Second)
}

func smallTask(c context.Context, name string, d time.Duration) {
	fmt.Printf("%s started with param %q\n", name, c.Value(paramKey{}))
	select {
        //定時到時觸發
	case <-time.After(d):
		fmt.Printf("%s done\n", name)
        //context設定到時觸發
	case <-c.Done():
		fmt.Printf("%s cancelled\n", name)
	}
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM