转自:https://www.jianshu.com/p/8f548e469bbe
参考:https://www.jianshu.com/p/5d4fe4b2a726

本次实战,我们用的是guava的RateLimiter,场景是spring mvc在处理请求时候,从桶中申请令牌,申请到了就成功响应,申请不到时直接返回失败。
实例
1、添加guava jar包
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency>
2、AccessLimitService.java 限流服务封装到一个类中AccessLimitService,提供tryAcquire()方法,用来尝试获取令牌,返回true表示获取到
@Service public class AccessLimitService { //每秒只发出5个令牌 RateLimiter rateLimiter = RateLimiter.create(5.0); /** * 尝试获取令牌 * @return */ public boolean tryAcquire(){ return rateLimiter.tryAcquire(); } }
3、Controller层每次收到请求的时候都尝试去获取令牌,获取成功和失败打印不同的信息
@Controller public class HelloController { private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Autowired private AccessLimitService accessLimitService; @RequestMapping("/access") @ResponseBody public String access(){ //尝试获取令牌 if(accessLimitService.tryAcquire()){ //模拟业务执行500毫秒 try { Thread.sleep(500); }catch (InterruptedException e){ e.printStackTrace(); } return "aceess success [" + sdf.format(new Date()) + "]"; }else{ return "aceess limit [" + sdf.format(new Date()) + "]"; } } }
4、测试:十个线程并发访问接口
public class AccessClient { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10); /** * get请求 * @param realUrl * @return */ public static String sendGet(URL realUrl) { String result = ""; BufferedReader in = null; try { // 打开和URL之间的连接 URLConnection connection = realUrl.openConnection(); // 设置通用的请求属性 connection.setRequestProperty("accept", "*/*"); connection.setRequestProperty("connection", "Keep-Alive"); connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)"); // 建立实际的连接 connection.connect(); // 定义 BufferedReader输入流来读取URL的响应 in = new BufferedReader(new InputStreamReader( connection.getInputStream())); String line; while ((line = in.readLine()) != null) { result += line; } } catch (Exception e) { System.out.println("发送GET请求出现异常!" + e); e.printStackTrace(); } // 使用finally块来关闭输入流 finally { try { if (in != null) { in.close(); } } catch (Exception e2) { e2.printStackTrace(); } } return result; } public void access() throws Exception{ final URL url = new URL("http://localhost:8080/guavalimitdemo/access"); for(int i=0;i<10;i++) { fixedThreadPool.submit(new Runnable() { public void run() { System.out.println(sendGet(url)); } }); } fixedThreadPool.shutdown(); fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } public static void main(String[] args) throws Exception{ AccessClient accessClient = new AccessClient(); accessClient.access(); } }
部分请求由于获取的令牌可以成功执行,其余请求没有拿到令牌,我们可以根据实际业务来做区分处理。还有一点要注意,我们通过RateLimiter.create(5.0)配置的是每一秒5枚令牌,但是限流的时候发出的是6枚,改用其他值验证,也是实际的比配置的大1。
以上就是快速实现限流的实战过程,此处仅是单进程服务的限流,而实际的分布式服务中会考虑更多因素,会复杂很多。
RateLimiter方法摘要
修饰符和类型 | 方法和描述 |
---|---|
double | acquire() 从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求 |
double | acquire(int permits)从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求 |
static RateLimiter | create(double permitsPerSecond)根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询) |
static RateLimiter | create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和) |
double | getRate()返回RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数 |
void | setRate(double permitsPerSecond)更新RateLimite的稳定速率,参数permitsPerSecond 由构造RateLimiter的工厂方法提供。 |
String | toString()返回对象的字符表现形式 |
boolean | tryAcquire()从RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话 |
boolean | tryAcquire(int permits)从RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话 |
boolean | tryAcquire(int permits, long timeout, TimeUnit unit)从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待) |
boolean | tryAcquire(long timeout, TimeUnit unit)从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待) |
- 举例来说明如何使用RateLimiter,想象下我们需要处理一个任务列表,但我们不希望每秒的任务提交超过两个:
//速率是每秒两个许可 final RateLimiter rateLimiter = RateLimiter.create(2.0); void submitTasks(List tasks, Executor executor) { for (Runnable task : tasks) { rateLimiter.acquire(); // 也许需要等待 executor.execute(task); } }
1、基本算法
1.1 漏桶算法
请求先进入到漏桶里,漏桶以一定的速度出水,当水请求过大会直接溢出,可以看出漏桶算法能强行限制数据的传输速率。
缺点:除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。
1.2 令牌桶
令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
2. Guava实现
2.1 简单的demo
public
class
RateLimiterDemo {
public
static
void
main(String[] args) {
testNoRateLimiter();
testWithRateLimiter();
}
public
static
void
testNoRateLimiter() {
Long start = System.currentTimeMillis();
for
(
int
i =
0
; i <
10
; i++) {
System.out.println(
"call execute.."
+ i);
}
Long end = System.currentTimeMillis();
System.out.println(end - start);
}
public
static
void
testWithRateLimiter() {
Long start = System.currentTimeMillis();
final
RateLimiter limiter = RateLimiter.create(
10.0
);
// 每秒不超过10个任务被提交
for
(
int
i =
0
; i <
10
; i++) {
limiter.acquire();
// 请求RateLimiter, 超过permits会被阻塞
System.out.println(
"call execute.."
+ i);
}
Long end = System.currentTimeMillis();
System.out.println(end - start);
}
}
testNoRateLimiter方法快速打印出来
testWithRateLimiter方法
1
秒打印
10
个
|
2.2 RateLimiter介绍
2.2.1 总体框架
RateLimiter类图结构
RateLimiter:抽象类,定义了主要的对外接口以及整个流程的模板
SmoothRateLimiter:RateLimiter的实现类,实现了RateLimiter的部分方法
SmoothWarmingUp:SmoothRateLimiter的实现类,渐进模式,令牌生成速度缓慢提升直到维持在一个稳定值
SmoothBursty:SmoothRateLimiter的实现类,稳定模式,令牌生成速度恒定
2.2.2 具体步骤
2.2.2.1 RateLimiter的创建
通过调用RateLimiter的create
接口来创建实例,实际是调用的SmoothBuisty
稳定模式创建的实例。
public
static
RateLimiter create(
double
permitsPerSecond) {
return
create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
static
RateLimiter create(
double
permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter =
new
SmoothBursty(stopwatch,
1.0
/* maxBurstSeconds */
);
rateLimiter.setRate(permitsPerSecond);
return
rateLimiter;
}
|
SmoothBursty
中的两个构造参数含义:
- SleepingStopwatch:guava中的一个时钟类实例,会通过这个来计算时间及令牌
- maxBurstSeconds:官方解释,在ReteLimiter未使用时,最多保存几秒的令牌,默认是1
在解析SmoothBursty原理前,重点解释下SmoothBursty中几个属性的含义
/**
* The work (permits) of how many seconds can be saved up if this RateLimiter is unused?
* 在RateLimiter未使用时,最多存储几秒的令牌
* */
final
double
maxBurstSeconds;
/**
* The currently stored permits.
* 当前存储令牌数
*/
double
storedPermits;
/**
* The maximum number of stored permits.
* 最大存储令牌数 = maxBurstSeconds * stableIntervalMicros(见下文)
*/
double
maxPermits;
/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
* 添加令牌时间间隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌数)
*/
double
stableIntervalMicros;
/**
* The time when the next request (no matter its size) will be granted. After granting a request,
* this is pushed further in the future. Large requests push this further than small requests.
* 下一次请求可以获取令牌的起始时间
* 由于RateLimiter允许预消费,上次请求预消费令牌后
* 下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌
*/
private
long
nextFreeTicketMicros = 0L;
// could be either in the past or future
|
2.2.2.2 关键方法
- setRate
public
final
void
setRate(
double
permitsPerSecond) {
checkArgument(
permitsPerSecond >
0.0
&& !Double.isNaN(permitsPerSecond),
"rate must be positive"
);
synchronized
(mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
|
通过这个接口设置令牌通每秒生成令牌的数量,内部时间通过调用SmoothRateLimiter
的doSetRate
来实现
- doSetRate
@Override
final
void
doSetRate(
double
permitsPerSecond,
long
nowMicros) {
resync(nowMicros);
double
stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this
.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
|
这里先通过调用resync
生成令牌以及更新下一期令牌生成时间,然后更新stableIntervalMicros,最后又调用了SmoothBursty
的doSetRate
- resync
/**
* Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
* 基于当前时间,更新下一次请求令牌的时间,以及当前存储的令牌(可以理解为生成令牌)
*/
void
resync(
long
nowMicros) {
// if nextFreeTicket is in the past, resync to now
if
(nowMicros > nextFreeTicketMicros) {
double
newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
|
延迟计算,如上resync
函数。该函数会在每次获取令牌之前调用,其实现思路为,若当前时间晚于nextFreeTicketMicros,则计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据。这样一来,只需要在获取令牌时计算一次即可。
- SmoothBursty的doSetRate
@Override
void
doSetRate(
double
permitsPerSecond,
double
stableIntervalMicros) {
double
oldMaxPermits =
this
.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if
(oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
// Double.POSITIVE_INFINITY 代表无穷啊
storedPermits = maxPermits;
}
else
{
storedPermits =
(oldMaxPermits ==
0.0
)
?
0.0
// initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
|
桶中可存放的最大令牌数由maxBurstSeconds计算而来,其含义为最大存储maxBurstSeconds秒生成的令牌。
该参数的作用在于,可以更为灵活地控制流量。如,某些接口限制为300次/20秒,某些接口限制为50次/45秒等。也就是流量不局限于qps
2.2.2.3 RateLimiter几个常用接口分析
理解RateLimiter暴露出来的接口
@CanIgnoreReturnValue
public
double
acquire() {
return
acquire(
1
);
}
/**
* 获取令牌,返回阻塞的时间
**/
@CanIgnoreReturnValue
public
double
acquire(
int
permits) {
long
microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return
1.0
* microsToWait / SECONDS.toMicros(1L);
}
final
long
reserve(
int
permits) {
checkPermits(permits);
synchronized
(mutex()) {
return
reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
|
acquire
函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回,主要通过reserve
返回需要等待的时间,reserve
中通过调用reserveAndGetWaitLength
获取等待时间
/**
* Reserves next ticket and returns the wait time that the caller must wait for.
*
* @return the required wait time, never negative
*/
final
long
reserveAndGetWaitLength(
int
permits,
long
nowMicros) {
long
momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return
max(momentAvailable - nowMicros,
0
);
}
|
最后调用了reserveEarliestAvailable
@Override
final
long
reserveEarliestAvailable(
int
requiredPermits,
long
nowMicros) {
resync(nowMicros);
long
returnValue = nextFreeTicketMicros;
double
storedPermitsToSpend = min(requiredPermits,
this
.storedPermits);
double
freshPermits = requiredPermits - storedPermitsToSpend;
long
waitMicros =
storedPermitsToWaitTime(
this
.storedPermits, storedPermitsToSpend)
+ (
long
) (freshPermits * stableIntervalMicros);
this
.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this
.storedPermits -= storedPermitsToSpend;
return
returnValue;
}
|
首先通过resync生成令牌以及同步nextFreeTicketMicros时间戳,
freshPermits从令牌桶中获取令牌后还需要的令牌数量,
通过storedPermitsToWaitTime计算出获取freshPermits还需要等待的时间,
在稳定模式中,这里就是(long) (freshPermits * stableIntervalMicros) ,
然后更新nextFreeTicketMicros以及storedPermits,这次获取令牌需要的等待到的时间点,
reserveAndGetWaitLength返回需要等待的时间间隔。
从`reserveEarliestAvailable`可以看出RateLimiter的预消费原理,
以及获取令牌的等待时间时间原理(可以解释示例结果),
再获取令牌不足时,并没有等待到令牌全部生成,
而是更新了下次获取令牌时的nextFreeTicketMicros,
从而影响的是下次获取令牌的等待时间。
`reserve`这里返回等待时间后,
`acquire`通过调用`stopwatch.sleepMicrosUninterruptibly(microsToWait);`
进行sleep操作,这里不同于Thread.sleep(), 这个函数的sleep是uninterruptibly的,内部实现:
public
static
void
sleepUninterruptibly(
long
sleepFor, TimeUnit unit) {
//sleep 阻塞线程 内部通过Thread.sleep()
boolean
interrupted =
false
;
try
{
long
remainingNanos = unit.toNanos(sleepFor);
long
end = System.nanoTime() + remainingNanos;
while
(
true
) {
try
{
// TimeUnit.sleep() treats negative timeouts just like zero.
NANOSECONDS.sleep(remainingNanos);
return
;
}
catch
(InterruptedException e) {
interrupted =
true
;
remainingNanos = end - System.nanoTime();
//如果被interrupt可以继续,更新sleep时间,循环继续sleep
}
}
}
finally
{
if
(interrupted) {
Thread.currentThread().interrupt();
//如果被打断过,sleep过后再真正中断线程
}
}
}
|
sleep之后,`acquire`返回sleep的时间,阻塞结束,获取到令牌。
public
boolean
tryAcquire(
int
permits) {
return
tryAcquire(permits,
0
, MICROSECONDS);
}
public
boolean
tryAcquire() {
return
tryAcquire(
1
,
0
, MICROSECONDS);
}
public
boolean
tryAcquire(
int
permits,
long
timeout, TimeUnit unit) {
long
timeoutMicros = max(unit.toMicros(timeout),
0
);
checkPermits(permits);
long
microsToWait;
synchronized
(mutex()) {
long
nowMicros = stopwatch.readMicros();
if
(!canAcquire(nowMicros, timeoutMicros)) {
return
false
;
}
else
{
microsToWait = reserveAndGetWaitLength(permits, nowMicros);
}
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return
true
;
}
private
boolean
canAcquire(
long
nowMicros,
long
timeoutMicros) {
return
queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
@Override
final
long
queryEarliestAvailable(
long
nowMicros) {
return
nextFreeTicketMicros;
}
|
tryAcquire
函数可以尝试在timeout时间内获取令牌,如果可以则挂起等待相应时间并返回true,否则立即返回falsecanAcquire
用于判断timeout时间内是否可以获取令牌,通过判断当前时间+超时时间是否大于nextFreeTicketMicros 来决定是否能够拿到足够的令牌数,如果可以获取到,则过程同acquire,线程sleep等待,如果通过canAcquire
在此超时时间内不能回去到令牌,则可以快速返回,不需要等待timeout后才知道能否获取到令牌。
2.2.3 方法摘要
修饰符和类型
|
方法和描述
|
---|---|
修饰符和类型
|
方法和描述
|
double | acquire() 从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求 |
double | acquire(int permits) 从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求 |
static RateLimiter | create(double permitsPerSecond) 根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询) |
static RateLimiter | create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) 根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),在这段预热时间内,RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和) |
double | getRate() 返回RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数 |
void | setRate(double permitsPerSecond) 更新RateLimite的稳定速率,参数permitsPerSecond 由构造RateLimiter的工厂方法提供。 |
String | toString() 返回对象的字符表现形式 |
boolean | tryAcquire() 从RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话 |
boolean | tryAcquire(int permits) 从RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话 |
boolean | tryAcquire(int permits, long timeout, TimeUnit unit) 从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待) |
boolean | tryAcquire(long timeout, TimeUnit unit) 从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待) |