先來個事例實現:
RateLimiter是Guava的一個限流組件,我這邊的系統就有用到這個限流組件,使用起來十分方便。
引入pom依賴:
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>20.0</version> </dependency>
RateLimiter它是基於令牌桶算法的,API非常簡單,看以下的Demo:
public static void main(String[] args) { //線程池 ExecutorService exec = Executors.newCachedThreadPool(); //速率是每秒只有3個許可 final RateLimiter rateLimiter = RateLimiter.create(3.0); for (int i = 0; i < 100; i++) { final int no = i; Runnable runnable = new Runnable() { @Override public void run() { try { //獲取許可 rateLimiter.acquire(); System.out.println("Accessing: " + no + ",time:" + new SimpleDateFormat("yy-MM-dd HH:mm:ss").format(new Date())); } catch (Exception e) { e.printStackTrace(); } } }; //執行線程 exec.execute(runnable); } //退出線程池 exec.shutdown(); }
我們可以從結果看出,每秒只能執行三個:
RateLimiter使用的是一種叫令牌桶的流控算法,RateLimiter會按照一定的頻率往桶里扔令牌,線程拿到令牌才能執行,比如你希望自己的應用程序QPS不要超過1000,那么RateLimiter設置1000的速率后,就會每秒往桶里扔1000個令牌。
RateLimiter經常用於限制對一些物理資源或者邏輯資源的訪問速率。與Semaphore 相比,Semaphore 限制了並發訪問的數量而不是使用速率。
通過設置許可證的速率來定義RateLimiter。在默認配置下,許可證會在固定的速率下被分配,速率單位是每秒多少個許可證。為了確保維護配置的速率,許可會被平穩地分配,許可之間的延遲會做調整。
可能存在配置一個擁有預熱期的RateLimiter 的情況,在這段時間內,每秒分配的許可數會穩定地增長直到達到穩定的速率。
舉例來說明如何使用RateLimiter,想象下我們需要處理一個任務列表,但我們不希望每秒的任務提交超過兩個:
//速率是每秒兩個許可
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List tasks, Executor executor) { for (Runnable task : tasks) { rateLimiter.acquire(); // 也許需要等待 executor.execute(task); } }
再舉另外一個例子,想象下我們制造了一個數據流,並希望以每秒5kb的速率處理它。可以通過要求每個字節代表一個許可,然后指定每秒5000個許可來完成:
// 每秒5000個許可
final RateLimiter rateLimiter = RateLimiter.create(5000.0);
void submitPacket(byte[] packet) { rateLimiter.acquire(packet.length); networkService.send(packet); }
有一點很重要,那就是請求的許可數從來不會影響到請求本身的限制(調用acquire(1) 和調用acquire(1000) 將得到相同的限制效果,如果存在這樣的調用的話),但會影響下一次請求的限制,也就是說,如果一個高開銷的任務抵達一個空閑的RateLimiter,它會被馬上許可,但是下一個請求會經歷額外的限制,從而來償付高開銷任務。注意:RateLimiter 並不提供公平性的保證。
方法摘要
修飾符和類型 | 方法和描述 |
---|---|
double | acquire() 從RateLimiter獲取一個許可,該方法會被阻塞直到獲取到請求 |
double | acquire(int permits) 從RateLimiter獲取指定許可數,該方法會被阻塞直到獲取到請求 |
static RateLimiter | create(double permitsPerSecond) 根據指定的穩定吞吐率創建RateLimiter,這里的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少查詢) |
static RateLimiter | create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) 根據指定的穩定吞吐率和預熱期來創建RateLimiter,這里的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少個請求量),在這段預熱時間內,RateLimiter每秒分配的許可數會平穩地增長直到預熱期結束時達到其最大速率。(只要存在足夠請求數來使其飽和) |
double | getRate() 返回RateLimiter 配置中的穩定速率,該速率單位是每秒多少許可數 |
void | setRate(double permitsPerSecond) 更新RateLimite的穩定速率,參數permitsPerSecond 由構造RateLimiter的工廠方法提供。 |
String | toString() 返回對象的字符表現形式 |
boolean | tryAcquire() 從RateLimiter 獲取許可,如果該許可可以在無延遲下的情況下立即獲取得到的話 |
boolean | tryAcquire(int permits) 從RateLimiter 獲取許可數,如果該許可數可以在無延遲下的情況下立即獲取得到的話 |
boolean | tryAcquire(int permits, long timeout, TimeUnit unit) 從RateLimiter 獲取指定許可數如果該許可數可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可數的話,那么立即返回false (無需等待) |
boolean | tryAcquire(long timeout, TimeUnit unit) 從RateLimiter 獲取許可如果該許可可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可的話,那么立即返回false(無需等待) |
令牌桶算法
令牌桶算法是網絡流量整形(Traffic Shaping)和限流(Rate Limiting)中最常使用的一種算法,它可用於控制發送到網絡上數據的數量並允許突發數據的發送。
從某種意義上來說,令牌桶算法是對漏桶算法的一種改進,主要在於令牌桶算法能夠在限制調用的平均速率的同時還允許一定程度的突發調用,來看下令牌桶算法的實現原理:
整個的過程是這樣的:
- 系統以恆定的速率產生令牌,然后將令牌放入令牌桶中
- 令牌桶有一個容量,當令牌桶滿了的時候,再向其中放入的令牌就會被丟棄
- 每次一個請求過來,需要從令牌桶中獲取一個令牌,假設有令牌,那么提供服務;假設沒有令牌,那么拒絕服務
那么,我們再看一下,為什么令牌桶算法可以防止一定程度的突發流量呢?可以這么理解,假設我們想要的速率是1000QPS,那么往桶中放令牌的速度就是1000個/s,假設第1秒只有800個請求,那意味着第2秒可以容許1200個請求,這就是一定程度突發流量的意思,反之我們看漏桶算法,第一秒只有800個請求,那么全部放過,第二秒這1200個請求將會被打回200個。
注意上面多次提到一定程度這四個字,這也是我認為令牌桶算法最需要注意的一個點。假設還是1000QPS的速率,那么5秒鍾放1000個令牌,第1秒鍾800個請求過來,第2~4秒沒有請求,那么按照令牌桶算法,第5秒鍾可以接受4200個請求,但是實際上這已經遠遠超出了系統的承載能力,因此使用令牌桶算法特別注意設置桶中令牌的上限即可。
總而言之,作為對漏桶算法的改進,令牌桶算法在限流場景下被使用更加廣泛。
RateLimiter使用
上面說了令牌桶算法在限流場景下被使用更加廣泛,接下來我們看一下代碼示例,模擬一下每秒最多過五個請求:
public class RateLimiterTest { private static final SimpleDateFormat FORMATTER = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static final int THREAD_COUNT = 25; @Test public void testRateLimiter1() { RateLimiter rateLimiter = RateLimiter.create(5); Thread[] ts = new Thread[THREAD_COUNT]; for (int i = 0; i < THREAD_COUNT; i++) { ts[i] = new Thread(new RateLimiterThread(rateLimiter), "RateLimiterThread-" + i); } for (int i = 0; i < THREAD_COUNT; i++) { ts[i].start(); } for (;;); } public class RateLimiterThread implements Runnable { private RateLimiter rateLimiter; public RateLimiterThread(RateLimiter rateLimiter) { this.rateLimiter = rateLimiter; } @Override public void run() { rateLimiter.acquire(1); System.out.println(Thread.currentThread().getName() + "獲取到了令牌,時間 = " + FORMATTER.format(new Date())); } } }
利用RateLimiter.create這個構造方法可以指定每秒向桶中放幾個令牌,比方說上面的代碼create(5),那么每秒放置5個令牌,即200ms會向令牌桶中放置一個令牌。這邊代碼寫了一條線程模擬實際場景,拿到令牌那么就能執行下面邏輯,看一下代碼執行結果:
RateLimiterThread-0獲取到了令牌,時間 = 2019-08-25 20:58:53 RateLimiterThread-23獲取到了令牌,時間 = 2019-08-25 20:58:54 RateLimiterThread-21獲取到了令牌,時間 = 2019-08-25 20:58:54 RateLimiterThread-19獲取到了令牌,時間 = 2019-08-25 20:58:54 RateLimiterThread-17獲取到了令牌,時間 = 2019-08-25 20:58:54 RateLimiterThread-13獲取到了令牌,時間 = 2019-08-25 20:58:54 RateLimiterThread-9獲取到了令牌,時間 = 2019-08-25 20:58:55 RateLimiterThread-15獲取到了令牌,時間 = 2019-08-25 20:58:55 RateLimiterThread-5獲取到了令牌,時間 = 2019-08-25 20:58:55 RateLimiterThread-1獲取到了令牌,時間 = 2019-08-25 20:58:55 RateLimiterThread-11獲取到了令牌,時間 = 2019-08-25 20:58:55 RateLimiterThread-7獲取到了令牌,時間 = 2019-08-25 20:58:56 RateLimiterThread-3獲取到了令牌,時間 = 2019-08-25 20:58:56 RateLimiterThread-4獲取到了令牌,時間 = 2019-08-25 20:58:56 RateLimiterThread-8獲取到了令牌,時間 = 2019-08-25 20:58:56 RateLimiterThread-12獲取到了令牌,時間 = 2019-08-25 20:58:56 RateLimiterThread-16獲取到了令牌,時間 = 2019-08-25 20:58:57 RateLimiterThread-20獲取到了令牌,時間 = 2019-08-25 20:58:57 RateLimiterThread-24獲取到了令牌,時間 = 2019-08-25 20:58:57 RateLimiterThread-2獲取到了令牌,時間 = 2019-08-25 20:58:57 RateLimiterThread-6獲取到了令牌,時間 = 2019-08-25 20:58:57 RateLimiterThread-10獲取到了令牌,時間 = 2019-08-25 20:58:58 RateLimiterThread-14獲取到了令牌,時間 = 2019-08-25 20:58:58 RateLimiterThread-18獲取到了令牌,時間 = 2019-08-25 20:58:58 RateLimiterThread-22獲取到了令牌,時間 = 2019-08-25 20:58:58
看到,非常標准,在每次消耗一個令牌的情況下,RateLimiter可以保證每一秒內最多只有5個線程獲取到令牌,使用這種方式可以很好的做單機對請求的QPS數控制。
至於為什么2019-08-25 20:58:53這個時間點只有1條線程獲取到了令牌而不是有5條線程獲取到令牌,因為RateLimiter是按照秒計數的,可能第一個線程是2019-08-25 20:58:53.999秒來的,算在2019-08-25 20:58:53這一秒內;下一個線程2019-08-25 20:58:54.001秒來,自然就算到2019-08-25 20:58:54這一秒去了。
上面的寫法是RateLimiter最常用的寫法,注意:
- acquire是阻塞的且會一直等待到獲取令牌為止,它有一個返回值為double型,意思是從阻塞開始到獲取到令牌的等待時間,單位為秒
- tryAcquire是另外一個方法,它可以指定超時時間,返回值為boolean型,即假設線程等待了指定時間后仍然沒有獲取到令牌,那么就會返回給客戶端false,客戶端根據自身情況是打回給前台錯誤還是定時重試
RateLimiter預消費
處理請求,每次來一個請求就acquire一把是RateLimiter最常見的用法,但是我們看acquire還有個acquire(int permits)的重載方法,即允許每次獲取多個令牌數。這也是有可能的,請求數是一個大維度每次扣減1,有可能服務器按照字節數來進行限流,例如每秒最多處理10000字節的數據,那每次扣減的就不止1了。
接着我們再看一段代碼示例:
@Test public void testRateLimiter2() { RateLimiter rateLimiter = RateLimiter.create(1); System.out.println("獲取1個令牌開始,時間為" + FORMATTER.format(new Date())); double cost = rateLimiter.acquire(1); System.out.println("獲取1個令牌結束,時間為" + FORMATTER.format(new Date()) + ", 耗時" + cost + "ms"); System.out.println("獲取5個令牌開始,時間為" + FORMATTER.format(new Date())); cost = rateLimiter.acquire(5); System.out.println("獲取5個令牌結束,時間為" + FORMATTER.format(new Date()) + ", 耗時" + cost + "ms"); System.out.println("獲取3個令牌開始,時間為" + FORMATTER.format(new Date())); cost = rateLimiter.acquire(3); System.out.println("獲取3個令牌結束,時間為" + FORMATTER.format(new Date()) + ", 耗時" + cost + "ms"); }
代碼運行結果為:
獲取1個令牌開始,時間為2019-08-25 21:21:09.973 獲取1個令牌結束,時間為2019-08-25 21:21:09.976, 耗時0.0ms 獲取5個令牌開始,時間為2019-08-25 21:21:09.976 獲取5個令牌結束,時間為2019-08-25 21:21:10.974, 耗時0.997237ms 獲取3個令牌開始,時間為2019-08-25 21:21:10.976 獲取3個令牌結束,時間為2019-08-25 21:21:15.974, 耗時4.996529ms
看到這就是標題所說的預消費能力,也是RateLimiter中允許一定程度突發流量的實現方式。第二次需要獲取5個令牌,指定的是每秒放1個令牌到桶中,我們發現實際上並沒有等5秒鍾等桶中積累了5個令牌才能讓第二次acquire成功,而是直接等了1秒鍾就成功了。我們可以捋一捋這個邏輯:
- 第一次請求過來需要獲取1個令牌,直接拿到
- RateLimiter在1秒鍾后放一個令牌,第一次請求預支的1個令牌還上了
- 1秒鍾之后第二次請求過來需要獲得5個令牌,直接拿到
- RateLimiter在花了5秒鍾放了5個令牌,還上了第二次請求預支的5個令牌
- 第三個請求在5秒鍾之后拿到3個令牌
也就是說,前面的請求如果流量大於每秒放置令牌的數量,那么允許處理,但是帶來的結果就是后面的請求延后處理,從而在整體上達到一個平衡整體處理速率的效果。
突發流量的處理,在令牌桶算法中有兩種方式,一種是有足夠的令牌才能消費,一種是先消費后還令牌。后者就像我們0首付買車似的,30萬的車很少有等攢到30萬才全款買的,先簽了相關合同把車子給你,然后貸款慢慢還,這樣就爽了。RateLimiter也是同樣的道理,先讓請求得到處理,再慢慢還上預支的令牌,客戶端同樣也爽了,否則我假設預支60個令牌,1分鍾之后才能處理我的請求,不合理也不人性化。
RateLimiter的限制
特別注意RateLimiter是單機的,也就是說它無法跨JVM使用,設置的1000QPS,那也在單機中保證平均1000QPS的流量。
假設集群中部署了10台服務器,想要保證集群1000QPS的接口調用量,那么RateLimiter就不適用了,集群流控最常見的方法是使用強大的Redis:
- 一種是固定窗口的計數,例如當前是2019/8/26 20:05:00,就往這個"2019/8/26 20:05:00"這個key進行incr,當前是2019/8/26 20:05:01,就往"2019/8/26 20:05:01"這個key進行incr,incr后的結果只要大於我們設定的值,那么就打回去,小於就相當於獲取到了執行權限
- 一種是結合lua腳本,實現分布式的令牌桶算法,網上實現還是比較多的,可以參考https://blog.csdn.net/sunlihuo/article/details/79700225這篇文章
總得來說,集群限流的實現也比較簡單。
總結
本文主要寫了常見的兩種限流算法漏桶算法與令牌桶算法,並且演示了Guava中RateLimiter的實現,相信看到這里的朋友一定都懂了,恭喜你們!
令牌桶算法是最常用的限流算法,它最大的特點就是容許一定程度的突發流量。
漏桶算法同樣也有自己的應用之處,例如Nginx的限流模塊就是基於漏桶算法的,它最大的特點就是強行限制流量按照指定的比例下發,適合那種對流量有絕對要求的場景,就是流量可以容許在我指定的值之下,可以被多次打回,但是無論如何決不能超過指定的。
雖然令牌桶算法相對更好,但是還是我經常說的,使用哪種完全就看大家各自的場景,適合的才是最好的。
Guava RateLimiter + AOP注解實現單機限流
1、基於springboot項目pom.xml添加如下依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> </dependency>
2、創建自定義運行時注解:
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface LxRateLimit { /** * * @return */ String value() default ""; /** * 每秒向桶中放入令牌的數量 默認最大即不做限流 * @return */ double perSecond() default Double.MAX_VALUE; /** * 獲取令牌的等待時間 默認0 * @return */ int timeOut() default 0; /** * 超時時間單位 * @return */ TimeUnit timeOutUnit() default TimeUnit.MILLISECONDS; }
3、創建aop切面進行環繞通知:
@Aspect @Component public class LxRateLimitAspect { private final static Logger logger = LoggerFactory.getLogger(LxRateLimitAspect.class); private RateLimiter rateLimiter = RateLimiter.create(Double.MAX_VALUE); /** * 定義切點 * 1、通過掃包切入 * 2、帶有指定注解切入 */ // @Pointcut("execution(public * com.ycn.springcloud.*.*(..))") @Pointcut("@annotation(com.ycn.springcloud.annotation.LxRateLimit)") public void checkPointcut() { } @ResponseBody @Around(value = "checkPointcut()") public Object aroundNotice(ProceedingJoinPoint pjp) throws Throwable { logger.info("攔截到了{}方法...", pjp.getSignature().getName()); Signature signature = pjp.getSignature(); MethodSignature methodSignature = (MethodSignature)signature; //獲取目標方法 Method targetMethod = methodSignature.getMethod(); if (targetMethod.isAnnotationPresent(LxRateLimit.class)) { //獲取目標方法的@LxRateLimit注解 LxRateLimit lxRateLimit = targetMethod.getAnnotation(LxRateLimit.class); rateLimiter.setRate(lxRateLimit.perSecond()); if (!rateLimiter.tryAcquire(lxRateLimit.timeOut(), lxRateLimit.timeOutUnit())) return "服務器繁忙,請稍后再試!"; } return pjp.proceed(); } }
在ctroller中使用自定義注解
@RequestMapping("/testAnnotation") @LxRateLimit(perSecond = 1.0, timeOut = 500) public String testAnnotation() { return "get token success"; }
當接口QPS大於1的時候就會返回 “服務器繁忙,請稍后再試!”
Redis+Lua實現限流
分布式限流最關鍵的是要將限流服務做成原子化,而解決方案可以使使用redis+lua或者nginx+lua技術進行實現,通過這兩種技術可以實現的高並發和高性能。
首先我們來使用redis+lua實現時間窗內某個接口的請求數限流(存在突刺效應),實現了該功能后可以改造為限流總並發/請求數和限制總資源數。Lua本身就是一種編程語言,也可以使用它實現復雜的令牌桶或漏桶算法。
如下操作因是在一個lua腳本中(相當於原子操作),又因Redis是單線程模型,因此是線程安全的。
相比Redis事務來說,Lua腳本有以下優點
減少網絡開銷: 不使用 Lua 的代碼需要向 Redis 發送多次請求, 而腳本只需一次即可, 減少網絡傳輸;
原子操作: Redis 將整個腳本作為一個原子執行, 無需擔心並發, 也就無需事務;
復用: 腳本會永久保存 Redis 中, 其他客戶端可繼續使用.
Lua腳本
local key = KEYS[1] --限流KEY(一秒一個) local limit = tonumber(ARGV[1]) --限流大小 local current = tonumber(redis.call('get', key) or "0") if current + 1 > limit then --如果超出限流大小 return 0 else --請求數+1,並設置2秒過期 redis.call("INCRBY", key,"1") redis.call("expire", key,"2") end return 1
java代碼
import org.apache.commons.io.FileUtils; import redis.clients.jedis.Jedis; import java.io.File; import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; public class RedisLimitRateWithLUA { public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(1); for (int i = 0; i < 7; i++) { new Thread(new Runnable() { public void run() { try { latch.await(); System.out.println("請求是否被執行:"+accquire()); } catch (Exception e) { e.printStackTrace(); } } }).start(); } latch.countDown(); } public static boolean accquire() throws IOException, URISyntaxException { Jedis jedis = new Jedis("127.0.0.1"); File luaFile = new File(RedisLimitRateWithLUA.class.getResource("/").toURI().getPath() + "limit.lua"); String luaScript = FileUtils.readFileToString(luaFile); String key = "ip:" + System.currentTimeMillis()/1000; // 當前秒 String limit = "5"; // 最大限制 List<String> keys = new ArrayList<String>(); keys.add(key); List<String> args = new ArrayList<String>(); args.add(limit); Long result = (Long)(jedis.eval(luaScript, keys, args)); // 執行lua腳本,傳入參數 return result == 1; } }
運行結果
請求是否被執行:true 請求是否被執行:true 請求是否被執行:false 請求是否被執行:true 請求是否被執行:true 請求是否被執行:true 請求是否被執行:false
從結果可看出只有5個請求成功執行
IP限流Lua腳本
local key = "rate.limit:" .. KEYS[1] local limit = tonumber(ARGV[1]) local expire_time = ARGV[2] local is_exists = redis.call("EXISTS", key) if is_exists == 1 then if redis.call("INCR", key) > limit then return 0 else return 1 end else redis.call("SET", key, 1) redis.call("EXPIRE", key, expire_time) return 1 end