漏斗限流是最常用的限流方法之一,漏斗流水的速率大於灌水的速率,漏斗就永遠裝不滿,反之水就會溢出。
所以漏斗的剩余空間就代表當前行為可以持續進行的數量,水流出的速率代表系統允許該行為的最大頻率。
import time class Funnel: def __init__(self, capacity, leaking_rate): self.capacity = capacity # 漏斗容量 self.leaking_rate = leaking_rate # 流出速率 self.left_quota = capacity # 漏斗剩余空間 self.leaking_ts = time.time() # 上一次漏水時間 def make_space(self): now_ts = time.time() delta_ts = now_ts - self.leaking_ts # 距離上一次漏水過去了多久 delta_quota = delta_ts * self.leaking_rate # 騰出的空間 if delta_quota < 1: # 騰的空間過小等待下一次 return self.left_quota += delta_quota # 增加剩余空間 self.leaking_ts = now_ts # 記錄漏水時間 if self.left_quota > self.capacity: # 剩余空間不得高於容量 self.left_quota = self.capacity def watering(self, quota): self.make_space() if self.left_quota >= quota: # 判斷剩余空間是否足夠 self.left_quota -= quota return True return False funnels = {} # 所有的漏斗 def is_action_allowed( user_id, action_key, capacity, leaking_rate): key = '{}:{}'.format(user_id, action_key) funnel = funnels.get(key) if not funnel: funnels[key] = Funnel(capacity, leaking_rate) return funnel.watering(1) for i in range(20): print is_action_allowed('tom', 'reply', 15, 0.5)
make_space方法是漏斗算法的核心,其在每次灌水前都會被調用以觸發漏水,給漏斗騰出空間,Funnel占據的空間大小不與行為頻率成正比,其空間占用是一個常量。
問題是分布式漏斗算法如何實現?
Funnel類其實就是一個高級字典,那么我們可以利用Redis中的hash結構來存儲對應字段,灌水時將字段取出進行邏輯運算后再存入hash結構中即可完成一次行為頻度的檢測。但這有個問題就是整個過程的原子性無法保證,意味着要用鎖來控制,但如果加鎖失敗,就要重試或者放棄,這回導致性能下降和影響用戶體驗,同時代碼復雜度也升高了,此時Redis提供了一個插件,Redis-Cell出現了。
Redis-Cell
Redis 4.0提供了一個限流Redis模塊,名稱為redis-cell,該模塊提供漏斗算法,並提供原子的限流指令。
該模塊只有一條指令cl.throttle
,其參數和返回值比較復雜。
> cl.throttle tom:reply 14 30 60 1
1) (integer) 0 # 0表示允許,1表示拒絕 2) (integer) 15 # 漏斗容量capacity 3) (integer) 14 # 漏斗剩余空間left_quota 4) (integer) -1 # 如果拒絕了,需要多長時間后再重試,單位秒 5) (integer) 2 # 多長時間后,漏斗完全空出來,單位秒
該指令意思為,允許用戶tom的reply行為的頻率為每60s最多30次,漏斗初始容量為15(因為是從0開始計數,到14為15個),默認每個行為占據的空間為1(可選參數)。
如果被拒絕,取返回數組的第四個值進行sleep即可作為重試時間,也可以異步定時任務來重試。
注意事項
由於Redis-Cell是基於Rust語言寫的插件,因此在安裝插件前要先安裝rust。
具體方法看GitHub。