1.限流
2.限流算法
2.1計數器
2.2 漏桶算法
2.3 令牌桶算法
2.4、滑動時間窗
2.5、三色速率標記法
三、限流實現
3.1 RateLimiter簡介(guava的令牌桶實現)
3.2 基於 redis 的分布式限流
3.3 Spring Cloud GateWay整合redis內置RequestRateLimiter限流應用
3.4 AOP + Semaphore限流應用
1.限流
每個API接口都是有訪問上限的,當訪問頻率或者並發量超過其承受范圍時候,我們就必須考慮限流來保證接口的可用性或者降級可用性.即接口也需要安裝上保險絲,以防止非預期的請求對系統壓力過大而引起的系統癱瘓.
通常的策略就是拒絕多余的訪問,或者讓多余的訪問排隊等待服務,或者引流.
如果要准確的控制QPS,簡單的做法是維護一個單位時間內的Counter,如判斷單位時間已經過去,則將Counter重置零.此做法被認為沒有很好的處理單位時間的邊界,比如在前一秒的最后一毫秒里和下一秒的第一毫秒都觸發了最大的請求數,將目光移動一下,就看到在兩毫秒內發生了兩倍的QPS.

2.限流算法
常用的更平滑的限流算法有兩種:漏桶算法和令牌桶算法.
兩種限流的祖師級算法確有其獨到之處,其他實現比如滑動時間窗或者三色速率標記法,其實是“漏桶”與“令牌桶”的變種。要么將“漏桶”容積換成了單位時間,要么是按規則將請求標記顏色進行處理,底層還是“令牌”的思想。
很多傳統的服務提供商如華為中興都有類似的專利,參考: http://www.google.com/patents/CN1536815A?cl=zh
2.1計數器
計數器法是限流算法里最簡單也是最容易實現的一種算法。比如我們規定,對於A接口來說,我們1分鍾的訪問次數不能超過100個。那么我們我們可以設置一個計數器counter,其有效時間為1分鍾(即每分鍾計數器會被重置為0),每當一個請求過來的時候,counter就加1,如果counter的值大於100,就說明請求數過多;
這個算法雖然簡單,但是有一個十分致命的問題,那就是臨界問題。

如上圖所示,在1:00前一刻到達100個請求,1:00計數器被重置,1:00后一刻又到達100個請求,顯然計數器不會超過100,所有請求都不會被攔截;然而這一時間段內請求數已經達到200,遠超100。違背定義的固定速率。
2.2 漏桶算法
漏桶(Leaky Bucket)算法思路很簡單,水(請求)先進入到漏桶里,漏桶以一定的速度出水(接口有響應速率),當水流入速度過大會直接溢出(訪問頻率超過接口響應速率),然后就拒絕請求,而當入小於出的情況下,漏桶不起任何作用。可以看出漏桶算法能強行限制數據的傳輸速率.示意圖如下:


注意:在我們的應用中,漏桶算法強制限定流量速率后,多出的(溢出的)流量可以被利用起來,並非完全丟棄,我們可以把它收集到一個隊列里面,做流量隊列,盡量做到合理利用所有資源。
漏桶算法:水(請求)先進入到漏桶里,漏桶以一定的速度出水,當水流入速度過大會直接溢出(拒絕服務),可以看出漏桶算法能強行限制數據的傳輸速率
流入:以任意速率往桶中放入水滴。
流出:以固定速率從桶中流出水滴。
用白話具體說明:假設漏斗總支持並發100個最大請求,如果超過100個請求,那么會提示系統繁忙,請稍后再試,數據輸出那可以設置1個線程池,處理線程數5個,每秒處理20個請求。
缺點:因為當流出速度固定,大規模持續突發量,無法多余處理,浪費網絡帶寬
優點:無法擊垮服務
示例:
可見這里有兩個變量,一個是桶的大小,支持流量突發增多時可以存多少的水(burst),另一個是水桶漏洞的大小(rate),偽代碼如下:
double rate; // leak rate in calls/s
double burst; // bucket size in calls
long refreshTime; // time for last water refresh
double water; // water count at refreshTime
refreshWater() {
long now = getTimeOfDay();
//水隨着時間流逝,不斷流走,最多就流干到0.
water = max(0, water- (now - refreshTime)*rate);
refreshTime = now;
}
bool permissionGranted() {
refreshWater();
if (water < burst) { // 水桶還沒滿,繼續加1
water ++;
return true;
} else {
return false;
}
}
因為漏桶的漏出速率是固定的參數,所以,即使網絡中不存在資源沖突(沒有發生擁塞),漏桶算法也不能使流突發(burst)到端口速率.因此,漏桶算法對於存在突發特性的流量來說缺乏效率.
2.3 令牌桶算法
令牌桶算法(Token Bucket)和 Leaky Bucket 效果一樣但方向相反的算法,更加容易理解.隨着時間流逝,系統會按恆定1/QPS時間間隔(如果QPS=100,則間隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了,令牌就溢出了。如果桶未滿,令牌可以積累。新請求來臨時,會各自拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務.



令牌桶的另外一個好處是可以方便的改變速度. 一旦需要提高速率,則按需提高放入桶中的令牌的速率. 一般會定時(比如100毫秒)往桶中增加一定數量的令牌, 有些變種算法則實時的計算應該增加的令牌的數量.
令牌桶算法:一個存放固定容量令牌的桶,按照固定速率(每秒/或者可以自定義時間)往桶里添加令牌,然后每次獲取一個令牌,當桶里沒有令牌可取時,則拒絕服務
令牌桶分為2個動作,動作1(固定速率往桶中存入令牌)、動作2(客戶端如果想訪問請求,先從桶中獲取token)
流入:以固定速率從桶中流入水滴
流出:按照任意速率從桶中流出水滴
技術上使用Google開源工具包Guava提供了限流工具類RateLimiter,該類基於令牌桶算法來完成限流,非常易於使用。RateLimiter是guava提供的基於令牌桶算法的實現類,可以非常簡單的完成限流特技,並且根據系統的實際情況來調整生成token的速率。 RateLimiter 是單機(單進程)的限流,是JVM級別的的限流,所有的令牌生成都是在內存中,在分布式環境下不能直接這么用。
優點:支持大的並發,有效利用網絡帶寬
漏桶和令牌桶的區別:
並不能說明令牌桶一定比漏洞好,她們使用場景不一樣。
- 令牌桶算法,放在服務端,用來保護服務端(自己),主要用來對調用者頻率進行限流,為的是不讓自己被壓垮。所以如果自己本身有處理能力的時候,如果流量突發(實際消費能力強於配置的流量限制=桶大小),那么實際處理速率可以超過配置的限制(桶大小)。
- 而漏桶算法,放在調用方,這是用來保護他人,也就是保護他所調用的系統。主要場景是,當調用的第三方系統本身沒有保護機制,或者有流量限制的時候,我們的調用速度不能超過他的限制,由於我們不能更改第三方系統,所以只有在主調方控制。這個時候,即使流量突發,也必須舍棄。因為消費能力是第三方決定的。
2.4、滑動時間窗
滑動窗口,又稱rolling window。為了解決這個問題,我們引入了滑動窗口算法。如果學過TCP網絡協議的話,那么一定對滑動窗口這個名詞不會陌生。下面這張圖,很好地解釋了滑動窗口算法:

在上圖中,整個紅色的矩形框表示一個時間窗口,在我們的例子中,一個時間窗口就是一分鍾。然后我們將時間窗口進行划分,比如圖中,我們就將滑動窗口 划成了6格,所以每格代表的是10秒鍾。每過10秒鍾,我們的時間窗口就會往右滑動一格。每一個格子都有自己獨立的計數器counter,比如當一個請求 在0:35秒的時候到達,那么0:30~0:39對應的counter就會加1。
那么滑動窗口怎么解決剛才的臨界問題的呢?我們可以看上圖,0:59到達的100個請求會落在灰色的格子中,而1:00到達的請求會落在橘黃色的格 子中。當時間到達1:00時,我們的窗口會往右移動一格,那么此時時間窗口內的總請求數量一共是200個,超過了限定的100個,所以此時能夠檢測出來觸 發了限流。
我再來回顧一下剛才的計數器算法,我們可以發現,計數器算法其實就是滑動窗口算法。只是它沒有對時間窗口做進一步地划分,所以只有1格。
由此可見,當滑動窗口的格子划分的越多,那么滑動窗口的滾動就越平滑,限流的統計就會越精確。
2.5、三色速率標記法
三、限流實現
3.1 RateLimiter簡介(guava的令牌桶實現)
Google開源工具包Guava提供了限流工具類RateLimiter,該類基於令牌桶算法(Token Bucket)來完成限流,非常易於使用.RateLimiter經常用於限制對一些物理資源或者邏輯資源的訪問速率.它支持兩種獲取permits接口,一種是如果拿不到立刻返回false,一種會阻塞等待一段時間看能不能拿到。原理見《guava--RateLimiter源碼分析》
RateLimiter和Java中的信號量(java.util.concurrent.Semaphore)類似,Semaphore通常用於限制並發量.
源碼注釋中的一個例子,比如我們有很多任務需要執行,但是我們不希望每秒超過兩個任務執行,那么我們就可以使用RateLimiter:
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List<Runnable> tasks, Executor executor) {
for (Runnable task : tasks) {
rateLimiter.acquire(); // may wait
executor.execute(task);
}
}
另外一個例子,假如我們會產生一個數據流,然后我們想以每秒5kb的速度發送出去.我們可以每獲取一個令牌(permit)就發送一個byte的數據,這樣我們就可以通過一個每秒5000個令牌的RateLimiter來實現:
final RateLimiter rateLimiter = RateLimiter.create(5000.0);
void submitPacket(byte[] packet) {
rateLimiter.acquire(packet.length);
networkService.send(packet);
}
另外,我們也可以使用非阻塞的形式達到降級運行的目的,即使用非阻塞的tryAcquire()方法:
if(limiter.tryAcquire()) { //未請求到limiter則立即返回false
doSomething();
}else{
doSomethingElse();
}
3.2 基於 redis 的分布式限流
單機版中我們了解到 AtomicInteger、RateLimiter、Semaphore 這幾種解決方案,但它們也僅僅是單機的解決手段,在集群環境下就透心涼了,后面又講述了 Nginx 的限流手段,可它又屬於網關層面的策略之一,並不能解決所有問題。例如供短信接口,你無法保證消費方是否會做好限流控制,所以自己在應用層實現限流還是很有必要的。
導入依賴
在 pom.xml 中添加上 starter-web、starter-aop、starter-data-redis 的依賴即可,習慣了使用 commons-lang3 和 guava 中的一些工具包…
<dependencies> <!-- 默認就內嵌了Tomcat 容器,如需要更換容器也極其簡單--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>21.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>
屬性配置
在 application.properites 資源文件中添加 redis 相關的配置項
spring.redis.host=localhost
spring.redis.port=6379
spring.redis.password=battcn
Limit 注解
創建一個 Limit 注解,不多說注釋都給各位寫齊全了….
package com.johnfnash.learn.springboot.ratelimiter.annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; // 限流 @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Inherited @Documented public @interface Limit { /** * 資源的名稱 * @return */ String name() default ""; /** * 資源的key * * @return */ String key() default ""; /** * Key的prefix * * @return */ String prefix() default ""; /** * 給定的時間段 * 單位秒 * * @return */ int period(); /** * 最多的訪問限制次數 * * @return */ int count(); /** * 類型 * * @return */ LimitType limitType() default LimitType.CUSTOMER; }
package com.johnfnash.learn.springboot.ratelimiter.annotation; // 限制的類型 public enum LimitType { /** * 自定義key */ CUSTOMER, /** * 根據請求者IP */ IP; }
RedisTemplate
默認情況下 spring-boot-data-redis 為我們提供了StringRedisTemplate 但是滿足不了其它類型的轉換,所以還是得自己去定義其它類型的模板….
package com.johnfnash.learn.springboot.ratelimiter; import java.io.Serializable; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration public class RedisLimiterHelper { @Bean public RedisTemplate<String, Serializable> limitRedisTemplate(LettuceConnectionFactory factory) { RedisTemplate<String, Serializable> template = new RedisTemplate<String, Serializable>(); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.setConnectionFactory(factory); return template; } }
Limit 攔截器(AOP)
熟悉 Redis 的朋友都知道它是線程安全的,我們利用它的特性可以實現分布式鎖、分布式限流等組件,限流相比稍微復雜一點,官方雖然沒有提供相應的API,但卻提供了支持 Lua 腳本的功能,我們可以通過編寫 Lua 腳本實現自己的API,同時它是滿足原子性的,下面核心就是調用 execute 方法傳入我們的 Lua 腳本內容,然后通過返回值判斷是否超出我們預期的范圍,超出則給出錯誤提示。
package com.johnfnash.learn.springboot.ratelimiter.aop;
import java.io.Serializable;
import java.lang.reflect.Method;
import javax.servlet.http.HttpServletRequest;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import com.google.common.collect.ImmutableList;
import com.johnfnash.learn.springboot.ratelimiter.annotation.Limit;
import com.johnfnash.learn.springboot.ratelimiter.annotation.LimitType;
@Aspect
@Configuration
public class LimitInterceptor {
private static final Logger logger = LoggerFactory.getLogger(LimitInterceptor.class);;
private final String REDIS_SCRIPT = buildLuaScript();
@Autowired
private RedisTemplate<String, Serializable> redisTemplate;
@Around("execution(public * *(..)) && @annotation(com.johnfnash.learn.springboot.ratelimiter.annotation.Limit)")
public Object interceptor(ProceedingJoinPoint pjp) {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
Limit limitAnno = method.getAnnotation(Limit.class);
LimitType limitType = limitAnno.limitType();
String name = limitAnno.name();
String key = null;
int limitPeriod = limitAnno.period();
int limitCount = limitAnno.count();
switch (limitType) {
case IP:
key = getIpAddress();
break;
case CUSTOMER:
// TODO 如果此處想根據表達式或者一些規則生成 請看 一起來學Spring Boot | 第二十三篇:輕松搞定重復提交(分布式鎖)
key = limitAnno.key();
break;
default:
break;
}
ImmutableList<String> keys = ImmutableList.of(StringUtils.join(limitAnno.prefix(), key));
try {
RedisScript<Number> redisScript = new DefaultRedisScript<Number>(REDIS_SCRIPT, Number.class);
Number count = redisTemplate.execute(redisScript, keys, limitCount, limitPeriod);
logger.info("Access try count is {} for name={} and key = {}", count, name, key);
if(count != null && count.intValue() <= limitCount) {
return pjp.proceed();
} else {
throw new RuntimeException("You have been dragged into the blacklist");
}
} catch (Throwable e) {
if (e instanceof RuntimeException) {
throw new RuntimeException(e.getLocalizedMessage());
}
throw new RuntimeException("server exception");
}
}
/**
* 限流 腳本
*
* @return lua腳本
*/
private String buildLuaScript() {
StringBuilder lua = new StringBuilder();
lua.append("local c")
.append("\nc = redis.call('get', KEYS[1])")
// 調用不超過最大值,則直接返回
.append("\nif c and tonumber(c) > tonumber(ARGV[1]) then")
.append("\nreturn c;")
.append("\nend")
// 執行計算器自加
.append("\nc = redis.call('incr', KEYS[1])")
.append("\nif tonumber(c) == 1 then")
// 從第一次調用開始限流,設置對應鍵值的過期
.append("\nredis.call('expire', KEYS[1], ARGV[2])")
.append("\nend")
.append("\nreturn c;");
return lua.toString();
}
private static final String UNKNOWN = "unknown";
public String getIpAddress() {
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
String ip = request.getHeader("x-forwarded-for");
if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
ip = request.getHeader("Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
ip = request.getHeader("WL-Proxy-Client-IP");
}
if (ip == null || ip.length() == 0 || UNKNOWN.equalsIgnoreCase(ip)) {
ip = request.getRemoteAddr();
}
return ip;
}
}
控制層
在接口上添加 @Limit() 注解,如下代碼會在 Redis 中生成過期時間為 100s 的 key = test 的記錄,特意定義了一個 AtomicInteger 用作測試:
package com.johnfnash.learn.springboot.ratelimiter.controller;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.johnfnash.learn.springboot.ratelimiter.annotation.Limit;
@RestController
public class LimiterController {
private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger();
@Limit(key = "test", period = 100, count = 10)
// 意味著 100S 內最多允許訪問10次
@GetMapping("/test")
public int testLimiter() {
return ATOMIC_INTEGER.incrementAndGet();
}
}
測試
完成准備事項后,啟動 啟動類 自行測試即可,測試手段相信大伙都不陌生了,如 瀏覽器、postman、junit、swagger,此處基於 postman,未達設定的閥值時,正常響應

注:上面的方式是使用計數器的限流方式,無法處理臨界的時候,大量請求的的情況。要解決這個問題,可以使用redis中列表類型的鍵來記錄最近N次訪問的時間,一旦鍵中的元素超過N個,就判斷時間最早的元素距現在的時間是否小於M秒。如果是則表示用戶最近M秒的訪問次數超過了N次;如果不是就將現在的時間加入列表中同時把最早的元素刪除(可以通過腳本功能避免競態條件)。由於需要記錄每次訪問的時間,所以當要限制“M時間最多訪問N次”時,如果“N”的數值較大,此方法會占用較多的存儲空間,實際使用時還需要開發者自己去權衡。
下面的解決思路的實現如下:
/**
* 限流 腳本(處理臨界時間大量請求的情況)
*
* @return lua腳本
*/
private String buildLuaScript2() {
StringBuilder lua = new StringBuilder();
lua.append("local listLen, time")
.append("\nlistLen = redis.call('LLEN', KEYS[1])")
// 不超過最大值,則直接寫入時間
.append("\nif listLen and tonumber(listLen) < tonumber(ARGV[1]) then")
.append("\nlocal a = redis.call('TIME');")
.append("\nredis.call('LPUSH', KEYS[1], a[1]*1000000+a[2])")
.append("\nelse")
// 取出現存的最早的那個時間,和當前時間比較,看是小於時間間隔
.append("\ntime = redis.call('LINDEX', KEYS[1], -1)")
.append("\nlocal a = redis.call('TIME');")
.append("\nif a[1]*1000000+a[2] - time < tonumber(ARGV[2])*1000000 then")
// 訪問頻率超過了限制,返回0表示失敗
.append("\nreturn 0;")
.append("\nelse")
.append("\nredis.call('LPUSH', KEYS[1], a[1]*1000000+a[2])")
.append("\nredis.call('LTRIM', KEYS[1], 0, tonumber(ARGV[1])-1)")
.append("\nend")
.append("\nend")
.append("\nreturn 1;");
return lua.toString();
}
調用地方修改如下:
if(count != null && count.intValue() == 1) {
return pjp.proceed();
} else {
throw new RuntimeException("You have been dragged into the blacklist");
}
補充,最近執行 buildLuaScript2() 中的lua腳本,報錯
Write commands not allowed after non deterministic commands.
這個錯誤的原因大家可以參見這篇文章,大致原因跟redis集群的重放和備份策略有關,相當於我調用TIME操作,會在主從各執行一次,得到的結果肯定會存在差異,這個差異就給最終邏輯正確性帶來了不確定性。在redis 4.0之后引入了redis.replicate_commands()來放開限制。
於是,在 buildLuaScript2 的 lua 腳本最前面加上 “redis.replicate_commands();”,錯誤得以解決。
3.3 Spring Cloud GateWay整合redis內置RequestRateLimiter限流應用
3.4 AOP + Semaphore限流應用
見《AOP+Semaphore實現單節點的接口(方法)限流》
參考: https://github.com/springside/springside4/wiki/Rate-Limiter
https://en.wikipedia.org/wiki/Token_bucket
https://en.wikipedia.org/wiki/Leaky_bucket
https://blog.csdn.net/johnf_nash/article/details/89791808

