- 什么是context
- context底層原理
- 傳遞共享數據
- [超時取消 goroutine](#超時取消 goroutine)
- 設置截止時間,超時觸發
- 調用cancel,關閉goroutine
- **多任務時間
什么是context
goroutine 的上下文,包含 goroutine 的運行狀態、環境、現場等信息。 context 主要用來在 goroutine 之間傳遞上下文信息,包括:取消信號、超時時間、截止時間、k-v 等。最重要的是它是並發安全的。
Context的底層實現是mutex與channel的結合,前者用於初始部分參數,后者用於通信
context底層原理
類型 | 名稱 | 作用 |
---|---|---|
Context | 接口 | 定義了 Context 接口的四個方法 |
emptyCtx | 結構 | 實現了 Context 接口,它其實是個空的 context |
CancelFunc | 函數 | 取消函數 |
canceler | 接口 | context 取消接口,定義了兩個方法 |
cancelCtx | 結構體 | 可以被取消 |
timerCtx | 結構體 | 超時會被取消 |
valueCtx | 結構體 | 可以存儲 k-v 對 |
Background | 函數 | 返回一個空的 context,常作為根 context |
TODO | 函數 | 返回一個空的 context,常用於重構時期,沒有合適的 context 可用 |
WithCancel | 函數 | 基於父 context,生成一個可以取消的 context |
newCancelCtx | 函數 | 創建一個可取消的 context |
propagateCancel | 函數 | 向下傳遞 context 節點間的取消關系 |
parentCancelCtx | 函數 | 找到第一個可取消的父節點 |
removeChild | 函數 | 去掉父節點的孩子節點 |
init | 函數 | 包初始化 |
WithDeadline | 函數 | 創建一個有 deadline 的 context (在某個時間點) |
WithTimeout | 函數 | 創建一個有 timeout 的 context (在多長時間) |
WithValue | 函數 | 創建一個存儲 k-v 對的 context |
整體類圖
接口
Context
type Context interface {
// 當 context 被取消或者到了 deadline,返回一個被關閉的 channel
Done() <-chan struct{}
// 在 channel Done 關閉后,返回 context 取消原因
Err() error
// 返回 context 是否會被取消以及自動取消時間(即 deadline)
Deadline() (deadline time.Time, ok bool)
// 獲取 key 對應的 value
Value(key interface{}) interface{}
}
Context
是一個接口,定義了 4 個方法,它們都是冪等
的。也就是說連續多次調用同一個方法,得到的結果都是相同的。
Done()
返回一個 channel,可以表示 context 被取消的信號:當這個 channel 被關閉時,說明 context 被取消了。注意,這是一個只讀的channel。 我們又知道,讀一個關閉的 channel 會讀出相應類型的零值。並且源碼里沒有地方會向這個 channel 里面塞入值。換句話說,這是一個 receive-only
的 channel。因此在子協程里讀這個 channel,除非被關閉,否則讀不出來任何東西。也正是利用了這一點,子協程從 channel 里讀出了值(零值)后,就可以做一些收尾工作,盡快退出。
Err()
返回一個錯誤,表示 channel 被關閉的原因。例如是被取消,還是超時。
Deadline()
返回 context 的截止時間,通過此時間,函數就可以決定是否進行接下來的操作,如果時間太短,就可以不往下做了,否則浪費系統資源。當然,也可以用這個 deadline 來設置一個 I/O 操作的超時時間。
Value()
獲取之前設置的 key 對應的 value
canceler
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
實現了上面定義的兩個方法的 Context,就表明該 Context 是可取消的。源碼中有兩個類型實現了 canceler 接口:*cancelCtx
和 *timerCtx
。注意是加了 *
號的,是這兩個結構體的指針實現了 canceler 接口。
Context 接口設計成這個樣子的原因:
-
“取消”操作應該是建議性,而非強制性
caller 不應該去關心、干涉 callee 的情況,決定如何以及何時 return 是 callee 的責任。caller 只需發送“取消”信息,callee 根據收到的信息來做進一步的決策,因此接口並沒有定義 cancel 方法。 -
“取消”操作應該可傳遞
“取消”某個函數時,和它相關聯的其他函數也應該“取消”。因此,Done()
方法返回一個只讀的 channel,所有相關函數監聽此 channel。一旦 channel 關閉,通過 channel 的“廣播機制”,所有監聽者都能收到。
結構體
emptyCtx
是Context接口的實現
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
這實際上是一個空的 context,永遠不會被 cancel,沒有存儲值,也沒有 deadline。
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
background 通常用在 main 函數中,作為所有 context 的根節點。
todo 通常用在並不知道傳遞什么 context的情形。例如,調用一個需要傳遞 context 參數的函數,你手頭並沒有其他 context 可以傳遞,這時就可以傳遞 todo。這常常發生在重構進行中,給一些函數添加了一個 Context 參數,但不知道要傳什么,就用 todo “占個位子”,最終要換成其他 context。
cancelCtx
type cancelCtx struct {
Context
// 保護之后的字段
mu sync.Mutex
done chan struct{}
children map[canceler]struct{}
err error
}
這是一個可以取消的 Context,實現了 canceler 接口.它直接將接口 Context 作為它的一個匿名字段,這樣,它就可以被看成一個 Context
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}
c.done 是“懶漢式”創建,只有調用了 Done() 方法的時候才會被創建。再次說明,函數返回的是一個只讀的 channel,而且沒有地方向這個 channel 里面寫數據。所以,直接調用讀這個 channel,協程會被 block 住。一般通過搭配 select 來使用。一旦關閉,就會立即讀出零值。
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
// 必須要傳 err
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // 已經被其他協程取消
}
// 給 err 字段賦值
c.err = err
// 關閉 channel,通知其他協程
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
// 遍歷它的所有子節點
for child := range c.children {
// 遞歸地取消所有子節點
child.cancel(false, err)
}
// 將子節點置空
c.children = nil
c.mu.Unlock()
if removeFromParent {
// 從父節點中移除自己
removeChild(c.Context, c)
}
}
cancel() 方法的功能就是關閉 channel:c.done;遞歸地取消它的所有子節點;從父節點從刪除自己。達到的效果是通過關閉 channel,將取消信號傳遞給了它的所有子節點。goroutine 接收到取消信號的方式就是 select 語句中的讀 c.done 被選中
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }//返回的cannel函數
}
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}
這是一個暴露給用戶的方法,傳入一個父 Context(這通常是一個 background,作為根節點),返回新建的 context,新 context 的 done channel 是新建的.
當 WithCancel 函數返回的 CancelFunc 被調用或者是父節點的 done channel 被關閉(父節點的 CancelFunc 被調用),此 context(子節點) 的 done channel 也會被關閉
WithCancel返回的cannel函數中調用了cancelCtx.cancel
參數前者是 true,也就是說取消的時候,需要將自己從父節點里刪除。第二個參數則是一個固定的取消錯誤類型
var Canceled = errors.New("context canceled"
還注意到一點,調用子節點 cancel 方法的時候,傳入的第一個參數 removeFromParent 是 false。
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
...
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err) //?什么時候傳true,什么時候傳false
}
...
}
調用 WithCancel() 方法的時候,也就是新創建一個可取消的 context 節點時,返回的 cancelFunc 函數會傳入 true。這樣做的結果是:當調用返回的 cancelFunc 時,會將這個 context 從它的父節點里“除名”,因為父節點可能有很多子節點,你自己取消了,所以我要和你斷絕關系,對其他人沒影響
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}
代表一棵 context 樹。當調用左圖中標紅 context 的 cancel 方法后,該 context 從它的父 context 中去除掉了:實線箭頭變成了虛線。且虛線圈框出來的 context 都被取消了,圈內的 context 間的父子關系都盪然無存了
timeCtx
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
timerCtx 首先是一個 cancelCtx,所以它能取消。看下 cancel() 方法
func (c *timerCtx) cancel(removeFromParent bool, err error) {
// 直接調用 cancelCtx 的取消方法
c.cancelCtx.cancel(false, err)
if removeFromParent {
// 從父節點中刪除子節點
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
// 關掉定時器,這樣,在deadline 到來時,不會再次取消
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
創建 timerCtx
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
WithTimeout 函數直接調用了 WithDeadline,傳入的 deadline 是當前時間加上 timeout 的時間,也就是從現在開始再經過 timeout 時間就算超時。也就是說,WithDeadline 需要用的是絕對時間。重點來看它
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
// 如果父節點 context 的 deadline 早於指定時間。直接構建一個可取消的 context。
// 原因是一旦父節點超時,自動調用 cancel 函數,子節點也會隨之取消。
// 所以不用單獨處理子節點的計時器時間到了之后,自動調用 cancel 函數
return WithCancel(parent)
}
// 構建 timerCtx
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: deadline,
}
// 掛靠到父節點上
propagateCancel(parent, c)
// 計算當前距離 deadline 的時間
d := time.Until(deadline)
if d <= 0 {
// 直接取消
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(true, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
// d 時間后,timer 會自動調用 cancel 函數。自動取消
c.timer = time.AfterFunc(d, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
valueCtx
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) String() string {
return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflect.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
對 key 的要求是可比較,因為之后需要通過 key 取出 context 中的值,可比較是必須的。
通過層層傳遞 context,最終形成這樣一棵樹
和鏈表有點像,只是它的方向相反:Context 指向它的父節點,鏈表則指向下一個節點。通過 WithValue 函數,可以創建層層的 valueCtx,存儲 goroutine 間可以共享的變量。
取值的過程,實際上是一個遞歸查找的過程
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
傳遞共享數據
package main
import (
"context"
"fmt"
)
type TraceId struct{}
func main() {
ctx := context.Background()
process(ctx)
//key不使用字符串
//ctx = context.WithValue(ctx, "traceId", "qcrao-2019")
ctx = context.WithValue(ctx, TraceId{}, "qcrao-2019")
process(ctx)
}
func process(ctx context.Context) {
traceId, ok := ctx.Value(TraceId{}).(string)
if ok {
fmt.Printf("process over. trace_id=%s\n", traceId)
} else {
fmt.Printf("process over. no trace_id\n")
}
}
超時取消 goroutine
func ctxTimeout() {
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*10)
// defer cancel()
go func(ctx context.Context) {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
// return
case <-time.After(time.Millisecond * 100):
fmt.Println("Time out")
}
}(ctx)
time.Sleep(time.Second)
}
設置截止時間,超時觸發
func ctxDeadline() {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond))
defer cancel()
go func(ctx context.Context) {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
// return
case <-time.After(time.Millisecond * 100):
fmt.Println("Time out")
}
}(ctx)
time.Sleep(time.Second)
}
調用cancel,關閉goroutine
func ctxCancel() {
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
case <-time.After(time.Millisecond * 100):
fmt.Println("Time out")
}
}(ctx)
cancel()
}
多任務時間
type paramKey struct{}
func main() {
//設置共享數據
c := context.WithValue(context.Background(),
paramKey{}, "abc")
//設置最多任務執行時長,子任務可根據總時長分配子任務時長
c, cancel := context.WithTimeout(c, 10*time.Second)
defer cancel()
go mainTask(c)
var cmd string
for {
fmt.Scan(&cmd)
if cmd == "c" {
cancel()
}
}
}
func mainTask(c context.Context) {
fmt.Printf("main task started with param %q\n", c.Value(paramKey{}))
go func() {
c1, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
smallTask(c1, "background_task", 9*time.Second)
}()
go func() {
//基於父context,創建子context
c1, cancel := context.WithTimeout(c, 10*time.Second)
defer cancel()
smallTask(c1, "sub_task", 9*time.Second)
}()
smallTask(c, "same_task", 8*time.Second)
}
func smallTask(c context.Context, name string, d time.Duration) {
fmt.Printf("%s started with param %q\n", name, c.Value(paramKey{}))
select {
//定時到時觸發
case <-time.After(d):
fmt.Printf("%s done\n", name)
//context設定到時觸發
case <-c.Done():
fmt.Printf("%s cancelled\n", name)
}
}