sentinel 滑動窗口統計機制


sentinel的滑動窗口統計機制就是根據當前時間,獲取對應的時間窗口,並更新該時間窗口中的各項統計指標(pass/block/rt等),這些指標被用來進行后續判斷,比如限流、降級等;隨着時間的推移,當前時間點對應的時間窗口是變化的,這時會涉及到時間窗口的初始化、復用等。可以說,sentinel上的功能所用到的數據幾乎都是滑動窗口統計機制來維護和更新的。

 

sentinel 處理流程是基於slot鏈(ProcessorSlotChain)來完成的,比如限流、熔斷等,其中重要的一個slot就是StatisticSlot,它是做各種數據統計的,而限流/熔斷的數據判斷來源就是StatisticSlot,StatisticSlot的各種數據統計都是基於滑動窗口來完成的,因此本文就重點分析StatisticSlot的滑動窗口統計機制。

 

sentinel 的slot鏈(ProcessorSlotChain)是責任鏈模式的體現,那SlotChain是在哪創建的呢?是在 CtSph.lookProcessChain()方法中創建的,並且該方法會根據當前請求的資源先去一個靜態的HashMap中獲取,如果獲取不到才會創建,創建后會保存到HashMap中。這就意味着,同一個資源會全局共享一個SlotChain。默認生成ProcessorSlotChain邏輯為:

 1 // DefaultSlotChainBuilder
 2 public ProcessorSlotChain build() {
 3    ProcessorSlotChain chain = new DefaultProcessorSlotChain();
 4    chain.addLast(new NodeSelectorSlot());
 5    chain.addLast(new ClusterBuilderSlot());
 6    chain.addLast(new LogSlot());
 7    chain.addLast(new StatisticSlot());
 8    chain.addLast(new SystemSlot());
 9    chain.addLast(new AuthoritySlot());
10    chain.addLast(new FlowSlot());
11    chain.addLast(new DegradeSlot());
12 
13    return chain;
14 }

 

整個處理過程從第一個slot往后一直傳遞到最后一個的,當到達StatisticSlot時,開始統計各項指標,統計的結果又會被后續的Slot所采用,作為各種規則校驗的依據。各種指標如下:

public enum MetricEvent {
   PASS, // Normal pass.
   BLOCK, // Normal block.
   EXCEPTION, // 異常統計
   SUCCESS,
   RT, // rt統計
   OCCUPIED_PASS
}

 

StatisticSlot.entry流程

處理流程走到StatisticSlot時,首先觸發后續slot.entry方法,然后統計各項指標,后續slot中數據判斷來源就是這里統計的各項指標。StatisticSlot.entry 邏輯如下:

 1 @Override
 2 public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNodenode, int count, Object... args) throws Throwable {
 3    try {
 4        // 觸發下一個Slot的entry方法
 5        fireEntry(context, resourceWrapper, node, count, args);
 6        // 如果能通過SlotChain中后面的Slot的entry方法,說明沒有被限流或降級
 7        // 統計信息
 8        node.increaseThreadNum();
 9        node.addPassRequest();
10        // 省略部分代碼
11   } catch (BlockException e) {
12        context.getCurEntry().setError(e);
13        // Add block count.
14        node.increaseBlockedQps();
15        // 省略部分代碼
16        throw e;
17   } catch (Throwable e) {
18        context.getCurEntry().setError(e);
19        // Should not happen
20        node.increaseExceptionQps();
21        // 省略部分代碼
22        throw e;
23   }
24 }

由以上代碼可知,StatisticSlot主要就做了3件事:

  1. 觸發后續slot的entry方法,進行規則校驗

  2. 校驗通過則更新node實時指標數據

  3. 校驗不通過則更新node異常指標數據

注意:由於后續的fireEntry操作和更新本次統計信息是兩個操作,不是原子的,會造成限流不准的小問題,比如設置的FlowRule count為20,並發情況下可能稍大於20,不過針對大部分場景來說,這點偏差是可以容忍的,畢竟我們要的是限流效果,而不是必須精確的限流操作。

 

更新node實時指標數據

我們可以看到 node.addPassRequest() 這段代碼是在fireEntry執行之后執行的,這意味着,當前請求通過了sentinel的流控等規則,此時需要將當次請求記錄下來,也就是執行 node.addPassRequest() 這行代碼,具體的代碼如下所示:

1 // DefaultNode
2 public void addPassRequest() {
3    super.addPassRequest();
4    this.clusterNode.addPassRequest();
5 }

這里的node是一個 DefaultNode 實例,這里特別補充一個 DefaultNode 和 ClusterNode 的區別:

  • DefaultNode:保存着某個resource在某個context中的實時指標,每個DefaultNode都指向一個ClusterNode。

  • ClusterNode:保存着某個resource在所有的context中實時指標的總和,同樣的resource會共享同一個ClusterNode,不管他在哪個context中。

 

上面代碼不管是 DefaultNode 還是 ClusterNode ,走的都是StatisticNode 對象的 addPassRequest 方法:

1 private transient volatile Metric rollingCounterInSecond = new ArrayMetric(2, 1000);
2 private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000);
3 
4 public void addPassRequest(int count) {
5    rollingCounterInSecond.addPass(count); // 對每秒指標統計
6    rollingCounterInMinute.addPass(count); // 每分鍾指標統計
7 }

 

每一個通過的指標(pass)都是調用Metric 的接口進行操作的,並且是通過 ArrayMetric 這種實現類,代碼如下:

public ArrayMetric(int windowLength, int interval) {
   this.data = new WindowLeapArray(windowLength, interval);
}

public void addPass(int count) {
   // 獲取當前時間窗口
   WindowWrap<MetricBucket> wrap = data.currentWindow();
   wrap.value().addPass(count);
}

 

首先通過 currentWindow() 獲取當前時間窗口,然后更新當前時間窗口對應的統計指標,以下代碼重點關注幾個判斷邏輯:

 1 // LeapArray
 2 public WindowWrap<T> currentWindow() {
 3    return currentWindow(TimeUtil.currentTimeMillis());
 4 }
 5 // TimeUtil
 6 public static long currentTimeMillis() {
 7    // currentTimeMillis是由一個tick線程每個1ms更新一次,具體邏輯在TimeUtil類中
 8    return currentTimeMillis;
 9 }
10 // LeapArray
11 public WindowWrap<T> currentWindow(long timeMillis) {
12    // 計算當前時間點落在滑動窗口的下標
13    int idx = calculateTimeIdx(timeMillis);
14    // Calculate current bucket start time.
15    long windowStart = calculateWindowStart(timeMillis);
16 
17    // 獲取當前時間點對應的windowWrap,array為AtomicReferenceArray
18    while (true) {
19        WindowWrap<T> old = array.get(idx);
20        if (old == null) {
21            // 1.為空表示當前時間窗口為初始化過,創建WindowWrap並cas設置到array中
22            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs,windowStart, newEmptyBucket());
23            if (array.compareAndSet(idx, null, window)) {
24                return window;
25           } else {
26                Thread.yield();
27           }
28       } else if (windowStart == old.windowStart()) {
29            // 2.獲取的時間窗口正好對應當前時間,直接返回
30            return old;
31       } else if (windowStart > old.windowStart()) {
32            // 3.獲取的時間窗口為老的,進行reset操作復用
33            if (updateLock.tryLock()) {
34                try {
35                    return resetWindowTo(old, windowStart);
36               } finally {
37                    updateLock.unlock();
38               }
39           } else {
40                Thread.yield();
41           }
42       } else if (windowStart < old.windowStart()) {
43            // 4.時間回撥了,正常情況下不會走到這里
44            return new WindowWrap<T>(windowLengthInMs, windowStart,newEmptyBucket());
45       }
46   }
47 }

 

獲取當前時間窗口對應的WindowWrap之后,就可以進行更新操作了。

// wrap.value().addPass(count);
public void addPass(int n) {
   add(MetricEvent.PASS, n);
}
// MetricBucket
public MetricBucket add(MetricEvent event, long n) {
   // 對應MetricEvent枚舉中值
   counters[event.ordinal()].add(n);
   return this;
}

到這里為止,整個指標統計流程就完成了,下面重點看下滑動窗口機制。

 

滑動窗口機制

時間窗口是用WindowWrap對象表示的,其屬性如下:

private final long windowLengthInMs;  // 時間窗口的長度
private long windowStart; // 時間窗口開始時間
private T value; // MetricBucket對象,保存各個指標數據

 

sentinel時間基准由tick線程來做,每1ms更新一次時間基准,邏輯如下:

currentTimeMillis = System.currentTimeMillis();
Thread daemon = new Thread(new Runnable() {
   @Override
   public void run() {
       while (true) {
           currentTimeMillis = System.currentTimeMillis();
           try {
               TimeUnit.MILLISECONDS.sleep(1);
          } catch (Throwable e) {
          }
      }
  }
});
daemon.setDaemon(true);
daemon.setName("sentinel-time-tick-thread");
daemon.start();

 

sentinel默認有每秒和每分鍾的滑動窗口,對應的LeapArray類型,它們的初始化邏輯是:

protected int windowLengthInMs; // 單個滑動窗口時間值
protected int sampleCount; // 滑動窗口個數
protected int intervalInMs; // 周期值(相當於所有滑動窗口時間值之和)

public LeapArray(int sampleCount, int intervalInMs) {
   this.windowLengthInMs = intervalInMs / sampleCount;
   this.intervalInMs = intervalInMs;
   this.sampleCount = sampleCount;

   this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount);
}

針對每秒滑動窗口,windowLengthInMs=500,sampleCount=2,intervalInMs=1000,針對每分鍾滑動窗口,windowLengthInMs=1000,sampleCount=60,intervalInMs=60000,對應代碼:

private transient volatile Metric rollingCounterInSecond = new ArrayMetric(2, 1000);
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000);

currentTimeMillis時間基准(tick線程)每1ms更新一次,通過currentWindow(timeMillis)方法獲取當前時間點對應的WindowWrap對象,然后更新對應的各種指標,用於做限流、降級時使用。注意,當前時間基准對應的事件窗口初始化時lazy模式,並且會復用的。

 

Sentinel 底層采用高性能的滑動窗口數據結構 LeapArray 來統計實時的秒級指標數據,可以很好地支撐寫多於讀的高並發場景。最后以一張圖結束吧:

 

 

 往期精選 

覺得文章不錯,對你有所啟發和幫助,希望能轉發給更多的小伙伴。如果有問題,請關注下面公眾號,發送問題給我,多謝。
歡迎小伙伴 關注【TopCoder】閱讀更多精彩好文。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM