Context


什么是context

goroutine 的上下文,包含 goroutine 的运行状态、环境、现场等信息。 context 主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等。最重要的是它是并发安全的。
Context的底层实现是mutex与channel的结合,前者用于初始部分参数,后者用于通信

context底层原理

类型 名称 作用
Context 接口 定义了 Context 接口的四个方法
emptyCtx 结构 实现了 Context 接口,它其实是个空的 context
CancelFunc 函数 取消函数
canceler 接口 context 取消接口,定义了两个方法
cancelCtx 结构体 可以被取消
timerCtx 结构体 超时会被取消
valueCtx 结构体 可以存储 k-v 对
Background 函数 返回一个空的 context,常作为根 context
TODO 函数 返回一个空的 context,常用于重构时期,没有合适的 context 可用
WithCancel 函数 基于父 context,生成一个可以取消的 context
newCancelCtx 函数 创建一个可取消的 context
propagateCancel 函数 向下传递 context 节点间的取消关系
parentCancelCtx 函数 找到第一个可取消的父节点
removeChild 函数 去掉父节点的孩子节点
init 函数 包初始化
WithDeadline 函数 创建一个有 deadline 的 context (在某个时间点)
WithTimeout 函数 创建一个有 timeout 的 context (在多长时间)
WithValue 函数 创建一个存储 k-v 对的 context

整体类图

接口

Context

type Context interface {
	// 当 context 被取消或者到了 deadline,返回一个被关闭的 channel
	Done() <-chan struct{}

	// 在 channel Done 关闭后,返回 context 取消原因
	Err() error

	// 返回 context 是否会被取消以及自动取消时间(即 deadline)
	Deadline() (deadline time.Time, ok bool)

	// 获取 key 对应的 value
	Value(key interface{}) interface{}
}

Context 是一个接口,定义了 4 个方法,它们都是幂等的。也就是说连续多次调用同一个方法,得到的结果都是相同的。

Done() 返回一个 channel,可以表示 context 被取消的信号:当这个 channel 被关闭时,说明 context 被取消了。注意,这是一个只读的channel。 我们又知道,读一个关闭的 channel 会读出相应类型的零值。并且源码里没有地方会向这个 channel 里面塞入值。换句话说,这是一个 receive-only 的 channel。因此在子协程里读这个 channel,除非被关闭,否则读不出来任何东西。也正是利用了这一点,子协程从 channel 里读出了值(零值)后,就可以做一些收尾工作,尽快退出。

Err() 返回一个错误,表示 channel 被关闭的原因。例如是被取消,还是超时。

Deadline() 返回 context 的截止时间,通过此时间,函数就可以决定是否进行接下来的操作,如果时间太短,就可以不往下做了,否则浪费系统资源。当然,也可以用这个 deadline 来设置一个 I/O 操作的超时时间。

Value() 获取之前设置的 key 对应的 value

canceler

type canceler interface {
	cancel(removeFromParent bool, err error)
	Done() <-chan struct{}
}

实现了上面定义的两个方法的 Context,就表明该 Context 是可取消的。源码中有两个类型实现了 canceler 接口:*cancelCtx*timerCtx。注意是加了 * 号的,是这两个结构体的指针实现了 canceler 接口。

Context 接口设计成这个样子的原因:

  • “取消”操作应该是建议性,而非强制性
    caller 不应该去关心、干涉 callee 的情况,决定如何以及何时 return 是 callee 的责任。caller 只需发送“取消”信息,callee 根据收到的信息来做进一步的决策,因此接口并没有定义 cancel 方法。

  • “取消”操作应该可传递
    “取消”某个函数时,和它相关联的其他函数也应该“取消”。因此,Done() 方法返回一个只读的 channel,所有相关函数监听此 channel。一旦 channel 关闭,通过 channel 的“广播机制”,所有监听者都能收到。

结构体

emptyCtx

是Context接口的实现

type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
	return nil
}

这实际上是一个空的 context,永远不会被 cancel,没有存储值,也没有 deadline。

var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)
func Background() Context {
	return background
}

func TODO() Context {
	return todo
}

background 通常用在 main 函数中,作为所有 context 的根节点。

todo 通常用在并不知道传递什么 context的情形。例如,调用一个需要传递 context 参数的函数,你手头并没有其他 context 可以传递,这时就可以传递 todo。这常常发生在重构进行中,给一些函数添加了一个 Context 参数,但不知道要传什么,就用 todo “占个位子”,最终要换成其他 context。

cancelCtx

type cancelCtx struct {
	Context

	// 保护之后的字段
	mu       sync.Mutex
	done     chan struct{}
	children map[canceler]struct{}
	err      error
}

这是一个可以取消的 Context,实现了 canceler 接口.它直接将接口 Context 作为它的一个匿名字段,这样,它就可以被看成一个 Context

func (c *cancelCtx) Done() <-chan struct{} {
	c.mu.Lock()
	if c.done == nil {
		c.done = make(chan struct{})
	}
	d := c.done
	c.mu.Unlock()
	return d
}

c.done 是“懒汉式”创建,只有调用了 Done() 方法的时候才会被创建。再次说明,函数返回的是一个只读的 channel,而且没有地方向这个 channel 里面写数据。所以,直接调用读这个 channel,协程会被 block 住。一般通过搭配 select 来使用。一旦关闭,就会立即读出零值。

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    // 必须要传 err
	if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // 已经被其他协程取消
	}
	// 给 err 字段赋值
	c.err = err
	// 关闭 channel,通知其他协程
	if c.done == nil {
		c.done = closedchan
	} else {
		close(c.done)
	}
	
	// 遍历它的所有子节点
	for child := range c.children {
	    // 递归地取消所有子节点
		child.cancel(false, err)
	}
	// 将子节点置空
	c.children = nil
	c.mu.Unlock()

	if removeFromParent {
	    // 从父节点中移除自己 
		removeChild(c.Context, c)
	}
}

cancel() 方法的功能就是关闭 channel:c.done;递归地取消它的所有子节点;从父节点从删除自己。达到的效果是通过关闭 channel,将取消信号传递给了它的所有子节点。goroutine 接收到取消信号的方式就是 select 语句中的读 c.done 被选中

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) }//返回的cannel函数
}

func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

这是一个暴露给用户的方法,传入一个父 Context(这通常是一个 background,作为根节点),返回新建的 context,新 context 的 done channel 是新建的.
当 WithCancel 函数返回的 CancelFunc 被调用或者是父节点的 done channel 被关闭(父节点的 CancelFunc 被调用),此 context(子节点) 的 done channel 也会被关闭
WithCancel返回的cannel函数中调用了cancelCtx.cancel
参数前者是 true,也就是说取消的时候,需要将自己从父节点里删除。第二个参数则是一个固定的取消错误类型

var Canceled = errors.New("context canceled"

还注意到一点,调用子节点 cancel 方法的时候,传入的第一个参数 removeFromParent 是 false。

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    ...
    for child := range c.children {
	// NOTE: acquiring the child's lock while holding parent's lock.
	child.cancel(false, err) //?什么时候传true,什么时候传false
    }
    ...
}

调用 WithCancel() 方法的时候,也就是新创建一个可取消的 context 节点时,返回的 cancelFunc 函数会传入 true。这样做的结果是:当调用返回的 cancelFunc 时,会将这个 context 从它的父节点里“除名”,因为父节点可能有很多子节点,你自己取消了,所以我要和你断绝关系,对其他人没影响

func removeChild(parent Context, child canceler) {
	p, ok := parentCancelCtx(parent)
	if !ok {
		return
	}
	p.mu.Lock()
	if p.children != nil {
		delete(p.children, child)
	}
	p.mu.Unlock()
}


代表一棵 context 树。当调用左图中标红 context 的 cancel 方法后,该 context 从它的父 context 中去除掉了:实线箭头变成了虚线。且虚线圈框出来的 context 都被取消了,圈内的 context 间的父子关系都荡然无存了

timeCtx

type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.

	deadline time.Time
}

timerCtx 首先是一个 cancelCtx,所以它能取消。看下 cancel() 方法

func (c *timerCtx) cancel(removeFromParent bool, err error) {
        // 直接调用 cancelCtx 的取消方法
	c.cancelCtx.cancel(false, err)
	if removeFromParent {
		// 从父节点中删除子节点
		removeChild(c.cancelCtx.Context, c)
	}
	c.mu.Lock()
	if c.timer != nil {
                // 关掉定时器,这样,在deadline 到来时,不会再次取消
		c.timer.Stop()
		c.timer = nil
	}
	c.mu.Unlock()
}

创建 timerCtx

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
	return WithDeadline(parent, time.Now().Add(timeout))
}

WithTimeout 函数直接调用了 WithDeadline,传入的 deadline 是当前时间加上 timeout 的时间,也就是从现在开始再经过 timeout 时间就算超时。也就是说,WithDeadline 需要用的是绝对时间。重点来看它

func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
	if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
		// 如果父节点 context 的 deadline 早于指定时间。直接构建一个可取消的 context。
		// 原因是一旦父节点超时,自动调用 cancel 函数,子节点也会随之取消。
		// 所以不用单独处理子节点的计时器时间到了之后,自动调用 cancel 函数
		return WithCancel(parent)
	}
	
	// 构建 timerCtx
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  deadline,
	}
	// 挂靠到父节点上
	propagateCancel(parent, c)
	
	// 计算当前距离 deadline 的时间
	d := time.Until(deadline)
	if d <= 0 {
		// 直接取消
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(true, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.err == nil {
		// d 时间后,timer 会自动调用 cancel 函数。自动取消
		c.timer = time.AfterFunc(d, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
	return c, func() { c.cancel(true, Canceled) }
}

valueCtx

type valueCtx struct {
	Context
	key, val interface{}
}
func (c *valueCtx) String() string {
	return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}
func WithValue(parent Context, key, val interface{}) Context {
	if key == nil {
		panic("nil key")
	}
	if !reflect.TypeOf(key).Comparable() {
		panic("key is not comparable")
	}
	return &valueCtx{parent, key, val}
}

对 key 的要求是可比较,因为之后需要通过 key 取出 context 中的值,可比较是必须的。
通过层层传递 context,最终形成这样一棵树

和链表有点像,只是它的方向相反:Context 指向它的父节点,链表则指向下一个节点。通过 WithValue 函数,可以创建层层的 valueCtx,存储 goroutine 间可以共享的变量。

取值的过程,实际上是一个递归查找的过程

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}

传递共享数据

package main

import (
	"context"
	"fmt"
)
type TraceId struct{}
func main() {
	ctx := context.Background()
	process(ctx)
        //key不使用字符串
	//ctx = context.WithValue(ctx, "traceId", "qcrao-2019")
          ctx = context.WithValue(ctx, TraceId{}, "qcrao-2019")
	process(ctx)
}

func process(ctx context.Context) {
	traceId, ok := ctx.Value(TraceId{}).(string)
	if ok {
		fmt.Printf("process over. trace_id=%s\n", traceId)
	} else {
		fmt.Printf("process over. no trace_id\n")
	}
}

超时取消 goroutine

func ctxTimeout() {
	ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*10)
	// defer cancel()
	go func(ctx context.Context) {
		select {
		case <-ctx.Done():
			fmt.Println(ctx.Err())
			// return
		case <-time.After(time.Millisecond * 100):
			fmt.Println("Time out")
		}
	}(ctx)
	time.Sleep(time.Second)
}

设置截止时间,超时触发

func ctxDeadline() {
	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond))
	defer cancel()
	go func(ctx context.Context) {
		select {
		case <-ctx.Done():
			fmt.Println(ctx.Err())
			// return
		case <-time.After(time.Millisecond * 100):
			fmt.Println("Time out")
		}
	}(ctx)
	time.Sleep(time.Second)
}

调用cancel,关闭goroutine

func ctxCancel() {
	ctx, cancel := context.WithCancel(context.Background())
	go func(ctx context.Context) {
		select {
		case <-ctx.Done():
			fmt.Println(ctx.Err())
			return
		case <-time.After(time.Millisecond * 100):
			fmt.Println("Time out")
		}
	}(ctx)
	cancel()
}

多任务时间

type paramKey struct{}

func main() {
        //设置共享数据
	c := context.WithValue(context.Background(),
		paramKey{}, "abc")
        //设置最多任务执行时长,子任务可根据总时长分配子任务时长
	c, cancel := context.WithTimeout(c, 10*time.Second)
	defer cancel()
	go mainTask(c)

	var cmd string
	for {
		fmt.Scan(&cmd)
		if cmd == "c" {
			cancel()
		}
	}
}
func mainTask(c context.Context) {
	fmt.Printf("main task started with param %q\n", c.Value(paramKey{}))
	go func() {
		c1, cancel := context.WithTimeout(context.Background(), 10*time.Second)
		defer cancel()
		smallTask(c1, "background_task", 9*time.Second)
	}()
	go func() {
                //基于父context,创建子context
		c1, cancel := context.WithTimeout(c, 10*time.Second)
		defer cancel()
		smallTask(c1, "sub_task", 9*time.Second)
	}()
	smallTask(c, "same_task", 8*time.Second)
}

func smallTask(c context.Context, name string, d time.Duration) {
	fmt.Printf("%s started with param %q\n", name, c.Value(paramKey{}))
	select {
        //定时到时触发
	case <-time.After(d):
		fmt.Printf("%s done\n", name)
        //context设定到时触发
	case <-c.Done():
		fmt.Printf("%s cancelled\n", name)
	}
}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM