1. 阿里sentinel源碼研究深入
1.1. 前言
- 昨天已經把sentinel成功部署到線上環境,可參考我上篇博文,該走的坑也都走了一遍,已經可以初步使用它的限流和降級功能,根據我目前的實踐,限流和降級規則似乎不能一同起效,還不知道原因,下面繼續探索
1.2. 源碼
1.2.1. 流控降級監控等的構建
- 首先客戶端而言,我關注的是我寫的代碼
SphU.entry
,這明顯是很關鍵的方法,下圖的內容就是這里構建的
-Sentinel工作主流程就包含在上面一個方法里,通過鏈式調用的方式,經過了建立樹狀結構,保存統計簇點,異常日志記錄,實時數據統計,負載保護,權限認證,流量控制,熔斷降級等Slot
- 進入鏈式方法的入口為CtSph類,try方法大括號內
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
1.2.2. 修改控制台規則是如何通知客戶端的?
- 看sentinel-transport-simple-http包中的
HttpEventTask
類,它開啟了一個線程,轉么用來做為socket連接,控制台通過socket請求通知客戶端,從而更新客戶端規則,更改規則核心代碼如下
// Find the matching command handler.
CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
if (commandHandler != null) {
CommandResponse<?> response = commandHandler.handle(request);
handleResponse(response, printWriter, outputStream);
} else {
// No matching command handler.
badRequest(printWriter, "Unknown command `" + commandName + '`');
}
通過命令模式,commandName為setRules時,更新規則
1.2.3. 既然它建立連接用的socket,為什么不用netty呢?
- 帶着這個疑問,我本想在issues里找下,突然發現它的源碼中有個
sentinel-transport-netty-http
這個包和sentinel-transport-simple-http
處於同級,官方的例子用的simple-http,但明顯它也准備了netty-http,於是我替換成了netty-http,運行后效果和原先一樣,至於效率上有沒有提升,我就不清楚了_
1.2.4. 流量規則如何檢查?
- 該規則檢查類為
FlowRuleChecker
,在core核心包中,核心檢查方法如下
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
1.2.5. 熔斷降級如何判斷?
- 判斷類為
DegradeRuleManager
,在core核心包,核心內容如下,再深入就是它判斷的算法了,感興趣的自己去看如下的passCheck
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
throws BlockException {
Set<DegradeRule> rules = degradeRules.get(resource.getName());
if (rules == null) {
return;
}
for (DegradeRule rule : rules) {
if (!rule.passCheck(context, node, count)) {
throw new DegradeException(rule.getLimitApp(), rule);
}
}
}
1.2.6. 默認的鏈條構建在哪?
- 核心類為
DefaultSlotChainBuilder
,構建了如下的slot
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
}
1.2.7. 既然已經知道了它是如何構建鏈式的處理節點的,我們是否何可自己重新構建?
- 發現類
SlotChainProvider
中的構建方法如下
private static void resolveSlotChainBuilder() {
List<SlotChainBuilder> list = new ArrayList<SlotChainBuilder>();
boolean hasOther = false;
for (SlotChainBuilder builder : LOADER) {
if (builder.getClass() != DefaultSlotChainBuilder.class) {
hasOther = true;
list.add(builder);
}
}
if (hasOther) {
builder = list.get(0);
} else {
// No custom builder, using default.
builder = new DefaultSlotChainBuilder();
}
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
+ builder.getClass().getCanonicalName());
}
- 也就是說,我們如果在
LOADER
中加入了其他的非默認實現就可以替代原來的DefaultSlotChainBuilder
,那LOADER
怎么來的?看代碼,如下的全局變量,也就是需要自定義實現SlotChainBuilder
接口的實現類
private static final ServiceLoader<SlotChainBuilder> LOADER = ServiceLoader.load(SlotChainBuilder.class);
1.2.8. 如何實現SlotChainBuilder接口呢?
- 這里要注意的是它使用了
ServiceLoader
,也就是SPI
,全稱Service Provider Interface
,加載它需要特定的配合,比如我自定義實現一個Slot
/**
* @author laoliangliang
* @date 2019/7/25 14:13
*/
public class MySlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
//自定義的
chain.addLast(new CarerSlot());
return chain;
}
}
/**
* @author laoliangliang
* @date 2019/7/25 14:15
*/
@Slf4j
public class CarerSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
log.info(JSON.toJSONString(resourceWrapper));
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
- 這里我自定義了
CarerSlot
,那是否能被加載到呢?事實上還不夠,需要在META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder
建這樣一個文件,內容如下
- 好了,這樣配置過后,它就能讀到我們自定義的實現類代替它原先的類了
1.2.9. 該命令模式最初的初始化階段在哪?
- 用過sentinel的都會感受到,只有當有第一個sentinel監控的請求過來時,sentinel客戶端才會正式初始化,這樣看來,這個初始化步驟應該在哪呢?
- 我通過不斷反向跟蹤上述的命令模式最初的初始化,找到了最初初始化的地方如下
public class Env {
public static final Sph sph = new CtSph();
static {
// If init fails, the process will exit.
InitExecutor.doInit();
}
}
- 有沒有覺得很熟悉?doInit就是很多初始化的起點,當Env被調用時會運行static代碼塊,那么只有可能是sph被調用時
- 只要你debug過我上述第一條
SphU.entry
的源碼,就會發現,如下,該方法一進入不就是先獲取Env的sph,再調用的entry嗎,所以初始化的地方也就找到了,第一次調用SphU.entry
的地方,或者你不用這個,使用的注解,里面同樣有這個方法
public static Entry entry(String name) throws BlockException {
return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
}
1.2.10. 注解是如何實現熔斷降級的?
- 這個其實是比較容易理解的,既然通過
SphU.entry
包裹可以實現熔斷降級,通過注解的形式包裹代碼方法應該是比較容易的,那么在哪里實現和配置的呢 - 看過我前一篇文章的應該看到了,有存在如下配置
@Bean
public SentinelResourceAspect sentinelResourceAspect() {
pushlish();
return new SentinelResourceAspect();
}
- 很明顯的注解切面,通過spring注解的形式注入,我覺得這還是比較優雅的注入方式了,點進入就可以看到如下
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}
對@SentinelResource
注解進行了處理
1.2.11. 什么是直接失敗?
- 這個很好理解,qps超過設置的值,直接失敗
1.2.12. 什么是排隊等待?
-
這個似乎看字面意思很好理解,但是一旦你點了這個選項,下面還有個參數的
-
所以這個排隊等待是有超時時間的,達到峰值后勻速通過,采用的漏桶算法,流控圖
1.2.13. 什么是慢啟動模式?
- 以下是核心算法,
Warm Up
模式不看算法細節,看它的中文說明應該就能理解是怎么回事了吧;所謂慢啟動模式,要求系統的QPS請求增速不能超過一定的速率,否則會被壓制超過部分請求失敗,應該是為了避免一啟動就有大流量的請求進入導致系統一下子就宕機卡主或直接進入了熔斷
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 開始計算它的斜率
// 如果進入了警戒線,開始調整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
-
配置如下時,測試流控
-
流控圖
1.2.14. 模式總結
-
你會發現直接失敗和排隊等待的區別在流控圖上並不明顯,那差別在哪呢?我重慶給個請求參數,5秒內模擬100個人輪流請求10次
-
sentinel控制台設置
-
流控圖
- 總結:我設置了超時時間是5秒,而100個線程10次輪詢也就是1000個請求,可以看出,它並不是一定要在5秒內解決這些請求,有了延時后,代表只要響應時間在5秒以內,不管多少請求都不會拒絕;
- 幾個模式有利有弊,默認的快速失敗使我們可以最大程度的控制系統的QPS,避免造成系統壓力過大,但同時可能造成用於的體驗效果變差
- 慢啟動上面說過了
- 排隊等待在設置合理的超時時間后可以最大程度的避免求情的失敗,但同時可能造成線程壓力過大
- 綜上,在我看來排隊等待模式是比較適合線上運行的,只是需要設置合理的超時時間,大公司機器不愁那就設小點,業界一般標准是200ms用戶無感知,中小型可以設500ms甚至更大,看機器情況動態調整了
1.2.15. 提醒
- 像我是用apollo來持久化規則的,你也可以用nacos,redis,zookeeper等,當控制台未啟動時,你啟動客戶端規則也會生效,只是沒了控制台實時監控數據