引言
在分析Sentinel的上一篇文章中,我們知道了它是基於滑動窗口做的流量統計,那么在當我們能夠根據流量統計算法拿到流量的實時數據后,下一步要做的事情自然就是基於這些數據做流控。在介紹Sentinel
的流控模型之前,我們先來簡單看下 Sentinel 后台是如何去定義一個流控規則的
對於上圖的配置Sentinel
把它抽象成一個FlowRule
類,與其屬性一一對應
- resource 資源名
- limitApp 限流來源,默認為default不區分來源
- grade 限流類型,有QPS和並發線程數兩種類型
- count 限流閾值
- strategy 流控策略 1. 直接 2. 關聯 3.鏈路
- controlBehavior 流控效果 1.快速失敗 2.預熱啟動 3.排隊等待 4. 預熱啟動排隊等待
- warmUpPeriodSec 流控效果為預熱啟動時的預熱時長(秒)
- maxQueueingTimeMs 流控效果為排隊等待時的等待時長 (毫秒)
下面我們來看下選擇流控策略和流控效果的核心代碼
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {
// 根據流控策略選擇需要流控的Node維度節點
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
// 獲取配置的流控效果 控制器 (1. 直接拒絕 2. 預熱啟動 3. 排隊 4. 預熱啟動排隊等待)
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
上面的代碼比較簡單流程也很清晰,首先根據我們配置的流控策略獲取到合適維度的 Node 節點(Node節點是Sentinel做流量統計的基本單位),然后再獲取到規則中配置的流控效果控制器(1. 直接拒絕 2. 預熱啟動 3. 排隊等待 4.預熱啟動排隊等待)。
流控策略
下面我們來看下選擇流控策略的源碼分析
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
// 獲取限流來源 limitApp
String limitApp = rule.getLimitApp();
// 獲取限流策略
int strategy = rule.getStrategy();
// 獲取當前 上下文的 來源
String origin = context.getOrigin();
// 如果規則配置的限流來源 limitApp 等於 當前上下文來源
if (limitApp.equals(origin) && filterOrigin(origin)) {
// 且配置的流控策略是 直接關聯策略
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// 直接返回當前來源 origin 節點
return context.getOriginNode();
}
// 配置的策略為關聯或則鏈路
return selectReferenceNode(rule, context, node);
// 如果規則配置的限流來源 limitApp 等於 default
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
// 且配置的流控策略是 直接關聯策略
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// 直接返回當前資源的 clusterNode
return node.getClusterNode();
}
// 配置的策略為關聯或則鏈路
return selectReferenceNode(rule, context, node);
// 如果規則配置的限流來源 limitApp 等於 other,且當前上下文origin不在流控規則策略中
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
// 且配置的流控策略是 直接關聯策略
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
// 配置的策略為關聯或則鏈路
return selectReferenceNode(rule, context, node);
}
return null;
}
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
// 關聯資源名稱 (如果策略是關聯 則是關聯的資源名稱,如果策略是鏈路 則是上下文名稱)
String refResource = rule.getRefResource();
int strategy = rule.getStrategy();
if (StringUtil.isEmpty(refResource)) {
return null;
}
// 策略是關聯
if (strategy == RuleConstant.STRATEGY_RELATE) {
// 返回關聯的資源ClusterNode
return ClusterBuilderSlot.getClusterNode(refResource);
}
// 策略是鏈路
if (strategy == RuleConstant.STRATEGY_CHAIN) {
// 當前上下文名稱不是規則配置的name 直接返回null
if (!refResource.equals(context.getName())) {
return null;
}
return node;
}
// No node.
return null;
}
這段代碼的邏輯判斷比較多,我們稍微理一下整個過程
LimitApp
的作用域只在配置的流控策略為RuleConstant.STRATEGY_DIRECT
(直接關聯)時起作用。其有三種配置,分別為default
,origin_name
,other
- default 如果配置為default,表示統計不區分來源,當前資源的任何來源流量都會被統計(其實就是選擇 Node 為 clusterNode 維度)
- origin_name 如果配置為指定名稱的 origin_name,則只會對當前配置的來源流量做統計
- other 如果配置為other 則會對其他全部來源生效但不包括第二條配置的來源
- 當策略配置為 RuleConstant.STRATEGY_RELATE 或 RuleConstant.STRATEGY_CHAIN 時
- STRATEGY_RELATE 關聯其他的指定資源,如資源A想以資源B的流量狀況來決定是否需要限流,這時資源A規則配置可以使用 STRATEGY_RELATE 策略
- STRATEGY_CHAIN 對指定入口的流量限流,因為流量可以有多個不同的入口(EntranceNode)
- 對於上面幾個節點之間的關系不清楚的可以去看我這篇文章開頭的總覽圖 https://www.cnblogs.com/taromilk/p/11750962.html
流控效果
關於流控效果的配置有四種,我們來看下它們的初始化代碼
/**
* class com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil
*/
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
// 只有Grade為統計 QPS時 才可以選擇除默認流控效果外的 其他流控效果控制器
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
switch (rule.getControlBehavior()) {
// 預熱啟動
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
// 超過 閾值 排隊等待 控制器
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
// 上面兩個的結合體
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
// 默認控制器 超過 閾值 直接拒絕
return new DefaultController(rule.getCount(), rule.getGrade());
}
可以比較清晰的看到總共對應有四種流控器的初始化
直接拒絕
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 獲取當前qps
int curCount = avgUsedTokens(node);
// 判斷是否已經大於閾值
if (curCount + acquireCount > count) {
// 如果當前流量具有優先級,則會提前去獲取未來的通過資格
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
此種策略比較簡單粗暴,超過流量閾值的會直接拒絕。不過這里有一個小細節,如果入口流量prioritized為true,也就是優先級比較高,則會通過占用未來時間窗口的名額來實現。這個在上一篇文章有介紹到
預熱啟動
WarmUpController
主要是用來防止流量的突然上升,使系統本在穩定狀態下能處理的,但是由於許多資源沒有預熱,導致處理不了了。注意這里的預熱並不是指系統啟動之后的一次性預熱,而是指系統在運行的任何時候流量從低峰到突增的預熱階段。
下面我們來看下WarmUpController
的具體實現類
/**
* WarmUpController 構造方法
* @param count 當前qps閾值
* @param warmUpPeriodInSec 預熱時長 秒
* @param coldFactor 冷啟動系數 默認為3
*/
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// 剩余Token的警戒值,小於警戒值系統就進入正常運行期
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// 系統最冷時候的剩余Token數
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// 系統預熱的速率(斜率)
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
// 計算當前的 剩余 token 數
syncToken(previousQps);
// 如果進入了警戒線,開始調整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
// 計算剩余token超出警戒值的值
long aboveToken = restToken - warningToken;
// 計算當前允許通過的最大 qps
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
// 不在預熱階段,則直接判斷當前qps是否大於閾值
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
首先是構造方法,主要關注2個重要參數
- warningToken 剩余token的警戒值
- maxToken 剩余的最大token數,如果剩余token數等於maxToken,則說明系統處於最冷階段
要理解這兩個參數的含義,可以參考令牌桶算法,每通過一個請求,就會從令牌桶中取走一個令牌。那么試想一下,當令牌桶中的令牌達到最大值是,是不是意味着系統目前處於最冷階段,因為桶里的令牌始終處於一個非常飽和的狀態。這里的令牌最大值對應的就是maxToken
,而warningToken
,則是對應了一個警戒值,當桶中的令牌數減少到一個指定的值時,說明系統已經度過了預熱階段
當一個請求進來時,首先需要計算當前桶中剩余的token數,具體邏輯在syncToken
方法中
當系統剩余Token大於warningToken時,說明系統仍處於預熱階段,故需要調整當前所能通過的最大qps閾值
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
// 獲取秒級別時間(去除毫秒)
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
long oldValue = storedTokens.get();
// 判斷是否需要往桶中添加令牌
long newValue = coolDownTokens(currentTime, passQps);
// 設置新的token數
if (storedTokens.compareAndSet(oldValue, newValue)) {
// 如果設置成功的話則減去上次通過的qps數量,就得到當前的實際token數
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
- 獲取當前時間
- coolDownTokens 方法會判斷是否需要往桶中放 token,並返回最新的token數
- 如果返回了最新的token數,則將當前剩余的token數減去已經通過的qps,得到最新的剩余token數
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 添加令牌的幾種情況
// 1. 系統初始啟動階段,oldvalue = 0,lastFilledTime也等於0,此時得到一個非常大的newValue,會取maxToken為當前token數量值
// 2. 系統處於預熱階段 且 當前qps小於 count / coldFactor
// 3. 系統處於完成預熱階段
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
這里看一下會添加令牌的幾種情況
- 系統初始啟動階段,oldvalue = 0,lastFilledTime也等於0,此時得到一個非常大的newValue,會取maxToken為當前token數量值
- 系統處於完成預熱階段,需要補充 token 使其穩定在一個范圍內
- 系統處於預熱階段 且 當前qps小於 count / coldFactor
前2種情況比較好理解,這里主要解釋一下第三種情況,為何 當前qps
小於count / coldFactor
時,需要往桶中添加Token?試想一下如果沒有這一步會怎么樣,如果沒有這一步在比較低的qps情況下補充Token,系統最終也會慢慢度過預熱階段,但實際上這么低的qps(小於 count / coldFactor時
)不應該完成預熱。所以這里才會在 qps低於count / coldFactor
時補充剩余token數,來讓系統在低qps情況下始終處於預熱狀態下
排隊等待
排隊等待的實現相對預熱啟動實現比較簡單
首先會通過我們的配置,計算出相鄰兩個請求允許通過的最小時間,然后會記錄最近一個通過的時間。兩者相加即是下一次請求允許通過的最小時間。
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
if (acquireCount <= 0) {
return true;
}
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// 計算相隔兩個請求 需要相隔多長時間
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// 本次期望通過的最小時間
long expectedTime = costTime + latestPassedTime.get();
// 如果當前時間大於期望時間,說明qps還未超過閾值,直接通過
if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
// 當前時間小於於期望時間,請求過快了,需要排隊等待指定時間
// 計算等待時間
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
// 等待時長大於我們設置的最大時長,則不通過
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
// 否則則排隊等待,占用下通過時間
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
// 判斷等待時間是否已經大於最大值
if (waitTime > maxQueueingTimeMs) {
// 大於則將上一步加的值重新減去
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
// 占用等待時間成功,直接sleep costTime
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
排隊等待控制器的核心策略其實就是圍繞了latestPassedTime
進行的,latestPassedTime
指的是上一次請求通過的時間,通過latestPassedTime
+ costTime
來與當前時間做比較,來判斷當前請求是否可以通過,無法通過的請求則會優先占用latestPassedTime
時間,直到sleep到可以通過的時間。當然我們也可以配置排隊等待的最大時間,來限制目前排隊等待通過的請求數量。
預熱啟動排隊等待
預熱排隊等待,WarmUpRateLimiterController
實現類我們發現其繼承了WarmUpController
,這是Sentinel在1.4版本后新加的一種控制器,其實就是預熱啟動和排隊等待的結合體,具體源碼我們就不做分析。
尾言
Sentinel
的流控策略和流控效果的相結合使用還是非常巧妙的,當中的一些設計思想還是非常有借鑒意義的