限流是指在系統面臨高並發、大流量請求的情況下,限制新的流量對系統的訪問,從而保證系統服務的安全性。常用的限流算法有計數器固定窗口算法、滑動窗口算法、漏斗算法和令牌桶算法,下面將對這幾種算法進行分別介紹,並給出具體的實現。本文目錄如下,略長,讀者可以全文閱讀,同樣也可以只看感興趣的部分。
計數器固定窗口算法
原理
計數器固定窗口算法是最基礎也是最簡單的一種限流算法。原理就是對一段固定時間窗口內的請求進行計數,如果請求數超過了閾值,則舍棄該請求;如果沒有達到設定的閾值,則接受該請求,且計數加1。當時間窗口結束時,重置計數器為0。

代碼實現及測試
實現起來也比較簡單,如下:
package project.limiter;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * Project: AllForJava
 * Title:
 * Description:
 * Date: 2020-09-07 15:56
 * Copyright: Copyright (c) 2020
 *
* @公眾號: 超悅編程
* @微信號:exzlco
* @author: 超悅人生
* @email: exzlc@139.com
* @version 1.0
 **/
public class CounterLimiter {
    private int windowSize; //窗口大小,毫秒為單位
    private int limit;//窗口內限流大小
    private AtomicInteger count;//當前窗口的計數器
    private CounterLimiter(){}
    public CounterLimiter(int windowSize,int limit){
        this.limit = limit;
        this.windowSize = windowSize;
        count = new AtomicInteger(0);
        //開啟一個線程,達到窗口結束時清空count
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    count.set(0);
                    try {
                        Thread.sleep(windowSize);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
    //請求到達后先調用本方法,若返回true,則請求通過,否則限流
    public boolean tryAcquire(){
        int newCount = count.addAndGet(1);
        if(newCount > limit){
            return false;
        }else{
            return true;
        }
    }
    //測試
    public static void main(String[] args) throws InterruptedException {
        //每秒20個請求
        CounterLimiter counterLimiter = new CounterLimiter(1000,20);
        int count = 0;
        //模擬50次請求,看多少能通過
        for(int i = 0;i < 50;i ++){
            if(counterLimiter.tryAcquire()){
                count ++;
            }
        }
        System.out.println("第一撥50次請求中通過:" + count + ",限流:" + (50 - count));
        //過一秒再請求
        Thread.sleep(1000);
        //模擬50次請求,看多少能通過
        count = 0;
        for(int i = 0;i < 50;i ++){
            if(counterLimiter.tryAcquire()){
                count ++;
            }
        }
        System.out.println("第二撥50次請求中通過:" + count + ",限流:" + (50 - count));
    }
}
 
         
         
        測試結果如下:

可以看到50個請求只有20個通過了,30個被限流,達到了預期的限流效果。
特點分析
優點:實現簡單,容易理解。
缺點:流量曲線可能不夠平滑,有“突刺現象”,如下圖所示。這樣會有兩個問題:

-  
一段時間內(不超過時間窗口)系統服務不可用。比如窗口大小為1s,限流大小為100,然后恰好在某個窗口的第1ms來了100個請求,然后第2ms-999ms的請求就都會被拒絕,這段時間用戶會感覺系統服務不可用。
 -  
窗口切換時可能會產生兩倍於閾值流量的請求。比如窗口大小為1s,限流大小為100,然后恰好在某個窗口的第999ms來了100個請求,窗口前期沒有請求,所以這100個請求都會通過。再恰好,下一個窗口的第1ms有來了100個請求,也全部通過了,那也就是在2ms之內通過了200個請求,而我們設定的閾值是100,通過的請求達到了閾值的兩倍。

 
計數器滑動窗口算法
原理
計數器滑動窗口算法是計數器固定窗口算法的改進,解決了固定窗口切換時可能會產生兩倍於閾值流量請求的缺點。
滑動窗口算法在固定窗口的基礎上,將一個計時窗口分成了若干個小窗口,然后每個小窗口維護一個獨立的計數器。當請求的時間大於當前窗口的最大時間時,則將計時窗口向前平移一個小窗口。平移時,將第一個小窗口的數據丟棄,然后將第二個小窗口設置為第一個小窗口,同時在最后面新增一個小窗口,將新的請求放在新增的小窗口中。同時要保證整個窗口中所有小窗口的請求數目之后不能超過設定的閾值。

從圖中不難看出,滑動窗口算法就是固定窗口的升級版。將計時窗口划分成一個小窗口,滑動窗口算法就退化成了固定窗口算法。而滑動窗口算法其實就是對請求數進行了更細粒度的限流,窗口划分的越多,則限流越精准。
代碼實現及測試
package project.limiter;
/**
 * Project: AllForJava
 * Title:
 * Description:
 * Date: 2020-09-07 18:38
 * Copyright: Copyright (c) 2020
 *
* @公眾號: 超悅編程
* @微信號:exzlco
* @author: 超悅人生
* @email: exzlc@139.com
* @version 1.0
 **/
public class CounterSildeWindowLimiter {
    private int windowSize; //窗口大小,毫秒為單位
    private int limit;//窗口內限流大小
    private int splitNum;//切分小窗口的數目大小
    private int[] counters;//每個小窗口的計數數組
    private int index;//當前小窗口計數器的索引
    private long startTime;//窗口開始時間
    private CounterSildeWindowLimiter(){}
    public CounterSildeWindowLimiter(int windowSize, int limit, int splitNum){
        this.limit = limit;
        this.windowSize = windowSize;
        this.splitNum = splitNum;
        counters = new int[splitNum];
        index = 0;
        startTime = System.currentTimeMillis();
    }
    //請求到達后先調用本方法,若返回true,則請求通過,否則限流
    public synchronized boolean tryAcquire(){
        long curTime = System.currentTimeMillis();
        long windowsNum = Math.max(curTime - windowSize - startTime,0) / (windowSize / splitNum);//計算滑動小窗口的數量
        slideWindow(windowsNum);//滑動窗口
        int count = 0;
        for(int i = 0;i < splitNum;i ++){
            count += counters[i];
        }
        if(count >= limit){
            return false;
        }else{
            counters[index] ++;
            return true;
        }
    }
    private synchronized void slideWindow(long windowsNum){
        if(windowsNum == 0)
            return;
        long slideNum = Math.min(windowsNum,splitNum);
        for(int i = 0;i < slideNum;i ++){
            index = (index + 1) % splitNum;
            counters[index] = 0;
        }
        startTime = startTime + windowsNum * (windowSize / splitNum);//更新滑動窗口時間
    }
    //測試
    public static void main(String[] args) throws InterruptedException {
        //每秒20個請求
        int limit = 20;
        CounterSildeWindowLimiter counterSildeWindowLimiter = new CounterSildeWindowLimiter(1000,limit,10);
        int count = 0;
        Thread.sleep(3000);
        //計數器滑動窗口算法模擬100組間隔30ms的50次請求
        System.out.println("計數器滑動窗口算法測試開始");
        System.out.println("開始模擬100組間隔150ms的50次請求");
        int faliCount = 0;
        for(int j = 0;j < 100;j ++){
            count = 0;
            for(int i = 0;i < 50;i ++){
                if(counterSildeWindowLimiter.tryAcquire()){
                    count ++;
                }
            }
            Thread.sleep(150);
            //模擬50次請求,看多少能通過
            for(int i = 0;i < 50;i ++){
                if(counterSildeWindowLimiter.tryAcquire()){
                    count ++;
                }
            }
            if(count > limit){
                System.out.println("時間窗口內放過的請求超過閾值,放過的請求數" + count + ",限流:" + limit);
                faliCount ++;
            }
            Thread.sleep((int)(Math.random() * 100));
        }
        System.out.println("計數器滑動窗口算法測試結束,100組間隔150ms的50次請求模擬完成,限流失敗組數:" + faliCount);
        System.out.println("===========================================================================================");
        //計數器固定窗口算法模擬100組間隔30ms的50次請求
        System.out.println("計數器固定窗口算法測試開始");
        //模擬100組間隔30ms的50次請求
        CounterLimiter counterLimiter = new CounterLimiter(1000,limit);
        System.out.println("開始模擬100組間隔150ms的50次請求");
        faliCount = 0;
        for(int j = 0;j < 100;j ++){
            count = 0;
            for(int i = 0;i < 50;i ++){
                if(counterLimiter.tryAcquire()){
                    count ++;
                }
            }
            Thread.sleep(150);
            //模擬50次請求,看多少能通過
            for(int i = 0;i < 50;i ++){
                if(counterLimiter.tryAcquire()){
                    count ++;
                }
            }
            if(count > limit){
                System.out.println("時間窗口內放過的請求超過閾值,放過的請求數" + count + ",限流:" + limit);
                faliCount ++;
            }
            Thread.sleep((int)(Math.random() * 100));
        }
        System.out.println("計數器滑動窗口算法測試結束,100組間隔150ms的50次請求模擬完成,限流失敗組數:" + faliCount);
    }
}
 
        測試時,取滑動窗口大小為1000/10=100ms,然后模擬100組間隔150ms的50次請求,計數器滑動窗口算法與計數器固定窗口算法進行對別,可以看到如下結果:

固定窗口算法在窗口切換時產生了兩倍於閾值流量請求的問題,而滑動窗口算法避免了這個問題。
特點分析
- 避免了計數器固定窗口算法固定窗口切換時可能會產生兩倍於閾值流量請求的問題;
 - 和漏斗算法相比,新來的請求也能夠被處理到,避免了漏斗算法的飢餓問題。
 
漏斗算法
原理
漏斗算法的原理也很容易理解。請求來了之后會首先進到漏斗里,然后漏斗以恆定的速率將請求流出進行處理,從而起到平滑流量的作用。當請求的流量過大時,漏斗達到最大容量時會溢出,此時請求被丟棄。從系統的角度來看,我們不知道什么時候會有請求來,也不知道請求會以多大的速率來,這就給系統的安全性埋下了隱患。但是如果加了一層漏斗算法限流之后,就能夠保證請求以恆定的速率流出。在系統看來,請求永遠是以平滑的傳輸速率過來,從而起到了保護系統的作用。

代碼實現及測試
package project.limiter;
import java.util.Date;
import java.util.LinkedList;
/**
* Project: AllForJava
* Title: 
* Description:
* Date: 2020-09-08 16:45
* Copyright: Copyright (c) 2020
*
* @公眾號: 超悅編程
* @微信號:exzlco
* @author: 超悅人生
* @email: exzlc@139.com
* @version 1.0
**/
public class LeakyBucketLimiter {
    private int capaticy;//漏斗容量
    private int rate;//漏斗速率
    private int left;//剩余容量
    private LinkedList<Request> requestList;
    private LeakyBucketLimiter() {}
    public LeakyBucketLimiter(int capaticy, int rate) {
        this.capaticy = capaticy;
        this.rate = rate;
        this.left = capaticy;
        requestList = new LinkedList<>();
        //開啟一個定時線程,以固定的速率將漏斗中的請求流出,進行處理
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    if(!requestList.isEmpty()){
                        Request request = requestList.removeFirst();
                        handleRequest(request);
                    }
                    try {
                        Thread.sleep(1000 / rate); //睡眠
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
    /**
     * 處理請求
     * @param request
     */
    private void handleRequest(Request request){
        request.setHandleTime(new Date());
        System.out.println(request.getCode() + "號請求被處理,請求發起時間:"
                + request.getLaunchTime() + ",請求處理時間:" + request.getHandleTime() + ",處理耗時:"
                + (request.getHandleTime().getTime()  - request.getLaunchTime().getTime()) + "ms");
    }
    public synchronized boolean tryAcquire(Request request){
        if(left <= 0){
            return false;
        }else{
            left --;
            requestList.addLast(request);
            return true;
        }
    }
    /**
     * 請求類,屬性包含編號字符串、請求達到時間和請求處理時間
     */
    static class Request{
        private int code;
        private Date launchTime;
        private Date handleTime;
        private Request() { }
        public Request(int code,Date launchTime) {
            this.launchTime = launchTime;
            this.code = code;
        }
        public int getCode() {
            return code;
        }
        public void setCode(int code) {
            this.code = code;
        }
        public Date getLaunchTime() {
            return launchTime;
        }
        public void setLaunchTime(Date launchTime) {
            this.launchTime = launchTime;
        }
        public Date getHandleTime() {
            return handleTime;
        }
        public void setHandleTime(Date handleTime) {
            this.handleTime = handleTime;
        }
    }
    public static void main(String[] args) {
        LeakyBucketLimiter leakyBucketLimiter = new LeakyBucketLimiter(5,2);
        for(int i = 1;i <= 10;i ++){
            Request request = new Request(i,new Date());
            if(leakyBucketLimiter.tryAcquire(request)){
                System.out.println(i + "號請求被接受");
            }else{
                System.out.println(i + "號請求被拒絕");
            }
        }
    }
}
 
        測試時,取漏斗限流算法的容量是5,漏斗速率為2個/秒,然后模擬了連續的10個請求,編號從1-10,結果如下:

可以看到1-5號請求被接受,而6-10號請求被拒絕,說明此時漏斗已經溢出了,符合我們的預期。
我們再關注下被接受的這5個請求的處理情況,可以看到這5個請求雖然被接受了,但是處理是一個一個被處理的(不一定是順序的,取決於具體實現),大約每500ms處理一個。這就體現了漏斗算法的特點了,即雖然請求流量是瞬時產生的,但是請求以固定速率流出被處理。因為我們設定的漏斗速率為2個/秒,所以每500ms漏斗會漏出一個請求然后進行處理。
特點分析
- 漏桶的漏出速率是固定的,可以起到整流的作用。即雖然請求的流量可能具有隨機性,忽大忽小,但是經過漏斗算法之后,變成了有固定速率的穩定流量,從而對下游的系統起到保護作用。
 - 不能解決流量突發的問題。還是拿剛剛測試的例子,我們設定的漏斗速率是2個/秒,然后突然來了10個請求,受限於漏斗的容量,只有5個請求被接受,另外5個被拒絕。你可能會說,漏斗速率是2個/秒,然后瞬間接受了5個請求,這不就解決了流量突發的問題嗎?不,這5個請求只是被接受了,但是沒有馬上被處理,處理的速度仍然是我們設定的2個/秒,所以沒有解決流量突發的問題。而接下來我們要談的令牌桶算法能夠在一定程度上解決流量突發的問題,讀者可以對比一下。
 
令牌桶算法
原理
令牌桶算法是對漏斗算法的一種改進,除了能夠起到限流的作用外,還允許一定程度的流量突發。在令牌桶算法中,存在一個令牌桶,算法中存在一種機制以恆定的速率向令牌桶中放入令牌。令牌桶也有一定的容量,如果滿了令牌就無法放進去了。當請求來時,會首先到令牌桶中去拿令牌,如果拿到了令牌,則該請求會被處理,並消耗掉拿到的令牌;如果令牌桶為空,則該請求會被丟棄。

代碼實現及測試
package project.limiter;
import java.util.Date;
/**
* Project: AllForJava
* Title: 
* Description:
* Date: 2020-09-08 19:22
* Copyright: Copyright (c) 2020
* 
* @公眾號: 超悅編程
* @微信號:exzlco
* @author: 超悅人生
* @email: exzlc@139.com
* @version 1.0
**/
public class TokenBucketLimiter {
    private int capaticy;//令牌桶容量
    private int rate;//令牌產生速率
    private int tokenAmount;//令牌數量
    public TokenBucketLimiter(int capaticy, int rate) {
        this.capaticy = capaticy;
        this.rate = rate;
        tokenAmount = capaticy;
        new Thread(new Runnable() {
            @Override
            public void run() {
                //以恆定速率放令牌
                while (true){
                    synchronized (this){
                        tokenAmount ++;
                        if(tokenAmount > capaticy){
                            tokenAmount = capaticy;
                        }
                    }
                    try {
                        Thread.sleep(1000 / rate);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
    public synchronized boolean tryAcquire(Request request){
        if(tokenAmount > 0){
            tokenAmount --;
            handleRequest(request);
            return true;
        }else{
            return false;
        }
    }
    /**
     * 處理請求
     * @param request
     */
    private void handleRequest(Request request){
        request.setHandleTime(new Date());
        System.out.println(request.getCode() + "號請求被處理,請求發起時間:"
                + request.getLaunchTime() + ",請求處理時間:" + request.getHandleTime() + ",處理耗時:"
                + (request.getHandleTime().getTime()  - request.getLaunchTime().getTime()) + "ms");
    }
    /**
     * 請求類,屬性只包含一個名字字符串
     */
    static class Request{
        private int code;
        private Date launchTime;
        private Date handleTime;
        private Request() { }
        public Request(int code,Date launchTime) {
            this.launchTime = launchTime;
            this.code = code;
        }
        public int getCode() {
            return code;
        }
        public void setCode(int code) {
            this.code = code;
        }
        public Date getLaunchTime() {
            return launchTime;
        }
        public void setLaunchTime(Date launchTime) {
            this.launchTime = launchTime;
        }
        public Date getHandleTime() {
            return handleTime;
        }
        public void setHandleTime(Date handleTime) {
            this.handleTime = handleTime;
        }
    }
    public static void main(String[] args) throws InterruptedException {
        TokenBucketLimiter tokenBucketLimiter = new TokenBucketLimiter(5,2);
        for(int i = 1;i <= 10;i ++){
            Request request = new Request(i,new Date());
            if(tokenBucketLimiter.tryAcquire(request)){
                System.out.println(i + "號請求被接受");
            }else{
                System.out.println(i + "號請求被拒絕");
            }
        }
    }
}
 
        測試時,為了與漏斗限流算法進行對別,同樣取令牌桶算法的容量是5,產生令牌的速度為2個/秒,然后模擬了連續的10個請求,編號從1-10,結果如下:

可以看到,對於10個請求,令牌桶算法和漏斗算法一樣,都是接受了5個請求,拒絕了5個請求。與漏斗算法不同的是,令牌桶算法馬上處理了這5個請求,處理速度可以認為是5個/秒,超過了我們設定的2個/秒的速率,即允許一定程度的流量突發。這一點也是和漏斗算法的主要區別,可以認真體會一下。
特點分析
令牌桶算法是對漏桶算法的一種改進,除了能夠在限制調用的平均速率的同時還允許一定程度的流量突發。
小結
我們對上述四種限流算法進行一下簡單的總結。
計數器固定窗口算法實現簡單,容易理解。和漏斗算法相比,新來的請求也能夠被馬上處理到。但是流量曲線可能不夠平滑,有“突刺現象”,在窗口切換時可能會產生兩倍於閾值流量的請求。而計數器滑動窗口算法作為計數器固定窗口算法的一種改進,有效解決了窗口切換時可能會產生兩倍於閾值流量請求的問題。
漏斗算法能夠對流量起到整流的作用,讓隨機不穩定的流量以固定的速率流出,但是不能解決流量突發的問題。令牌桶算法作為漏斗算法的一種改進,除了能夠起到平滑流量的作用,還允許一定程度的流量突發。
以上四種限流算法都有自身的特點,具體使用時還是要結合自身的場景進行選取,沒有最好的算法,只有最合適的算法。比如令牌桶算法一般用於保護自身的系統,對調用者進行限流,保護自身的系統不被突發的流量打垮。如果自身的系統實際的處理能力強於配置的流量限制時,可以允許一定程度的流量突發,使得實際的處理速率高於配置的速率,充分利用系統資源。而漏斗算法一般用於保護第三方的系統,比如自身的系統需要調用第三方的接口,為了保護第三方的系統不被自身的調用打垮,便可以通過漏斗算法進行限流,保證自身的流量平穩的打到第三方的接口上。
算法是死的,而算法中的思想精髓才是值得我們學習的。實際的場景中完全可以靈活運用,還是那句話,沒有最好的算法,只有最合適的算法。
覺得文章有用的話,點贊+關注唄,好讓更多的人看到這篇文章,也激勵博主寫出更多的好文章。
更多關於校招面試、算法、數據結構和計算機基礎知識的內容,歡迎掃碼關注我的原創公眾號「超悅編程」。

更多推薦閱讀
 為什么有紅黑樹?什么是紅黑樹?看完這篇你就明白了
 《深入淺出話數據結構》系列之什么是B樹、B+樹?為什么二叉查找樹不行?
 都2020年了,聽說你還不會歸並排序?手把手教你手寫歸並排序算法
 為什么會有多線程?什么是線程安全?如何保證線程安全?
 《一文說透數據結構》系列之什么是堆?看這一篇就夠了
