3. Sentinel源碼分析— QPS流量控制是如何實現的?


終於在這周內寫了一篇源碼解析,每周一篇即使再忙也不能打破

Sentinel源碼解析系列:
1.Sentinel源碼分析—FlowRuleManager加載規則做了什么?
2. Sentinel源碼分析—Sentinel是如何進行流量統計的?


上回我們用基於並發數來講了一下Sentinel的整個流程,這篇文章我們來講一下Sentinel的QPS流量控制是如何實現的。

先上一個極簡的demo,我們的代碼就從這個demo入手:

public static void main(String[] args) {
    List<FlowRule> rules = new ArrayList<FlowRule>();
    FlowRule rule1 = new FlowRule();
    rule1.setResource("abc"); 
    rule1.setCount(20);
    rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule1.setLimitApp("default");
    rules.add(rule1);
    FlowRuleManager.loadRules(rules);

    Entry entry = null;

    try {
        entry = SphU.entry("abc");
        //dosomething 
    } catch (BlockException e1) {

    } catch (Exception e2) {
        // biz exception
    } finally {
        if (entry != null) {
            entry.exit();
        }
    }
}

在這個例子中我們首先新建了一個FlowRule實例,然后調用了loadRules方法加載規則,這部分的代碼都和基於並發數的流量控制的代碼是一樣的,想要了解的朋友可以去看看我的這一篇文章1.Sentinel源碼分析—FlowRuleManager加載規則做了什么?,下面我們說說不一樣的地方。

在調用FlowRuleManager的loadRules方法的時候會創建一個rater實例:

FlowRuleUtil#buildFlowRuleMap

//設置拒絕策略:直接拒絕、Warm Up、勻速排隊,默認是DefaultController
TrafficShapingController rater = generateRater(rule);
rule.setRater(rater);

我們進入到generateRater看一下是怎么創建實例的:

FlowRuleUtil#generateRater

private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
    if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
        switch (rule.getControlBehavior()) {
            case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                //warmUpPeriodSec默認是10 
                return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                    ColdFactorProperty.coldFactor);
            case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                //rule.getMaxQueueingTimeMs()默認是500
                return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
            case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                    rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
            case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
            default:
                // Default mode or unknown mode: default traffic shaping controller (fast-reject).
        }
    }
    return new DefaultController(rule.getCount(), rule.getGrade());
}

這個方法里面如果設置的是按QPS的方式來限流的話,可以設置一個ControlBehavior屬性,用來做流量控制分別是:直接拒絕、Warm Up、勻速排隊。

接下來的所有的限流操作全部在FlowSlot中進行,不熟悉Sentinel流程的朋友可以去看看我的這一篇文章:2. Sentinel源碼分析—Sentinel是如何進行流量統計的?,這篇文章介紹了Sentinel的全流程分析,本文的其他流程基本都在這篇文章上講了,只有FlowSlot部分代碼不同。

接下來我們來講一下FlowSlot里面是怎么實現QPS限流的

FlowSlot#entry

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    checkFlow(resourceWrapper, context, node, count, prioritized);

    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
    throws BlockException {
    checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}

FlowSlot在實例化的時候會實例化一個FlowRuleChecker實例作為checker。在checkFlow方法里面會繼續調用FlowRuleChecker的checkFlow方法,其中ruleProvider實例是用來根據根據resource來從flowRules中獲取相應的FlowRule。

我們進入到FlowRuleChecker的checkFlow方法中

FlowRuleChecker#checkFlow

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    if (ruleProvider == null || resource == null) {
        return;
    }
    //返回FlowRuleManager里面注冊的所有規則
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
    if (rules != null) {
        for (FlowRule rule : rules) {
            //如果當前的請求不能通過,那么就拋出FlowException異常
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

這里是調用ruleProvider來獲取所有FlowRule,然后遍歷rule集合通過canPassCheck方法來進行過濾,如果不符合條件則會拋出FlowException異常。

我們跟進去直接來到passLocalCheck方法:

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                      boolean prioritized) {
    //節點選擇
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    if (selectedNode == null) {
        return true;
    }
    //根據設置的規則來攔截
    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}

這個方法里面會選擇好相應的節點后調用rater的canPass方法來判斷是否需要阻塞。

Rater有四個,分別是:DefaultController、RateLimiterController、WarmUpController、WarmUpRateLimiterController,我們挨個分析一下。

其中DefaultController是直接拒絕策略,我們在上一篇文章中已經分析過了,這次我們來看看其他三個。

RateLimiterController勻速排隊

它的中心思想是,以固定的間隔時間讓請求通過。當請求到來的時候,如果當前請求距離上個通過的請求通過的時間間隔不小於預設值,則讓當前請求通過;否則,計算當前請求的預期通過時間,如果該請求的預期通過時間小於規則預設的 timeout 時間,則該請求會等待直到預設時間到來通過(排隊等待處理);若預期的通過時間超出最大排隊時長,則直接拒接這個請求。

這種方式適合用於請求以突刺狀來到,這個時候我們不希望一下子把所有的請求都通過,這樣可能會把系統壓垮;同時我們也期待系統以穩定的速度,逐步處理這些請求,以起到“削峰填谷”的效果,而不是拒絕所有請求。

要想使用這個策略需要在實例化FlowRule的時候設置rule1.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)這樣的一句代碼。

在實例化Rater的時候會調用FlowRuleUtil#generateRateri創建一個實例:

new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());

MaxQueueingTimeMs默認是500 ,Count在我們這個例子中傳入的是20。

我們看一下具體的canPass方法是怎么實現限流的:

public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    // Pass when acquire count is less or equal than 0.
    if (acquireCount <= 0) {
        return true;
    }
    // Reject when count is less or equal than 0.
    // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
    if (count <= 0) {
        return false;
    }

    long currentTime = TimeUtil.currentTimeMillis();
    //兩個請求預期通過的時間,也就是說把請求平均分配到1秒上
    // Calculate the interval between every two requests.
    long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

    //latestPassedTime代表的是上一次調用請求的時間
    // Expected pass time of this request.
    long expectedTime = costTime + latestPassedTime.get();
    //如果預期通過的時間加上上次的請求時間小於當前時間,則通過
    if (expectedTime <= currentTime) {
        // Contention may exist here, but it's okay.
        latestPassedTime.set(currentTime);
        return true;
    } else {
        //默認是maxQueueingTimeMs
        // Calculate the time to wait.
        long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();

        //如果預提時間比當前時間大maxQueueingTimeMs那么多,那么就阻塞
        if (waitTime > maxQueueingTimeMs) {
            return false;
        } else {
            //將上次時間加上這次請求要耗費的時間
            long oldTime = latestPassedTime.addAndGet(costTime);
            try {
                waitTime = oldTime - TimeUtil.currentTimeMillis();
                //再次判斷一下是否超過maxQueueingTimeMs設置的時間
                if (waitTime > maxQueueingTimeMs) {
                    //如果是的話就阻塞,並重置上次通過時間
                    latestPassedTime.addAndGet(-costTime);
                    return false;
                }
                //如果需要等待的時間大於零,那么就sleep
                // in race condition waitTime may <= 0
                if (waitTime > 0) {
                    Thread.sleep(waitTime);
                }
                return true;
            } catch (InterruptedException e) {
            }
        }
    }
    return false;
}

這個方法一開始會計算一下costTime這個值,將請求平均分配到一秒中。例如:當 count 設為 10 的時候,則代表一秒勻速的通過 10 個請求,也就是每個請求平均間隔恆定為 1000 / 10 = 100 ms。

但是這里有個小bug,如果count設置的比較大,比如設置成10000,那么costTime永遠都會等於0,整個QPS限流將會失效。

然后會將costTime和上次的請求時間相加,如果大於當前時間就表明請求的太頻繁了,會將latestPassedTime這個屬性加上這次請求的costTime,並調用sleep方法讓這個線程先睡眠一會再請求。

這里有個細節,如果多個請求同時一起過來,那么每個請求在設置oldTime的時候都會通過addAndGet這個原子性的方法將latestPassedTime依次相加,並賦值給oldTime,這樣每個線程的sleep的時間都不會相同,線程也不會同時醒來。

WarmUpController限流 冷啟動

當系統長期處於低水位的情況下,當流量突然增加時,直接把系統拉升到高水位可能瞬間把系統壓垮。通過"冷啟動",讓通過的流量緩慢增加,在一定時間內逐漸增加到閾值上限,給冷系統一個預熱的時間,避免冷系統被壓垮。

//默認為3
private int coldFactor;
//轉折點的令牌數
protected int warningToken = 0;
//最大的令牌數
private int maxToken;
//斜線斜率
protected double slope;
//累積的令牌數
protected AtomicLong storedTokens = new AtomicLong(0);
//最后更新令牌的時間
protected AtomicLong lastFilledTime = new AtomicLong(0);

public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
    construct(count, warmUpPeriodInSec, coldFactor);
}

private void construct(double count, int warmUpPeriodInSec, int coldFactor) {

    if (coldFactor <= 1) {
        throw new IllegalArgumentException("Cold factor should be larger than 1");
    }

    this.count = count;
    //默認是3
    this.coldFactor = coldFactor;

    // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
    // 10*20/2 = 100
    // warningToken = 100;
    warningToken = (int) (warmUpPeriodInSec * count) / (coldFactor - 1);
    // / maxPermits = thresholdPermits + 2 * warmupPeriod /
    // (stableInterval + coldInterval)
    // maxToken = 200
    maxToken = warningToken + (int) (2 * warmUpPeriodInSec * count / (1.0 + coldFactor));

    // slope
    // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits
    // - thresholdPermits);
    slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}

這里我拿一張圖來說明一下:

X 軸代表 storedPermits 的數量,Y 軸代表獲取一個 permits 需要的時間。

假設指定 permitsPerSecond 為 10,那么 stableInterval 為 100ms,而 coldInterval 是 3 倍,也就是 300ms(coldFactor,3 倍 )。也就是說,當達到 maxPermits 時,此時處於系統最冷的時候,獲取一個 permit 需要 300ms,而如果 storedPermits 小於 thresholdPermits 的時候,只需要 100ms。

利用 “獲取冷的 permits ” 需要等待更多時間,來限制突發請求通過,達到系統預熱的目的。

所以在我們的代碼中,maxToken代表的就是圖中的maxPermits,warningToken代表的就是thresholdPermits,slope就是代表每次獲取permit減少的程度。

我們接下來看看WarmUpController的canpass方法:

WarmUpController#canpass

public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    //獲取當前時間窗口的流量大小
    long passQps = (long) node.passQps();
    //獲取上一個窗口的流量大小
    long previousQps = (long) node.previousPassQps();
    //設置 storedTokens 和 lastFilledTime 到正確的值
    syncToken(previousQps);

    // 開始計算它的斜率
    // 如果進入了警戒線,開始調整他的qps
    long restToken = storedTokens.get();
    if (restToken >= warningToken) {
        //通過計算當前的restToken和警戒線的距離來計算當前的QPS
        //離警戒線越接近,代表這個程序越“熱”,從而逐步釋放QPS
        long aboveToken = restToken - warningToken;
        //當前狀態下能達到的最高 QPS
        // current interval = restToken*slope+1/count
        double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));

        // 如果不會超過,那么通過,否則不通過
        if (passQps + acquireCount <= warningQps) {
            return true;
        }
    } else {
        // count 是最高能達到的 QPS
        if (passQps + acquireCount <= count) {
            return true;
        }
    }
    return false;
}

這個方法里通過syncToken(previousQps)設置storedTokens的值后,與警戒值做判斷,如果沒有達到警戒值,那么通過計算和警戒值的距離再加上slope計算出一個當前的QPS值,storedTokens越大當前的QPS越小。

如果當前的storedTokens已經小於警戒值了,說明已經預熱完畢了,直接用count判斷就好了。

WarmUpController#syncToken

protected void syncToken(long passQps) {
    long currentTime = TimeUtil.currentTimeMillis();
    //去掉毫秒的時間
    currentTime = currentTime - currentTime % 1000;
    long oldLastFillTime = lastFilledTime.get();
    if (currentTime <= oldLastFillTime) {
        return;
    }

    // 令牌數量的舊值
    long oldValue = storedTokens.get();
    // 計算新的令牌數量,往下看
    long newValue = coolDownTokens(currentTime, passQps);

    if (storedTokens.compareAndSet(oldValue, newValue)) {
        // 令牌數量上,減去上一分鍾的 QPS,然后設置新值
        long currentValue = storedTokens.addAndGet(0 - passQps);
        if (currentValue < 0) {
            storedTokens.set(0L);
        }
        lastFilledTime.set(currentTime);
    } 
}

這個方法通過coolDownTokens方法來獲取一個新的value,然后通過CAS設置到storedTokens中,然后將storedTokens減去上一個窗口的QPS值,並為lastFilledTime設置一個新的值。

其實我這里有個疑惑,在用storedTokens減去上一個窗口的QPS的時候並沒有做控制,假如處理的速度非常的快,在一個窗口內就減了很多次,直接把當前的storedTokens減到了小於warningToken,那么是不是就沒有在一定的時間范圍內啟動冷啟動的效果?

private long coolDownTokens(long currentTime, long passQps) {
    long oldValue = storedTokens.get();
    long newValue = oldValue;

    // 添加令牌的判斷前提條件:
    // 當令牌的消耗程度遠遠低於警戒線的時候
    if (oldValue < warningToken) {
        // 根據count數每秒加上令牌
        newValue = (long) (oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
    } else if (oldValue > warningToken) {
        //如果還在冷啟動階段
        // 如果當前通過的 QPS 大於 count/coldFactor,說明系統消耗令牌的速度,大於冷卻速度
        //    那么不需要添加令牌,否則需要添加令牌
        if (passQps < (int) count / coldFactor) {
            newValue = (long) (oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
        }
    }
    return Math.min(newValue, maxToken);
}

這個方法主要是用來做添加令牌的操作,如果是流量比較小或者是已經預熱完畢了,那么就需要根據count數每秒加上令牌,如果是在預熱階段那么就不進行令牌添加。

WarmUpRateLimiterController就是結合了冷啟動和勻速排隊,代碼非常的簡單,有了上面的分析,相信大家也能看得懂,所以也就不講解了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM