使用Guava-RateLimiter限流控制qps


轉自:https://www.jianshu.com/p/8f548e469bbe

參考:https://www.jianshu.com/p/5d4fe4b2a726

常用的限流算法有漏桶算法和令牌桶算法,guava的RateLimiter使用的是令牌桶算法,也就是以固定的頻率向桶中放入令牌,例如一秒鍾10枚令牌,實際業務在每次響應請求之前都從桶中獲取令牌,只有取到令牌的請求才會被成功響應,獲取的方式有兩種:阻塞等待令牌或者取不到立即返回失敗,下圖來自網上:


本次實戰,我們用的是guava的RateLimiter,場景是spring mvc在處理請求時候,從桶中申請令牌,申請到了就成功響應,申請不到時直接返回失敗。

實例

1、添加guava jar包

 <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>18.0</version>
</dependency>

2、AccessLimitService.java 限流服務封裝到一個類中AccessLimitService,提供tryAcquire()方法,用來嘗試獲取令牌,返回true表示獲取到

@Service
public class AccessLimitService {

    //每秒只發出5個令牌
    RateLimiter rateLimiter = RateLimiter.create(5.0);

    /**
     * 嘗試獲取令牌
     * @return
     */
    public boolean tryAcquire(){
        return rateLimiter.tryAcquire();
    }
}

3、Controller層每次收到請求的時候都嘗試去獲取令牌,獲取成功和失敗打印不同的信息

@Controller
public class HelloController {

    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private AccessLimitService accessLimitService;

    @RequestMapping("/access")
    @ResponseBody
    public String access(){
        //嘗試獲取令牌
        if(accessLimitService.tryAcquire()){
            //模擬業務執行500毫秒
            try {
                Thread.sleep(500);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            return "aceess success [" + sdf.format(new Date()) + "]";
        }else{
            return "aceess limit [" + sdf.format(new Date()) + "]";
        }
    }
}

4、測試:十個線程並發訪問接口

public class AccessClient {
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

    /**
     * get請求
     * @param realUrl
     * @return
     */
    public static String sendGet(URL realUrl) {
        String result = "";
        BufferedReader in = null;
        try {
            // 打開和URL之間的連接
            URLConnection connection = realUrl.openConnection();
            // 設置通用的請求屬性
            connection.setRequestProperty("accept", "*/*");
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("user-agent",
                    "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
            // 建立實際的連接
            connection.connect();

            // 定義 BufferedReader輸入流來讀取URL的響應
            in = new BufferedReader(new InputStreamReader(
                    connection.getInputStream()));
            String line;
            while ((line = in.readLine()) != null) {
                result += line;
            }
        } catch (Exception e) {
            System.out.println("發送GET請求出現異常!" + e);
            e.printStackTrace();
        }
        // 使用finally塊來關閉輸入流
        finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        return result;
    }



    public void access() throws Exception{
        final URL url = new URL("http://localhost:8080/guavalimitdemo/access");

        for(int i=0;i<10;i++) {
            fixedThreadPool.submit(new Runnable() {
                public void run() {
                    System.out.println(sendGet(url));
                }
            });
        }

        fixedThreadPool.shutdown();
        fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    }

    public static void main(String[] args) throws Exception{
        AccessClient accessClient = new AccessClient();
        accessClient.access();
    }
}

部分請求由於獲取的令牌可以成功執行,其余請求沒有拿到令牌,我們可以根據實際業務來做區分處理。還有一點要注意,我們通過RateLimiter.create(5.0)配置的是每一秒5枚令牌,但是限流的時候發出的是6枚,改用其他值驗證,也是實際的比配置的大1。

以上就是快速實現限流的實戰過程,此處僅是單進程服務的限流,而實際的分布式服務中會考慮更多因素,會復雜很多。


RateLimiter方法摘要

修飾符和類型 方法和描述
double acquire() 從RateLimiter獲取一個許可,該方法會被阻塞直到獲取到請求
double acquire(int permits)從RateLimiter獲取指定許可數,該方法會被阻塞直到獲取到請求
static RateLimiter create(double permitsPerSecond)根據指定的穩定吞吐率創建RateLimiter,這里的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少查詢)
static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)根據指定的穩定吞吐率和預熱期來創建RateLimiter,這里的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少個請求量),在這段預熱時間內,RateLimiter每秒分配的許可數會平穩地增長直到預熱期結束時達到其最大速率。(只要存在足夠請求數來使其飽和)
double getRate()返回RateLimiter 配置中的穩定速率,該速率單位是每秒多少許可數
void setRate(double permitsPerSecond)更新RateLimite的穩定速率,參數permitsPerSecond 由構造RateLimiter的工廠方法提供。
String toString()返回對象的字符表現形式
boolean tryAcquire()從RateLimiter 獲取許可,如果該許可可以在無延遲下的情況下立即獲取得到的話
boolean tryAcquire(int permits)從RateLimiter 獲取許可數,如果該許可數可以在無延遲下的情況下立即獲取得到的話
boolean tryAcquire(int permits, long timeout, TimeUnit unit)從RateLimiter 獲取指定許可數如果該許可數可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可數的話,那么立即返回false (無需等待)
boolean tryAcquire(long timeout, TimeUnit unit)從RateLimiter 獲取許可如果該許可可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可的話,那么立即返回false(無需等待)
  • 舉例來說明如何使用RateLimiter,想象下我們需要處理一個任務列表,但我們不希望每秒的任務提交超過兩個:
//速率是每秒兩個許可
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List tasks, Executor executor) {
    for (Runnable task : tasks) {
        rateLimiter.acquire(); // 也許需要等待
        executor.execute(task);
    }
}


官方文檔:http://ifeve.com/guava-ratelimiter



附:

1、基本算法

1.1 漏桶算法

請求先進入到漏桶里,漏桶以一定的速度出水,當水請求過大會直接溢出,可以看出漏桶算法能強行限制數據的傳輸速率。

缺點:除了要求能夠限制數據的平均傳輸速率外,還要求允許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更為適合。

1.2 令牌桶

令牌桶算法的原理是系統會以一個恆定的速度往桶里放入令牌,而如果請求需要被處理,則需要先從桶里獲取一個令牌,當桶里沒有令牌可取時,則拒絕服務

 

2. Guava實現

2.1 簡單的demo

public  class  RateLimiterDemo {
     public  static  void  main(String[] args) {
         testNoRateLimiter();
         testWithRateLimiter();
     }
  
     public  static  void  testNoRateLimiter() {
         Long start = System.currentTimeMillis();
         for  ( int  i =  0 ; i <  10 ; i++) {
             System.out.println( "call execute.."  + i);
             
         }
         Long end = System.currentTimeMillis();
         
         System.out.println(end - start);
         
     }
     
     public  static  void  testWithRateLimiter() {
         Long start = System.currentTimeMillis();
         final  RateLimiter limiter = RateLimiter.create( 10.0 );  // 每秒不超過10個任務被提交
         for  ( int  i =  0 ; i <  10 ; i++) {
             limiter.acquire();  // 請求RateLimiter, 超過permits會被阻塞
             System.out.println( "call execute.."  + i);
             
         }
         Long end = System.currentTimeMillis();
         
         System.out.println(end - start);
         
     }
     
}
 
 
testNoRateLimiter方法快速打印出來
testWithRateLimiter方法 1 秒打印 10

2.2 RateLimiter介紹

2.2.1 總體框架

RateLimiter類圖結構

RateLimiter:抽象類,定義了主要的對外接口以及整個流程的模板

SmoothRateLimiter:RateLimiter的實現類,實現了RateLimiter的部分方法

SmoothWarmingUp:SmoothRateLimiter的實現類,漸進模式,令牌生成速度緩慢提升直到維持在一個穩定值

SmoothBursty:SmoothRateLimiter的實現類,穩定模式,令牌生成速度恆定

2.2.2 具體步驟

 2.2.2.1 RateLimiter的創建

通過調用RateLimiter的create接口來創建實例,實際是調用的SmoothBuisty穩定模式創建的實例。

public  static  RateLimiter create( double  permitsPerSecond) {
     return  create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
   }
 
   static  RateLimiter create( double  permitsPerSecond, SleepingStopwatch stopwatch) {
     RateLimiter rateLimiter =  new  SmoothBursty(stopwatch,  1.0  /* maxBurstSeconds */ );
     rateLimiter.setRate(permitsPerSecond);
     return  rateLimiter;
   }

SmoothBursty中的兩個構造參數含義:

  • SleepingStopwatch:guava中的一個時鍾類實例,會通過這個來計算時間及令牌
  • maxBurstSeconds:官方解釋,在ReteLimiter未使用時,最多保存幾秒的令牌,默認是1

在解析SmoothBursty原理前,重點解釋下SmoothBursty中幾個屬性的含義

/**
  * The work (permits) of how many seconds can be saved up if this RateLimiter is unused?
  * 在RateLimiter未使用時,最多存儲幾秒的令牌
  * */
  final  double  maxBurstSeconds;
  
 
/**
  * The currently stored permits.
  * 當前存儲令牌數
  */
double  storedPermits;
 
/**
  * The maximum number of stored permits.
  * 最大存儲令牌數 = maxBurstSeconds * stableIntervalMicros(見下文)
  */
double  maxPermits;
 
/**
  * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
  * per second has a stable interval of 200ms.
  * 添加令牌時間間隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌數)
  */
double  stableIntervalMicros;
 
/**
  * The time when the next request (no matter its size) will be granted. After granting a request,
  * this is pushed further in the future. Large requests push this further than small requests.
  * 下一次請求可以獲取令牌的起始時間
  * 由於RateLimiter允許預消費,上次請求預消費令牌后
  * 下次請求需要等待相應的時間到nextFreeTicketMicros時刻才可以獲取令牌
  */
private  long  nextFreeTicketMicros = 0L;  // could be either in the past or future


2.2.2.2 關鍵方法

  • setRate
public  final  void  setRate( double  permitsPerSecond) {
   checkArgument(
       permitsPerSecond >  0.0  && !Double.isNaN(permitsPerSecond),  "rate must be positive" );
   synchronized  (mutex()) {
     doSetRate(permitsPerSecond, stopwatch.readMicros());
   }
}

通過這個接口設置令牌通每秒生成令牌的數量,內部時間通過調用SmoothRateLimiterdoSetRate來實現

  • doSetRate
@Override
   final  void  doSetRate( double  permitsPerSecond,  long  nowMicros) {
     resync(nowMicros);
     double  stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
     this .stableIntervalMicros = stableIntervalMicros;
     doSetRate(permitsPerSecond, stableIntervalMicros);
   }

這里先通過調用resync生成令牌以及更新下一期令牌生成時間,然后更新stableIntervalMicros,最后又調用了SmoothBurstydoSetRate

  • resync
/**
  * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
  * 基於當前時間,更新下一次請求令牌的時間,以及當前存儲的令牌(可以理解為生成令牌)
  */
void  resync( long  nowMicros) {
     // if nextFreeTicket is in the past, resync to now
     if  (nowMicros > nextFreeTicketMicros) {
       double  newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
       storedPermits = min(maxPermits, storedPermits + newPermits);
       nextFreeTicketMicros = nowMicros;
     }
}

延遲計算,如上resync函數。該函數會在每次獲取令牌之前調用,其實現思路為,若當前時間晚於nextFreeTicketMicros,則計算該段時間內可以生成多少令牌,將生成的令牌加入令牌桶中並更新數據。這樣一來,只需要在獲取令牌時計算一次即可。

  • SmoothBursty的doSetRate
@Override
void  doSetRate( double  permitsPerSecond,  double  stableIntervalMicros) {
   double  oldMaxPermits =  this .maxPermits;
   maxPermits = maxBurstSeconds * permitsPerSecond;
   if  (oldMaxPermits == Double.POSITIVE_INFINITY) {
     // if we don't special-case this, we would get storedPermits == NaN, below
     // Double.POSITIVE_INFINITY 代表無窮啊
     storedPermits = maxPermits;
   else  {
     storedPermits =
         (oldMaxPermits ==  0.0 )
             0.0  // initial state
             : storedPermits * maxPermits / oldMaxPermits;
   }
}

桶中可存放的最大令牌數由maxBurstSeconds計算而來,其含義為最大存儲maxBurstSeconds秒生成的令牌。

該參數的作用在於,可以更為靈活地控制流量。如,某些接口限制為300次/20秒,某些接口限制為50次/45秒等。也就是流量不局限於qps

2.2.2.3 RateLimiter幾個常用接口分析

理解RateLimiter暴露出來的接口

@CanIgnoreReturnValue
public  double  acquire() {
   return  acquire( 1 );
}
 
/**
* 獲取令牌,返回阻塞的時間
**/
@CanIgnoreReturnValue
public  double  acquire( int  permits) {
   long  microsToWait = reserve(permits);
   stopwatch.sleepMicrosUninterruptibly(microsToWait);
   return  1.0  * microsToWait / SECONDS.toMicros(1L);
}
 
final  long  reserve( int  permits) {
   checkPermits(permits);
   synchronized  (mutex()) {
     return  reserveAndGetWaitLength(permits, stopwatch.readMicros());
   }
}

acquire函數主要用於獲取permits個令牌,並計算需要等待多長時間,進而掛起等待,並將該值返回,主要通過reserve返回需要等待的時間,reserve中通過調用reserveAndGetWaitLength獲取等待時間

/**
  * Reserves next ticket and returns the wait time that the caller must wait for.
  *
  * @return the required wait time, never negative
  */
final  long  reserveAndGetWaitLength( int  permits,  long  nowMicros) {
   long  momentAvailable = reserveEarliestAvailable(permits, nowMicros);
   return  max(momentAvailable - nowMicros,  0 );
}

最后調用了reserveEarliestAvailable

@Override
final  long  reserveEarliestAvailable( int  requiredPermits,  long  nowMicros) {
   resync(nowMicros);
   long  returnValue = nextFreeTicketMicros;
   double  storedPermitsToSpend = min(requiredPermits,  this .storedPermits);
   double  freshPermits = requiredPermits - storedPermitsToSpend;
   long  waitMicros =
       storedPermitsToWaitTime( this .storedPermits, storedPermitsToSpend)
           + ( long ) (freshPermits * stableIntervalMicros);
 
   this .nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
   this .storedPermits -= storedPermitsToSpend;
   return  returnValue;
}

 

首先通過resync生成令牌以及同步nextFreeTicketMicros時間戳,freshPermits從令牌桶中獲取令牌后還需要的令牌數量,通過storedPermitsToWaitTime計算出獲取freshPermits還需要等待的時間,在穩定模式中,這里就是(long) (freshPermits * stableIntervalMicros) ,然后更新nextFreeTicketMicros以及storedPermits,這次獲取令牌需要的等待到的時間點, reserveAndGetWaitLength返回需要等待的時間間隔。

 

  • `reserveEarliestAvailable`可以看出RateLimiter的預消費原理,
  • 以及獲取令牌的等待時間時間原理(可以解釋示例結果),
  • 再獲取令牌不足時,並沒有等待到令牌全部生成,
  • 而是更新了下次獲取令牌時的nextFreeTicketMicros,
  • 從而影響的是下次獲取令牌的等待時間。

 

`reserve`這里返回等待時間后,`acquire`通過調用`stopwatch.sleepMicrosUninterruptibly(microsToWait);`

進行sleep操作,這里不同於Thread.sleep(), 這個函數的sleep是uninterruptibly的,內部實現:

public  static  void  sleepUninterruptibly( long  sleepFor, TimeUnit unit) {
     //sleep 阻塞線程 內部通過Thread.sleep()
   boolean  interrupted =  false ;
   try  {
     long  remainingNanos = unit.toNanos(sleepFor);
     long  end = System.nanoTime() + remainingNanos;
     while  ( true ) {
       try  {
         // TimeUnit.sleep() treats negative timeouts just like zero.
         NANOSECONDS.sleep(remainingNanos);
         return ;
       catch  (InterruptedException e) {
         interrupted =  true ;
         remainingNanos = end - System.nanoTime();
         //如果被interrupt可以繼續,更新sleep時間,循環繼續sleep
       }
     }
   finally  {
     if  (interrupted) {
       Thread.currentThread().interrupt();
       //如果被打斷過,sleep過后再真正中斷線程
     }
   }
}

sleep之后,`acquire`返回sleep的時間,阻塞結束,獲取到令牌。

public  boolean  tryAcquire( int  permits) {
   return  tryAcquire(permits,  0 , MICROSECONDS);
}
 
public  boolean  tryAcquire() {
   return  tryAcquire( 1 0 , MICROSECONDS);
}
 
public  boolean  tryAcquire( int  permits,  long  timeout, TimeUnit unit) {
   long  timeoutMicros = max(unit.toMicros(timeout),  0 );
   checkPermits(permits);
   long  microsToWait;
   synchronized  (mutex()) {
     long  nowMicros = stopwatch.readMicros();
     if  (!canAcquire(nowMicros, timeoutMicros)) {
       return  false ;
     else  {
       microsToWait = reserveAndGetWaitLength(permits, nowMicros);
     }
   }
   stopwatch.sleepMicrosUninterruptibly(microsToWait);
   return  true ;
}
 
private  boolean  canAcquire( long  nowMicros,  long  timeoutMicros) {
   return  queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
 
@Override
final  long  queryEarliestAvailable( long  nowMicros) {
   return  nextFreeTicketMicros;
}

tryAcquire函數可以嘗試在timeout時間內獲取令牌,如果可以則掛起等待相應時間並返回true,否則立即返回false
canAcquire用於判斷timeout時間內是否可以獲取令牌,通過判斷當前時間+超時時間是否大於nextFreeTicketMicros 來決定是否能夠拿到足夠的令牌數,如果可以獲取到,則過程同acquire,線程sleep等待,如果通過canAcquire在此超時時間內不能回去到令牌,則可以快速返回,不需要等待timeout后才知道能否獲取到令牌。

參考文檔:https://www.jianshu.com/p/5d4fe4b2a726

2.2.3 方法摘要

修飾符和類型
方法和描述
修飾符和類型
方法和描述
double acquire()
從RateLimiter獲取一個許可,該方法會被阻塞直到獲取到請求
double acquire(int permits)
從RateLimiter獲取指定許可數,該方法會被阻塞直到獲取到請求
static RateLimiter create(double permitsPerSecond)
根據指定的穩定吞吐率創建RateLimiter,這里的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少查詢)
static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
根據指定的穩定吞吐率和預熱期來創建RateLimiter,這里的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少個請求量),在這段預熱時間內,RateLimiter每秒分配的許可數會平穩地增長直到預熱期結束時達到其最大速率。(只要存在足夠請求數來使其飽和)
double getRate()
返回RateLimiter 配置中的穩定速率,該速率單位是每秒多少許可數
void setRate(double permitsPerSecond)
更新RateLimite的穩定速率,參數permitsPerSecond 由構造RateLimiter的工廠方法提供。
String toString()
返回對象的字符表現形式
boolean tryAcquire()
從RateLimiter 獲取許可,如果該許可可以在無延遲下的情況下立即獲取得到的話
boolean tryAcquire(int permits)
從RateLimiter 獲取許可數,如果該許可數可以在無延遲下的情況下立即獲取得到的話
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
從RateLimiter 獲取指定許可數如果該許可數可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可數的話,那么立即返回false (無需等待)
boolean tryAcquire(long timeout, TimeUnit unit)
從RateLimiter 獲取許可如果該許可可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可的話,那么立即返回false(無需等待)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM