在分布式系統中,應對高並發訪問時,緩存、限流、降級是保護系統正常運行的常用方法。當請求量突發暴漲時,如果不加以限制訪問,則可能導致整個系統崩潰,服務不可用。同時有一些業務場景,比如短信驗證碼,或者其它第三方API調用,也需要提供必要的訪問限制支持。還有一些資源消耗過大的請求,比如數據導出等(參考 記一次線上Java服務CPU 100%處理過程 ),也有限制訪問頻率的需求。
常見的限流算法有令牌桶算法,漏桶算法,與計數器算法。本文主要對三個算法的基本原理及Google Guava包中令牌桶算法的實現RateLimiter進行介紹,下一篇文章介紹最近寫的一個以RateLimiter為參考的分布式限流實現及計數器限流實現。
令牌桶算法
令牌桶算法的原理就是以一個恆定的速度往桶里放入令牌,每一個請求的處理都需要從桶里先獲取一個令牌,當桶里沒有令牌時,則請求不會被處理,要么排隊等待,要么降級處理,要么直接拒絕服務。當桶里令牌滿時,新添加的令牌會被丟棄或拒絕。
令牌桶算法的處理示意圖如下(圖片來自網絡)
令牌桶算法主要是可以控制請求的平均處理速率,它允許預消費,即可以提前消費令牌,以應對突發請求,但是后面的請求需要為預消費買單(等待更長的時間),以滿足請求處理的平均速率是一定的。
漏桶算法
漏桶算法的原理是水(請求)先進入漏桶中,漏桶以一定的速度出水(處理請求),當水流入速度大於流出速度導致水在桶內逐漸堆積直到桶滿時,水會溢出(請求被拒絕)。
漏桶算法的處理示意圖如下(圖片來自網絡)
漏桶算法主要是控制請求的處理速率,平滑網絡上的突發流量,請求可以以任意速度進入漏桶中,但請求的處理則以恆定的速度進行。
計數器算法
計數器算法是限流算法中最簡單的一種算法,限制在一個時間窗口內,至多處理多少個請求。比如每分鍾最多處理10個請求,則從第一個請求進來的時間為起點,60s的時間窗口內只允許最多處理10個請求。下一個時間窗口又以前一時間窗口過后第一個請求進來的時間為起點。常見的比如一分鍾內只能獲取一次短信驗證碼的功能可以通過計數器算法來實現。
Guava RateLimiter解析
Guava是Google開源的一個工具包,其中的RateLimiter是實現了令牌桶算法的一個限流工具類。在pom.xml中添加guava依賴,即可使用RateLimiter
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
如下測試代碼示例了RateLimiter的用法,
public static void main(String[] args) {
RateLimiter rateLimiter = RateLimiter.create(1); //創建一個每秒產生一個令牌的令牌桶
for(int i=1;i<=5;i++) {
double waitTime = rateLimiter.acquire(i); //一次獲取i個令牌
System.out.println("acquire:" + i + " waitTime:" + waitTime);
}
}
運行后,輸出如下,
acquire:1 waitTime:0.0
acquire:2 waitTime:0.997729
acquire:3 waitTime:1.998076
acquire:4 waitTime:3.000303
acquire:5 waitTime:4.000223
第一次獲取一個令牌時,等待0s立即可獲取到(這里之所以不需要等待是因為令牌桶的預消費特性),第二次獲取兩個令牌,等待時間1s,這個1s就是前面獲取一個令牌時因為預消費沒有等待延到這次來等待的時間,這次獲取兩個又是預消費,所以下一次獲取(取3個時)就要等待這次預消費需要的2s了,依此類推。可見預消費不需要等待的時間都由下一次來買單,以保障一定的平均處理速率(上例為1s一次)。
RateLimiter有兩種實現:
- SmoothBursty: 令牌的生成速度恆定。使用
RateLimiter.create(double permitsPerSecond)
創建的是 SmoothBursty 實例。 - SmoothWarmingUp:令牌的生成速度持續提升,直到達到一個穩定的值。WarmingUp,顧名思義就是有一個熱身的過程。使用
RateLimiter.create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
時創建就是 SmoothWarmingUp 實例,其中 warmupPeriod 就是熱身達到穩定速度的時間。
類結構如下
關鍵屬性及方法解析(以 SmoothBursty 為例)
1.關鍵屬性
/** 桶中當前擁有的令牌數. */
double storedPermits;
/** 桶中最多可以保存多少秒存入的令牌數 */
double maxBurstSeconds;
/** 桶中能存儲的最大令牌數,等於storedPermits*maxBurstSeconds. */
double maxPermits;
/** 放入令牌的時間間隔*/
double stableIntervalMicros;
/** 下次可獲取令牌的時間點,可以是過去也可以是將來的時間點*/
private long nextFreeTicketMicros = 0L;
2.關鍵方法
調用 RateLimiter.create(double permitsPerSecond)
方法時,創建的是 SmoothBursty 實例,默認設置 maxBurstSeconds 為1s。SleepingStopwatch 是guava中的一個時鍾類實現。
@VisibleForTesting
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds;
}
並通過調用 SmoothBursty.doSetRate(double, long)
方法進行初始化,該方法中:
- 調用
resync(nowMicros)
對 storedPermits 與 nextFreeTicketMicros 進行了調整——如果當前時間晚於 nextFreeTicketMicros,則計算這段時間內產生的令牌數,累加到 storedPermits 上,並更新下次可獲取令牌時間 nextFreeTicketMicros 為當前時間。 - 計算 stableIntervalMicros 的值,1/permitsPerSecond。
- 調用
doSetRate(double, double)
方法計算 maxPermits 值(maxBurstSeconds*permitsPerSecond),並根據舊的 maxPermits 值對 storedPermits 進行調整。
源碼如下所示
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
/** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
調用 acquire(int)
方法獲取指定數量的令牌時,
- 調用
reserve(int)
方法,該方法最終調用reserveEarliestAvailable(int, long)
來更新下次可取令牌時間點與當前存儲的令牌數,並返回本次可取令牌的時間點,根據該時間點計算需要等待的時間 - 阻塞等待1中返回的等待時間
- 返回等待的時間(秒)
源碼如下所示
/** 獲取指定數量(permits)的令牌,阻塞直到獲取到令牌,返回等待的時間*/
@CanIgnoreReturnValue
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
/** 返回需要等待的時間*/
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
/** 針對此次需要獲取的令牌數更新下次可取令牌時間點與存儲的令牌數,返回本次可取令牌的時間點*/
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros); // 更新當前數據
long returnValue = nextFreeTicketMicros;
double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 本次可消費的令牌數
double freshPermits = requiredPermits - storedPermitsToSpend; // 需要新增的令牌數
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros); // 需要等待的時間
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 更新下次可取令牌的時間點
this.storedPermits -= storedPermitsToSpend; // 更新當前存儲的令牌數
return returnValue;
}
acquire(int)
方法是獲取不到令牌時一直阻塞,直到獲取到令牌,tryAcquire(int,long,TimeUnit)
方法則是在指定超時時間內嘗試獲取令牌,如果獲取到或超時時間到則返回是否獲取成功
- 先判斷是否能在指定超時時間內獲取到令牌,通過
nextFreeTicketMicros <= timeoutMicros + nowMicros
是否為true來判斷,即可取令牌時間早於當前時間加超時時間則可取(預消費的特性),否則不可獲取。 - 如果不可獲取,立即返回false。
- 如果可獲取,則調用
reserveAndGetWaitLength(permits, nowMicros)
來更新下次可取令牌時間點與當前存儲的令牌數,返回等待時間(邏輯與前面相同),並阻塞等待相應的時間,返回true。
源碼如下所示
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
long timeoutMicros = max(unit.toMicros(timeout), 0);
checkPermits(permits);
long microsToWait;
synchronized (mutex()) {
long nowMicros = stopwatch.readMicros();
if (!canAcquire(nowMicros, timeoutMicros)) { //判斷是否能在超時時間內獲取指定數量的令牌
return false;
} else {
microsToWait = reserveAndGetWaitLength(permits, nowMicros);
}
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return true;
}
private boolean canAcquire(long nowMicros, long timeoutMicros) {
return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros; //只要可取時間小於當前時間+超時時間,則可獲取(可預消費的特性!)
}
@Override
final long queryEarliestAvailable(long nowMicros) {
return nextFreeTicketMicros;
}
以上就是 SmoothBursty 實現的基本處理流程。注意兩點:
- RateLimiter 通過限制后面請求的等待時間,來支持一定程度的突發請求——預消費的特性。
- RateLimiter 令牌桶的實現並不是起一個線程不斷往桶里放令牌,而是以一種延遲計算的方式(參考
resync
函數),在每次獲取令牌之前計算該段時間內可以產生多少令牌,將產生的令牌加入令牌桶中並更新數據來實現,比起一個線程來不斷往桶里放令牌高效得多。(想想如果需要針對每個用戶限制某個接口的訪問,則針對每個用戶都得創建一個RateLimiter,並起一個線程來控制令牌存放的話,如果在線用戶數有幾十上百萬,起線程來控制是一件多么恐怖的事情)
總結
本文介紹了限流的三種基本算法,其中令牌桶算法與漏桶算法主要用來限制請求處理的速度,可將其歸為限速,計數器算法則是用來限制一個時間窗口內請求處理的數量,可將其歸為限量(對速度不限制)。Guava 的 RateLimiter 是令牌桶算法的一種實現,但 RateLimiter 只適用於單機應用,在分布式環境下就不適用了。雖然已有一些開源項目可用於分布式環境下的限流管理,如阿里的Sentinel,但對於小型項目來說,引入Sentinel可能顯得有點過重,但限流的需求在小型項目中也是存在的,下一篇文章就介紹下基於 RateLimiter 的分布式下的限流實現。
[轉載請注明出處]
作者:雨歌
歡迎關注作者公眾號:半路雨歌,查看更多技術干貨文章