使用Guava-RateLimiter限流控制qps


转自:https://www.jianshu.com/p/8f548e469bbe

参考:https://www.jianshu.com/p/5d4fe4b2a726

常用的限流算法有漏桶算法和令牌桶算法,guava的RateLimiter使用的是令牌桶算法,也就是以固定的频率向桶中放入令牌,例如一秒钟10枚令牌,实际业务在每次响应请求之前都从桶中获取令牌,只有取到令牌的请求才会被成功响应,获取的方式有两种:阻塞等待令牌或者取不到立即返回失败,下图来自网上:


本次实战,我们用的是guava的RateLimiter,场景是spring mvc在处理请求时候,从桶中申请令牌,申请到了就成功响应,申请不到时直接返回失败。

实例

1、添加guava jar包

 <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>18.0</version>
</dependency>

2、AccessLimitService.java 限流服务封装到一个类中AccessLimitService,提供tryAcquire()方法,用来尝试获取令牌,返回true表示获取到

@Service
public class AccessLimitService {

    //每秒只发出5个令牌
    RateLimiter rateLimiter = RateLimiter.create(5.0);

    /**
     * 尝试获取令牌
     * @return
     */
    public boolean tryAcquire(){
        return rateLimiter.tryAcquire();
    }
}

3、Controller层每次收到请求的时候都尝试去获取令牌,获取成功和失败打印不同的信息

@Controller
public class HelloController {

    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private AccessLimitService accessLimitService;

    @RequestMapping("/access")
    @ResponseBody
    public String access(){
        //尝试获取令牌
        if(accessLimitService.tryAcquire()){
            //模拟业务执行500毫秒
            try {
                Thread.sleep(500);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            return "aceess success [" + sdf.format(new Date()) + "]";
        }else{
            return "aceess limit [" + sdf.format(new Date()) + "]";
        }
    }
}

4、测试:十个线程并发访问接口

public class AccessClient {
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

    /**
     * get请求
     * @param realUrl
     * @return
     */
    public static String sendGet(URL realUrl) {
        String result = "";
        BufferedReader in = null;
        try {
            // 打开和URL之间的连接
            URLConnection connection = realUrl.openConnection();
            // 设置通用的请求属性
            connection.setRequestProperty("accept", "*/*");
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("user-agent",
                    "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
            // 建立实际的连接
            connection.connect();

            // 定义 BufferedReader输入流来读取URL的响应
            in = new BufferedReader(new InputStreamReader(
                    connection.getInputStream()));
            String line;
            while ((line = in.readLine()) != null) {
                result += line;
            }
        } catch (Exception e) {
            System.out.println("发送GET请求出现异常!" + e);
            e.printStackTrace();
        }
        // 使用finally块来关闭输入流
        finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        return result;
    }



    public void access() throws Exception{
        final URL url = new URL("http://localhost:8080/guavalimitdemo/access");

        for(int i=0;i<10;i++) {
            fixedThreadPool.submit(new Runnable() {
                public void run() {
                    System.out.println(sendGet(url));
                }
            });
        }

        fixedThreadPool.shutdown();
        fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    }

    public static void main(String[] args) throws Exception{
        AccessClient accessClient = new AccessClient();
        accessClient.access();
    }
}

部分请求由于获取的令牌可以成功执行,其余请求没有拿到令牌,我们可以根据实际业务来做区分处理。还有一点要注意,我们通过RateLimiter.create(5.0)配置的是每一秒5枚令牌,但是限流的时候发出的是6枚,改用其他值验证,也是实际的比配置的大1。

以上就是快速实现限流的实战过程,此处仅是单进程服务的限流,而实际的分布式服务中会考虑更多因素,会复杂很多。


RateLimiter方法摘要

修饰符和类型 方法和描述
double acquire() 从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求
double acquire(int permits)从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求
static RateLimiter create(double permitsPerSecond)根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询)
static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和)
double getRate()返回RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数
void setRate(double permitsPerSecond)更新RateLimite的稳定速率,参数permitsPerSecond 由构造RateLimiter的工厂方法提供。
String toString()返回对象的字符表现形式
boolean tryAcquire()从RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话
boolean tryAcquire(int permits)从RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话
boolean tryAcquire(int permits, long timeout, TimeUnit unit)从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待)
boolean tryAcquire(long timeout, TimeUnit unit)从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待)
  • 举例来说明如何使用RateLimiter,想象下我们需要处理一个任务列表,但我们不希望每秒的任务提交超过两个:
//速率是每秒两个许可
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List tasks, Executor executor) {
    for (Runnable task : tasks) {
        rateLimiter.acquire(); // 也许需要等待
        executor.execute(task);
    }
}


官方文档:http://ifeve.com/guava-ratelimiter



附:

1、基本算法

1.1 漏桶算法

请求先进入到漏桶里,漏桶以一定的速度出水,当水请求过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。

缺点:除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。

1.2 令牌桶

令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务

 

2. Guava实现

2.1 简单的demo

public  class  RateLimiterDemo {
     public  static  void  main(String[] args) {
         testNoRateLimiter();
         testWithRateLimiter();
     }
  
     public  static  void  testNoRateLimiter() {
         Long start = System.currentTimeMillis();
         for  ( int  i =  0 ; i <  10 ; i++) {
             System.out.println( "call execute.."  + i);
             
         }
         Long end = System.currentTimeMillis();
         
         System.out.println(end - start);
         
     }
     
     public  static  void  testWithRateLimiter() {
         Long start = System.currentTimeMillis();
         final  RateLimiter limiter = RateLimiter.create( 10.0 );  // 每秒不超过10个任务被提交
         for  ( int  i =  0 ; i <  10 ; i++) {
             limiter.acquire();  // 请求RateLimiter, 超过permits会被阻塞
             System.out.println( "call execute.."  + i);
             
         }
         Long end = System.currentTimeMillis();
         
         System.out.println(end - start);
         
     }
     
}
 
 
testNoRateLimiter方法快速打印出来
testWithRateLimiter方法 1 秒打印 10

2.2 RateLimiter介绍

2.2.1 总体框架

RateLimiter类图结构

RateLimiter:抽象类,定义了主要的对外接口以及整个流程的模板

SmoothRateLimiter:RateLimiter的实现类,实现了RateLimiter的部分方法

SmoothWarmingUp:SmoothRateLimiter的实现类,渐进模式,令牌生成速度缓慢提升直到维持在一个稳定值

SmoothBursty:SmoothRateLimiter的实现类,稳定模式,令牌生成速度恒定

2.2.2 具体步骤

 2.2.2.1 RateLimiter的创建

通过调用RateLimiter的create接口来创建实例,实际是调用的SmoothBuisty稳定模式创建的实例。

public  static  RateLimiter create( double  permitsPerSecond) {
     return  create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
   }
 
   static  RateLimiter create( double  permitsPerSecond, SleepingStopwatch stopwatch) {
     RateLimiter rateLimiter =  new  SmoothBursty(stopwatch,  1.0  /* maxBurstSeconds */ );
     rateLimiter.setRate(permitsPerSecond);
     return  rateLimiter;
   }

SmoothBursty中的两个构造参数含义:

  • SleepingStopwatch:guava中的一个时钟类实例,会通过这个来计算时间及令牌
  • maxBurstSeconds:官方解释,在ReteLimiter未使用时,最多保存几秒的令牌,默认是1

在解析SmoothBursty原理前,重点解释下SmoothBursty中几个属性的含义

/**
  * The work (permits) of how many seconds can be saved up if this RateLimiter is unused?
  * 在RateLimiter未使用时,最多存储几秒的令牌
  * */
  final  double  maxBurstSeconds;
  
 
/**
  * The currently stored permits.
  * 当前存储令牌数
  */
double  storedPermits;
 
/**
  * The maximum number of stored permits.
  * 最大存储令牌数 = maxBurstSeconds * stableIntervalMicros(见下文)
  */
double  maxPermits;
 
/**
  * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
  * per second has a stable interval of 200ms.
  * 添加令牌时间间隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌数)
  */
double  stableIntervalMicros;
 
/**
  * The time when the next request (no matter its size) will be granted. After granting a request,
  * this is pushed further in the future. Large requests push this further than small requests.
  * 下一次请求可以获取令牌的起始时间
  * 由于RateLimiter允许预消费,上次请求预消费令牌后
  * 下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌
  */
private  long  nextFreeTicketMicros = 0L;  // could be either in the past or future


2.2.2.2 关键方法

  • setRate
public  final  void  setRate( double  permitsPerSecond) {
   checkArgument(
       permitsPerSecond >  0.0  && !Double.isNaN(permitsPerSecond),  "rate must be positive" );
   synchronized  (mutex()) {
     doSetRate(permitsPerSecond, stopwatch.readMicros());
   }
}

通过这个接口设置令牌通每秒生成令牌的数量,内部时间通过调用SmoothRateLimiterdoSetRate来实现

  • doSetRate
@Override
   final  void  doSetRate( double  permitsPerSecond,  long  nowMicros) {
     resync(nowMicros);
     double  stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
     this .stableIntervalMicros = stableIntervalMicros;
     doSetRate(permitsPerSecond, stableIntervalMicros);
   }

这里先通过调用resync生成令牌以及更新下一期令牌生成时间,然后更新stableIntervalMicros,最后又调用了SmoothBurstydoSetRate

  • resync
/**
  * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
  * 基于当前时间,更新下一次请求令牌的时间,以及当前存储的令牌(可以理解为生成令牌)
  */
void  resync( long  nowMicros) {
     // if nextFreeTicket is in the past, resync to now
     if  (nowMicros > nextFreeTicketMicros) {
       double  newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
       storedPermits = min(maxPermits, storedPermits + newPermits);
       nextFreeTicketMicros = nowMicros;
     }
}

延迟计算,如上resync函数。该函数会在每次获取令牌之前调用,其实现思路为,若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只需要在获取令牌时计算一次即可。

  • SmoothBursty的doSetRate
@Override
void  doSetRate( double  permitsPerSecond,  double  stableIntervalMicros) {
   double  oldMaxPermits =  this .maxPermits;
   maxPermits = maxBurstSeconds * permitsPerSecond;
   if  (oldMaxPermits == Double.POSITIVE_INFINITY) {
     // if we don't special-case this, we would get storedPermits == NaN, below
     // Double.POSITIVE_INFINITY 代表无穷啊
     storedPermits = maxPermits;
   else  {
     storedPermits =
         (oldMaxPermits ==  0.0 )
             0.0  // initial state
             : storedPermits * maxPermits / oldMaxPermits;
   }
}

桶中可存放的最大令牌数由maxBurstSeconds计算而来,其含义为最大存储maxBurstSeconds秒生成的令牌。

该参数的作用在于,可以更为灵活地控制流量。如,某些接口限制为300次/20秒,某些接口限制为50次/45秒等。也就是流量不局限于qps

2.2.2.3 RateLimiter几个常用接口分析

理解RateLimiter暴露出来的接口

@CanIgnoreReturnValue
public  double  acquire() {
   return  acquire( 1 );
}
 
/**
* 获取令牌,返回阻塞的时间
**/
@CanIgnoreReturnValue
public  double  acquire( int  permits) {
   long  microsToWait = reserve(permits);
   stopwatch.sleepMicrosUninterruptibly(microsToWait);
   return  1.0  * microsToWait / SECONDS.toMicros(1L);
}
 
final  long  reserve( int  permits) {
   checkPermits(permits);
   synchronized  (mutex()) {
     return  reserveAndGetWaitLength(permits, stopwatch.readMicros());
   }
}

acquire函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回,主要通过reserve返回需要等待的时间,reserve中通过调用reserveAndGetWaitLength获取等待时间

/**
  * Reserves next ticket and returns the wait time that the caller must wait for.
  *
  * @return the required wait time, never negative
  */
final  long  reserveAndGetWaitLength( int  permits,  long  nowMicros) {
   long  momentAvailable = reserveEarliestAvailable(permits, nowMicros);
   return  max(momentAvailable - nowMicros,  0 );
}

最后调用了reserveEarliestAvailable

@Override
final  long  reserveEarliestAvailable( int  requiredPermits,  long  nowMicros) {
   resync(nowMicros);
   long  returnValue = nextFreeTicketMicros;
   double  storedPermitsToSpend = min(requiredPermits,  this .storedPermits);
   double  freshPermits = requiredPermits - storedPermitsToSpend;
   long  waitMicros =
       storedPermitsToWaitTime( this .storedPermits, storedPermitsToSpend)
           + ( long ) (freshPermits * stableIntervalMicros);
 
   this .nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
   this .storedPermits -= storedPermitsToSpend;
   return  returnValue;
}

 

首先通过resync生成令牌以及同步nextFreeTicketMicros时间戳,freshPermits从令牌桶中获取令牌后还需要的令牌数量,通过storedPermitsToWaitTime计算出获取freshPermits还需要等待的时间,在稳定模式中,这里就是(long) (freshPermits * stableIntervalMicros) ,然后更新nextFreeTicketMicros以及storedPermits,这次获取令牌需要的等待到的时间点, reserveAndGetWaitLength返回需要等待的时间间隔。

 

  • `reserveEarliestAvailable`可以看出RateLimiter的预消费原理,
  • 以及获取令牌的等待时间时间原理(可以解释示例结果),
  • 再获取令牌不足时,并没有等待到令牌全部生成,
  • 而是更新了下次获取令牌时的nextFreeTicketMicros,
  • 从而影响的是下次获取令牌的等待时间。

 

`reserve`这里返回等待时间后,`acquire`通过调用`stopwatch.sleepMicrosUninterruptibly(microsToWait);`

进行sleep操作,这里不同于Thread.sleep(), 这个函数的sleep是uninterruptibly的,内部实现:

public  static  void  sleepUninterruptibly( long  sleepFor, TimeUnit unit) {
     //sleep 阻塞线程 内部通过Thread.sleep()
   boolean  interrupted =  false ;
   try  {
     long  remainingNanos = unit.toNanos(sleepFor);
     long  end = System.nanoTime() + remainingNanos;
     while  ( true ) {
       try  {
         // TimeUnit.sleep() treats negative timeouts just like zero.
         NANOSECONDS.sleep(remainingNanos);
         return ;
       catch  (InterruptedException e) {
         interrupted =  true ;
         remainingNanos = end - System.nanoTime();
         //如果被interrupt可以继续,更新sleep时间,循环继续sleep
       }
     }
   finally  {
     if  (interrupted) {
       Thread.currentThread().interrupt();
       //如果被打断过,sleep过后再真正中断线程
     }
   }
}

sleep之后,`acquire`返回sleep的时间,阻塞结束,获取到令牌。

public  boolean  tryAcquire( int  permits) {
   return  tryAcquire(permits,  0 , MICROSECONDS);
}
 
public  boolean  tryAcquire() {
   return  tryAcquire( 1 0 , MICROSECONDS);
}
 
public  boolean  tryAcquire( int  permits,  long  timeout, TimeUnit unit) {
   long  timeoutMicros = max(unit.toMicros(timeout),  0 );
   checkPermits(permits);
   long  microsToWait;
   synchronized  (mutex()) {
     long  nowMicros = stopwatch.readMicros();
     if  (!canAcquire(nowMicros, timeoutMicros)) {
       return  false ;
     else  {
       microsToWait = reserveAndGetWaitLength(permits, nowMicros);
     }
   }
   stopwatch.sleepMicrosUninterruptibly(microsToWait);
   return  true ;
}
 
private  boolean  canAcquire( long  nowMicros,  long  timeoutMicros) {
   return  queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
 
@Override
final  long  queryEarliestAvailable( long  nowMicros) {
   return  nextFreeTicketMicros;
}

tryAcquire函数可以尝试在timeout时间内获取令牌,如果可以则挂起等待相应时间并返回true,否则立即返回false
canAcquire用于判断timeout时间内是否可以获取令牌,通过判断当前时间+超时时间是否大于nextFreeTicketMicros 来决定是否能够拿到足够的令牌数,如果可以获取到,则过程同acquire,线程sleep等待,如果通过canAcquire在此超时时间内不能回去到令牌,则可以快速返回,不需要等待timeout后才知道能否获取到令牌。

参考文档:https://www.jianshu.com/p/5d4fe4b2a726

2.2.3 方法摘要

修饰符和类型
方法和描述
修饰符和类型
方法和描述
double acquire()
从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求
double acquire(int permits)
从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求
static RateLimiter create(double permitsPerSecond)
根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询)
static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和)
double getRate()
返回RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数
void setRate(double permitsPerSecond)
更新RateLimite的稳定速率,参数permitsPerSecond 由构造RateLimiter的工厂方法提供。
String toString()
返回对象的字符表现形式
boolean tryAcquire()
从RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话
boolean tryAcquire(int permits)
从RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待)
boolean tryAcquire(long timeout, TimeUnit unit)
从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待)

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM