作者javadoop,資深Java工程師。本文已獲作者授權發布。
原文鏈接https://www.javadoop.com/post/rate-limiter
本文主要介紹關於流控的兩部分內容。
第一部分介紹 Guava 中 RateLimiter 的源碼,包括它的兩種模式,目前網上大部分文章只分析簡單的 SmoothBursty 模式,而沒有分析帶有預熱的 SmoothWarmingUp。
第二部分介紹 Sentinel 中流控的實現,本文不要求讀者了解 Sentinel,這部分內容和 Sentinel 耦合很低,所以讀者不需要有閱讀壓力。
Sentinel 中流控設計是參考 Guava RateLimiter 的,所以閱讀第二部分內容,需要有第一部分內容的背景。
Guava RateLimiter
RateLimiter 基於漏桶算法,但它參考了令牌桶算法,這里不討論流控算法,請自行查找資料。
RateLimiter 使用介紹
RateLimiter 的接口非常簡單,它有兩個靜態方法用來實例化,實例化以后,我們只需要關心 acquire 就行了,甚至都沒有 release 操作。
// RateLimiter 接口列表:
// 實例化的兩種方式:
public static RateLimiter create(double permitsPerSecond){}
public static RateLimiter create(double permitsPerSecond,long warmupPeriod,TimeUnit unit) {}
public double acquire() {}
public double acquire(int permits) {}
public boolean tryAcquire() {}
public boolean tryAcquire(int permits) {}
public boolean tryAcquire(long timeout, TimeUnit unit) {}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {}
public final double getRate() {}
public final void setRate(double permitsPerSecond) {}
RateLimiter 的作用是用來限流的,我們知道 java 並發包中提供了 Semaphore,它也能夠提供對資源使用進行控制,我們看一下下面的代碼:
// Semaphore
Semaphore semaphore = new Semaphore(10);
for (int i = 0; i < 100; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
semaphore.acquireUninterruptibly(1);
try {
doSomething();
} finally {
semaphore.release();
}
}
});
}
Semaphore 用來控制同時訪問某個資源的並發數量,如上面的代碼,我們設置 100 個線程工作,但是我們能做到最多只有 10 個線程能同時到 doSomething() 方法中。它控制的是並發數量。
而 RateLimiter 是用來控制訪問資源的速率(rate)的,它強調的是控制速率。比如控制每秒只能有 100 個請求通過,比如允許每秒發送 1MB 的數據。
它的構造方法指定一個 permitsPerSecond 參數,代表每秒鍾產生多少個 permits,這就是我們的速率。
RateLimiter 允許預占未來的令牌,比如,每秒產生 5 個 permits,我們可以單次請求 100 個,這樣,緊接着的下一個請求需要等待大概 20 秒才能獲取到 permits。
SmoothRateLimiter 介紹
RateLimiter 目前只有一個子類,那就是抽象類 SmoothRateLimiter,SmoothRateLimiter 有兩個實現類,也就是我們這邊要介紹的兩種模式,我們先簡單介紹下 SmoothRateLimiter,然后后面分兩個小節分別介紹它的兩個實現類。
RateLimiter 作為抽象類,只有兩個屬性:
private final SleepingStopwatch stopwatch;
private volatile Object mutexDoNotUseDirectly;
stopwatch 非常重要,它用來“計時”,RateLimiter 把實例化的時間設置為 0 值,后續都是取相對時間,用微秒表示。
mutexDoNotUseDirectly 用來做鎖,RateLimiter 依賴於 synchronized 來控制並發,所以我們之后可以看到,各個屬性甚至都沒有用 volatile 修飾。
然后我們來看 SmoothRateLimiter 的屬性,分別代表什么意思。
// 當前還有多少 permits 沒有被使用,被存下來的 permits 數量
double storedPermits;
// 最大允許緩存的 permits 數量,也就是 storedPermits 能達到的最大值
double maxPermits;
// 每隔多少時間產生一個 permit,
// 比如我們構造方法中設置每秒 5 個,也就是每隔 200ms 一個,這里單位是微秒,也就是 200,000
double stableIntervalMicros;
// 下一次可以獲取 permits 的時間,這個時間是相對 RateLimiter 的構造時間的,是一個相對時間,理解為時間戳吧
private long nextFreeTicketMicros = 0L;
其實,看到這幾個屬性,我們就可以大致猜一下它的內部實現了:
nextFreeTicketMicros 是一個很關鍵的屬性。我們每次獲取 permits 的時候,先拿 storedPermits 的值,如果夠,storedPermits 減去相應的值就可以了,如果不夠,那么還需要將 nextFreeTicketMicros 往前推,表示我預占了接下來多少時間的量了。那么下一個請求來的時候,如果還沒到 nextFreeTicketMicros 這個時間點,需要 sleep 到這個點再返回,當然也要將這個值再往前推。
大家在這里可能會有疑惑,因為時間是一直往前走的,所以 storedPermits 的信息可能是不准確的,不過,只需要在關鍵的操作中同步一下,重新計算就好了。
SmoothBursty 分析
我們先從比較簡單的 SmoothBursty 出發,來分析 RateLimiter 的源碼,之后我們再分析 SmoothWarmingUp。
Bursty 是突發的意思,它說的不是下面這個意思:我們設置了 1k 每秒,而我們可以一次性獲取 5k 的 permits,這個場景表達的不是突發,而是在說預先占有了接下來幾秒產生的 permits。
突發說的是,RateLimiter 會緩存一定數量的 permits 在池中,這樣對於突發請求,能及時得到滿足。想象一下我們的某個接口,很久沒有請求過來,突然同時來了好幾個請求,如果我們沒有緩存一些 permits 的話,很多線程就需要等待了。
SmoothBursty 默認緩存最多 1 秒鍾的 permits,不可以修改。
RateLimiter 的靜態構造方法:
public static RateLimiter create(double permitsPerSecond) {
return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
構造參數 permitsPerSecond 指定每秒鍾可以產生多少個 permits。
static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
我們看到,這里實例化的是 SmoothBursty 的實例,它的構造方法很簡單,而且它只有一個屬性 maxBurstSeconds,這里就不貼代碼了。
構造函數指定了 maxBurstSeconds 為 1.0,也就是說,最多會緩存 1 秒鍾,也就是 (1.0 * permitsPerSecond) 這么多個 permits 到池中。
這個 1.0 秒,關系到 storedPermits 和 maxPermits:
0 <= storedPermits <= maxPermits = permitsPerSecond
我們繼續往后看 setRate 方法:
public final void setRate(double permitsPerSecond) {
checkArgument(
permitsPerSecond > 0.0 && !Double.isNaN(permitsPerSecond), "rate must be positive");
synchronized (mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
setRate 這個方法是一個 public 方法,它可以用來調整速率。我們這邊繼續跟的是初始化過程,但是大家提前知道這個方法是用來調整速率用的,對理解源碼有很大的幫助。注意看,這里用了 synchronized 控制並發。
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
// 同步
resync(nowMicros);
// 計算屬性 stableIntervalMicros
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
resync 方法很簡單,它用來調整 storedPermits 和 nextFreeTicketMicros。這就是我們說的,在關鍵的節點,需要先更新一下 storedPermits 到正確的值。
void resync(long nowMicros) {
// 如果 nextFreeTicket 已經過掉了,想象一下很長時間都沒有再次調用 limiter.acquire() 的場景
// 需要將 nextFreeTicket 設置為當前時間,重新計算 storedPermits
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
coolDownIntervalMicros() 這個方法大家先不用關注,可以看到,在 SmoothBursty 類中的實現是直接返回了 stableIntervalMicros 的值,也就是我們說的,每產生一個 permit 的時間長度。
當然了,細心的讀者,可能會發現,此時的 stableIntervalMicros 其實沒有設置,也就是說,上面發生了一次除以 0 值的操作,得到的 newPermits 其實是一個無窮大。而 maxPermits 此時還是 0 值,不過這里其實沒有關系。
我們回到前面一個方法,resync 同步以后,會設置 stableIntervalMicros 為一個正確的值,然后進入下面的方法:
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
// 這里計算了,maxPermits 為 1 秒產生的 permits
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
// 因為 storedPermits 的值域變化了,需要等比例縮放
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
上面這個方法,我們要這么看,原來的 RateLimiter 是用某個 permitsPerSecond 值初始化的,現在我們要調整這個頻率。對於 maxPermits 來說,是重新計算,而對於 storedPermits 來說,是做等比例的縮放。
到此,構造方法就完成了,我們得到了一個 RateLimiter 的實現類 SmoothBursty 的實例,可能上面的源碼你還是會有一些疑惑,不過也沒關系,繼續往下看,可能你的很多疑惑就解開了。
接下來,我們來分析 acquire 方法:
@CanIgnoreReturnValue
public double acquire() {
return acquire(1);
}
@CanIgnoreReturnValue
public double acquire(int permits) {
// 預約,如果當前不能直接獲取到 permits,需要等待
// 返回值代表需要 sleep 多久
long microsToWait = reserve(permits);
// sleep
stopwatch.sleepMicrosUninterruptibly(microsToWait);
// 返回 sleep 的時長
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
我們來看 reserve 方法:
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
// 返回 nextFreeTicketMicros
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
// 計算時長
return max(momentAvailable - nowMicros, 0);
}
繼續往里看:
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// 這里做一次同步,更新 storedPermits 和 nextFreeTicketMicros (如果需要)
resync(nowMicros);
// 返回值就是 nextFreeTicketMicros,注意剛剛已經做了 resync 了,此時它是最新的正確的值
long returnValue = nextFreeTicketMicros;
// storedPermits 中可以使用多少個 permits
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// storedPermits 中不夠的部分
double freshPermits = requiredPermits - storedPermitsToSpend;
// 為了這個不夠的部分,需要等待多久時間
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) // 這部分固定返回 0
+ (long) (freshPermits * stableIntervalMicros);
// 將 nextFreeTicketMicros 往前推
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// storedPermits 減去被拿走的部分
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
我們可以看到,獲取 permits 的時候,其實是獲取了兩部分,一部分來自於存量 storedPermits,存量不夠的話,另一部分來自於預占未來的 freshPermits。
這里提一個關鍵點吧,我們看到,返回值是 nextFreeTicketMicros 的舊值,因為只要到這個時間點,就說明當次 acquire 可以成功返回了,而不管 storedPermits 夠不夠。如果 storedPermits 不夠,會將 nextFreeTicketMicros 往前推一定的時間,預占了一定的量。
到這里,acquire 方法就分析完了,大家看到這里,逆着往前看就是了。應該說,SmoothBursty 的源碼還是非常簡單的。
SmoothWarmingUp 分析
分析完了 SmoothBursty,我們再來分析 SmoothWarmingUp 會簡單一些。我們說過,SmoothBursty 可以處理突發請求,因為它會緩存最多 1 秒的 permits,而待會我們會看到 SmoothWarmingUp 完全不同的設計。
SmoothWarmingUp 適用於資源需要預熱的場景,比如我們的某個接口業務,需要使用到數據庫連接,由於連接需要預熱才能進入到最佳狀態,如果我們的系統長時間處於低負載或零負載狀態(當然,應用剛啟動也是一樣的),連接池中的連接慢慢釋放掉了,此時我們認為連接池是冷的。
假設我們的業務在穩定狀態下,正常可以提供最大 1000 QPS 的訪問,但是如果連接池是冷的,我們就不能讓 1000 個請求同時進來,因為這會拖垮我們的系統,我們應該有個預熱升溫的過程。
對應到 SmoothWarmingUp 中,如果系統處於低負載狀態,storedPermits 會一直增加,當請求來的時候,我們要從 storedPermits 中取 permits,最關鍵的點在於,從 storedPermits 中取 permits 的操作是比較耗時的,因為沒有預熱。
回顧一下前面介紹的 SmoothBursty,它從 storedPermits 中獲取 permits 是不需要等待時間的,而這邊洽洽相反,從 storedPermits 獲取需要更多的時間,這是最大的不同,先理解這一點,能幫助你更好地理解源碼。
大家先有一些粗的概念,然后我們來看下面這個圖:
這個圖不容易看懂,X 軸代表 storedPermits 的數量,Y 軸代表獲取一個 permits 需要的時間。
假設指定 permitsPerSecond 為 10,那么 stableInterval 為 100ms,而 coldInterval 是 3 倍,也就是 300ms(coldFactor,3 倍是寫死的,用戶不能修改)。也就是說,當達到 maxPermits 時,此時處於系統最冷的時候,獲取一個 permit 需要 300ms,而如果 storedPermits 小於 thresholdPermits 的時候,只需要 100ms。
想象有一條垂直線 x=k,它與 X 軸的交點 k 代表當前 storedPermits 的數量:
- 當系統在非常繁忙的時候,這條線停留在 x=0 處,此時 storedPermits 為 0
- 當 limiter 沒有被使用的時候,這條線慢慢往右移動,直到 x=maxPermits 處;
- 如果 limiter 被重新使用,那么這條線又慢慢往左移動,直到 x=0 處;
當 storedPermits 處於 maxPermits 狀態時,我們認為 limiter 中的 permits 是冷的,此時獲取一個 permit 需要較多的時間,因為需要預熱,有一個關鍵的分界點是 thresholdPermits。
預熱時間是我們在構造的時候指定的,圖中梯形的面積就是預熱時間,因為預熱完成后,我們能進入到一個穩定的速率中(stableInterval),下面我們來計算出 thresholdPermits 和 maxPermits 的值。
有一個關鍵點,從 thresholdPermits 到 0 的時間,是從 maxPermits 到 thresholdPermits 時間的一半,也就是梯形的面積是長方形面積的 2 倍,梯形的面積是 warmupPeriod。
之所以長方形的面積是 warmupPeriod/2,是因為 coldFactor 是硬編碼的 3。
梯形面積為 warmupPeriod,即:
warmupPeriod = 2 * stableInterval * thresholdPermits
由此,我們得出 thresholdPermits 的值:
thresholdPermits = 0.5 * warmupPeriod / stableInterval
然后我們根據梯形面積的計算公式:
warmupPeriod = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits)
得出 maxPermits 為:
maxPermits = thresholdPermits + 2.0 * warmupPeriod / (stableInterval + coldInterval)
這樣,我們就得到了 thresholdPermits 和 maxPermits 的值。
接下來,我們來看一下冷卻時間間隔,它指的是 storedPermits 中每個 permit 的增長速度,也就是我們前面說的 x=k 這條垂直線往右的移動速度,為了達到從 0 到 maxPermits 花費 warmupPeriodMicros 的時間,我們將其定義為:
@Override
double coolDownIntervalMicros() {
return warmupPeriodMicros / maxPermits;
}
貼一下代碼,大家就知道了,在 resync 中用到的這個:
void resync(long nowMicros) {
if (nowMicros > nextFreeTicketMicros) {
// coolDownIntervalMicros 在這里使用
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
基於上面的分析,我們來看 SmoothWarmingUp 的其他源碼。
首先,我們來看它的 doSetRate 方法,有了前面的介紹,這個方法的源碼非常簡單:
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
// coldFactor 是固定的 3
double coldIntervalMicros = stableIntervalMicros * coldFactor;
// 這個公式我們上面已經說了
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
// 這個公式我們上面也已經說了
maxPermits =
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
// 計算那條斜線的斜率。數學知識,對邊 / 臨邊
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}
setRate 方法非常簡單,接下來,我們要分析的是 storedPermitsToWaitTime 方法,我們回顧一下下面的代碼:
這段代碼是 acquire 方法的核心,waitMicros 由兩部分組成,一部分是從 storedPermits 中獲取花費的時間,一部分是等待 freshPermits 產生花費的時間。在 SmoothBursty 的實現中,從 storedPermits 中獲取 permits 直接返回 0,不需要等待。
而在 SmoothWarmingUp 的實現中,由於需要預熱,所以從 storedPermits 中取 permits 需要花費一定的時間,其實就是要計算下圖中,陰影部分的面積。
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// 如果右邊梯形部分有 permits,那么先從右邊部分獲取permits,計算梯形部分的陰影部分的面積
if (availablePermitsAboveThreshold > 0.0) {
// 從右邊部分獲取的 permits 數量
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// 梯形面積公式:(上底+下底)*高/2
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// 加上 長方形部分的陰影面積
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}
// 對於給定的 x 值,計算 y 值
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}
到這里,SmoothWarmingUp 也已經說完了。
如果大家對於 Guava RateLimiter 還有什么疑惑,歡迎在留言區留言,對於 Sentinel 中的流控不感興趣的讀者,看到這里就可以結束了。
Sentinel 中的流控
Sentinel 是阿里開源的流控、熔斷工具,這里不做過多的介紹,感興趣的讀者請自行了解。
在 Sentinel 的流控中,我們可以配置流控規則,主要是控制 QPS 和線程數,這里我們不討論控制線程數,控制線程數的代碼不在我們這里的討論范圍內,下面的介紹都是指控制 QPS。
RateLimiterController
RateLimiterController 非常簡單,它通過使用 latestPassedTime 屬性來記錄最后一次通過的時間,然后根據規則中 QPS 的限制,計算當前請求是否可以通過。
舉個非常簡單的例子:設置 QPS 為 10,那么每 100 毫秒允許通過一個,通過計算當前時間是否已經過了上一個請求的通過時間 latestPassedTime 之后的 100 毫秒,來判斷是否可以通過。假設才過了 50ms,那么需要當前線程再 sleep 50ms,然后才可以通過。如果同時有另一個請求呢?那需要 sleep 150ms 才行。
public class RateLimiterController implements TrafficShapingController {
// 排隊最大時長,默認 500ms
private final int maxQueueingTimeMs;
// QPS 設置的值
private final double count;
// 上一次請求通過的時間
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
// 通常 acquireCount 為 1,這里不用關心參數 prioritized
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
//
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// 計算每 2 個請求之間的間隔,比如 QPS 限制為 10,那么間隔就是 100ms
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
// 可以通過,設置 latestPassedTime 然后就返回 true 了
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// 不可以通過,需要等待
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
// 等待時長大於最大值,返回 false
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
// 將 latestPassedTime 往前推
long oldTime = latestPassedTime.addAndGet(costTime);
try {
// 需要 sleep 的時間
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
這個策略還是非常好理解的,簡單粗暴,快速失敗。
WarmUpController
WarmUpController 用來防止突發流量迅速上升,導致系統負載嚴重過高,本來系統在穩定狀態下能處理的,但是由於許多資源沒有預熱,導致這個時候處理不了了。比如,數據庫需要建立連接、需要連接到遠程服務等,這就是為什么我們需要預熱。
啰嗦一句,這里不僅僅指系統剛剛啟動需要預熱,對於長時間處於低負載的系統,突發流量也需要重新預熱。
Guava 的 SmoothWarmingUp 是用來控制獲取令牌的速率的,和這里的控制 QPS 還是有一點區別,但是中心思想是一樣的。我們在看完源碼以后再討論它們的區別。
為了幫助大家理解源碼,我們這邊先設定一個場景:QPS 設置為 100,預熱時間設置為 10 秒。代碼中使用 “【】” 代表根據這個場景計算出來的值。
public class WarmUpController implements TrafficShapingController {
// 閾值
protected double count;
// 3
private int coldFactor;
// 轉折點的令牌數,和 Guava 的 thresholdPermits 一個意思
// [500]
protected int warningToken = 0;
// 最大的令牌數,和 Guava 的 maxPermits 一個意思
// [1000]
private int maxToken;
// 斜線斜率
// [1/25000]
protected double slope;
// 累積的令牌數,和 Guava 的 storedPermits 一個意思
protected AtomicLong storedTokens = new AtomicLong(0);
// 最后更新令牌的時間
protected AtomicLong lastFilledTime = new AtomicLong(0);
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}
public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}
// 下面的構造方法,和 Guava 中是差不多的,只不過 thresholdPermits 和 maxPermits 都換了個名字
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// warningToken 和 thresholdPermits 是一樣的意思,計算結果其實是一樣的
// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// 【warningToken = (10*100)/(3-1) = 500】
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// maxToken 和 maxPermits 是一樣的意思,計算結果其實是一樣的
// maxPermits = thresholdPermits + 2*warmupPeriod/(stableInterval+coldInterval)
// 【maxToken = 500 + (2*10*100)/(1.0+3) = 1000】
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// 斜率計算
// slope
// slope = (coldIntervalMicros-stableIntervalMicros)/(maxPermits-thresholdPermits);
// 【slope = (3-1.0) / 100 / (1000-500) = 1/25000】
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Sentinel 的 QPS 統計使用的是滑動窗口
// 當前時間窗口的 QPS
long passQps = (long) node.passQps();
// 這里是上一個時間窗口的 QPS,這里的一個窗口跨度是1分鍾
long previousQps = (long) node.previousPassQps();
// 同步。設置 storedTokens 和 lastFilledTime 到正確的值
syncToken(previousQps);
long restToken = storedTokens.get();
// 令牌數超過 warningToken,進入梯形區域
if (restToken >= warningToken) {
// 這里簡單說一句,因為當前的令牌數超過了 warningToken 這個閾值,系統處於需要預熱的階段
// 通過計算當前獲取一個令牌所需時間,計算其倒數即是當前系統的最大 QPS 容量
long aboveToken = restToken - warningToken;
// 這里計算警戒 QPS 值,就是當前狀態下能達到的最高 QPS。
// (aboveToken * slope + 1.0 / count) 其實就是在當前狀態下獲取一個令牌所需要的時間
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
// 如果不會超過,那么通過,否則不通過
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
// count 是最高能達到的 QPS
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
protected void syncToken(long passQps) {
// 下面幾行代碼,說明在第一次進入新的 1 秒鍾的時候,做同步
// 題外話:Sentinel 默認地,1 秒鍾分為 2 個時間窗口,分別 500ms
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
// 令牌數量的舊值
long oldValue = storedTokens.get();
// 計算新的令牌數量,往下看
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
// 令牌數量上,減去上一分鍾的 QPS,然后設置新值
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
// 更新令牌數
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 當前令牌數小於 warningToken,添加令牌
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
// 當前令牌數量處於梯形階段,
// 如果當前通過的 QPS 大於 count/coldFactor,說明系統消耗令牌的速度,大於冷卻速度
// 那么不需要添加令牌,否則需要添加令牌
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
}
coolDownTokens 這個方法用來計算新的 token 數量,其實我也沒有完全理解作者的設計:
-
第一、對於令牌的增加,在 Guava 中,使用 warmupPeriodMicros / maxPermits 作為增長率,因為它實現的是 storedPermits 從 0 到 maxPermits 花費的時間為 warmupPeriod。而這里是以每秒 count 個作為增長率,為什么?
-
第二、else if 分支中的決定我沒有理解,為什么用 passQps 和 count / coldFactor 進行對比來決定是否繼續添加令牌?
-
我自己的理解是,count/coldFactor 就是指冷卻速度,那么就是說得通的。歡迎大家一起探討。
最后,我們再簡單說說 Guava 的 SmoothWarmingUp 和 Sentinel 的 WarmupController 的區別。
Guava 在於控制獲取令牌的速率,它關心的是,獲取 permits 需要多少時間,包括從 storedPermits 中獲取,以及獲取 freshPermits,以此推進 nextFreeTicketMicros 到未來的某個時間點。
而 Sentinel 在於控制 QPS,它用令牌數來標識當前系統處於什么狀態,根據時間推進一直增加令牌,根據通過的 QPS 一直減少令牌。如果 QPS 持續下降,根據推演,可以發現 storedTokens 越來越多,然后越過 warningTokens 這個閾值,之后只有當 QPS 下降到 count/3 以后,令牌才會繼續往上增長,一直到 maxTokens。
storedTokens 是以 “count 每秒”的增長率增長的,減少是以 前一分鍾的 QPS 來減少的。其實這里我也有個疑問,為什么增加令牌的時候考慮了時間,而減少的時候卻不考慮時間因素,提了 issue,似乎沒人搭理。
WarmUpRateLimiterController
注意,這個類繼承自剛剛介紹的 WarmUpController,它的流控效果定義為排隊等待。它的代碼其實就是前面介紹的 RateLimiterController 加上 WarmUpController。
public class WarmUpRateLimiterController extends WarmUpController {
private final int timeoutInMs;
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) {
super(count, warmUpPeriodSec, coldFactor);
this.timeoutInMs = timeOutMs;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
long currentTime = TimeUtil.currentTimeMillis();
long restToken = storedTokens.get();
long costTime = 0;
long expectedTime = 0;
// 和 RateLimiterController 比較,區別主要就是這塊代碼
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// current interval = restToken*slope+1/count
double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
} else {
costTime = Math.round(1.0 * (acquireCount) / count * 1000);
}
expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
long waitTime = costTime + latestPassedTime.get() - currentTime;
if (waitTime > timeoutInMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > timeoutInMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
這個代碼很簡單,就是 RateLimiter 中的代碼,然后加入了預熱的內容。
在 RateLimiter 中,單個請求的 costTime 是固定的,就是 1/count,比如設置 100 qps,那么 costTime 就是 10ms。
但是這邊,加入了 WarmUp 的內容,就是說,通過令牌數量,來判斷當前系統的 QPS 應該是多少,如果當前令牌數超過 warningTokens,那么系統的 QPS 容量已經低於我們預設的 QPS,相應的,costTime 就會延長。
小結
有段時間沒寫文章了,寫得不好之處,歡迎指正。
關注作者公眾號: