guava--RateLimiter源碼分析


限流是保護高並發系統的三把利器之一,另外兩個是緩存和降級。

限流的目的是通過對並發訪問/請求進行限速或者一個時間窗口內的的請求進行限速來保護系統,一旦達到限制速率則可以拒絕服務或進行流量整形

常用的限流方式和場景有:

A、限制總並發數(比如數據庫連接池、線程池)
B、限制瞬時並發數(如nginx的limitconn模塊,用來限制瞬時並發連接數,Java的Semaphore也可以實現)
C、限制時間窗口內的平均速率(如Guava的RateLimiter、nginx的limitreq模塊,限制每秒的平均速率);
D、其他還有如限制遠程接口調用速率、限制MQ的消費速率。另外還可以根據網絡連接數、網絡流量、CPU或內存負載等來限流。
比如說,我們需要限制方法被調用的並發數不能超過100(同一時間並發數),則我們可以用信號量 Semaphore實現。可如果我們要限制方法在一段時間內平均被調用次數不超過100,則需要使用 RateLimiter

Guava是Java領域優秀的開源項目,它包含了Google在Java項目中使用一些核心庫,包含集合(Collections),緩存(Caching),並發編程庫(Concurrency),常用注解(Common annotations),String操作,I/O操作方面的眾多非常實用的函數。 本文重點介紹RateLimiter。

一、RateLimiter源碼

RateLimiter使用的是一種叫令牌桶的流控算法,RateLimiter會按照一定的頻率往桶里扔令牌,線程拿到令牌才能執行,比如你希望自己的應用程序QPS不要超過1000,那么RateLimiter設置1000的速率后,就會每秒往桶里扔1000個令牌。Guava的 RateLimiter提供了令牌桶算法實現:平滑突發限流(SmoothBursty)和平滑預熱限流(SmoothWarmingUp)實現

1.1、RateLimiter包路徑

package com.google.common.util.concurrent;
@ThreadSafe @Beta
public abstract class RateLimiter {

RateLimiter經常用於限制對一些物理資源或者邏輯資源的訪問速率。與Semaphore 相比,Semaphore 限制了並發訪問的數量而不是使用速率。(注意盡管並發性和速率是緊密相關的,比如參考Little定律

通過設置許可證的速率來定義RateLimiter。在默認配置下,許可證會在固定的速率下被分配,速率單位是每秒多少個許可證。為了確保維護配置的速率,許可會被平穩地分配,許可之間的延遲會做調整。
可能存在配置一個擁有預熱期的RateLimiter 的情況,在這段時間內,每秒分配的許可數會穩定地增長直到達到穩定的速率。

舉例來說明如何使用RateLimiter,想象下我們需要處理一個任務列表,但我們不希望每秒的任務提交超過兩個:

//速率是每秒兩個許可
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List tasks, Executor executor) {
    for (Runnable task : tasks) {
        rateLimiter.acquire(); // 也許需要等待
        executor.execute(task);
    }
}

再舉另外一個例子,想象下我們制造了一個數據流,並希望以每秒5kb的速率處理它。可以通過要求每個字節代表一個許可,然后指定每秒5000個許可來完成:

// 每秒5000個許可
final RateLimiter rateLimiter = RateLimiter.create(5000.0);
void submitPacket(byte[] packet) {
    rateLimiter.acquire(packet.length);
    networkService.send(packet);
}

有一點很重要,那就是請求的許可數從來不會影響到請求本身的限制(調用acquire(1) 和調用acquire(1000) 將得到相同的限制效果,如果存在這樣的調用的話),但會影響下一次請求的限制,也就是說,如果一個高開銷的任務抵達一個空閑的RateLimiter,它會被馬上許可,但是下一個請求會經歷額外的限制,從而來償付高開銷任務。注意:RateLimiter 並不提供公平性的保證。

1.2、RateLimiter方法摘要

static RateLimiter create(double permitsPerSecond):根據指定的穩定吞吐率創建RateLimiter,這里的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少查詢)。

static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond)創建平滑突發限流SmoothBursty

static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit):根據指定的穩定吞吐率和預熱期來創建RateLimiter,這里的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少個請求量),在這段預熱時間內,RateLimiter每秒分配的許可數會平穩地增長直到預熱期結束時達到其最大速率。(只要存在足夠請求數來使其飽和)。

static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit, double coldFactor):創建平滑預熱限流SmoothWarmingUp

final double getRate():返回RateLimiter 配置中的穩定速率,該速率單位是每秒多少許可數。

final void setRate(double permitsPerSecond):更新RateLimiter的穩定速率,參數permitsPerSecond 由構造RateLimiter的工廠方法提供。

double acquire(int permits):從RateLimiter獲取指定許可數,該方法會被阻塞直到獲取到請求。

double  acquire():從RateLimiter獲取一個許可,該方法會被阻塞直到獲取到請求。此方法等效於acquire(1)。

boolean tryAcquire(int permits, long timeout, TimeUnit unit):從RateLimiter 獲取指定許可數如果該許可數可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可數的話,那么立即返回false (無需等待)。

boolean tryAcquire(long timeout, TimeUnit unit):從RateLimiter 獲取許可如果該許可可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可的話,那么立即返回false(無需等待)。

boolean tryAcquire(int permits):如果可以立即獲得許可證,則從該限速器處獲得許可證。此方法等效於tryAcquire(permits,0,anyUnit)。

boolean tryAcquire():如果可以立即獲得許可證,則從該限速器處獲得許可證。等同於tryAcquire(1)。

1.3、RateLimiter創建方法介紹

RateLimiter的類圖如上所示,其中 RateLimiter是入口類,它提供了兩套工廠方法來創建出兩個子類。這很符合《Effective Java》中的用靜態工廠方法代替構造函數的建議,畢竟該書的作者也正是Guava庫的主要維護者,二者配合"食用"更佳。

// RateLimiter提供了兩個工廠方法,最終會調用下面兩個函數,生成RateLimiter的兩個子類。
  @VisibleForTesting
  static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }

  @VisibleForTesting
  static RateLimiter create(
      SleepingStopwatch stopwatch,
      double permitsPerSecond,
      long warmupPeriod,
      TimeUnit unit,
      double coldFactor) {
    RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }

1.3.1、平滑突發限流

使用 RateLimiter的靜態方法創建一個限流器,設置每秒放置的令牌數為5個。返回的RateLimiter對象可以保證1秒內不會給超過5個令牌,並且以固定速率進行放置,達到平滑輸出的效果。

public void testSmoothBursty() {
 RateLimiter r = RateLimiter.create(5);
 while (true) {
 System.out.println("get 1 tokens: " + r.acquire() + "s");
 }
 /**
     * output: 基本上都是0.2s執行一次,符合一秒發放5個令牌的設定。
     * get 1 tokens: 0.0s 
     * get 1 tokens: 0.182014s
     * get 1 tokens: 0.188464s
     * get 1 tokens: 0.198072s
     * get 1 tokens: 0.196048s
     * get 1 tokens: 0.197538s
     * get 1 tokens: 0.196049s
     */
}

RateLimiter使用令牌桶算法,會進行令牌的累積,如果獲取令牌的頻率比較低,則不會導致等待,直接獲取令牌。

public void testSmoothBursty2() {
 RateLimiter r = RateLimiter.create(2);
 while (true)
 {
    System.out.println("get 1 tokens: " + r.acquire(1) + "s");
 try {
    Thread.sleep(2000);
 } catch (Exception e) {}
 System.out.println("get 1 tokens: " + r.acquire(1) + "s");
 System.out.println("get 1 tokens: " + r.acquire(1) + "s");
 System.out.println("get 1 tokens: " + r.acquire(1) + "s");
 System.out.println("end");
 /**
       * output:
       * get 1 tokens: 0.0s
       * get 1 tokens: 0.0s
       * get 1 tokens: 0.0s
       * get 1 tokens: 0.0s
       * end
       * get 1 tokens: 0.499796s
       * get 1 tokens: 0.0s
       * get 1 tokens: 0.0s
       * get 1 tokens: 0.0s
       */
 }
}

RateLimiter由於會累積令牌,所以可以應對突發流量。在下面代碼中,有一個請求會直接請求5個令牌,但是由於此時令牌桶中有累積的令牌,足以快速響應。 RateLimiter在沒有足夠令牌發放時,采用滯后處理的方式,也就是前一個請求獲取令牌所需等待的時間由下一次請求來承受,也就是代替前一個請求進行等待。

public void testSmoothBursty3() {
 RateLimiter r = RateLimiter.create(5);
 while (true)
 {
    System.out.println("get 5 tokens: " + r.acquire(5) + "s");
    System.out.println("get 1 tokens: " + r.acquire(1) + "s");
    System.out.println("get 1 tokens: " + r.acquire(1) + "s");
    System.out.println("get 1 tokens: " + r.acquire(1) + "s");
    System.out.println("end");
 /**
       * output:
       * get 5 tokens: 0.0s
       * get 1 tokens: 0.996766s 滯后效應,需要替前一個請求進行等待
       * get 1 tokens: 0.194007s
       * get 1 tokens: 0.196267s
       * end
       * get 5 tokens: 0.195756s
       * get 1 tokens: 0.995625s 滯后效應,需要替前一個請求進行等待
       * get 1 tokens: 0.194603s
       * get 1 tokens: 0.196866s
       */
 }
}

1.3.2、平滑預熱限流

RateLimiter的 SmoothWarmingUp是帶有預熱期的平滑限流,它啟動后會有一段預熱期,逐步將分發頻率提升到配置的速率。 比如下面代碼中的例子,創建一個平均分發令牌速率為2,預熱期為3秒。由於設置了預熱時間是3秒,令牌桶一開始並不會0.5秒發一個令牌,而是形成一個平滑線性下降的坡度,頻率越來越高,在3秒鍾之內達到原本設置的頻率,以后就以固定的頻率輸出。這種功能適合系統剛啟動需要一點時間來“熱身”的場景。

public void testSmoothwarmingUp() {
 RateLimiter r = RateLimiter.create(2, 3, TimeUnit.SECONDS);
 while (true)
 {
   System.out.println("get 1 tokens: " + r.acquire(1) + "s");
   System.out.println("get 1 tokens: " + r.acquire(1) + "s");
   System.out.println("get 1 tokens: " + r.acquire(1) + "s");
   System.out.println("get 1 tokens: " + r.acquire(1) + "s");
   System.out.println("end");
 /**
       * output:
       * get 1 tokens: 0.0s
       * get 1 tokens: 1.329289s
       * get 1 tokens: 0.994375s
       * get 1 tokens: 0.662888s  上邊三次獲取的時間相加正好為3秒
       * end
       * get 1 tokens: 0.49764s  正常速率0.5秒一個令牌
       * get 1 tokens: 0.497828s
       * get 1 tokens: 0.49449s
       * get 1 tokens: 0.497522s
       */
 }
}

 1.4、RateLimiter設計

考慮一下RateLimiter是如何設計的,並且為什么要這樣設計.

RateLimiter的主要功能就是提供一個穩定的速率,實現方式就是通過限制請求流入的速度,比如計算請求等待合適的時間閾值.

實現QPS速率的最簡單的方式就是記住上一次請求的最后授權時間,然后保證1/QPS秒內不允許請求進入.比如QPS=5,如果我們保證最后一個被授權請求之后的200ms的時間內沒有請求被授權,那么我們就達到了預期的速率.如果一個請求現在過來但是最后一個被授權請求是在100ms之前,那么我們就要求當前這個請求等待100ms.按照這個思路,請求15個新令牌(許可證)就需要3秒.

有一點很重要:上面這個設計思路的RateLimiter記憶非常的淺,它的腦容量非常的小,只記得上一次被授權的請求的時間.如果RateLimiter的一個被授權請求q之前很長一段時間沒有被使用會怎么樣?這個RateLimiter會立馬忘記過去這一段時間的利用不足,而只記得剛剛的請求q.

過去一段時間的利用不足意味着有過剩的資源是可以利用的.這種情況下,RateLimiter應該加把勁(speed up for a while)將這些過剩的資源利用起來.比如在向網絡中發生數據的場景(限流),過去一段時間的利用不足可能意味着網卡緩沖區是空的,這種場景下,我們是可以加速發送來將這些過程的資源利用起來.

另一方面,過去一段時間的利用不足可能意味着處理請求的服務器對即將到來的請求是准備不足的(less ready for future requests),比如因為很長一段時間沒有請求當前服務器的cache是陳舊的,進而導致即將到來的請求會觸發一個昂貴的操作(比如重新刷新全量的緩存).

為了處理這種情況,RateLimiter中增加了一個維度的信息,就是過去一段時間的利用不足(past underutilization),代碼中使用storedPermits變量表示.當沒有利用不足這個變量為0,最大能達到maxStoredPermits(maxStoredPermits表示完全沒有利用).因此,請求的令牌可能從兩個地方來:

1.過去剩余的令牌(stored permits, 可能沒有)
2.現有的令牌(fresh permits,當前這段時間還沒用完的令牌)

我們將通過一個例子來解釋它是如何工作的:

對一個每秒產生一個令牌的RateLimiter,每有一個沒有使用令牌的一秒,我們就將storedPermits加1,如果RateLimiter在10秒都沒有使用,則storedPermits變成10.0.這個時候,一個請求到來並請求三個令牌(acquire(3)),我們將從storedPermits中的令牌為其服務,storedPermits變為7.0.這個請求之后立馬又有一個請求到來並請求10個令牌,我們將從storedPermits剩余的7個令牌給這個請求,剩下還需要三個令牌,我們將從RateLimiter新產生的令牌中獲取.我們已經知道,RateLimiter每秒新產生1個令牌,就是說上面這個請求還需要的3個請求就要求其等待3秒.

想象一個RateLimiter每秒產生一個令牌,現在完全沒有使用(處於初始狀態),限制一個昂貴的請求acquire(100)過來.如果我們選擇讓這個請求等待100秒再允許其執行,這顯然很荒謬.我們為什么什么也不做而只是傻傻的等待100秒,一個更好的做法是允許這個請求立即執行(和acquire(1)沒有區別),然后將隨后到來的請求推遲到正確的時間點.這種策略,我們允許這個昂貴的任務立即執行,並將隨后到來的請求推遲100秒.這種策略就是讓任務的執行和等待同時進行.

一個重要的結論:RateLimiter不會記最后一個請求,而是即下一個請求允許執行的時間.這也可以很直白的告訴我們到達下一個調度時間點的時間間隔.然后定一個一段時間未使用的Ratelimiter也很簡單:下一個調度時間點已經過去,這個時間點和現在時間的差就是Ratelimiter多久沒有被使用,我們會將這一段時間翻譯成storedPermits.所有,如果每秒鍾產生一個令牌(rate==1),並且正好每秒來一個請求,那么storedPermits就不會增長.

二、源碼分析

看完了 RateLimiter的基本使用示例后,我們來學習一下它的實現原理。先了解一下幾個比較重要的成員變量的含義。

//SmoothRateLimiter.java
//當前存儲令牌數
double storedPermits;
//最大存儲令牌數
double maxPermits;
//添加令牌時間間隔
double stableIntervalMicros;
/**
 * 下一次請求可以獲取令牌的起始時間
 * 由於RateLimiter允許預消費,上次請求預消費令牌后
 * 下次請求需要等待相應的時間到nextFreeTicketMicros時刻才可以獲取令牌
 */
private long nextFreeTicketMicros = 0L;

平滑突發限流

RateLimiter的原理就是每次調用 acquire時用當前時間和 nextFreeTicketMicros進行比較,根據二者的間隔和添加單位令牌的時間間隔 stableIntervalMicros來刷新存儲令牌數 storedPermits。然后acquire會進行休眠,直到 nextFreeTicketMicros

acquire函數如下所示,它會調用 reserve函數計算獲取目標令牌數所需等待的時間,然后使用 SleepStopwatch進行休眠,最后返回等待時間。

public double acquire(int permits) {
 // 計算獲取令牌所需等待的時間
 long microsToWait = reserve(permits);
 // 進行線程sleep
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
 return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
    checkPermits(permits);
 // 由於涉及並發操作,所以使用synchronized進行並發操作
 synchronized (mutex()) {
 return reserveAndGetWaitLength(permits, stopwatch.readMicros());
 }
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
 // 計算從當前時間開始,能夠獲取到目標數量令牌時的時間
 long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
 // 兩個時間相減,獲得需要等待的時間
 return max(momentAvailable - nowMicros, 0);
}

reserveEarliestAvailable是刷新令牌數和下次獲取令牌時間 nextFreeTicketMicros的關鍵函數。它有三個步驟,一是調用 resync函數增加令牌數,二是計算預支付令牌所需額外等待的時間,三是更新下次獲取令牌時間 nextFreeTicketMicros和存儲令牌數 storedPermits

這里涉及 RateLimiter的一個特性,也就是可以預先支付令牌,並且所需等待的時間在下次獲取令牌時再實際執行。詳細的代碼邏輯的解釋請看注釋。

final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
 // 刷新令牌數,相當於每次acquire時在根據時間進行令牌的刷新。補充令牌
    resync(nowMicros);
 long returnValue = nextFreeTicketMicros;
 // 獲取當前已有的令牌數和需要獲取的目標令牌數進行比較,計算出可以目前即可得到的令牌數。
 double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
 // freshPermits是需要預先支付的令牌,也就是目標令牌數減去目前即可得到的令牌數
 double freshPermits = requiredPermits - storedPermitsToSpend;
 // 因為會突然涌入大量請求,而現有令牌數又不夠用,因此會預先支付一定的令牌數
 // waitMicros即是產生預先支付令牌的數量時間,則將下次要添加令牌的時間應該計算時間加上watiMicros
 long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
 + (long) (freshPermits * stableIntervalMicros);
 // storedPermitsToWaitTime在SmoothWarmingUp和SmoothBuresty的實現不同,用於實現預熱緩沖期
 // SmoothBuresty的storedPermitsToWaitTime直接返回0,所以watiMicros就是預先支付的令牌所需等待的時間
 try {
 // 更新nextFreeTicketMicros,本次預先支付的令牌所需等待的時間讓下一次請求來實際等待。
 this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros);
 } catch (ArithmeticException e) {
 this.nextFreeTicketMicros = Long.MAX_VALUE;
 }
 // 更新令牌數,最低數量為0
 this.storedPermits -= storedPermitsToSpend;
 // 返回舊的nextFreeTicketMicros數值,無需為預支付的令牌多加等待時間。
 return returnValue;
}
// SmoothBurest
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
 return 0L;
}

對於storedPermits的使用,RateLimiter存在兩種策略,二者區別主要體現在使用storedPermits時候需要等待的時間。這個邏輯由storedPermitsToWaitTime函數實現:

/**
 * Translates a specified portion of our currently stored permits which we want to
 * spend/acquire, into a throttling time. Conceptually, this evaluates the integral
 * of the underlying function we use, for the range of
 * [(storedPermits - permitsToTake), storedPermits].
 *
 * <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
 */
abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);

存在兩種策略就是為了應對我們上面講到的,存在資源使用不足大致分為兩種情況: (1).資源確實使用不足,這些剩余的資源我們私海可以使用的; (2).提供資源的服務過去還沒准備好,比如服務剛啟動等;

為此,RateLimiter實際上由兩種實現策略,其實現分別見SmoothBursty和SmoothWarmingUp。二者主要的區別就是storedPermitsToWaitTime實現以及maxPermits數量的計算。

resync函數用於增加存儲令牌,核心邏輯就是 (nowMicros-nextFreeTicketMicros)/stableIntervalMicros。當前時間大於 nextFreeTicketMicros時進行刷新,否則直接返回。

void resync(long nowMicros) {
 // 當前時間晚於nextFreeTicketMicros,所以刷新令牌和nextFreeTicketMicros
 if (nowMicros > nextFreeTicketMicros) {
 // coolDownIntervalMicros函數獲取每機秒生成一個令牌,SmoothWarmingUp和SmoothBuresty的實現不同
 // SmoothBuresty的coolDownIntervalMicros直接返回stableIntervalMicros
 // 當前時間減去要更新令牌的時間獲取時間間隔,再除以添加令牌時間間隔獲取這段時間內要添加的令牌數
      storedPermits = min(maxPermits,
          storedPermits
 + (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros());
      nextFreeTicketMicros = nowMicros;
 }
 // 如果當前時間早於nextFreeTicketMicros,則獲取令牌的線程要一直等待到nextFreeTicketMicros,該線程獲取令牌所需
 // 額外等待的時間由下一次獲取的線程來代替等待。
}
double coolDownIntervalMicros() {
 return stableIntervalMicros;
}

根據令牌桶算法,桶中的令牌是持續生成存放的,有請求時需要先從桶中拿到令牌才能開始執行,誰來持續生成令牌存放呢?

  • 一種解法是,開啟一個定時任務,由定時任務持續生成令牌。這樣的問題在於會極大的消耗系統資源,如,某接口需要分別對每個用戶做訪問頻率限制,假設系統中存在6W用戶,則至多需要開啟6W個定時任務來維持每個桶中的令牌數,這樣的開銷是巨大的。
  • 另一種解法則是延遲計算,如上resync函數。該函數會在每次獲取令牌之前調用,其實現思路為,若當前時間晚於nextFreeTicketMicros,則計算該段時間內可以生成多少令牌,將生成的令牌加入令牌桶中並更新數據。這樣一來,只需要在獲取令牌時計算一次即可。
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
  resync(nowMicros);
  long returnValue = nextFreeTicketMicros; // 返回的是上次計算的nextFreeTicketMicros
  double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 可以消費的令牌數
  double freshPermits = requiredPermits - storedPermitsToSpend; // 還需要的令牌數
  long waitMicros =
      storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
          + (long) (freshPermits * stableIntervalMicros); // 根據freshPermits計算需要等待的時間

  this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 本次計算的nextFreeTicketMicros不返回
  this.storedPermits -= storedPermitsToSpend;
  return returnValue;
}

該函數用於獲取requiredPermits個令牌,並返回需要等待到的時間點
其中,storedPermitsToSpend為桶中可以消費的令牌數,freshPermits為還需要的(需要補充的)令牌數,根據該值計算需要等待的時間,追加並更新到nextFreeTicketMicros

需要注意的是,該函數的返回是更新前的(上次請求計算的)nextFreeTicketMicros,而不是本次更新的nextFreeTicketMicros,通俗來講,本次請求需要為上次請求的預消費行為埋單,這也是RateLimiter可以預消費(處理突發)的原理所在。若需要禁止預消費,則修改此處返回更新后的nextFreeTicketMicros值。

回頭來看SmoothBursty的構造函數

SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
  super(stopwatch);
  this.maxBurstSeconds = maxBurstSeconds; // 最大存儲maxBurstSeconds秒生成的令牌
}

@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
  double oldMaxPermits = this.maxPermits;
  maxPermits = maxBurstSeconds * permitsPerSecond; // 計算最大存儲令牌數
  if (oldMaxPermits == Double.POSITIVE_INFINITY) {
    // if we don't special-case this, we would get storedPermits == NaN, below
    storedPermits = maxPermits;
  } else {
    storedPermits =
        (oldMaxPermits == 0.0)
            ? 0.0 // initial state
            : storedPermits * maxPermits / oldMaxPermits;
  }
}

桶中可存放的最大令牌數由maxBurstSeconds計算而來,其含義為最大存儲maxBurstSeconds秒生成的令牌。
該參數的作用在於,可以更為靈活地控制流量。如,某些接口限制為300次/20秒,某些接口限制為50次/45秒等。

一個簡單的使用示意圖及解釋,下面示例一個QPS=4的SmoothBursty:

(1).t=0(時間開始點),這時候storedPermits=0,請求1個令牌,等待時間=0;
(2).t=1,這時候storedPermits=3,請求3個令牌,等待時間=0;
(3).t=2,這時候storedPermits=4,請求10個令牌,等待時間=0,超前使用了2個令牌;
(4).t=3,這時候storedPermits=0,請求1個令牌,等待時間=0.5;
代碼的輸出:
maxPermits=4.0, storedPermits=7.2E-4, stableIntervalMicros=250000.0, nextFreeTicketMicros=1472
acquire(1), sleepSecond=0.0

maxPermits=4.0, storedPermits=3.012212, stableIntervalMicros=250000.0, nextFreeTicketMicros=1004345
acquire(3), sleepSecond=0.0

maxPermits=4.0, storedPermits=4.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=2004668
acquire(10), sleepSecond=0.0

maxPermits=4.0, storedPermits=0.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=3504668
acquire(1), sleepSecond=0.499591

 

 

t=3(時間段內)時,t=1和t=2中的4個空白 + t3中的4個 + t4中的2個(預支)

 


在了解以上概念后,就非常容易理解RateLimiter暴露出來的接口

@CanIgnoreReturnValue
public double acquire() {
  return acquire(1);
}

@CanIgnoreReturnValue
public double acquire(int permits) {
  long microsToWait = reserve(permits);
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return 1.0 * microsToWait / SECONDS.toMicros(1L);
}

final long reserve(int permits) {
  checkPermits(permits);
  synchronized (mutex()) {
    return reserveAndGetWaitLength(permits, stopwatch.readMicros());
  }
}

acquire函數主要用於獲取permits個令牌,並計算需要等待多長時間,進而掛起等待,並將該值返回

public boolean tryAcquire(int permits) {
  return tryAcquire(permits, 0, MICROSECONDS);
}

public boolean tryAcquire() {
  return tryAcquire(1, 0, MICROSECONDS);
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
  long timeoutMicros = max(unit.toMicros(timeout), 0);
  checkPermits(permits);
  long microsToWait;
  synchronized (mutex()) {
    long nowMicros = stopwatch.readMicros();
    if (!canAcquire(nowMicros, timeoutMicros)) {
      return false;
    } else {
      microsToWait = reserveAndGetWaitLength(permits, nowMicros);
    }
  }
  stopwatch.sleepMicrosUninterruptibly(microsToWait);
  return true;
}

private boolean canAcquire(long nowMicros, long timeoutMicros) {
  return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}

@Override
final long queryEarliestAvailable(long nowMicros) {
  return nextFreeTicketMicros;
}

tryAcquire函數可以嘗試在timeout時間內獲取令牌,如果可以則掛起等待相應時間並返回true,否則立即返回false
canAcquire用於判斷timeout時間內是否可以獲取令牌

 

下面我們舉個例子,讓大家更好的理解 resync和 reserveEarliestAvailable函數的邏輯。

比如 RateLimiter的 stableIntervalMicros為500,也就是1秒發兩個令牌,storedPermits為0,nextFreeTicketMicros為155391849 5748。線程一acquire(2),當前時間為155391849 6248,首先 resync函數計算,(1553918496248 - 1553918495748)/500 = 1,所以當前可獲取令牌數為1,但是由於可以預支付,所以nextFreeTicketMicros= nextFreeTicketMicro + 1 * 500 = 155391849 6748。線程一無需等待。

緊接着,線程二也來acquire(2),首先 resync函數發現當前時間早於 nextFreeTicketMicros,所以無法增加令牌數,所以需要預支付2個令牌,nextFreeTicketMicros= nextFreeTicketMicro + 2 * 500 = 155391849 7748。線程二需要等待155391849 6748時刻,也就是線程一獲取時計算的nextFreeTicketMicros時刻。同樣的,線程三獲取令牌時也需要等待到線程二計算的nextFreeTicketMicros時刻。

平滑預熱限流

上述就是平滑突發限流RateLimiter的實現,下面我們來看一下加上預熱緩沖期的實現原理。

static final class SmoothWarmingUp extends SmoothRateLimiter {
    private final long warmupPeriodMicros;
    /**
     * The slope of the line from the stable interval (when permits == 0), to the cold interval
     * (when permits == maxPermits)
     */
    private double slope;
    private double thresholdPermits;
    private double coldFactor;

    SmoothWarmingUp(
        SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
      super(stopwatch);
      this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
      this.coldFactor = coldFactor;
    }

    @Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = maxPermits;
      double coldIntervalMicros = stableIntervalMicros * coldFactor;
      thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
      maxPermits =
          thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
      slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = 0.0;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? maxPermits // initial state is cold
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    // SmoothWarmingUp,等待時間就是計算上圖中梯形或者正方形的面積。
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
         /**
            * 當前permits超出閾值的部分
            */
         double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
         long micros = 0;
         /**
            * 如果當前存儲的令牌數超出thresholdPermits
            */
         if (availablePermitsAboveThreshold > 0.0) {
         /**
             * 在閾值右側並且需要被消耗的令牌數量
             */
         double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);

         /**
                * 梯形的面積
                *
                * 高 * (頂 * 底) / 2
                *
                * 高是 permitsAboveThresholdToTake 也就是右側需要消費的令牌數
                * 底 較長 permitsToTime(availablePermitsAboveThreshold)
                * 頂 較短 permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake)
                */
            micros = (long) (permitsAboveThresholdToTake
         * (permitsToTime(availablePermitsAboveThreshold)
         + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake)) / 2.0);
         /**
                * 減去已經獲取的在閾值右側的令牌數
                */
            permitsToTake -= permitsAboveThresholdToTake;
         }
         /**
            * 平穩時期的面積,正好是長乘寬
            */
            micros += (stableIntervalMicros * permitsToTake);
         return micros;
    }

    private double permitsToTime(double permits) {
      return stableIntervalMicros + permits * slope;
    }

    double coolDownIntervalMicros() {
        /**
        * 每秒增加的令牌數為 warmup時間/maxPermits. 這樣的話,在warmuptime時間內,就就增張的令牌數量
        * 為 maxPermits
        */
        return warmupPeriodMicros / maxPermits;
    }
  }

 

 

maxPermits等於熱身(warmup)期間能產生的令牌數,比如QPS=4,warmup為2秒,則maxPermits=8.halfPermits為maxPermits的一半.

參考注釋中的神圖:

   * This implements the following function where coldInterval = coldFactor * stableInterval.
   *
   *          ^ throttling 獲取令牌的時間間隔
   *          |
   *    cold  +                  /
   * interval |                 /.
   *          |                / .
   *          |               /  .   <-- "warmup period" is the area of the trapezoid between
   *          |              /   .       thresholdPermits and maxPermits
   *          |             /    .
   *          |            /     .
   *          |           /      .
   *   stable +----------/  WARM .
   * interval |          .   UP  .
   *          |          . PERIOD.
   *          |          .       .
   *        0 +----------+-------+--------------> storedPermits 存儲令牌數
   *          0 thresholdPermits maxPermits
                令牌閥值         最大令牌數

 

下面是我們QPS=4,warmup為2秒時候對應的圖。

 

 maxPermits=8,halfPermits=4,和SmoothBursty相同的請求序列:

(1).t=0,這時候storedPermits=8,請求1個令牌,使用1個storedPermits消耗時間=1×(0.75+0.625)/2=0.6875秒;
(2).t=1,這時候storedPermits=8,請求3個令牌,使用3個storedPermits消耗時間=3×(0.75+0.375)/2=1.6875秒(注意已經超過1秒了,意味着下次產生新Permit時間為2.6875);
(3).t=2,這時候storedPermits=5,請求10個令牌,使用5個storedPermits消耗時間=1×(0.375+0.25)/2+4*0.25=1.3125秒,再加上額外請求的5個新產生的Permit需要消耗=5*0.25=1.25秒,即總共需要耗時2.5625秒,則下一次產生新的Permit時間為2.6875+2.5625=5.25,注意當前請求私海2.6875才返回的,之前一直阻塞;
(4).t=3,因為前一個請求阻塞到2.6875,實際這個請求3.6875才到達RateLimiter,請求1個令牌,storedPermits=0,下一次產生新Permit時間為5.25,因此總共需要等待5.25-3.6875=1.5625秒;

實際執行結果:

warmupPeriodMicros=2000000
stableIntervalMicros=250000.0, maxPermits=8.0, halfPermits=4.0, coldIntervalMicros=750000.0, slope=125000.0, storedPermits=8.0

maxPermits=8.0, storedPermits=8.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=1524
acquire(1), sleepSecond=0.0

maxPermits=8.0, storedPermits=8.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=1001946
acquire(3), sleepSecond=0.0

maxPermits=8.0, storedPermits=5.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=2689446
acquire(10), sleepSecond=0.687186

maxPermits=8.0, storedPermits=0.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=5251946
acquire(1), sleepSecond=1.559174

 

 

SmoothWarmingUp實現預熱緩沖的關鍵在於其分發令牌的速率會隨時間和令牌數而改變,速率會先慢后快。表現形式如下圖所示,令牌刷新的時間間隔由長逐漸變短。等存儲令牌數從maxPermits到達thresholdPermits時,發放令牌的時間價格也由coldInterval降低到了正常的stableInterval。

 

后記

RateLimiter只能用於單機的限流,如果想要集群限流,則需要引入 redis或者阿里開源的 sentinel中間件,請大家繼續關注。

 

 

參考:https://ifeve.com/guava-ratelimiter/

https://zhuanlan.zhihu.com/p/60979444


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM