- 什么是context
- context底层原理
- 传递共享数据
- [超时取消 goroutine](#超时取消 goroutine)
- 设置截止时间,超时触发
- 调用cancel,关闭goroutine
- **多任务时间
什么是context
goroutine 的上下文,包含 goroutine 的运行状态、环境、现场等信息。 context 主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等。最重要的是它是并发安全的。
Context的底层实现是mutex与channel的结合,前者用于初始部分参数,后者用于通信
context底层原理
类型 | 名称 | 作用 |
---|---|---|
Context | 接口 | 定义了 Context 接口的四个方法 |
emptyCtx | 结构 | 实现了 Context 接口,它其实是个空的 context |
CancelFunc | 函数 | 取消函数 |
canceler | 接口 | context 取消接口,定义了两个方法 |
cancelCtx | 结构体 | 可以被取消 |
timerCtx | 结构体 | 超时会被取消 |
valueCtx | 结构体 | 可以存储 k-v 对 |
Background | 函数 | 返回一个空的 context,常作为根 context |
TODO | 函数 | 返回一个空的 context,常用于重构时期,没有合适的 context 可用 |
WithCancel | 函数 | 基于父 context,生成一个可以取消的 context |
newCancelCtx | 函数 | 创建一个可取消的 context |
propagateCancel | 函数 | 向下传递 context 节点间的取消关系 |
parentCancelCtx | 函数 | 找到第一个可取消的父节点 |
removeChild | 函数 | 去掉父节点的孩子节点 |
init | 函数 | 包初始化 |
WithDeadline | 函数 | 创建一个有 deadline 的 context (在某个时间点) |
WithTimeout | 函数 | 创建一个有 timeout 的 context (在多长时间) |
WithValue | 函数 | 创建一个存储 k-v 对的 context |
整体类图
接口
Context
type Context interface {
// 当 context 被取消或者到了 deadline,返回一个被关闭的 channel
Done() <-chan struct{}
// 在 channel Done 关闭后,返回 context 取消原因
Err() error
// 返回 context 是否会被取消以及自动取消时间(即 deadline)
Deadline() (deadline time.Time, ok bool)
// 获取 key 对应的 value
Value(key interface{}) interface{}
}
Context
是一个接口,定义了 4 个方法,它们都是幂等
的。也就是说连续多次调用同一个方法,得到的结果都是相同的。
Done()
返回一个 channel,可以表示 context 被取消的信号:当这个 channel 被关闭时,说明 context 被取消了。注意,这是一个只读的channel。 我们又知道,读一个关闭的 channel 会读出相应类型的零值。并且源码里没有地方会向这个 channel 里面塞入值。换句话说,这是一个 receive-only
的 channel。因此在子协程里读这个 channel,除非被关闭,否则读不出来任何东西。也正是利用了这一点,子协程从 channel 里读出了值(零值)后,就可以做一些收尾工作,尽快退出。
Err()
返回一个错误,表示 channel 被关闭的原因。例如是被取消,还是超时。
Deadline()
返回 context 的截止时间,通过此时间,函数就可以决定是否进行接下来的操作,如果时间太短,就可以不往下做了,否则浪费系统资源。当然,也可以用这个 deadline 来设置一个 I/O 操作的超时时间。
Value()
获取之前设置的 key 对应的 value
canceler
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
实现了上面定义的两个方法的 Context,就表明该 Context 是可取消的。源码中有两个类型实现了 canceler 接口:*cancelCtx
和 *timerCtx
。注意是加了 *
号的,是这两个结构体的指针实现了 canceler 接口。
Context 接口设计成这个样子的原因:
-
“取消”操作应该是建议性,而非强制性
caller 不应该去关心、干涉 callee 的情况,决定如何以及何时 return 是 callee 的责任。caller 只需发送“取消”信息,callee 根据收到的信息来做进一步的决策,因此接口并没有定义 cancel 方法。 -
“取消”操作应该可传递
“取消”某个函数时,和它相关联的其他函数也应该“取消”。因此,Done()
方法返回一个只读的 channel,所有相关函数监听此 channel。一旦 channel 关闭,通过 channel 的“广播机制”,所有监听者都能收到。
结构体
emptyCtx
是Context接口的实现
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
这实际上是一个空的 context,永远不会被 cancel,没有存储值,也没有 deadline。
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
func Background() Context {
return background
}
func TODO() Context {
return todo
}
background 通常用在 main 函数中,作为所有 context 的根节点。
todo 通常用在并不知道传递什么 context的情形。例如,调用一个需要传递 context 参数的函数,你手头并没有其他 context 可以传递,这时就可以传递 todo。这常常发生在重构进行中,给一些函数添加了一个 Context 参数,但不知道要传什么,就用 todo “占个位子”,最终要换成其他 context。
cancelCtx
type cancelCtx struct {
Context
// 保护之后的字段
mu sync.Mutex
done chan struct{}
children map[canceler]struct{}
err error
}
这是一个可以取消的 Context,实现了 canceler 接口.它直接将接口 Context 作为它的一个匿名字段,这样,它就可以被看成一个 Context
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}
c.done 是“懒汉式”创建,只有调用了 Done() 方法的时候才会被创建。再次说明,函数返回的是一个只读的 channel,而且没有地方向这个 channel 里面写数据。所以,直接调用读这个 channel,协程会被 block 住。一般通过搭配 select 来使用。一旦关闭,就会立即读出零值。
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
// 必须要传 err
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // 已经被其他协程取消
}
// 给 err 字段赋值
c.err = err
// 关闭 channel,通知其他协程
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
// 遍历它的所有子节点
for child := range c.children {
// 递归地取消所有子节点
child.cancel(false, err)
}
// 将子节点置空
c.children = nil
c.mu.Unlock()
if removeFromParent {
// 从父节点中移除自己
removeChild(c.Context, c)
}
}
cancel() 方法的功能就是关闭 channel:c.done;递归地取消它的所有子节点;从父节点从删除自己。达到的效果是通过关闭 channel,将取消信号传递给了它的所有子节点。goroutine 接收到取消信号的方式就是 select 语句中的读 c.done 被选中
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }//返回的cannel函数
}
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}
这是一个暴露给用户的方法,传入一个父 Context(这通常是一个 background,作为根节点),返回新建的 context,新 context 的 done channel 是新建的.
当 WithCancel 函数返回的 CancelFunc 被调用或者是父节点的 done channel 被关闭(父节点的 CancelFunc 被调用),此 context(子节点) 的 done channel 也会被关闭
WithCancel返回的cannel函数中调用了cancelCtx.cancel
参数前者是 true,也就是说取消的时候,需要将自己从父节点里删除。第二个参数则是一个固定的取消错误类型
var Canceled = errors.New("context canceled"
还注意到一点,调用子节点 cancel 方法的时候,传入的第一个参数 removeFromParent 是 false。
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
...
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err) //?什么时候传true,什么时候传false
}
...
}
调用 WithCancel() 方法的时候,也就是新创建一个可取消的 context 节点时,返回的 cancelFunc 函数会传入 true。这样做的结果是:当调用返回的 cancelFunc 时,会将这个 context 从它的父节点里“除名”,因为父节点可能有很多子节点,你自己取消了,所以我要和你断绝关系,对其他人没影响
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}
代表一棵 context 树。当调用左图中标红 context 的 cancel 方法后,该 context 从它的父 context 中去除掉了:实线箭头变成了虚线。且虚线圈框出来的 context 都被取消了,圈内的 context 间的父子关系都荡然无存了
timeCtx
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
timerCtx 首先是一个 cancelCtx,所以它能取消。看下 cancel() 方法
func (c *timerCtx) cancel(removeFromParent bool, err error) {
// 直接调用 cancelCtx 的取消方法
c.cancelCtx.cancel(false, err)
if removeFromParent {
// 从父节点中删除子节点
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
// 关掉定时器,这样,在deadline 到来时,不会再次取消
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
创建 timerCtx
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
WithTimeout 函数直接调用了 WithDeadline,传入的 deadline 是当前时间加上 timeout 的时间,也就是从现在开始再经过 timeout 时间就算超时。也就是说,WithDeadline 需要用的是绝对时间。重点来看它
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
// 如果父节点 context 的 deadline 早于指定时间。直接构建一个可取消的 context。
// 原因是一旦父节点超时,自动调用 cancel 函数,子节点也会随之取消。
// 所以不用单独处理子节点的计时器时间到了之后,自动调用 cancel 函数
return WithCancel(parent)
}
// 构建 timerCtx
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: deadline,
}
// 挂靠到父节点上
propagateCancel(parent, c)
// 计算当前距离 deadline 的时间
d := time.Until(deadline)
if d <= 0 {
// 直接取消
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(true, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
// d 时间后,timer 会自动调用 cancel 函数。自动取消
c.timer = time.AfterFunc(d, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
valueCtx
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) String() string {
return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflect.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
对 key 的要求是可比较,因为之后需要通过 key 取出 context 中的值,可比较是必须的。
通过层层传递 context,最终形成这样一棵树
和链表有点像,只是它的方向相反:Context 指向它的父节点,链表则指向下一个节点。通过 WithValue 函数,可以创建层层的 valueCtx,存储 goroutine 间可以共享的变量。
取值的过程,实际上是一个递归查找的过程
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}
传递共享数据
package main
import (
"context"
"fmt"
)
type TraceId struct{}
func main() {
ctx := context.Background()
process(ctx)
//key不使用字符串
//ctx = context.WithValue(ctx, "traceId", "qcrao-2019")
ctx = context.WithValue(ctx, TraceId{}, "qcrao-2019")
process(ctx)
}
func process(ctx context.Context) {
traceId, ok := ctx.Value(TraceId{}).(string)
if ok {
fmt.Printf("process over. trace_id=%s\n", traceId)
} else {
fmt.Printf("process over. no trace_id\n")
}
}
超时取消 goroutine
func ctxTimeout() {
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*10)
// defer cancel()
go func(ctx context.Context) {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
// return
case <-time.After(time.Millisecond * 100):
fmt.Println("Time out")
}
}(ctx)
time.Sleep(time.Second)
}
设置截止时间,超时触发
func ctxDeadline() {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Millisecond))
defer cancel()
go func(ctx context.Context) {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
// return
case <-time.After(time.Millisecond * 100):
fmt.Println("Time out")
}
}(ctx)
time.Sleep(time.Second)
}
调用cancel,关闭goroutine
func ctxCancel() {
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
select {
case <-ctx.Done():
fmt.Println(ctx.Err())
return
case <-time.After(time.Millisecond * 100):
fmt.Println("Time out")
}
}(ctx)
cancel()
}
多任务时间
type paramKey struct{}
func main() {
//设置共享数据
c := context.WithValue(context.Background(),
paramKey{}, "abc")
//设置最多任务执行时长,子任务可根据总时长分配子任务时长
c, cancel := context.WithTimeout(c, 10*time.Second)
defer cancel()
go mainTask(c)
var cmd string
for {
fmt.Scan(&cmd)
if cmd == "c" {
cancel()
}
}
}
func mainTask(c context.Context) {
fmt.Printf("main task started with param %q\n", c.Value(paramKey{}))
go func() {
c1, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
smallTask(c1, "background_task", 9*time.Second)
}()
go func() {
//基于父context,创建子context
c1, cancel := context.WithTimeout(c, 10*time.Second)
defer cancel()
smallTask(c1, "sub_task", 9*time.Second)
}()
smallTask(c, "same_task", 8*time.Second)
}
func smallTask(c context.Context, name string, d time.Duration) {
fmt.Printf("%s started with param %q\n", name, c.Value(paramKey{}))
select {
//定时到时触发
case <-time.After(d):
fmt.Printf("%s done\n", name)
//context设定到时触发
case <-c.Done():
fmt.Printf("%s cancelled\n", name)
}
}