一、使用場景
大背景是從kafka 中讀取oplog進行增量處理,但是當我想發一條命令將這個增量過程阻塞,然后開始進行一次全量同步之后,在開始繼續增量。
所以需要對多個協程進行控制。
二、使用知識
1. 從一個未初始化的管道讀會阻塞
2.從一個關閉的管道讀不會阻塞
利用兩個管道和select 進行控制
三、上代碼
控制器代碼
package util
import (
"errors"
"sync"
)
const (
//STOP 停止
STOP = iota
//START 開始
START
//PAUSE 暫停
PAUSE
)
//Control 控制器
type Control struct {
ch1 chan struct{}
ch2 chan struct{}
stat int64
lock sync.RWMutex
}
var (
//ErrStat 錯誤狀態
ErrStat = errors.New("stat error")
)
//NewControl 獲得一個新Control
func NewControl() *Control {
return &Control{
ch1: make(chan struct{}),
ch2: nil,
stat: START,
lock: sync.RWMutex{},
}
}
//Stop 停止
func (c *Control) Stop() error {
c.lock.Lock()
defer c.lock.Unlock()
if c.stat == START {
c.ch2 = nil
close(c.ch1)
c.stat = STOP
} else if c.stat == PAUSE {
ch2 := c.ch2
c.ch2 = nil
close(c.ch1)
close(ch2)
c.stat = STOP
} else {
return ErrStat
}
return nil
}
//Pause 暫停
func (c *Control) Pause() error {
c.lock.Lock()
defer c.lock.Unlock()
if c.stat == START {
c.ch2 = make(chan struct{})
close(c.ch1)
c.stat = PAUSE
} else {
return ErrStat
}
return nil
}
//Start 開始
func (c *Control) Start() error {
c.lock.Lock()
defer c.lock.Unlock()
if c.stat == PAUSE {
c.ch1 = make(chan struct{})
close(c.ch2)
c.stat = START
} else {
return ErrStat
}
return nil
}
//C 控制管道
func (c *Control) C() <-chan struct{} {
c.lock.RLock()
defer c.lock.RUnlock()
return c.ch1
}
//Wait 等待
func (c *Control) Wait() bool {
c.lock.RLock()
ch2 := c.ch2
c.lock.RUnlock()
if ch2 == nil { //通過賦值nil 發送停止推出命令
return false
}
<-ch2 //會進行阻塞
return true
}
使用代碼
for {
select {
case part, ok := <-c.Partitions():
if !ok {
conf.Logger.Error("get kafka Partitions not ok", regular.Name)
return
}
go readFromPart(c, part, regular, respChan)
case <-regular.C(): //regular 為Control 類
if !regular.Wait() {
conf.Logger.Debug("Stop! ")
return
}
conf.Logger.Debug("Start! ")
}
}
這樣就可以隨時隨地的控制工程中的協程
regular := util.NewControl() regular.Pause() regular.Start() regular.Stop()
