限流:計數器、漏桶、令牌桶 三大算法的原理與實戰(史上最全)


文章很長,建議收藏起來,慢慢讀! Java 高並發 發燒友社群:瘋狂創客圈 奉上以下珍貴的學習資源:


推薦:入大廠 、做架構、大力提升Java 內功 的 精彩博文

入大廠 、做架構、大力提升Java 內功 必備的精彩博文 2021 秋招漲薪1W + 必備的精彩博文
1:Redis 分布式鎖 (圖解-秒懂-史上最全) 2:Zookeeper 分布式鎖 (圖解-秒懂-史上最全)
3: Redis與MySQL雙寫一致性如何保證? (面試必備) 4: 面試必備:秒殺超賣 解決方案 (史上最全)
5:面試必備之:Reactor模式 6: 10分鍾看懂, Java NIO 底層原理
7:TCP/IP(圖解+秒懂+史上最全) 8:Feign原理 (圖解)
9:DNS圖解(秒懂 + 史上最全 + 高薪必備) 10:CDN圖解(秒懂 + 史上最全 + 高薪必備)
11: 分布式事務( 圖解 + 史上最全 + 吐血推薦 )

Java 面試題 30個專題 , 史上最全 , 面試必刷 阿里、京東、美團... 隨意挑、橫着走!!!
1: JVM面試題(史上最強、持續更新、吐血推薦) 2:Java基礎面試題(史上最全、持續更新、吐血推薦
3:架構設計面試題 (史上最全、持續更新、吐血推薦) 4:設計模式面試題 (史上最全、持續更新、吐血推薦)
17、分布式事務面試題 (史上最全、持續更新、吐血推薦) 一致性協議 (史上最全)
29、多線程面試題(史上最全) 30、HR面經,過五關斬六將后,小心陰溝翻船!
9.網絡協議面試題(史上最全、持續更新、吐血推薦) 更多專題, 請參見【 瘋狂創客圈 高並發 總目錄

SpringCloud 精彩博文
nacos 實戰(史上最全) sentinel (史上最全+入門教程)
SpringCloud gateway (史上最全) 更多專題, 請參見【 瘋狂創客圈 高並發 總目錄

限流

限流是面試中的常見的面試題(尤其是大廠面試、高P面試)

在這里插入圖片描述

為什么要限流

簡單來說:

限流在很多場景中用來限制並發和請求量,比如說秒殺搶購,保護自身系統和下游系統不被巨型流量沖垮等。

以微博為例,例如某某明星公布了戀情,訪問從平時的50萬增加到了500萬,系統的規划能力,最多可以支撐200萬訪問,那么就要執行限流規則,保證是一個可用的狀態,不至於服務器崩潰,所有請求不可用。

參考鏈接

系統架構知識圖譜(一張價值10w的系統架構知識圖譜)

https://www.processon.com/view/link/60fb9421637689719d246739

秒殺系統的架構

https://www.processon.com/view/link/61148c2b1e08536191d8f92f

限流的思想

在保證可用的情況下盡可能多增加進入的人數,其余的人在排隊等待,或者返回友好提示,保證里面的進行系統的用戶可以正常使用,防止系統雪崩。

日常生活中,有哪些需要限流的地方?

像我旁邊有一個國家景區,平時可能根本沒什么人前往,但是一到五一或者春節就人滿為患,這時候景區管理人員就會實行一系列的政策來限制進入人流量,
為什么要限流呢?

假如景區能容納一萬人,現在進去了三萬人,勢必摩肩接踵,整不好還會有事故發生,這樣的結果就是所有人的體驗都不好,如果發生了事故景區可能還要關閉,導致對外不可用,這樣的后果就是所有人都覺得體驗糟糕透了。

限流的算法

限流算法很多,常見的有三類,分別是計數器算法、漏桶算法、令牌桶算法,下面逐一講解。

限流的手段通常有計數器、漏桶、令牌桶。注意限流和限速(所有請求都會處理)的差別,視
業務場景而定。

(1)計數器:

在一段時間間隔內(時間窗/時間區間),處理請求的最大數量固定,超過部分不做處理。

(2)漏桶:

漏桶大小固定,處理速度固定,但請求進入速度不固定(在突發情況請求過多時,會丟棄過多的請求)。

(3)令牌桶:

令牌桶的大小固定,令牌的產生速度固定,但是消耗令牌(即請求)速度不固定(可以應對一些某些時間請求過多的情況);每個請求都會從令牌桶中取出令牌,如果沒有令牌則丟棄該次請求。

計數器算法

計數器限流定義:

在一段時間間隔內(時間窗/時間區間),處理請求的最大數量固定,超過部分不做處理。

簡單粗暴,比如指定線程池大小,指定數據庫連接池大小、nginx連接數等,這都屬於計數器算法。

計數器算法是限流算法里最簡單也是最容易實現的一種算法。

舉個例子,比如我們規定對於A接口,我們1分鍾的訪問次數不能超過100個。

那么我們可以這么做:

  • 在一開 始的時候,我們可以設置一個計數器counter,每當一個請求過來的時候,counter就加1,如果counter的值大於100並且該請求與第一個請求的間隔時間還在1分鍾之內,那么說明請求數過多,拒絕訪問;

  • 如果該請求與第一個請求的間隔時間大於1分鍾,且counter的值還在限流范圍內,那么就重置 counter,就是這么簡單粗暴。

img

計算器限流的實現

package com.crazymaker.springcloud.ratelimit;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

// 計速器 限速
@Slf4j
public class CounterLimiter
{

    // 起始時間
    private static long startTime = System.currentTimeMillis();
    // 時間區間的時間間隔 ms
    private static long interval = 1000;
    // 每秒限制數量
    private static long maxCount = 2;
    //累加器
    private static AtomicLong accumulator = new AtomicLong();

    // 計數判斷, 是否超出限制
    private static long tryAcquire(long taskId, int turn)
    {
        long nowTime = System.currentTimeMillis();
        //在時間區間之內
        if (nowTime < startTime + interval)
        {
            long count = accumulator.incrementAndGet();

            if (count <= maxCount)
            {
                return count;
            } else
            {
                return -count;
            }
        } else
        {
            //在時間區間之外
            synchronized (CounterLimiter.class)
            {
                log.info("新時間區到了,taskId{}, turn {}..", taskId, turn);
                // 再一次判斷,防止重復初始化
                if (nowTime > startTime + interval)
                {
                    accumulator.set(0);
                    startTime = nowTime;
                }
            }
            return 0;
        }
    }

    //線程池,用於多線程模擬測試
    private ExecutorService pool = Executors.newFixedThreadPool(10);

    @Test
    public void testLimit()
    {

        // 被限制的次數
        AtomicInteger limited = new AtomicInteger(0);
        // 線程數
        final int threads = 2;
        // 每條線程的執行輪數
        final int turns = 20;
        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++)
        {
            pool.submit(() ->
            {
                try
                {

                    for (int j = 0; j < turns; j++)
                    {

                        long taskId = Thread.currentThread().getId();
                        long index = tryAcquire(taskId, j);
                        if (index <= 0)
                        {
                            // 被限制的次數累積
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }


                } catch (Exception e)
                {
                    e.printStackTrace();
                }
                //等待所有線程結束
                countDownLatch.countDown();

            });
        }
        try
        {
            countDownLatch.await();
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //輸出統計結果

        log.info("限制的次數為:" + limited.get() +
                ",通過的次數為:" + (threads * turns - limited.get()));
        log.info("限制的比例為:" + (float) limited.get() / (float) (threads * turns));
        log.info("運行的時長為:" + time);
    }


}

計數器限流的嚴重問題

這個算法雖然簡單,但是有一個十分致命的問題,那就是臨界問題,我們看下圖:
img

從上圖中我們可以看到,假設有一個惡意用戶,他在0:59時,瞬間發送了100個請求,並且1:00又瞬間發送了100個請求,那么其實這個用戶在 1秒里面,瞬間發送了200個請求。

我們剛才規定的是1分鍾最多100個請求(規划的吞吐量),也就是每秒鍾最多1.7個請求,用戶通過在時間窗口的重置節點處突發請求, 可以瞬間超過我們的速率限制。

用戶有可能通過算法的這個漏洞,瞬間壓垮我們的應用。

漏桶算法

漏桶算法限流的基本原理為:水(對應請求)從進水口進入到漏桶里,漏桶以一定的速度出水(請求放行),當水流入速度過大,桶內的總水量大於桶容量會直接溢出,請求被拒絕,如圖所示。
大致的漏桶限流規則如下:
(1)進水口(對應客戶端請求)以任意速率流入進入漏桶。
(2)漏桶的容量是固定的,出水(放行)速率也是固定的。
(3)漏桶容量是不變的,如果處理速度太慢,桶內水量會超出了桶的容量,則后面流入的水滴會溢出,表示請求拒絕。

漏桶算法原理

漏桶算法思路很簡單:

水(請求)先進入到漏桶里,漏桶以一定的速度出水,當水流入速度過大會超過桶可接納的容量時直接溢出。

可以看出漏桶算法能強行限制數據的傳輸速率。

2002319-20210220223842536-838208163

漏桶算法其實很簡單,可以粗略的認為就是注水漏水過程,往桶中以任意速率流入水,以一定速率流出水,當水超過桶容量(capacity)則丟棄,因為桶容量是不變的,保證了整體的速率。

以一定速率流出水,

在這里插入圖片描述

削峰:有大量流量進入時,會發生溢出,從而限流保護服務可用

緩沖:不至於直接請求到服務器, 緩沖壓力

消費速度固定 因為計算性能固定

漏桶算法實現

package com.crazymaker.springcloud.ratelimit;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

// 漏桶 限流
@Slf4j
public class LeakBucketLimiter {

    // 計算的起始時間
    private static long lastOutTime = System.currentTimeMillis();
    // 流出速率 每秒 2 次
    private static int leakRate = 2;

    // 桶的容量
    private static int capacity = 2;

    //剩余的水量
    private static AtomicInteger water = new AtomicInteger(0);

    //返回值說明:
    // false 沒有被限制到
    // true 被限流
    public static synchronized boolean isLimit(long taskId, int turn) {
        // 如果是空桶,就當前時間作為漏出的時間
        if (water.get() == 0) {
            lastOutTime = System.currentTimeMillis();
            water.addAndGet(1);
            return false;
        }
        // 執行漏水
        int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 1000)) * leakRate;
        // 計算剩余水量
        int waterLeft = water.get() - waterLeaked;
        water.set(Math.max(0, waterLeft));
        // 重新更新leakTimeStamp
        lastOutTime = System.currentTimeMillis();
        // 嘗試加水,並且水還未滿 ,放行
        if ((water.get()) < capacity) {
            water.addAndGet(1);
            return false;
        } else {
            // 水滿,拒絕加水, 限流
            return true;
        }

    }


    //線程池,用於多線程模擬測試
    private ExecutorService pool = Executors.newFixedThreadPool(10);

    @Test
    public void testLimit() {

        // 被限制的次數
        AtomicInteger limited = new AtomicInteger(0);
        // 線程數
        final int threads = 2;
        // 每條線程的執行輪數
        final int turns = 20;
        // 線程同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        long taskId = Thread.currentThread().getId();
                        boolean intercepted = isLimit(taskId, j);
                        if (intercepted) {
                            // 被限制的次數累積
                            limited.getAndIncrement();
                        }
                        Thread.sleep(200);
                    }


                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有線程結束
                countDownLatch.countDown();

            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //輸出統計結果

        log.info("限制的次數為:" + limited.get() +
                ",通過的次數為:" + (threads * turns - limited.get()));
        log.info("限制的比例為:" + (float) limited.get() / (float) (threads * turns));
        log.info("運行的時長為:" + time);
    }
}

漏桶的問題

漏桶的出水速度固定,也就是請求放行速度是固定的。

網上抄來抄去的說法:

漏桶不能有效應對突發流量,但是能起到平滑突發流量(整流)的作用。

實際上的問題:

漏桶出口的速度固定,不能靈活的應對后端能力提升。比如,通過動態擴容,后端流量從1000QPS提升到1WQPS,漏桶沒有辦法。

令牌桶限流

令牌桶算法以一個設定的速率產生令牌並放入令牌桶,每次用戶請求都得申請令牌,如果令牌不足,則拒絕請求。
令牌桶算法中新請求到來時會從桶里拿走一個令牌,如果桶內沒有令牌可拿,就拒絕服務。當然,令牌的數量也是有上限的。令牌的數量與時間和發放速率強相關,時間流逝的時間越長,會不斷往桶里加入越多的令牌,如果令牌發放的速度比申請速度快,令牌桶會放滿令牌,直到令牌占滿整個令牌桶,如圖所示。

令牌桶限流大致的規則如下:
(1)進水口按照某個速度,向桶中放入令牌。
(2)令牌的容量是固定的,但是放行的速度不是固定的,只要桶中還有剩余令牌,一旦請求過來就能申請成功,然后放行。
(3)如果令牌的發放速度,慢於請求到來速度,桶內就無牌可領,請求就會被拒絕。

總之,令牌的發送速率可以設置,從而可以對突發的出口流量進行有效的應對。

令牌桶算法

令牌桶與漏桶相似,不同的是令牌桶桶中放了一些令牌,服務請求到達后,要獲取令牌之后才會得到服務,舉個例子,我們平時去食堂吃飯,都是在食堂內窗口前排隊的,這就好比是漏桶算法,大量的人員聚集在食堂內窗口外,以一定的速度享受服務,如果涌進來的人太多,食堂裝不下了,可能就有一部分人站到食堂外了,這就沒有享受到食堂的服務,稱之為溢出,溢出可以繼續請求,也就是繼續排隊,那么這樣有什么問題呢?

如果這時候有特殊情況,比如有些趕時間的志願者啦、或者高三要高考啦,這種情況就是突發情況,如果也用漏桶算法那也得慢慢排隊,這也就沒有解決我們的需求,對於很多應用場景來說,除了要求能夠限制數據的平均傳輸速率外,還要求允許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更為適合。如圖所示,令牌桶算法的原理是系統會以一個恆定的速度往桶里放入令牌,而如果請求需要被處理,則需要先從桶里獲取一個令牌,當桶里沒有令牌可取時,則拒絕服務。

2002319-20210220223928172-1995912492

在這里插入圖片描述

令牌桶算法實現

package com.crazymaker.springcloud.ratelimit;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

// 令牌桶 限速
@Slf4j
public class TokenBucketLimiter {
    // 上一次令牌發放時間
    public long lastTime = System.currentTimeMillis();
    // 桶的容量
    public int capacity = 2;
    // 令牌生成速度 /s
    public int rate = 2;
    // 當前令牌數量
    public AtomicInteger tokens = new AtomicInteger(0);
    ;

    //返回值說明:
    // false 沒有被限制到
    // true 被限流
    public synchronized boolean isLimited(long taskId, int applyCount) {
        long now = System.currentTimeMillis();
        //時間間隔,單位為 ms
        long gap = now - lastTime;

        //計算時間段內的令牌數
        int reverse_permits = (int) (gap * rate / 1000);
        int all_permits = tokens.get() + reverse_permits;
        // 當前令牌數
        tokens.set(Math.min(capacity, all_permits));
        log.info("tokens {} capacity {} gap {} ", tokens, capacity, gap);

        if (tokens.get() < applyCount) {
            // 若拿不到令牌,則拒絕
            // log.info("被限流了.." + taskId + ", applyCount: " + applyCount);
            return true;
        } else {
            // 還有令牌,領取令牌
            tokens.getAndAdd( - applyCount);
            lastTime = now;

            // log.info("剩余令牌.." + tokens);
            return false;
        }

    }

    //線程池,用於多線程模擬測試
    private ExecutorService pool = Executors.newFixedThreadPool(10);

    @Test
    public void testLimit() {

        // 被限制的次數
        AtomicInteger limited = new AtomicInteger(0);
        // 線程數
        final int threads = 2;
        // 每條線程的執行輪數
        final int turns = 20;


        // 同步器
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                try {

                    for (int j = 0; j < turns; j++) {

                        long taskId = Thread.currentThread().getId();
                        boolean intercepted = isLimited(taskId, 1);
                        if (intercepted) {
                            // 被限制的次數累積
                            limited.getAndIncrement();
                        }

                        Thread.sleep(200);
                    }


                } catch (Exception e) {
                    e.printStackTrace();
                }
                //等待所有線程結束
                countDownLatch.countDown();

            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        float time = (System.currentTimeMillis() - start) / 1000F;
        //輸出統計結果

        log.info("限制的次數為:" + limited.get() +
                ",通過的次數為:" + (threads * turns - limited.get()));
        log.info("限制的比例為:" + (float) limited.get() / (float) (threads * turns));
        log.info("運行的時長為:" + time);
    }


}

令牌桶的好處

令牌桶的好處之一就是可以方便地應對 突發出口流量(后端能力的提升)。

比如,可以改變令牌的發放速度,算法能按照新的發送速率調大令牌的發放數量,使得出口突發流量能被處理。

Guava RateLimiter

Guava是Java領域優秀的開源項目,它包含了Google在Java項目中使用一些核心庫,包含集合(Collections),緩存(Caching),並發編程庫(Concurrency),常用注解(Common annotations),String操作,I/O操作方面的眾多非常實用的函數。 Guava的 RateLimiter提供了令牌桶算法實現:平滑突發限流(SmoothBursty)和平滑預熱限流(SmoothWarmingUp)實現。

img

RateLimiter的類圖如上所示,

Nginx漏桶限流

Nginx限流的簡單演示

每六秒才處理一次請求,如下

  limit_req_zone  $arg_sku_id  zone=skuzone:10m      rate=6r/m;
  limit_req_zone  $http_user_id  zone=userzone:10m      rate=6r/m;
  limit_req_zone  $binary_remote_addr  zone=perip:10m      rate=6r/m;
  limit_req_zone  $server_name        zone=perserver:1m   rate=6r/m;

這是從請求參數里邊,提前參數做限流

這是從請求參數里邊,提前參數,進行限流的次數統計key。

在http塊里邊定義限流的內存區域 zone。

  limit_req_zone  $arg_sku_id  zone=skuzone:10m      rate=6r/m;
  limit_req_zone  $http_user_id  zone=userzone:10m      rate=6r/m;
  limit_req_zone  $binary_remote_addr  zone=perip:10m      rate=6r/m;
  limit_req_zone  $server_name        zone=perserver:1m   rate=10r/s;

在location塊中使用 限流zone,參考如下:


    #  ratelimit by sku id
    location  = /ratelimit/sku {
      limit_req  zone=skuzone;
      echo "正常的響應";
    }

測試

[root@cdh1 ~]# /vagrant/LuaDemoProject/sh/linux/openresty-restart.sh
shell dir is: /vagrant/LuaDemoProject/sh/linux
Shutting down openrestry/nginx:  pid is 13479 13485
Shutting down  succeeded!
OPENRESTRY_PATH:/usr/local/openresty
PROJECT_PATH:/vagrant/LuaDemoProject/src
nginx: [alert] lua_code_cache is off; this will hurt performance in /vagrant/LuaDemoProject/src/conf/nginx-seckill.conf:90
openrestry/nginx starting succeeded!
pid is 14197


[root@cdh1 ~]# curl  http://cdh1/ratelimit/sku?sku_id=1
正常的響應
root@cdh1 ~]#  curl  http://cdh1/ratelimit/sku?sku_id=1
正常的響應
[root@cdh1 ~]#  curl  http://cdh1/ratelimit/sku?sku_id=1
限流后的降級內容
[root@cdh1 ~]#  curl  http://cdh1/ratelimit/sku?sku_id=1
限流后的降級內容
[root@cdh1 ~]#  curl  http://cdh1/ratelimit/sku?sku_id=1
限流后的降級內容
[root@cdh1 ~]#  curl  http://cdh1/ratelimit/sku?sku_id=1
限流后的降級內容
[root@cdh1 ~]#  curl  http://cdh1/ratelimit/sku?sku_id=1
限流后的降級內容
[root@cdh1 ~]#  curl  http://cdh1/ratelimit/sku?sku_id=1
正常的響應

從Header頭部提前參數

1、nginx是支持讀取非nginx標准的用戶自定義header的,但是需要在http或者server下開啟header的下划線支持:

underscores_in_headers on;

2、比如我們自定義header為X-Real-IP,通過第二個nginx獲取該header時需要這樣:

$http_x_real_ip; (一律采用小寫,而且前面多了個http_)

 underscores_in_headers on;

  limit_req_zone  $http_user_id  zone=userzone:10m      rate=6r/m;
  server {
    listen       80 default;
    server_name  nginx.server *.nginx.server;
    default_type 'text/html';
    charset utf-8;


#  ratelimit by user id
    location  = /ratelimit/demo {
      limit_req  zone=userzone;
      echo "正常的響應";
    }


  
    location = /50x.html{
      echo "限流后的降級內容";
    }

    error_page 502 503 =200 /50x.html;

  }

測試

[root@cdh1 ~]# curl -H "USER-ID:1" http://cdh1/ratelimit/demo
正常的響應
[root@cdh1 ~]# curl -H "USER-ID:1" http://cdh1/ratelimit/demo
限流后的降級內容
[root@cdh1 ~]# curl -H "USER-ID:1" http://cdh1/ratelimit/demo
限流后的降級內容
[root@cdh1 ~]# curl -H "USER-ID:1" http://cdh1/ratelimit/demo
限流后的降級內容
[root@cdh1 ~]# curl -H "USER-ID:1" http://cdh1/ratelimit/demo
限流后的降級內容
[root@cdh1 ~]# curl -H "USER-ID:1" http://cdh1/ratelimit/demo
限流后的降級內容
[root@cdh1 ~]# curl -H "USER-ID:1" http://cdh1/ratelimit/demo
限流后的降級內容
[root@cdh1 ~]# curl -H "USER_ID:2" http://cdh1/ratelimit/demo
正常的響應
[root@cdh1 ~]# curl -H "USER_ID:2" http://cdh1/ratelimit/demo
限流后的降級內容
[root@cdh1 ~]#
[root@cdh1 ~]# curl -H "USER_ID:2" http://cdh1/ratelimit/demo
限流后的降級內容
[root@cdh1 ~]# curl -H "USER-ID:3" http://cdh1/ratelimit/demo
正常的響應
[root@cdh1 ~]# curl -H "USER-ID:3" http://cdh1/ratelimit/demo
限流后的降級內容

Nginx漏桶限流的三個細分類型,即burst、nodelay參數詳解

每六秒才處理一次請求,如下

limit_req_zone  $arg_user_id  zone=limti_req_zone:10m      rate=10r/m;

不帶緩沖隊列的漏桶限流

limit_req zone=limti_req_zone;

  • 嚴格依照在limti_req_zone中配置的rate來處理請求
  • 超過rate處理能力范圍的,直接drop
  • 表現為對收到的請求無延時

假設1秒內提交10個請求,可以看到一共10個請求,9個請求都失敗了,直接返回503,

接着再查看 /var/log/nginx/access.log,印證了只有一個請求成功了,其它就是都直接返回了503,即服務器拒絕了請求。
img

帶緩沖隊列的漏桶限流

limit_req zone=limti_req_zone burst=5;

  • 依照在limti_req_zone中配置的rate來處理請求
  • 同時設置了一個大小為5的緩沖隊列,在緩沖隊列中的請求會等待慢慢處理
  • 超過了burst緩沖隊列長度和rate處理能力的請求被直接丟棄
  • 表現為對收到的請求有延時

假設1秒內提交10個請求,則可以發現在1s內,在服務器接收到10個並發請求后,先處理1個請求,同時將5個請求放入burst緩沖隊列中,等待處理。而超過(burst+1)數量的請求就被直接拋棄了,即直接拋棄了4個請求。burst緩存的5個請求每隔6s處理一次。

接着查看 /var/log/nginx/access.log日志

img

帶瞬時處理能力的漏桶限流

limit_req zone=req_zone burst=5 nodelay;

如果設置nodelay,會在瞬時提供處理(burst + rate)個請求的能力,請求數量超過(burst + rate)的時候就會直接返回503,峰值范圍內的請求,不存在請求需要等待的情況

假設1秒內提交10個請求,則可以發現在1s內,服務器端處理了6個請求(峰值速度:burst+10s內一個請求)。對於剩下的4個請求,直接返回503,在下一秒如果繼續向服務端發送10個請求,服務端會直接拒絕這10個請求並返回503。

接着查看 /var/log/nginx/access.log日志

img

可以發現在1s內,服務器端處理了6個請求(峰值速度:burst+原來的處理速度)。對於剩下的4個請求,直接返回503。

但是,總數額度和速度*時間保持一致, 就是額度用完了,需要等到一個有額度的時間段,才開始接收新的請求。如果一次處理了5個請求,相當於占了30s的額度,6*5=30。因為設定了6s處理1個請求,所以直到30
s 之后,才可以再處理一個請求,即如果此時向服務端發送10個請求,會返回9個503,一個200

分布式限流組件

why

但是Nginx的限流指令只能在同一塊內存區域有效,而在生產場景中秒殺的外部網關往往是多節點部署,所以這就需要用到分布式限流組件。

高性能的分布式限流組件可以使用Redis+Lua來開發,京東的搶購就是使用Redis+Lua完成的限流。並且無論是Nginx外部網關還是Zuul內部網關,都可以使用Redis+Lua限流組件。

理論上,接入層的限流有多個維度:

(1)用戶維度限流:在某一時間段內只允許用戶提交一次請求,比如可以采取客戶端IP或者用戶ID作為限流的key。

(2)商品維度的限流:對於同一個搶購商品,在某個時間段內只允許一定數量的請求進入,可以采取秒殺商品ID作為限流的key。

什么時候用nginx限流:

用戶維度的限流,可以在ngix 上進行,因為使用nginx限流內存來存儲用戶id,比用redis 的key,來存儲用戶id,效率高。

什么時候用redis+lua分布式限流:

商品維度的限流,可以在redis上進行,不需要大量的計算訪問次數的key,另外,可以控制所有的接入層節點的訪問秒殺請求的總量。

redis+lua分布式限流組件

--- 此腳本的環境: redis 內部,不是運行在 nginx 內部

---方法:申請令牌
--- -1 failed
--- 1 success
--- @param key key 限流關鍵字
--- @param apply  申請的令牌數量
local function acquire(key, apply)
    local times = redis.call('TIME');
    -- times[1] 秒數   -- times[2] 微秒數
    local curr_mill_second = times[1] * 1000000 + times[2];
    curr_mill_second = curr_mill_second / 1000;

    local cacheInfo = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate")
    --- 局部變量:上次申請的時間
    local last_mill_second = cacheInfo[1];
    --- 局部變量:之前的令牌數
    local curr_permits = tonumber(cacheInfo[2]);
    --- 局部變量:桶的容量
    local max_permits = tonumber(cacheInfo[3]);
    --- 局部變量:令牌的發放速率
    local rate = cacheInfo[4];
    --- 局部變量:本次的令牌數
    local local_curr_permits = 0;

    if (type(last_mill_second) ~= 'boolean' and last_mill_second ~= nil) then
        -- 計算時間段內的令牌數
        local reverse_permits = math.floor(((curr_mill_second - last_mill_second) / 1000) * rate);
        -- 令牌總數
        local expect_curr_permits = reverse_permits + curr_permits;
        -- 可以申請的令牌總數
        local_curr_permits = math.min(expect_curr_permits, max_permits);
    else
        -- 第一次獲取令牌
        redis.pcall("HSET", key, "last_mill_second", curr_mill_second)
        local_curr_permits = max_permits;
    end

    local result = -1;
    -- 有足夠的令牌可以申請
    if (local_curr_permits - apply >= 0) then
        -- 保存剩余的令牌
        redis.pcall("HSET", key, "curr_permits", local_curr_permits - apply);
        -- 為下次的令牌獲取,保存時間
        redis.pcall("HSET", key, "last_mill_second", curr_mill_second)
        -- 返回令牌獲取成功
        result = 1;
    else
        -- 返回令牌獲取失敗
        result = -1;
    end
    return result
end
--eg
-- /usr/local/redis/bin/redis-cli  -a 123456  --eval   /vagrant/LuaDemoProject/src/luaScript/redis/rate_limiter.lua key , acquire 1  1

-- 獲取 sha編碼的命令
-- /usr/local/redis/bin/redis-cli  -a 123456  script load "$(cat  /vagrant/LuaDemoProject/src/luaScript/redis/rate_limiter.lua)"
-- /usr/local/redis/bin/redis-cli  -a 123456  script exists  "cf43613f172388c34a1130a760fc699a5ee6f2a9"

-- /usr/local/redis/bin/redis-cli -a 123456  evalsha   "cf43613f172388c34a1130a760fc699a5ee6f2a9" 1 "rate_limiter:seckill:1"  init 1  1
-- /usr/local/redis/bin/redis-cli -a 123456  evalsha   "cf43613f172388c34a1130a760fc699a5ee6f2a9" 1 "rate_limiter:seckill:1"  acquire 1

--local rateLimiterSha = "e4e49e4c7b23f0bf7a2bfee73e8a01629e33324b";

---方法:初始化限流 Key
--- 1 success
--- @param key key
--- @param max_permits  桶的容量
--- @param rate  令牌的發放速率
local function init(key, max_permits, rate)
    local rate_limit_info = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate")
    local org_max_permits = tonumber(rate_limit_info[3])
    local org_rate = rate_limit_info[4]

    if (org_max_permits == nil) or (rate ~= org_rate or max_permits ~= org_max_permits) then
        redis.pcall("HMSET", key, "max_permits", max_permits, "rate", rate, "curr_permits", max_permits)
    end
    return 1;
end
--eg
-- /usr/local/redis/bin/redis-cli -a 123456 --eval   /vagrant/LuaDemoProject/src/luaScript/redis/rate_limiter.lua key , init 1  1
-- /usr/local/redis/bin/redis-cli -a 123456 --eval   /vagrant/LuaDemoProject/src/luaScript/redis/rate_limiter.lua  "rate_limiter:seckill:1"  , init 1  1


---方法:刪除限流 Key
local function delete(key)
    redis.pcall("DEL", key)
    return 1;
end
--eg
-- /usr/local/redis/bin/redis-cli  --eval   /vagrant/LuaDemoProject/src/luaScript/redis/rate_limiter.lua key , delete


local key = KEYS[1]
local method = ARGV[1]
if method == 'acquire' then
    return acquire(key, ARGV[2], ARGV[3])
elseif method == 'init' then
    return init(key, ARGV[2], ARGV[3])
elseif method == 'delete' then
    return delete(key)
else
    --ignore
end

在redis中,為了避免重復發送腳本數據浪費網絡資源,可以使用script load命令進行腳本數據緩存,並且返回一個哈希碼作為腳本的調用句柄,

每次調用腳本只需要發送哈希碼來調用即可。

分布式令牌限流實戰

可以使用redis+lua,實戰一票下邊的簡單案例:

令牌按照1個每秒的速率放入令牌桶,桶中最多存放2個令牌,那系統就只會允許持續的每秒處理2個請求,

或者每隔2 秒,等桶中2 個令牌攢滿后,一次處理2個請求的突發情況,保證系統穩定性。

商品維度的限流

當秒殺商品維度的限流,當商品的流量,遠遠大於涉及的流量時,開始隨機丟棄請求。

Nginx的令牌桶限流腳本getToken_access_limit.lua執行在請求的access階段,但是,該腳本並沒有實現限流的核心邏輯,僅僅調用緩存在Redis內部的rate_limiter.lua腳本進行限流。

getToken_access_limit.lua腳本和rate_limiter.lua腳本的關系,具體如圖10-17所示。

img

圖10-17 getToken_access_limit.lua腳本和rate_limiter.lua腳本關系

什么時候在Redis中加載rate_limiter.lua腳本呢?

和秒殺腳本一樣,該腳本是在Java程序啟動商品秒殺時,完成其在Redis的加載和緩存的。

還有一點非常重要,Java程序會將腳本加載完成之后的sha1編碼,去通過自定義的key(具體為"lua:sha1:rate_limiter")緩存在Redis中,以方便Nginx的getToken_access_limit.lua腳本去獲取,並且在調用evalsha方法時使用。

注意:使用redis集群,因此每個節點都需要各自緩存一份腳本數據

/**
* 由於使用redis集群,因此每個節點都需要各自緩存一份腳本數據
* @param slotKey 用來定位對應的slot的slotKey
*/
public void storeScript(String slotKey){
if (StringUtils.isEmpty(unlockSha1) || !jedisCluster.scriptExists(unlockSha1, slotKey)){
   //redis支持腳本緩存,返回哈希碼,后續可以繼續用來調用腳本
    unlockSha1 = jedisCluster.scriptLoad(DISTRIBUTE_LOCK_SCRIPT_UNLOCK_VAL, slotKey);
   }
}

常見的限流組件

redission分布式限流采用令牌桶思想和固定時間窗口,trySetRate方法設置桶的大小,利用redis key過期機制達到時間窗口目的,控制固定時間窗口內允許通過的請求量。

spring cloud gateway集成redis限流,但屬於網關層限流


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM