前言
RateLimiter是基於令牌桶算法實現的一個多線程限流器,它可以將請求均勻的進行處理,當然他並不是一個分布式限流器,只是對單機進行限流。它可以應用在定時拉取接口數據,
預防單機過大流量使用。
原理
首先先講一下令牌桶的原理,每隔一段時間生產一個令牌放入桶里,請求在執行時需要拿到令牌才可以執行,如果拿不到令牌將等待令牌產生,一個生產者,多個消費者。
但是這樣的令牌桶有一個問題,如果CPU負載過高,生產令牌的線程沒有獲取到時間片生產令牌,那么限制的流量將會比設定值更低。
可能是出於這個原因,guava並沒有這樣做,而是一個惰性生產令牌,每次請求令牌時,通過當前時間和下次產生令牌時間的差值計算出現在有多少個令牌,如果當前時間比發放時間大,會獲得令牌,並且會生成令牌存儲。如果令牌不夠,則讓線程sleep,並且將下次令牌產生時間更新成當前時間+sleep時間
sleep,並且將下次發放令牌的時間,設置成當前時間+線程sleep的時間。這樣說,可能不是很清楚,看圖。
這樣做的好處是什么,如果獲取令牌的線程搶不到cpu,只是這個線程的執行時間會晚,其他線程不會受到影響。
源碼閱讀
public static void main(String[] args) { RateLimiter rateLimiter = RateLimiter.create(10); while (true) { long start = System.currentTimeMillis(); rateLimiter.acquire(); System.out.println(System.currentTimeMillis() - start); } } |
運行可以發現,上面的代碼除了第一次輸出的是0或者1,其他都接近100。下面先看一下RateLimiter.create做了哪些事情
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) { //創建對象,並且賦值,permitsPerSecond這個是我們設置的qps,stopwatch這個相當於一個計時器,記錄相對時間,類似於我上面圖中的10ms,100ms等,下面傳入的1.0就是一秒的意思,設置上速率就是一秒多少次 RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0); rateLimiter.setRate(permitsPerSecond); return rateLimiter; } 看一下setRate public final void setRate(double permitsPerSecond) { checkArgument(permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive"); //從名字就可以看出這是一個互斥鎖,這個互斥鎖采用了double check懶漢單例模式生成, synchronized (mutex()) { doSetRate(permitsPerSecond, stopwatch.readMicros()); } } |
下面看一下doSetRate,真正開始設置速率了
final void doSetRate(double permitsPerSecond, long nowMicros) { //這個方法非常重要里面是nextFreeTicketMicros和storedPermits的設置,在生成對象的時候沒有用,獲取令牌時再講 resync(nowMicros); //這個就是令牌生成間隔,微秒表示 double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond; this.stableIntervalMicros = stableIntervalMicros; 這里又有一個doSetRate doSetRate(permitsPerSecond, stableIntervalMicros); } void doSetRate(double permitsPerSecond, double stableIntervalMicros) { double oldMaxPermits = this.maxPermits; //設置最大令牌 maxPermits = maxBurstSeconds * permitsPerSecond; //設置存儲令牌 if (oldMaxPermits == Double.POSITIVE_INFINITY) { storedPermits = maxPermits; } else { storedPermits = (oldMaxPermits == 0.0) ? 0.0 // initial state : storedPermits * maxPermits / oldMaxPermits; } |
到了這里整個創建過程就結束了,基本上就是一些設置,創建了鎖,設置了生成令牌的間隔時間等等,下面看一下獲取令牌的方法。
//獲取一個令牌 public double acquire() { return acquire(1); } public double acquire(int permits) { //reserve返回等待時間,內部進行了令牌的獲取 long microsToWait = reserve(permits); stopwatch.sleepMicrosUninterruptibly(microsToWait); return 1.0 * microsToWait / SECONDS.toMicros(1L); } final long reserve(int permits) { checkPermits(permits); //創建對象時生成的鎖 synchronized (mutex()) { //stopwatch.readMicros()拿到當前的時間,預訂 return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } final long reserveAndGetWaitLength(int permits, long nowMicros) { //將數據透傳,拿到最早可預訂的時間,如果預訂時間在未來時間,返回一個大於0的值為等待時間 long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); } |
下面的代碼就是我圖中的實現
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { //這個方法很重要先看下面這個方法的講解,在從這里往下看 resync(nowMicros); //返回下次發放令牌時間,如果這個時間大於當前時間,在調用的上層會sleep long returnValue = nextFreeTicketMicros; //拿到此次花費的令牌 double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 如果令牌不夠,這里就會大於0,下面就會得出一個等待時間 double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); //將下次發放令牌的時間加上等待時間 this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); this.storedPermits -= storedPermitsToSpend; return returnValue; } void resync(long nowMicros) { if (nowMicros > nextFreeTicketMicros) { //當前時間大於下次令牌發放時間,新的令牌為當前時間減去下次發放令牌時間除以生成令牌的時間間隔 double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); //不能超過最大令牌數 storedPermits = min(maxPermits, storedPermits + newPermits); //更新下次發放令牌時間為當前時間 nextFreeTicketMicros = nowMicros; } } |
總結
RateLimiter的原理用語言描述,很容易把人繞暈,上面的圖其實是最好的總結,懂得原理才能更好的使用,在多種限流器中選擇合適的限流器。了解源碼,能更進一步的掌握原理,並且從源碼中可以學到設計思路和
一些設計模式的應用。