client-go和golang源碼中的技巧


client-go中有很多比較有意思的實現,如定時器,同步機制等,可以作為移植使用。下面就遇到的一些技術講解,首先看第一個:

  • sets.String(k8s.io/apimachinery/pkg/util/sets/string.go)

實現了對golang map的key的處理,如計算交集,並集等。實際中可能會遇到需要判斷兩個map的key是否重合的場景,此時可以使用下述方式實現,sets.StringKeySet函數將入參的map的key抽取成一個String類型,這樣就可以使用String的方法操作key

ps:更多功能參見源碼

package main

import (
    "fmt"
    "k8s.io/apimachinery/pkg/util/sets"
)

func main(){
    map1 := map[string]int{"aaa":1,"bbb":2,"ccc":3}
    map2 := map[string]int{"ccc":1,"ddd":2,"eee":3}
    newmap1 := sets.StringKeySet(map1)
    newmap2 := sets.StringKeySet(map2)
    fmt.Println(newmap1.List(),newmap2.List())
    fmt.Println(newmap1.HasAny(newmap2.List()...)) //3個點用於把數組打散為單個元素
}

結果:true
  • 同步機制
    • sync.Mutex(golang 內置方法),用於數據同步

有2個方法:

func (m *Mutex) Lock()
func (m *Mutex) Unlock()

類似C語言線程的互斥鎖,用於對數據進行加解鎖操作。當數據被加鎖后,未獲得該鎖的程序將無法讀取被加鎖的數據。從下面例子可以看出在數據被解鎖前其他協程無法對該數據進行讀寫操作。

ps: read data的數據也可能為“data

package main

import (
    "fmt"
    "sync"
)

type LockTest struct {
    l sync.Mutex
    data string
}

func main(){
    lockTest := LockTest{sync.Mutex{},"data"}
    go func() {
        lockTest.l.Lock()
        fmt.Println("sleep begin")
        time.Sleep(time.Second*2)
        fmt.Println("sleep end")
        lockTest.l.Unlock()
    }()
    
    time.Sleep(time.Second*1)
    
    go func() {
        lockTest.l.Lock()
        fmt.Println("read data:",lockTest.data)
        lockTest.l.Unlock()
    }()

    go func() {
        lockTest.l.Lock()
        fmt.Println("write data begin")
        lockTest.data="new data"
        fmt.Println("write data end")
        lockTest.l.Unlock()
    }()

    time.Sleep(time.Second*5)
}

結果: sleep begin sleep end write data begin write data end read data: new data
    • sync.RWMutex(golang 內置方法),用於數據同步

讀寫鎖,含4個方法,前2個為讀鎖,后2個為寫鎖,使用時要一一對應。寫鎖會阻塞讀寫操作,讀鎖不會阻塞寫操作,讀鎖可以有多個,讀鎖之間不會相互阻塞,適用於讀多寫少的場景。因此如果單純使用RWMutex.Lock/RWMutex.UnLock與使用Mutex.Lock/Mutex.UnLock效果相同

func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()

讀寫鎖一般是讀鎖和寫鎖結合使用的。在有寫鎖的時候,讀鎖會被阻塞,等待寫鎖釋放后才能進行讀操作。

ps:sync.Mutex和sync.RWMutex一般都是內置在結構體中使用,用於保護本結構體的數據

package main

import (
    "fmt"
    "sync"
)
type LockTest struct {
    l sync.RWMutex
    data string
}

func main(){
    lockTest := LockTest{sync.RWMutex{},"data"}
    go func() {
        lockTest.l.Lock()
        fmt.Println("write data begin")
        lockTest.data="new data"
        time.Sleep(time.Second*3)
        fmt.Println("write data end")
        lockTest.l.Unlock()
    }()

    time.Sleep(time.Second*1)

    go func() {
        lockTest.l.RLock()  //阻塞等待寫鎖釋放
        fmt.Println("read begin")
        fmt.Println("read data:",lockTest.data)
        fmt.Println("read begin")
        lockTest.l.RUnlock()
    }()

    time.Sleep(time.Second*5)
}

結果:
write data begin write data end read begin read data:
new data read begin
    • sync.Cond(golang 內置方法),用於條件變量

sync.Cond用於條件等待,在滿足某些條件時程序才能繼續執行。它包含如下3個方法:Wait()會掛起其所在的協程等待Signal()或Broadcast()的喚醒。

func (c *Cond) Wait() 
func (c *Cond) Signal()
func (c *Cond) Broadcast() 

官方推薦的典型用法如下。由於喚醒協程並不意味着條件已就緒,因此在喚醒后需要檢測是否本協程的條件已經滿足。

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

使用Signal()喚醒的方式如下,Signal()用於當次喚醒一個協程。如果注釋掉下例中的Signal(),那么兩個協程會一直Wait(),並不會繼續執行。

package main

import (
    "fmt"
    "sync"
)

func main(){
    l := sync.Mutex{}
    c := sync.NewCond(&l)
    condition1 := false
    condition2 := false
go func() { c.L.Lock() for !condition1 { c.Wait() } fmt.Println("condition1=true,run1") c.L.Unlock() }() go func() { c.L.Lock() for !condition2 { c.Wait() } fmt.Println("condition2=true,run2") c.L.Unlock() }()
time.Sleep(time.Second
*1) fmt.Println("signal-1") condition1=true c.Signal() time.Sleep(time.Second*1) fmt.Println("signal-2") condition2=true c.Signal() time.Sleep(time.Second*10) } 結果: signal-1 condition1=true,run1 signal-2 condition2=true,run2

使用Signal()喚醒協程時需要注意,在多個協程等待時,該函數並沒有指定需要喚醒哪一個協程。下面程序的輸出可能為“condition1=true,run1”也可能為“condition2=true,run2”。因此Signal一般適用於僅有一個協程等待的情況,否則可能造成混亂。

package main

import (
    "fmt"
    "sync"
)

func main(){
    l := sync.Mutex{}
    c := sync.NewCond(&l)
    condition1 := false
    condition2 := false
    go func() {
        c.L.Lock()
        for !condition1 {
            c.Wait()
        }
        fmt.Println("condition1=true,run1")
        c.L.Unlock()
    }()

    go func() {
        c.L.Lock()
        for !condition2 {
            c.Wait()
        }
        fmt.Println("condition2=true,run2")
        c.L.Unlock()
    }()
    time.Sleep(time.Second*1)
    condition1=true
    condition2=true
    c.Signal()
    time.Sleep(time.Second*10)
}

Broadcast()比較簡單,即喚醒所有等待的協程

package main

import (
    "fmt"
    "sync"
)

func main(){
    l := sync.Mutex{}
    c := sync.NewCond(&l)
    condition1 := false
    condition2 := false
    go func() {
        c.L.Lock()
        for !condition1 {
            c.Wait()
        }
        fmt.Println("condition1=true,run1")
        c.L.Unlock()
    }()

    go func() {
        c.L.Lock()
        for !condition2 {
            c.Wait()
        }
        fmt.Println("condition2=true,run2")
        c.L.Unlock()
    }()
    time.Sleep(time.Second*1)
    condition1=true
    condition2=true
    c.Broadcast()
    time.Sleep(time.Second*10)
}
 結果: condition1=true,run1 condition2=true,run2
    • sync.waitgroup,用於等待協程執行完成

sync.waitgroup有如下3個方法,Add(delta int)入參表示需要等待的協程的個數,如2表示需要等待2個協程完成;Done()表示該協程結束;Wait()用於阻塞主協程,等待所有協程結束后釋放。

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait() 

舉例如下,啟動10個協程,Wait()會阻塞,直到所有的協程執行Done()。

ps: Add(delta int)函數的入參很重要,入參大於實際需要等待的協程會導致主協程一致阻塞,小於需要等待的協程會導致某些協程提前退出

import (
    "fmt"
    "sync"
)

func main(){
    wg := sync.WaitGroup{}
    wg.Add(10)

    for i := 0; i < 10; i++ {
        go func(i int) {
            defer wg.Done()
            fmt.Print(i, " ")
        }(i)
    }

    wg.Wait()
}
 結果: 9 4 0 1 2 3 6 5 7 8 
    • 協程間使用chan進行同步

下例中使用chan實現主協程控制write,並使用write控制read。協程關閉使用close()函數

ps:使用chan進行協程同步一般將chan作為入參傳入,或在函數內部實現協程間的同步。為方便驗證,下面例子將所有chan作為全局變量

package main

import (
    "fmt"
    "sync"
)
var speakCh = make(chan string)
var stopReadChan = make(chan struct{})
var stopWriteChan = make(chan struct{})

func readChan(stopCh <-chan struct{}){
    for {
        select {
        case words := <- speakCh:
            fmt.Println("received:",words)
        case <- stopCh:
            fmt.Println("stop read!")
            return
        }
    }
}

func writeChan(stopCh <-chan struct{}){
    for {
        select {
        case <- stopCh:
            fmt.Println("stop write!")
            close(stopReadChan)
            return
        default:
        }
        speakCh <- "hi"
        time.Sleep(time.Second*2)
    }
}

func main(){
    go readChan(stopReadChan)
    go writeChan(stopWriteChan)

    time.Sleep(time.Second*6)
    close(stopWriteChan)
    time.Sleep(time.Second*6)
}

結果: received: hi received: hi received: hi stop write! stop read!
    • 協程間使用context進行同步

context用於對協程進行管理,如主動退出協程,超時退出協程等,可以看作是使用chan管理協程的擴展。在使用時首先創建一個context,使用cancel()可以取消context,並使用Done()返回的chan管理協程。

官方推薦的用法如下:

func Stream(ctx context.Context, out chan<- Value) error {
    for {
        v, err := DoSomething(ctx)
        if err != nil {
            return err
        }
        select {
        case <-ctx.Done():
            return ctx.Err()
        case out <- v:
        }
    }
}

下例中使用context.WithCancel創建一個context,使用cancel()給這一組context發送信號,在協程中使用Done()處理退出事件。

package main

import (
    "fmt"
    "context"
)

func main(){
    ctx,cancel := context.WithCancel(context.Background())
    go testCtx(ctx,"ctx1")
    go testCtx(ctx,"ctx2")
    go testCtx(ctx,"ctx3")
    time.Sleep(time.Second*3)
    cancel()

    time.Sleep(time.Second*5)
}

func testCtx(ctx context.Context, name string) error{
    for {
        select {
        case <-ctx.Done():
            fmt.Println("ctx.Done:",name)
            return ctx.err()
        default:
            fmt.Println("default:",name)
            time.Sleep(time.Second*2)
        }
    }
}

結果: default: ctx1 default: ctx3 default: ctx2 default: ctx3 default: ctx1 default: ctx2 ctx.Done: ctx1 ctx.Done: ctx3 ctx.Done: ctx2

創建context的方式如下,其余三個可以看作是WithCancel的擴展

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)              //需要主動取消context
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)   //在deadline時間點后取消context
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) //在超時后取消context
func WithValue(parent Context, key, val interface{}) Context

再看一個WithTimeout的例子,下面設置context的超時時間為3s且沒有主動cancel(),3s超時后可以看到該context對應的協程正常退出

func main(){
    ctx,_ := context.WithTimeout(context.Background(),time.Second*3)
    go testCtx(ctx,"ctx1")
    go testCtx(ctx,"ctx2")
    go testCtx(ctx,"ctx3")
    time.Sleep(time.Second*5)
}

結果: default: ctx3 default: ctx1 default: ctx2 default: ctx3 default: ctx1 default: ctx2 ctx.Done: ctx3 ctx.Done: ctx2 ctx.Done: ctx1 

context可以看作是一個樹,當cancel一個context時,會同時cancle它的子context。下面首先創建一個ctx,然后在此ctx下面創建一個subctx。當執行cancle() ctx時會同時cancel() 該的subctx。

context.Background()就是已經實現的首個context。

func main(){
    ctx,cancel := context.WithCancel(context.Background())
    subctx,_ := context.WithCancel(ctx)
    go testCtx(ctx,"ctx1")
    go testCtx(subctx,"subctx1")
    go testCtx(subctx,"subctx2")
    time.Sleep(time.Second*3)
    canclel()

    time.Sleep(time.Second*10)
}

結果: default: subctx2 default: ctx1 default: subctx1 default: subctx2 default: ctx1 default: subctx1 timeout ctx.Done: ctx1 ctx.Done: subctx1 ctx.Done: subctx2

下例中僅cancel() subctx,可以看到並沒有影響subctx的parent。

func main(){
    ctx, _:= context.WithCancel(context.Background())
    subctx,subcancel := context.WithCancel(ctx)
    go testCtx(ctx,"ctx1")
    go testCtx(subctx,"subctx1")
    go testCtx(subctx,"subctx2")
    time.Sleep(time.Second*3)
    subcancel()

    time.Sleep(time.Second*10)
}

結果: default: subctx1 default: subctx2 default: ctx1 default: ctx1 default: subctx1 default: subctx2 timeout ctx.Done: subctx2 default: ctx1 ctx.Done: subctx1 default: ctx1 default: ctx1 default: ctx1 default: ctx1
    • wait.Group(k8s.io/apimachinery/pkg/util/wait/wait.go)

client-go中的wait.Group創造性地將sync.WaitGroup與chan和ctx結合,實現了協程間同步和等待全部Group中的協程結束的功能。由於StartWithChannel和StartWithContext的入參函數類型比較固定,因此使用上並不通用,但可以作為參考,在實際中擴展使用。下例中給出了簡單用法。

func (g *Group) Wait() 
func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{}))
func (g *Group) StartWithContext(ctx context.Context, f func(context.Context))
func main(){
    f1:= func(ctx context.Context) {
        for {
            select {
            case <- ctx.Done():
                return
            default:
                fmt.Println("hi11")
                time.Sleep(time.Second)
            }
        }
    }
    wg := wait.Group{}
    ctx, cancel := context.WithCancel(context.Background())
    wg.StartWithContext(ctx,f1)
    time.Sleep(time.Second*3)
    cancel()
    wg.Wait()
}

結果:
hi
hi
hi

 

  •  定時器
    • ticker定時器

首先看一下一般使用的定時器,client-go中比較復雜的定時器也是在此基礎上封裝的。下面例子中給出的是ticker定時器,它會按照一定的時間頻率往Ticker.C中發time.Time類型的數據,可以在協程中通過判斷Ticker.C來執行定時任務。下例來自官方,實現每秒執行一次打印,

import (
    "fmt"
    "time"
)

func main(){
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    done := make(chan bool)
    go func() {
        time.Sleep(10 * time.Second)
        done <- true
    }()
    for {
        select {
        case <-done:
            fmt.Println("Done!")
            return
        case t := <-ticker.C:
            fmt.Println("Current time: ", t)
        }
    }
}

結果: Current time: 2019-07-04 14:30:37.9088968 +0800 CST m=+5.328291301 Current time: 2019-07-04 14:30:38.9089349 +0800 CST m=+6.328328801 Current time: 2019-07-04 14:30:39.9101415 +0800 CST m=+7.329534901 Current time: 2019-07-04 14:30:40.9095174 +0800 CST m=+8.328910201 Current time: 2019-07-04 14:30:41.9092961 +0800 CST m=+9.328688301 Current time: 2019-07-04 14:30:42.9087682 +0800 CST m=+10.328159801 Current time: 2019-07-04 14:30:43.9088604 +0800 CST m=+11.328251401 Current time: 2019-07-04 14:30:44.909609 +0800 CST m=+12.328999501 Current time: 2019-07-04 14:30:45.9094782 +0800 CST m=+13.328868101 Current time: 2019-07-04 14:30:46.909006 +0800 CST m=+14.328395401 Done!

需要注意的是使用ticker並不能保證程序被精確性調度,如果程序的執行時間大於ticker的調度周期,那么程序的觸發周期會發生偏差(可能由於系統cpu占用過高,網絡延遲等原因)。如下面例子中,ticker觸發周期為1s,但程序執行大於2s,此時會出現程序執行頻率不一致的情況。適用於周期性觸發一個任務。

func main(){
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()
    done := make(chan bool)
    go func() {
        time.Sleep(10 * time.Second)
        done <- true
    }()
    for {
        select {
        case <-done:
            fmt.Println("Done!")
            return
        case t := <-ticker.C:
            time.Sleep(time.Second*2)
            fmt.Println("Current time: ", t)
        }
    }
}

結果: Current time: 2019-07-04 14:56:52.5446526 +0800 CST m=+5.281916601 Current time: 2019-07-04 14:56:53.5452488 +0800 CST m=+6.282512201 //和上一條相差1s,但和下一條相差2s Current time: 2019-07-04 14:56:55.5443528 +0800 CST m=+8.281615101 Current time: 2019-07-04 14:56:57.5449183 +0800 CST m=+10.282179401 Current time: 2019-07-04 14:56:59.5448671 +0800 CST m=+12.282127101 Done!
    • timer定時器

timer的機制和ticker相同,在定時器超時后往一個chan中發送time.Time數據。不同的是ticker可以周期性調度,timer只會執行一次,如果需要重復調度,需要使用Reset函數重置timer。利用該機制,可以在同一個timer上以不同間隔調度程序。

func main(){
    timer := time.NewTimer(time.Second)
    defer timer.Stop()
    t := <-timer.C
    fmt.Println("Current time: ", t)
    timer.Reset(time.Second*2)
    t = <-timer.C
    fmt.Println("Current time: ", t)
    timer.Reset(time.Second*3)
    t = <-timer.C
    fmt.Println("Current time: ", t)
}

結果: Current time: 2019-07-04 15:47:01.7518201 +0800 CST m=+5.312710501 Current time: 2019-07-04 15:47:03.7766692 +0800 CST m=+7.337558501 Current time: 2019-07-04 15:47:06.7770913 +0800 CST m=+10.337978901

使用timer需要注意Reset函數只能在timer超時后使用,否則將無效。因為Timer.C的長度只有1,如果前面一個定時器結束前執行了Reset,那么前面的定時器會被取消。具體可以參見這里

func NewTimer(d Duration) *Timer {
    c := make(chan Time, 1)
    ...
}

下面例子中可以看出,多次執行Reset並不會多次觸發定時任務,在前一個定時器超時前執行Reset,會取消前一個定時器並以Reset中的duration開始計時。

func main(){
    fmt.Println("now time: "time.Now())
    timer := time.NewTimer(time.Second*5)
    
    defer timer.Stop()
    timer.Reset(time.Second*2)
    timer.Reset(time.Second*2)
    timer.Reset(time.Second*2)


    go func() {
        for ; ;  {
            select {
            case t:=<- timer.C:
                fmt.Println("Current time: ", t)
            }
        }
    }()
    
    time.Sleep(time.Second*10)
}

結果: now time: 2019-07-04 16:16:31.7246084 +0800 CST m=+4.281414201 Current time: 2019-07-04 16:16:33.7505395 +0800 CST m=+6.307344201

官方推薦的用法如下,由於沒有加鎖,此方法不能在多個協程中同時使用。

if !t.Stop() {
    <-t.C
}
t.Reset(d)

func AfterFunc(d Duration, f func()) *Timer函數用於在d時間超時后,執行f函數。注意返回的timer需要手動stop

timer := time.AfterFunc(time.Second*5, func() {
    fmt.Println("timeout")
})

time.Sleep(time.Second*6)
timer.Stop()

更多timer的用法可以參見官方文檔

  • wait實現(k8s.io/apimachinery/pkg/util/wait/wait.go)
    • wait中實現了很多與定時相關的函數,首先來看第一組:
func Forever(f func(), period time.Duration) 
func Until(f func(), period time.Duration, stopCh <-chan struct{}) 
func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) 
func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) 
func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) 

Until函數每period會調度f函數,如果stopCh中有停止信號,則退出。當程序運行時間超過period時,也不會退出調度循環,該特性和Ticker相同。底層使用Timer實現。

Until和NonSlidingUntil為一對,UntilWithContext和NonSlidingUntilWithContext為一對,區別只是定時器啟動時間點不同,可以簡單用下圖表示:

這兩種(帶“NonSliding”前綴的)函數在處理正常程序時沒有什么區別,但在一些場景下會有不同的地方。下面例子中使用wait.NonSlidingUntil處理的程序中sleep了2s,這可以表示程序因為某種原因導致超出正常處理時間。此時可以看到結果中的“num 1”和“num 2”是同時調用的

func main(){
    first := true
    num := 0
    stopCh:=make(chan struct{} )
    
    go func() {
        time.Sleep(time.Second*10)
        close(stopCh)
        fmt.Println("done")
    }()

    go wait.NonSlidingUntil(func(){
        if true == first{
            time.Sleep(time.Second*2)
            first=false
        }
num = num + 1 fmt.Println(
"num:",num,"time",time.Now()) },time.Second*1,stopCh) time.Sleep(time.Second*100) } 結果: num: 1 time 2019-07-04 21:05:59.5298524 +0800 CST m=+26.277103101 num: 2 time 2019-07-04 21:05:59.554999 +0800 CST m=+26.302249701 num: 3 time 2019-07-04 21:06:00.5559679 +0800 CST m=+27.303218601 num: 4 time 2019-07-04 21:06:01.5566608 +0800 CST m=+28.303911501

將上述程序的wait.NonSlidingUntil替換為wait.Until,得到如下結果,可以看到首次(異常)和第二次(正常)的間隔正好是wait.Until中設置的調度周期,即1s。

ps:大部分場景下兩者使用上並沒有什么不同,畢竟正常情況下程序運行時間必然小於程序調度周期。如果需要在程序處理延時的情況下盡快進行下一次調度,則選擇帶”NonSliding“前綴的函數

結果:
num: 1 time 2019-07-04 21:09:14.9643889 +0800 CST m=+2.010865201 num: 2 time 2019-07-04 21:09:15.9935285 +0800 CST m=+3.040004801 num: 3 time 2019-07-04 21:09:16.9956846 +0800 CST m=+4.042160901
    • func Forever(f func(), period time.Duration)

該函數比較簡單,就是取消了用於控制Until停止的stopCh。以永遠不停止的方式周期性執行f函數

    •  func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error 

ExponentialBackoff可以實現在函數執行錯誤后實現以指數退避方式的延時重試。ExponentialBackoff內部使用的是time.Sleep

ExponentialBackoff的首個入參Backoff如下:

  • Duration:表示初始的延時時間
  • Factor:指數退避的因子
  • Jitter:可以看作是偏差因子,該值越大,每次重試的延時的可選區間越大
  • Steps:指數退避的步數,可以看作程序的最大重試次數
  • Cap:用於在Factor非0時限制最大延時時間和最大重試次數,為0表示不限制最大延時時間
type Backoff struct {
    // The initial duration.
    Duration time.Duration
    // Duration is multiplied by factor each iteration. Must be greater
    // than or equal to zero.
    Factor float64
    // The amount of jitter applied each iteration. Jitter is applied after
    // cap.
    Jitter float64
    // The number of steps before duration stops changing. If zero, initial
    // duration is always used. Used for exponential backoff in combination
    // with Factor.
    Steps int
    // The returned duration will never be greater than cap *before* jitter
    // is applied. The actual maximum cap is `cap * (1.0 + jitter)`.
    Cap time.Duration
}

第二個參數ConditionFunc表示運行的函數,返回的bool值表示該函數是否執行成功,如果執行成功則會退出指數退避

type ConditionFunc func() (done bool, err error)

下面做幾組測試:

=> 當Factor和Jitter都為0時,可以看到調度周期是相同的,即Duration的值(1s)。

import (
    "fmt"
    "k8s.io/apimachinery/pkg/util/wait"
    "time"
)


func main(){
    var DefaultRetry = wait.Backoff{
        Steps:    5,
        Duration: 1 * time.Second,
        Factor:   0,
        Jitter:   0,
    }

    fmt.Println(wait.ExponentialBackoff(DefaultRetry,func() (bool, error){
        fmt.Println(time.Now())
        return false,nil
    }))
}

結果: 2019-07-05 10:17:33.9610108 +0800 CST m=+0.079831101 2019-07-05 10:17:34.961132 +0800 CST m=+1.079952301 2019-07-05 10:17:35.961512 +0800 CST m=+2.080332301 2019-07-05 10:17:36.9625144 +0800 CST m=+3.081334701 2019-07-05 10:17:37.9636334 +0800 CST m=+4.082453701 timed out waiting for the condition

=> 先看Jitter對duration的影響,Jitter(duration, b.Jitter)的計算方式如下,如果入參的Factor為0,而Jitter非0,則將Factor調整為1。rand.Float64()為[0.0,1.0)的偽隨機數。

將Jitter調整為0.5,根據下面計算方式預期duration為[1s,1.5s)。運行程序得出如下結果,觀察可以發現,duration大概是1.4s

if maxFactor <= 0.0 {
    maxFactor = 1.0 } wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   0,
    Jitter:   0.5,
}

結果: 2019-07-05 10:21:49.5993445 +0800 CST m=+2.382669101 2019-07-05 10:21:50.9026701 +0800 CST m=+3.685994701 2019-07-05 10:21:52.3759019 +0800 CST m=+5.159226401 2019-07-05 10:21:53.7086265 +0800 CST m=+6.491951001 2019-07-05 10:21:54.9283913 +0800 CST m=+7.711715901 timed out waiting for the condition

=> Factor非0且Jitter為0時,對duration的調整如下

if b.Factor != 0 {
    b.Duration = time.Duration(float64(b.Duration) * b.Factor)
    if b.Cap > 0 && b.Duration > b.Cap {
        b.Duration = b.Cap
        b.Steps = 0
    }
}

從公式中可以得出,Factor對程序執行的延的影響如下,可以看到Factor為1時並沒有什么作用

duration(1) = duration
duration(2) = Factor * duration(1) duration(3) = Factor * duration(2) ... duration(n) = Factor * duration(n-1)

Factor為1時,可以看到函數執行間隔均為1s

var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   1,
    Jitter:   0,
}

結果: 2019-07-05 10:28:50.8481017 +0800 CST m=+2.363983901 2019-07-05 10:28:51.8482274 +0800 CST m=+3.364109601 2019-07-05 10:28:52.8482359 +0800 CST m=+4.364118201 2019-07-05 10:28:53.848687 +0800 CST m=+5.364569301 2019-07-05 10:28:54.849409 +0800 CST m=+6.365291201 timed out waiting for the condition

調整Factor為3,預期延時時間為1s,3s,9s,27s,從測試結果看與預期相符

var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   3,
    Jitter:   0,
}
 結果: 2019-07-05 10:35:06.9030165 +0800 CST m=+0.077746101 2019-07-05 10:35:07.9038392 +0800 CST m=+1.078568701 2019-07-05 10:35:10.9038733 +0800 CST m=+4.078602901 2019-07-05 10:35:19.9042141 +0800 CST m=+13.078943601 2019-07-05 10:35:46.904647 +0800 CST m=+40.079376501 timed out waiting for the condition

=> 當Factor和Jitter非0時的延遲計算方式如下:

    save_duration(0) = duration
duration(1) =  Jitter(save_duration(0) , b.Jitter)
    save_duration(1) = Factor * save_duration(0) 

duration(2) = Jitter(save_duration(1), b.Jitter)
    save_duration(2) = Factor * save_duration(1)

duration(3) = Jitter(save_duration(2), b.Jitter)
    save_duration = Factor * save_duration(2)
...
duration(n) = Jitter(save_duration(n-1), b.Jitter)

設置Backoff參數如下,按照上述公式得出的期望延時為[1,1.1),[3,3.3),  [9,9.9), [27,29.7)。實際運行如下,小數點一位后四舍五入得出實際延時為1.1, 3.3, 9.6, 28.2,與預期相符。

var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   3,
    Jitter:   0.1,
}

結果: 2019-07-05 11:42:54.8779046 +0800 CST m=+0.135740401 2019-07-05 11:42:55.9399737 +0800 CST m=+1.197782901 2019-07-05 11:42:59.2240904 +0800 CST m=+4.481817401 2019-07-05 11:43:08.8232438 +0800 CST m=+14.080730501 2019-07-05 11:43:37.0058953 +0800 CST m=+42.262752301 timed out waiting for the condition

=> 最后看下Backoff.Cap的影響。設置Cap為10s,預期會比上面不帶Cap的少執行2次(不帶Cap限制的在Step為0時還會執行一次)。實際執行上也是如此

var DefaultRetry = wait.Backoff{
    Steps:    5,
    Duration: 1 * time.Second,
    Factor:   3,
    Jitter:   0.1,
    Cap:      time.Second*10,
}
 結果: 2019-07-05 12:02:43.8678742 +0800 CST m=+0.120673901 2019-07-05 12:02:44.9294079 +0800 CST m=+1.182202101 2019-07-05 12:02:48.2125558 +0800 CST m=+4.465333301

ExponentialBackoff借鑒了TCP協議的指數退避算法,適用於可能會產生資源競爭的場景。指數退避可以有效地在沒有緩存處理或緩存不足的場景下減小服務端的壓力。

    •  wait庫的第二組
func Poll(interval, timeout time.Duration, condition ConditionFunc) error 
func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error 
func PollInfinite(interval time.Duration, condition ConditionFunc) error 
func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error 
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error 
func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error 

Poll表示以interval的周期執行condition函數,直到timeout超時或condition返回true/err非空。

wait.Poll和wait.Until使用上還是有些類似的,區別在於一個使用timeout限制超時時間,一個使用chan提供主動停止調度。

import (
    "fmt"
    "k8s.io/apimachinery/pkg/util/wait"
    "time"
)


func main(){

    wait.Poll(time.Second, time.Second*5, func() (done bool, err error) {
        fmt.Println(time.Now())
        return false,nil
    })

結果: 2019-07-05 13:43:31.2622405 +0800 CST m=+1.069324901 2019-07-05 13:43:32.2619663 +0800 CST m=+2.069050701 2019-07-05 13:43:33.2626114 +0800 CST m=+3.069695801 2019-07-05 13:43:34.2626876 +0800 CST m=+4.069772001 2019-07-05 13:43:35.2624168 +0800 CST m=+5.069501201 2019-07-05 13:43:35.2624168 +0800 CST m=+5.069501201

PollInfinite相比Poll取消了timeout的限制。

PollUntil相比Until來說,PollUntil在condition函數返回true或error的時候會退出調度。

Poll和PollImmediate為一組,PollInfinite和PollImmediateInfinite為一組,PollUntil和PollImmediateUntil為一組,它們的細微差別在於前者在執行condition函數前會等待interval時間,后者則會首先運行condition函數,然后再檢查是否需要等待(condition返回true或err非空時不會再等待)。如果不關注這點差異,用哪個都可以。

    •  heap 堆(k8s.io/client-go/tools/cache)

實現heap需要實現下面Interface接口,heap使用隊列實現了一個完全二叉樹

// heap.Interface
type Interface interface {
    sort.Interface
    Push(x interface{}) // add x as element Len()
    Pop() interface{}   // remove and return element Len() - 1.
}

// sort.Interface
type Interface interface {
    // Len is the number of elements in the collection.
    Len() int
    // Less reports whether the element with
    // index i should sort before the element with index j.
    Less(i, j int) bool
    // Swap swaps the elements with indexes i and j.
    Swap(i, j int)
}

heap對外提供的方法為如下:

func Init(h Interface)
func Push(h Interface, x interface{})
func Pop(h Interface) interface{}
func Remove(h Interface, i int) interface{}
func Fix(h Interface, i int) // 當修改完隊列中的index=i的元素后,重新排序

例子如下:

import (
    "container/heap"
    "fmt"
)
    
func GetAllHeapItems(t Heap_t,name string){
    items := []interface{}{}
    for t.Len() != 0{
        items = append(items, heap.Pop(&t))
    }
    fmt.Println(name,":",items)
}

type Heap_t []int
func (h Heap_t)Len() int{return len(h)}
func (h Heap_t)Less(i,j int)bool {return h[i]<h[j]}
func (h Heap_t)Swap(i,j int){h[i], h[j] = h[j], h[i]}
func (h *Heap_t)Push(x interface{}){*h = append(*h,x.(int))}
func (h *Heap_t)Pop() interface{}{
    if h.Len() == 0{
        return nil
    }
    x := (*h)[len(*h)-1]
    *h = (*h)[0:(len(*h) - 1)]
    return x
}

func main(){
    h := &Heap_t{4,2,6,80,100,45} //[1 2 4 8 80 45 6 23 56 100]
    heap.Init(h)
    GetAllHeapItems(*h,"h")

    h1 := &Heap_t{4,2,6,80,100,45}
    heap.Init(h1)
    h1.Push(3)
    GetAllHeapItems(*h1,"h1")

    h2 := &Heap_t{4,2,6,80,100,45}
    heap.Init(h2)
    GetAllHeapItems(*h2,"h2")

    h3 := &Heap_t{4,2,6,80,100,45}
    heap.Init(h3)
    (*h3)[2] = 200
    fmt.Println(1111,h3)
    heap.Fix(h3,2)
    fmt.Println(2222,h3)
    GetAllHeapItems(*h3,"h3")
}

結果: h : [2 4 6 45 80 100] h1 : [2 3 4 6 45 80 100] h2 : [2 4 6 45 80 100] 1111 &[2 4 200 80 100 45] 2222 &[2 4 45 80 100 200] h3 : [2 4 45 80 100 200]

heap的實現比較巧妙,使用隊列實現了完全二叉樹,比較適用於查詢頻繁的場景,原理解析可以參見這里

更多使用和例子參見官方文檔

  • klog(k8s.io/klog) 實現執行日志打印
  • 使用select{}實現主協程不退出
func main(){
    ...
    select{}
}
  •  可以使用switch對地址進行判斷
package main

import (
    "fmt"
)

func main(){
    type emptyCtx int
    background := new(emptyCtx)
    todo       := new(emptyCtx)
    typeSwitch := func (i interface{}) {
        switch i {
        case background:
            fmt.Println("background")
        case todo:
            fmt.Println("todo")
        default:
            fmt.Println("default")
        }
    }

    typeSwitch(background)
}

結果: true
  •  限流("golang.org/x/time/rate")

rate.Limiter使用令牌桶實現限流,它共有3組對外方法,多數場景下使用Wait,用於等待令牌。更多解析可以參見這里

func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation

Limiter使用如下函數初始化,b為令牌桶大小,初始是滿的;r為每秒往桶里面放token的速率

func NewLimiter(r Limit, b int) *Limiter

下面是Limiter的簡單用法,其最終耗費時間為5s。計算方式為:

    • 需要處理20個事件
    • 由於桶一開始是滿的,所以立即可以處理已有的10個token
    • 還剩下10個事件,此時桶已經空了,每秒往桶里面放token的速率為每秒2個,因此每秒可以處理2個事件,處理10個事件需要5秒,這就是5s的由來,即(20-b)/r

ps:NewLimiter的入參r可以大於b,但其實此時大於的部分並沒有意義,受限於桶的大小,多余的token會被丟棄

import (
    "context"
    "fmt"
    "golang.org/x/time/rate"
    "time"
)

func main(){
    l := rate.NewLimiter(2, 10)
    ctx,cancel := context.WithCancel(context.Background())
    defer cancel()
    
    f:= func(ctx context.Context) error{
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:            
        }
        //do something
        return nil
    }

    start := time.Now()
    for i := 0; i < 20; i++ {
        err := l.Wait(ctx)
        if nil != err{
            fmt.Println(err)
            fmt.Println(time.Since(start))
            return
        }
        go f(ctx)
    }
    fmt.Println(time.Since(start))
}
 結果: 5.0000404s

下例中,如果每秒處理的令牌小於2,調度頻率為實際執行頻率(每秒一次)

func main(){
    l := rate.NewLimiter(2, 10)
    ctx,cancel := context.WithCancel(context.Background())
    defer cancel()

    f:= func(ctx context.Context) error{
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }
  time.Sleep(1*time.Second) return nil
    }

    start := time.Now()
    for i := 0; i < 20; i++ {
        err := l.Wait(ctx)
        if nil != err{
            fmt.Println(err)
            fmt.Println(time.Since(start))
            return
        }
        f(ctx)
    }
    fmt.Println(time.Since(start))
}

結果: 20.0107691s

WaitN用於判斷是否可以同時執行n個事件,每次消耗n個令牌。如下例子的總時間算法為:(5*6-10)/2=10

import (
    "context"
    "fmt"
    "golang.org/x/time/rate"
    "time"

)

func main(){
    l := rate.NewLimiter(2, 10)
    ctx,cancel := context.WithCancel(context.Background())
    defer cancel()

    f:= func(ctx context.Context) error{
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }
        return nil
    }

    start := time.Now()
    for i := 0; i < 6; i++ {
        err := l.WaitN(ctx,5)
        if nil != err{
            fmt.Println(err)
            fmt.Println(time.Since(start))
            return
        }
        f(ctx)
    }
    fmt.Println(time.Since(start))
}
 結果: 10.0011304s

讀取yaml文件 

假設yaml文件內容如下,yaml文件中的每個首字母必須小寫。PS:yaml文件中的所有字符建議都小寫

scrapeInterval: 60
endpoints:
  - endpoint: cn-shanghai.log.aliyuncs.com
    accessKeyID: LTAI2KSu0MDauu2r
    accessKeySecret: D3m0j7vDmrAWf33SFUh3LJRF1QGgTu
    project: avacar-slb-sls
    logstore: avacar-sls
    consumerGroupName: endCursor
    consumerName: endConsumer1
    cursorPosition: END_CURSOR

在代碼中定義對應的解析結構體,結構體成員首字母大寫,注意每個元素后面的yaml對應的字符串需要與yaml文件中的元素對應,大小寫一致。

type Config struct {
    ScrapeInterval int32
    Endpoints []Endpoints`yaml: "endpoints"`
}

type Endpoints struct {
    Endpoint string `yaml:"endpoint"`
    AccessKeyID string `yaml:"accessKeyID"`
    AccessKeySecret string `yaml:"accessKeySecret"`
    Project string `yaml:"project"`
    Logstore string `yaml:"logstore"`
    ConsumerGroupName string `yaml:"consumerGroupName"`
    ConsumerName string `yaml:"consumerName"`
    CursorPosition string `yaml:"cursorPosition"`
}

使用如下方式即可將yaml文件的內容提取出來,即config

var config Config
configContent, err := ioutil.ReadFile("D:\\test.yaml")
if err != nil {
    log.Panic("open file failed")
    return
}

yaml.Unmarshal(configContent,&config)

sync.Pool設計的目的是用來保存和復用臨時對象,以減少內存分配,降低CG壓力,在大量復用變量的場景下能顯著提高運行效率

type S struct {
    num int
}

func BenchmarkWithPool(b *testing.B) {
    var s *S
    var pool = sync.Pool{
        New: func() interface{} { return new(S) },
    }
    for i := 0; i < b.N; i++ {
        for j := 0; j < 10000; j++ {
            s = pool.Get().(*S)
            s.num = 1
            s.num++
            pool.Put(s)
        }
    }
}

func BenchmarkWithNoPool(b *testing.B) {
    var s *S
    for i := 0; i < b.N; i++ {
        for j := 0; j < 10000; j++ {
            s = &S{num: 1}
            s.num++
        }
    }
}

func main(){
    t1 := time.Now().Nanosecond()
    BenchmarkWithPool(&testing.B{N:10})
    t2 := time.Now().Nanosecond() - t1
    fmt.Println("t2 =",t2)

    t3 := time.Now().Nanosecond()
    BenchmarkWithNoPool(&testing.B{N:10})
    t4 := time.Now().Nanosecond() - t3
    fmt.Println("t4 =",t4)}
}

結果:

t2 = 1992800
t4 = 999000

從下面可以看出,put和get是按照順序一對一的,如果get完,則調用New函數創建一個新的元素返回

// 建立對象
var pipe = &sync.Pool{New:func()interface{}{return "Hello,BeiJing"}}

// 放入
pipe.Put("Hello,World1")
pipe.Put("Hello,World2")
pipe.Put("Hello,World3")
// 取出
log.Println(pipe.Get())
log.Println(pipe.Get())
log.Println(pipe.Get())
// 再取就沒有了,會自動調用NEW
log.Println(pipe.Get())

結果:

2019/12/02 15:24:47 Hello,World1
2019/12/02 15:24:47 Hello,World2
2019/12/02 15:24:47 Hello,World3
2019/12/02 15:24:47 Hello,BeiJing

下圖來自golang sync.Pool 分析

 

 

 

 參考:

https://www.flysnow.org/2017/05/12/go-in-action-go-context.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM