限流

限流又稱為流量控制(流控),通常是指限制到達系統的並發請求數。

我們生活中也會經常遇到限流的場景,比如:某景區限制每日進入景區的游客數量為8萬人;沙河地鐵站早高峰通過站外排隊逐一放行的方式限制同一時間進入車站的旅客數量等。

限流雖然會影響部分用戶的使用體驗,但是卻能在一定程度上報障系統的穩定性,不至於崩潰(大家都沒了用戶體驗)。

而互聯網上類似需要限流的業務場景也有很多,比如電商系統的秒殺、微博上突發熱點新聞、雙十一購物節、12306搶票等等。這些場景下的用戶請求量通常會激增,遠遠超過平時正常的請求量,此時如果不加任何限制很容易就會將后端服務打垮,影響服務的穩定性。

此外,一些廠商公開的API服務通常也會限制用戶的請求次數,比如百度地圖開放平台等會根據用戶的付費情況來限制用戶的請求數等。

常用的限流策略

漏桶

漏桶法限流很好理解,假設我們有一個水桶按固定的速率向下方滴落一滴水,無論有多少請求,請求的速率有多大,都按照固定的速率流出,對應到系統中就是按照固定的速率處理請求。

漏桶法的關鍵點在於漏桶始終按照固定的速率運行,但是它並不能很好的處理有大量突發請求的場景,畢竟在某些場景下我們可能需要提高系統的處理效率,而不是一味的按照固定速率處理請求。

關於漏桶的實現,uber團隊有一個開源的github.com/uber-go/ratelimit庫。 這個庫的使用方法比較簡單,Take() 方法會返回漏桶下一次滴水的時間。

import (
	"fmt"
	"time"

	"go.uber.org/ratelimit"
)

func main() {
    rl := ratelimit.New(100) // per second

    prev := time.Now()
    for i := 0; i < 10; i++ {
        now := rl.Take()
        fmt.Println(i, now.Sub(prev))
        prev = now
    }

    // Output:
    // 0 0
    // 1 10ms
    // 2 10ms
    // 3 10ms
    // 4 10ms
    // 5 10ms
    // 6 10ms
    // 7 10ms
    // 8 10ms
    // 9 10ms
}

  

它的源碼實現也比較簡單,這里大致說一下關鍵的地方,有興趣的同學可以自己去看一下完整的源碼。

限制器是一個接口類型,其要求實現一個Take()方法:

type Limiter interface {
	// Take方法應該阻塞已確保滿足 RPS
	Take() time.Time
}

  

實現限制器接口的結構體定義如下,這里可以重點留意下maxSlack字段,它在后面的Take()方法中的處理。

type limiter struct {
	sync.Mutex                // 鎖
	last       time.Time      // 上一次的時刻
	sleepFor   time.Duration  // 需要等待的時間
	perRequest time.Duration  // 每次的時間間隔
	maxSlack   time.Duration  // 最大的富余量
	clock      Clock          // 時鍾
}

  

limiter結構體實現Limiter接口的Take()方法內容如下:

// Take 會阻塞確保兩次請求之間的時間走完
// Take 調用平均數為 time.Second/rate.
func (t *limiter) Take() time.Time {
	t.Lock()
	defer t.Unlock()

	now := t.clock.Now()

	// 如果是第一次請求就直接放行
	if t.last.IsZero() {
		t.last = now
		return t.last
	}

	// sleepFor 根據 perRequest 和上一次請求的時刻計算應該sleep的時間
	// 由於每次請求間隔的時間可能會超過perRequest, 所以這個數字可能為負數,並在多個請求之間累加
	t.sleepFor += t.perRequest - now.Sub(t.last)

	// 我們不應該讓sleepFor負的太多,因為這意味着一個服務在短時間內慢了很多隨后會得到更高的RPS。
	if t.sleepFor < t.maxSlack {
		t.sleepFor = t.maxSlack
	}

	// 如果 sleepFor 是正值那么就 sleep
	if t.sleepFor > 0 {
		t.clock.Sleep(t.sleepFor)
		t.last = now.Add(t.sleepFor)
		t.sleepFor = 0
	} else {
		t.last = now
	}

	return t.last
}

  

上面的代碼根據記錄每次請求的間隔時間和上一次請求的時刻來計算當次請求需要阻塞的時間——sleepFor,這里需要留意的是sleepFor的值可能為負,在經過間隔時間長的兩次訪問之后會導致隨后大量的請求被放行,所以代碼中針對這個場景有專門的優化處理。創建限制器的New()函數中會為maxSlack設置初始值,也可以通過WithoutSlack這個Option取消這個默認值。

func New(rate int, opts ...Option) Limiter {
	l := &limiter{
		perRequest: time.Second / time.Duration(rate),
		maxSlack:   -10 * time.Second / time.Duration(rate),
	}
	for _, opt := range opts {
		opt(l)
	}
	if l.clock == nil {
		l.clock = clock.New()
	}
	return l
}

  

令牌桶

令牌桶其實和漏桶的原理類似,令牌桶按固定的速率往桶里放入令牌,並且只要能從桶里取出令牌就能通過,令牌桶支持突發流量的快速處理。

對於從桶里取不到令牌的場景,我們可以選擇等待也可以直接拒絕並返回。

對於令牌桶的Go語言實現,大家可以參照github.com/juju/ratelimit庫。這個庫支持多種令牌桶模式,並且使用起來也比較簡單。

創建令牌桶的方法:

// 創建指定填充速率和容量大小的令牌桶
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
// 創建指定填充速率、容量大小和每次填充的令牌數的令牌桶
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket
// 創建填充速度為指定速率和容量大小的令牌桶
// NewBucketWithRate(0.1, 200) 表示每秒填充20個令牌
func NewBucketWithRate(rate float64, capacity int64) *Bucket

  

取出令牌的方法如下:

// 取token(非阻塞)
func (tb *Bucket) Take(count int64) time.Duration
func (tb *Bucket) TakeAvailable(count int64) int64
// 最多等maxWait時間取token
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)

// 取token(非阻塞)
func (tb *Bucket) Wait(count int64)
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool

  

雖說是令牌桶,但是我們沒有必要真的去生成令牌放到桶里,我們只需要每次來取令牌的時候計算一下,當前是否有足夠的令牌就可以了,具體的計算方式可以總結為下面的公式:

當前令牌數 = 上一次剩余的令牌數 + (本次取令牌的時刻-上一次取令牌的時刻)/放置令牌的時間間隔 * 每次放置的令牌數

  

github.com/juju/ratelimit這個庫中關於令牌數計算的源代碼如下:

func (tb *Bucket) currentTick(now time.Time) int64 {
	return int64(now.Sub(tb.startTime) / tb.fillInterval)
}
func (tb *Bucket) adjustavailableTokens(tick int64) {
	if tb.availableTokens >= tb.capacity {
		return
	}
	tb.availableTokens += (tick - tb.latestTick) * tb.quantum
	if tb.availableTokens > tb.capacity {
		tb.availableTokens = tb.capacity
	}
	tb.latestTick = tick
	return
}

  

獲取令牌的TakeAvailable()函數關鍵部分的源代碼如下:

func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
	if count <= 0 {
		return 0
	}
	tb.adjustavailableTokens(tb.currentTick(now))
	if tb.availableTokens <= 0 {
		return 0
	}
	if count > tb.availableTokens {
		count = tb.availableTokens
	}
	tb.availableTokens -= count
	return count
}

  

大家從代碼中也可以看到其實令牌桶的實現並沒有很復雜。

gin框架中使用限流中間件

在gin框架構建的項目中,我們可以將限流組件定義成中間件。

這里使用令牌桶作為限流策略,編寫一個限流中間件如下:

func RateLimitMiddleware(fillInterval time.Duration, cap int64) func(c *gin.Context) {
	bucket := ratelimit.NewBucket(fillInterval, cap)
	return func(c *gin.Context) {
		// 如果取不到令牌就返回響應
		if bucket.TakeAvailable(1) > 0 {
			c.String(http.StatusOK, "rate limit...")
			c.Abort()
			return
		}
		c.Next()
	}
}

  

對於該限流中間件的注冊位置,我們可以按照不同的限流策略將其注冊到不同的位置,例如:

  1. 如果要對全站限流就可以注冊成全局的中間件。
  2. 如果是某一組路由需要限流,那么就只需將該限流中間件注冊到對應的路由組即可。