1.序
除開前面章節講到的令牌桶算法實現的網絡限流外, 還有另外一種常見的限流算法, 漏桶算法
2. 漏桶算法
漏桶算法(Leaky Bucket) 是網絡世界中 流量整形(Traffic Shaping)或速率限制(Rate Limiting)時經常使用的一種算法,它的主要目的是控制數據注入到網絡的速率,平滑網絡上的突發流量。漏桶算法提供了一種機制,通過它,突發流量可以被整形以便為網絡提供一個穩定的流量。
Bursty Flow
在上圖中,水龍頭代表着突發流量(Bursty Flow)。當網絡中存在突發流量,且無任何調控時,就會出現像 Bursty Data
處類似的場景。主機以 12 Mbps
的速率發送數據,時間持續 2s
,總計 24 Mbits
數據。隨后主機暫停發送 5s,然后再以 2 Mbps 的速率發送數據 3s,最終總共發送了 6 Mbits 的數據。
因此主機在 10s 內總共發送了 30 Mbits 的數據。但這里存在一個問題,就是數據的發送並不是平滑的,存在一個較大的波峰。若所有流量都是如此的傳輸方式,將會 “旱的旱死澇的澇死”,對系統並不是特別的友好
Fixed Flow
為了解決 Bursty Flow 場景的問題。漏桶(Leaky Bucket)出現了,漏桶具有固定的流出速率、固定的容量大小。
在上圖中,漏桶在相同的 10s 內以 3 Mbps 的速率持續發送數據來平滑流量。若水(流量)來的過猛,但水流(漏水)不夠快時,其最終結果就是導致水直接溢出,呈現出來就是拒絕請求/排隊等待的表現。另外當 Buckets 空時,是會出現一次性倒入達到 Bucket 容量限制的水的可能性,此時也可能會出現波峰。
簡單來講就是,一個漏桶,水流進來,但漏桶只有固定的流速來流出水,若容量滿即拒絕,否則將持續保持流量流出。
漏桶算法的主要作用就是避免出現有的時候流量很高
Go 中比較常用的漏桶算法的實現就是來自 uber 的 ratelimit,下面我們就會看一下這個庫的使用方式和源碼
2.1 API
type Clock // interface
type Limiter
func New(rate int, opts ...Option) Limiter
func NewUnlimited() Limiter
type Option
func Per(per time.Duration) Option
func WithClock(clock Clock) Option
func WithSlack(slack int) Option
-
Clock 是一個接口,計時器的最小實現,有兩個方法,分別是當前的時間和睡眠
type Clock interface { Now() time.Time Sleep(time.Duration) }
-
Limiter 也是一個接口, 只有一個Take方法, 執行這個方法的時候如果觸發了
rps
限制則會阻塞住type Limiter interface { // Take should block to make sure that the RPS is met. Take() time.Time }
-
NewLimter
和NewUnlimited
會分別初始化一個無鎖的限速器和沒有任何限制的限速器 -
Option 是在初始化的時候的額外參數. Option有三個方法
Per
可以修改時間單位, 默認是秒所以我們默認限制的是rps, 如果改成分鍾那么就是rpm了WithClock
可以修改時鍾,這個用於在測試的時候可以 mock 掉不使用真實的時間WithSlack
用於修改松弛時間,也就是可以允許的突發流量的大小,默認是 Pre / 10 ,這個后面會講到
2.3 基於漏桶算法實現IP限流中間件
-
demo.go
package main import ( "fmt" "net/http" "sync" "github.com/gin-gonic/gin" "go.uber.org/ratelimit" ) // Gin中間件 func NewLimiter(rps int) gin.HandlerFunc { limiters := sync.Map{} return func(c *gin.Context) { // 獲取限速器 // key 除了 ip 之外也可以是其他的,例如 header,user name 等 key := c.ClientIP() l, _ := limiters.LoadOrStore(key, ratelimit.New(rps)) now := l.(ratelimit.Limiter).Take() fmt.Printf("now: %s\n", now) c.Next() } } func main() { e := gin.Default() // 新建一個限速器,允許突發 3 個並發 e.Use(NewLimiter(3)) e.GET("ping", func(c *gin.Context) { c.String(http.StatusOK, "pong") }) err := e.Run(":8080") if err != nil { fmt.Printf("Start server err, %s", err.Error()) } }
-
測試, 使用
go-stress-testing
進行壓測,並發100
go-stress-testing -c 100 -u http://172.20.192.1:8080/ping
-
測試結果
root@failymao:~# go-stress-testing -c 20 -u http://172.20.192.1:8080/ping 開始啟動 並發數:20 請求數:1 請求參數: request: form:http url:http://172.20.192.1:8080/ping method:GET headers:map[] data: verify:statusCode timeout:30s debug:false ─────┬───────┬───────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┬──────── 耗時│ 並發數│ 成功數│ 失敗數│ qps │最長耗時│最短耗時│平均耗時│下載字節│字節每秒│ 錯誤碼 ─────┼───────┼───────┼───────┼────────┼────────┼────────┼────────┼────────┼────────┼──────── 1s│ 13│ 13│ 0│ 233.84│ 676.26│ 5.00│ 85.53│ 52│ 51│200:13 2s│ 16│ 16│ 0│ 62.13│ 1676.71│ 5.00│ 321.92│ 64│ 31│200:16 3s│ 19│ 19│ 0│ 31.17│ 2676.45│ 5.00│ 641.63│ 76│ 25│200:19 3s│ 20│ 20│ 0│ 26.28│ 3027.85│ 5.00│ 760.94│ 80│ 26│200:20 ************************* 結果 stat **************************** 處理協程數量: 20 請求總數(並發數*請求數 -c * -n): 20 總請求時間: 3.032 秒 successNum: 20 failureNum: 0 ************************* 結果 end ****************************
查看結果發現為什么第一秒的時候完成了 13 個請求,不是限制的 3rps 么?不要慌,我們看看它的實現就知道了
2.4 實現
這個庫有基於互斥鎖的實現和基於 CAS 的無鎖實現,默認使用的是無鎖實現版本,所以我們主要看無鎖實現的源碼
type state struct {
last time.Time
sleepFor time.Duration
}
type atomicLimiter struct {
state unsafe.Pointer
//lint:ignore U1000 Padding is unused but it is crucial to maintain performance
// of this rate limiter in case of collocation with other frequently accessed memory.
padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.
perRequest time.Duration
maxSlack time.Duration
clock Clock
}
atomicLimiter
結構體
state
是一個狀態的指針,用於存儲上一次的執行的時間,以及需要 sleep 的時間padding
是一個無意義的填充數據,為了提高性能,避免 cpu 緩存的 false sharing- Go 並發內存模型 為了能夠最大限度的利用 CPU 的能力,會做很多喪心病狂的優化,其中一種就是 cpu cache
- cpu cache 一般是以 cache line 為單位的,在 64 位的機器上一般是 64 字節
- 所以如果我們高頻並發訪問的數據小於 64 字節的時候就可能會和其他數據一起緩存,其他數據如果出現改變就會導致 cpu 認為緩存失效,這就是 false sharing
- 所以在這里為了盡可能提高性能,填充了 56 字節的無意義數據,因為 state 是一個指針占用了 8 個字節,所以 64 - 8 = 56
- 剩下三個字段和 Option 中的三個方法意義對應
perRequest
就是單位,默認是秒maxSlack
松弛時間,也就是可以允許的突發流量的大小,默認是 Pre / 10 ,這個后面會講到clock
時鍾,這個用於在測試的時候可以 mock 掉不使用真實的時間
Take 方法
func (t *atomicLimiter) Take() time.Time {
var (
// 狀態
newState state
// 用於表示原子操作是否成功
taken bool
// 需要 sleep 的時間
interval time.Duration
)
// 如果 CAS 操作不成功就一直嘗試
for !taken {
// 獲取當前的時間
now := t.clock.Now()
// load 出上一次調用的時間
previousStatePointer := atomic.LoadPointer(&t.state)
oldState := (*state)(previousStatePointer)
newState = state{
last: now,
sleepFor: oldState.sleepFor,
}
// 如果 last 是零值的話,表示之前就沒用過,直接保存返回即可
if oldState.last.IsZero() {
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
continue
}
// sleepFor 是需要睡眠的時間,由於引入了松弛時間,所以 sleepFor 可能是一個
// maxSlack ~ 0 之間的一個值,所以這里需要將現在的需要 sleep 的時間和上一次
// sleepFor 的值相加
newState.sleepFor += t.perRequest - now.Sub(oldState.last)
// 如果距離上一次調用已經很久了,sleepFor 可能會是一個很小的值
// 最小值只能是 maxSlack 的大小
if newState.sleepFor < t.maxSlack {
newState.sleepFor = t.maxSlack
}
// 如果 sleepFor 大於 0 的話,計算出需要 sleep 的時間
// 然后將 state.sleepFor 置零
if newState.sleepFor > 0 {
newState.last = newState.last.Add(newState.sleepFor)
interval, newState.sleepFor = newState.sleepFor, 0
}
// 保存狀態
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
// sleep interval
t.clock.Sleep(interval)
return newState.last
}
3. 總結
漏桶和令牌桶的最大的區別就是,令牌桶是支持突發流量的,但是漏桶是不支持的。但是 uber 的這個庫通過引入彈性時間的方式也讓漏桶算法有了類似令牌桶能夠應對部分突發流量的能力,並且實現上還非常的簡單,值得學習。
3.1 漏桶 vs 令牌桶
漏桶算法和令牌桶 算法本質上都是為了做流量整形(Traffic Shaping)或速率限制(Rate Limiting),避免系統因為大流量而被打崩,但兩者核心差異在於限流的方向是相反的。
令牌桶限制的是流量的平均流入速率,並且允許一定程度的突然性流量,最大速率為桶的容量和生成 token 的速率。而漏桶限制的是流量的流出速率,是相對固定的。
因此也會相對的帶來一個問題,在某些場景中,漏桶算法並不能有效的使用網絡資源,因為漏桶的漏出速率是相對固定的,所以在網絡情況比較好,沒有擁塞的狀態下,漏桶依然是限制住的,並沒有辦法放開量。而令牌桶算法則不同,其能夠限制平均速率的同時支持一定程度的突發流量。