golang官方包限流器使用和原理(golang.org/x/time/rate)


限流器模型

golang.org/x/time/rate 限流器目前提供了一種令牌桶算法的的限流器。

  • 請求需要拿到令牌才能接着往下執行, 邏輯上有一個令牌桶,桶的最大容量是固定的。

  • 當桶內令牌數 小於 桶的最大容量時, 以固定的頻率向桶內增加令牌直至令牌數滿。

  • 每個請求理論上消耗一個令牌(實際上提供了的方法每次可以消耗大於一個的令牌)

  • 限流器初始化時 令牌桶是滿的

簡單例子

package main

import (
    "fmt"
    "golang.org/x/time/rate"
    "time"
)

func main() {
    limiter := rate.NewLimiter(rate.Every(time.Millisecond * 31), 2)
    //time.Sleep(time.Second)
    for i := 0; i < 10; i++ {
        var ok bool
        if limiter.Allow() {
            ok = true
        }
        time.Sleep(time.Millisecond * 20)
        fmt.Println(ok, limiter.Burst())
    }
}

輸出

true 2
true 2
true 2
true 2
false 2
true 2
true 2
false 2
true 2
true 2

可以看出 一開始桶內令牌數是滿的。 由於生成令牌的間隔比請求的間隔多了11ms, 所以到后面每兩個請求后就會失敗一次。

限流器實現原理

限流器的數據結構和New函數如下:

type Limiter struct {
    limit Limit
    burst int

    mu     sync.Mutex
    tokens float64
    // last is the last time the limiter's tokens field was updated
    last time.Time
    // lastEvent is the latest time of a rate-limited event (past or future)
    lastEvent time.Time
}

func NewLimiter(r Limit, b int) *Limiter {
    return &Limiter{
        limit: r,
        burst: b,
    }
}

可見, 雖然在邏輯上, 令牌桶在沒滿的情況下, 是不斷往里面以一定時間間隔添加令牌, 但代碼實現上並沒有這樣的一個協程。 實際上桶剩余令牌的更新是推遲在每次消費的時候進行計算的。

核心函數

func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation

當調用 limiter.Allow()的時候, 底層調用實際即使該函數, 入參now=當前時間, n=1, maxFutrueReserve=0

maxFutrueReserve在Wait()方法內調用時, 值會大於0。

函數前半段: 當limit變量無限大(該變量是新增令牌的時間間隔的倒數, 無限大即表示無間隔), 直接返回TRUE即可

    lim.mu.Lock()

    if lim.limit == Inf {
        lim.mu.Unlock()
        return Reservation{
            ok:        true,
            lim:       lim,
            tokens:    n,
            timeToAct: now,
        }
    }

之調用lim.advance(now) 函數, 獲取上次令牌桶更新時間和此時桶內令牌數應該是多少(這一步就是上文提到的桶剩余令牌的延遲計算)

  • 先檢查傳入的時間是否先於數據結構內記錄的上次令牌數更新時間(並發的情況下, now變量的初始化未在鎖內執行, 這種情況很有可能發生), 如果是則將上次令牌時更新時間替換為當前時間
  • 獲取上次令牌數更新時間到現在的時間間隔, 如果超過讓令牌桶加到滿的時間間隔,就使用讓令牌桶加到滿的時間間隔。這個變量用於計算增加令牌桶的令牌數, 由於有容量限制,過大沒有意義
  • 計算令牌桶此時的個數, 返回
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
    last := lim.last
    if now.Before(last) {
        last = now
    }

    // Avoid making delta overflow below when last is very old.
    maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
    elapsed := now.Sub(last)
    if elapsed > maxElapsed {
        elapsed = maxElapsed
    }

    // Calculate the new number of tokens, due to time that passed.
    delta := lim.limit.tokensFromDuration(elapsed)
    tokens := lim.tokens + delta
    if burst := float64(lim.burst); tokens > burst {
        tokens = burst
    }

    return now, last, tokens
}

函數后半段:

  • 計算這次消耗的令牌數, 但是不是直接簡單比較消耗后剩余令牌數大於0, 而是計算還需要多少時間令牌桶能生成滿足需要的令牌數(如果消耗后令牌數仍大於0 則需要時間就是0)。 將該值和入參maxFutureReserve比較再得出結果。即當前令牌數可能是負的。 這么做的原因是限流器還提供了Wait方法, 當前令牌不足時可以阻塞等待至有令牌為止(提前消費令牌 然后阻塞一段時間), 而不是像Allow方法一樣立即返回False
   // Calculate the remaining number of tokens resulting from the request.
    tokens -= float64(n)

    // Calculate the wait duration
    var waitDuration time.Duration
    if tokens < 0 {
        waitDuration = lim.limit.durationFromTokens(-tokens)
    }

    // Decide result
    ok := n <= lim.burst && waitDuration <= maxFutureReserve

    // Prepare reservation
    r := Reservation{
        ok:    ok,
        lim:   lim,
        limit: lim.limit,
    }
    if ok {
        r.tokens = n
      	r.timeToAct = now.Add(waitDuration)
    }

    // Update state
    if ok {
        lim.last = now
        lim.tokens = tokens
        lim.lastEvent = r.timeToAct
    } else {
        lim.last = last
    }

    lim.mu.Unlock()
    return r
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM