時間窗限流算法
每個時間窗口長度為 10t ,當 單位時間 10t 時長范圍內,超過 100 時,將會被限流;
存在的問題:相鄰的時間窗之間截取新的時間窗,如: 16t ---- 26t ,同樣為 10t 時間窗長度,但其請求數為 110 ,但系統認為是通過的

該算法存在這樣的問題:連續兩個時間窗口中的統計數據都沒有超出閾值,但在跨窗口的時間窗長度范圍內的統計數據卻超出了閾值
滑動時間窗限流算法
滑動時間窗限流算法解決了固定時間窗限流算法的問題。其沒有划分固定的時間窗起點與終點,而是將每一次請求的到來時間點作為統計時間窗的終點,起點則是終點向前推時間窗長度的時間點。這種時間窗稱為“滑動時間窗”
時間點1:

時間點2:

存在的問題:
如下圖:分析點1 與分析點2 存在大量的重疊區域,所以統計過程中存在大量測重復工作,浪費了大量的系統資源

算法改進:
將時間窗口進行細分為多個子 時間窗口:樣本窗口


針對以上問題,系統采用了一種“折中”的改進措施:將整個時間軸拆分為若干“樣本窗口”,樣本窗口的長度是小於滑動時間窗口長度的。當等於滑動時間窗口長度時,就變為了“固定時間窗口算法”。 一般時間窗口長度會是樣本窗口長度的整數倍。那么是如何判斷一個請求是否能夠通過呢?當到達樣本窗口終點時間時,每個樣本窗口會統計一次本樣本窗口中的流量數據並記錄下來。當一個請求到達時,會統計出當前請求時間點所在樣本窗口中的流量數據,然后再獲取到當前請求時間點所在時間窗中其它樣本窗口的統計數據,求和后,如果沒有超出閾值,則通過,否則被限流。
數據統計源碼解析
Sentinel滑動時間窗算法源碼解析—數據統計如下圖:

Sentinel滑動時間窗算法源碼解析—使用統計數據如下圖:

StatisticNode 滑動時間窗計數器
// 以秒為單位的計量器,定義了一個使用數組保存數據的計量器 private transient volatile Metric rollingCounterInSecond = // SAMPLE_COUNT,樣本窗口數量,默認值為2 // INTERVAL,時間窗長度,默認值1000毫秒,1秒 new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); // 一分為單位的計量器 private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
@Override public void addPassRequest(int count) { // 為滑動計數器增加本次訪問的數據 rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); }
// 這是一個使用數組保存數據的計量器類 public class ArrayMetric implements Metric { // 數據就保存在這個data中 private final LeapArray<MetricBucket> data; 。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
源碼:( 請使用 IDEA 結合查看源碼 )
// 環形數組 public abstract class LeapArray<T> { // 樣本窗口長度 protected int windowLengthInMs; // 一個時間窗中包含的時間窗數量 protected int sampleCount; // 時間窗長度 protected int intervalInMs; private double intervalInSecond; // 這個一個數組,元素為WindowWrap樣本窗口 // 注意,這里的泛型 T 實際為 MetricBucket 類型 protected final AtomicReferenceArray<WindowWrap<T>> array; 。。。。。。。。。。。。。。。。。。。。。。。。。。 public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } // 計算當前時間所在的樣本窗口id,即在計算數組LeapArray中的索引 int idx = calculateTimeIdx(timeMillis); // Calculate current bucket start time. // 計算當前樣本窗口的開始時間點 long windowStart = calculateWindowStart(timeMillis); /* * Get bucket item at given time from the array. * * (1) Bucket is absent, then just create a new bucket and CAS update to circular array. * (2) Bucket is up-to-date, then just return the bucket. * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets. */ while (true) { // 獲取到當前時間所在的樣本窗口 WindowWrap<T> old = array.get(idx); // 若當前時間所在樣本窗口為null,說明該樣本窗口還不存在,則創建一個 if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * bucket is empty, so create new and update * * If the old bucket is absent, then we create a new bucket at {@code windowStart}, * then try to update circular array via a CAS operation. Only one thread can * succeed to update, while other threads yield its time slice. */ // 創建一個時間窗 WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); // 通過CAS方式將新建窗口放入到array if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } // 若當前樣本窗口的起始時間點與計算出的樣本窗口起始時間點相同, // 則說明這兩個是同一個樣本窗口 } else if (windowStart == old.windowStart()) { /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * startTime of Bucket 3: 800, so it's up-to-date * * If current {@code windowStart} is equal to the start timestamp of old bucket, * that means the time is within the bucket, so directly return the bucket. */ return old; // 若當前樣本窗口的起始時間點 大於 計算出的樣本窗口起始時間點, // 說明計算出的樣本窗口已經過時了,需要將原來的樣本窗口替換 } else if (windowStart > old.windowStart()) { /* * (old) * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 * startTime of Bucket 2: 400, deprecated, should be reset * * If the start timestamp of old bucket is behind provided time, that means * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}. * Note that the reset and clean-up operations are hard to be atomic, * so we need a update lock to guarantee the correctness of bucket update. * * The update lock is conditional (tiny scope) and will take effect only when * bucket is deprecated, so in most cases it won't lead to performance loss. */ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. // 替換掉老的樣本窗口 return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } // 當前樣本窗口的起始時間點 小於 計算出的樣本窗口起始時間點, // 這種情況一般不會出現,因為時間不會倒流。除非人為修改了系統時鍾 } else if (windowStart < old.windowStart()) { // Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }
。
// 樣本窗口實例,泛型T為MetricBucket public class WindowWrap<T> { /** * Time length of a single window bucket in milliseconds. */ // 樣本窗口長度 private final long windowLengthInMs; /** * Start timestamp of the window in milliseconds. */ // 樣本窗口的起始時間戳 private long windowStart; /** * Statistic data. */ // 當前樣本窗口中的統計數據,其類型為 MetricBucket private T value; 。。。。。。。。。。。。。。。。。。
。
// 統計數據的封裝類 public class MetricBucket { // 統計的數據存放在這里 // 這里要統計的數據是多維度的,這些維度類型在 MetricEvent 枚舉中 private final LongAdder[] counters; private volatile long minRt; 。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
。
// 數據統計的維度 public enum MetricEvent { /** * Normal pass. */ PASS, /** * Normal block. */ BLOCK, EXCEPTION, SUCCESS, RT, /** * Passed in future quota (pre-occupied, since 1.5.0). */ OCCUPIED_PASS }
.....
