常見的限流算法大致有三種:
- 令牌桶算法
- 漏桶算法
- 計數器算法
網上對令牌桶又細分為固定窗口計數器限流和滑動窗口計數器限流,下面將對這幾種限流方式進行簡單的介紹及代碼實現。
注意:代碼中會考慮並發線程安全問題,非分布式限流
Github地址:重構后的代碼
固定窗口計數器限流
固定窗口計數器限流就是在固定時間內(如10s),只允許固定的請求數訪問(如10個),超過的請求將受到限制。
實現邏輯圖
實現代碼
package com.dfy.ratelimiter.core; import java.util.concurrent.TimeUnit; /** * @description: 計數器限流 * @author: DFY * @time: 2020/4/8 17:02 */ public abstract class CounterLimit { /** 單位時間限制數 */ protected int limitCount; /** 限制時間 */ protected long limitTime; /** 時間單位,默認為秒 */ protected TimeUnit timeUnit; /** 當前是否為受限狀態 */ protected volatile boolean limited; /** * 嘗試將計數器加1,返回為true表示能夠正常訪問接口,false表示訪問受限 * @return */ protected abstract boolean tryCount(); }
package com.dfy.ratelimiter.core; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @description: 固定窗口計數器限流 * @author: DFY * @time: 2020/4/8 15:50 */ public class FixedWindowCounterLimit extends CounterLimit { private static Logger logger = LoggerFactory.getLogger(FixedWindowCounterLimit.class); /** 計數器 */ private AtomicInteger counter = new AtomicInteger(0); public FixedWindowCounterLimit(int limitCount, long limitTime) { this(limitCount, limitTime, TimeUnit.SECONDS); } public FixedWindowCounterLimit(int limitCount, long limitTime, TimeUnit timeUnit) { this.limitCount = limitCount; this.limitTime = limitTime; this.timeUnit = timeUnit; new Thread(new CounterResetThread()).start(); // 開啟計數器清零線程 } public boolean tryCount() { while (true) { if (limited) { return false; } else { int currentCount = counter.get(); if (currentCount == limitCount) { logger.info("限流:{}", LocalDateTime.now().toString()); limited = true; return false; } else { if (counter.compareAndSet(currentCount, currentCount + 1)) return true; } } } } class CounterResetThread implements Runnable { @Override public void run() { while (true) { try { timeUnit.sleep(limitTime); counter.compareAndSet(limitCount, 0); // 計數器清零 limited = false; // 修改當前狀態為不受限 } catch (InterruptedException e) { e.printStackTrace(); } } } } }
使用及測試
啟動項目,連續訪問接口,當在訪問第11次時接口受限,受限時間到后又能正常訪問。
private FixedWindowCounterLimit fixedWindowCounterLimit = new FixedWindowCounterLimit(10, 10); @GetMapping("/hello") public String hello() { if (!fixedWindowCounterLimit.tryCount()) { return "限流!"; } return "hello world!"; }
存在的問題
限流不均勻,如下所示我們規定10S內至多10個訪問量,但2S內實際上有20個訪問量。
滑動窗口計數器限流
固定窗口計數器限流是在固定時間內訪問量受限,滑動窗口計數器限流是在滑動窗口內訪問量受限。
例子
如下是規定5S內不能超過10個訪問量,當已經達到10個訪問量,則訪問受限。使用該方式可以使受限均勻,任意連續的5S內都只能有10個訪問量。
實現代碼
package com.dfy.ratelimiter.core; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @description: 滑動窗口計數器限流 * @author: DFY * @time: 2020/4/8 17:01 */ public class SlidingWindowCounterLimit extends CounterLimit { private static Logger logger = LoggerFactory.getLogger(SlidingWindowCounterLimit.class); /** 格子分布 */ private AtomicInteger[] gridDistribution; /** 當前時間在計數分布的索引 */ private volatile int currentIndex; /** 當前時間之前的滑動窗口計數 */ private int preTotalCount; /** 格子數 */ private int gridNumber; /** 是否正在執行狀態重置 */ private volatile boolean resetting; public SlidingWindowCounterLimit(int gridNumber, int limitCount, long limitTime) { this(gridNumber, limitCount, limitTime, TimeUnit.SECONDS); } public SlidingWindowCounterLimit(int gridNumber, int limitCount, long limitTime, TimeUnit timeUnit) { if (gridNumber <= limitTime) throw new RuntimeException("無法完成限流,gridNumber必須大於limitTime,gridNumber = " + gridNumber + ",limitTime = " + limitTime); this.gridNumber = gridNumber; this.limitCount = limitCount; this.limitTime = limitTime; this.timeUnit = timeUnit; gridDistribution = new AtomicInteger[gridNumber]; for (int i = 0; i < gridNumber; i++) { gridDistribution[i] = new AtomicInteger(0); } new Thread(new CounterResetThread()).start(); } public boolean tryCount() { while (true) { if (limited) { return false; } else { int currentGridCount = gridDistribution[currentIndex].get(); if (preTotalCount + currentGridCount == limitCount) { logger.info("限流:{}", LocalDateTime.now().toString()); limited = true; return false; } if (!resetting && gridDistribution[currentIndex].compareAndSet(currentGridCount, currentGridCount + 1)) return true; } } } class CounterResetThread implements Runnable { @Override public void run() { while (true) { try { timeUnit.sleep(1); // 停止1個時間單位 int indexToReset = currentIndex - limitCount - 1; // 要重置計數的格子索引 if (indexToReset < 0) indexToReset += gridNumber; resetting = true; // 防止在更新狀態時,用戶訪問接口將當前格子的訪問量 + 1 preTotalCount = preTotalCount - gridDistribution[indexToReset].get() + gridDistribution[currentIndex++].get(); // 重置當前時間之前的滑動窗口計數 if (currentIndex == gridNumber) currentIndex = 0; if (preTotalCount + gridDistribution[currentIndex].get() < limitCount) limited = false; // 修改當前狀態為不受限 resetting = false; logger.info("當前格子:{},重置格子:{},重置格子訪問量:{},前窗口格子總數:{}", currentIndex, indexToReset, gridDistribution[indexToReset].get(), preTotalCount); gridDistribution[indexToReset].set(0); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
使用及測試
private SlidingWindowCounterLimit slidingWindowCounterLimit = new SlidingWindowCounterLimit(20, 10, 10); @GetMapping("/hello") public String hello() { if (!slidingWindowCounterLimit.tryCount()) { return "限流!"; } return "hello world!"; }
令牌桶限流
Google guava的RateLimiter提供了基於令牌桶算法的兩種實現,下面代碼只是簡單實現。
package com.dfy.ratelimiter.core; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @description: 令牌桶限流 * @author: DFY * @time: 2020/4/10 15:35 */ public class TokenBucketLimit { private static Logger logger = LoggerFactory.getLogger(TokenBucketLimit.class); /** 給定時間生成令牌數 */ private int genNumber; /** 生成令牌所花費的時間 */ private int genTime; /** 時間單位,默認為秒 */ private TimeUnit timeUnit; /** 最大令牌數 */ private int maxNumber; /** 已存儲的令牌數 */ private AtomicInteger storedNumber; public TokenBucketLimit(int genNumber, int genTime, int maxNumber) { this(genNumber, genTime, TimeUnit.SECONDS, maxNumber); } public TokenBucketLimit(int genNumber, int genTime, TimeUnit timeUnit, int maxNumber) { this.genNumber = genNumber; this.genTime = genTime; this.timeUnit = timeUnit; this.maxNumber = maxNumber; this.storedNumber = new AtomicInteger(0); new Thread(new TokenGenerateThread()).start(); } public boolean tryAcquire() { while (true) { int currentStoredNumber = storedNumber.get(); if (currentStoredNumber == 0) { logger.info("限流:{}", LocalDateTime.now().toString()); return false; } if (storedNumber.compareAndSet(currentStoredNumber, currentStoredNumber - 1)) { return true; } } } class TokenGenerateThread implements Runnable { @Override public void run() { while (true) { if (storedNumber.get() == maxNumber) { logger.info("當前令牌數已滿"); try { timeUnit.sleep(genTime); } catch (InterruptedException e) { e.printStackTrace(); } } else { int old = storedNumber.get(); int newValue = old + genNumber; if (newValue > maxNumber) newValue = maxNumber; storedNumber.compareAndSet(old, newValue); logger.info("生成令牌數:{},當前令牌數:{}", genNumber, newValue); try { timeUnit.sleep(genTime); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }
漏桶算法
漏桶限流的實現與令牌桶限流類似,只是一個是按固定速率增加,一個按固定速率減少。
package com.dfy.ratelimiter.core; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @description: 漏桶限流 * @author: DFY * @time: 2020/4/13 14:47 */ public class LeakyBucketLimit { private static Logger logger = LoggerFactory.getLogger(LeakyBucketLimit.class); /** 桶最大容量 */ private int maxNumber; /** 時間單位,默認為秒 */ private TimeUnit timeUnit; /** 泄露的數量 */ private int leakNumber; /** 泄露的時間 */ private int leakTime; /** 桶中剩余數量 */ private AtomicInteger remainingNumber; public LeakyBucketLimit(int leakNumber, int leakTime, int maxNumber) { this(leakNumber, leakTime, TimeUnit.SECONDS, maxNumber); } public LeakyBucketLimit(int leakNumber, int leakTime, TimeUnit timeUnit, int maxNumber) { this.leakNumber = leakNumber; this.leakTime = leakTime; this.timeUnit = timeUnit; this.maxNumber = maxNumber; this.remainingNumber = new AtomicInteger(0); } public boolean tryAcquire() { while (true) { int currentStoredNumber = remainingNumber.get(); if (currentStoredNumber == maxNumber) { logger.info("限流:{}", LocalDateTime.now().toString()); return false; } if (remainingNumber.compareAndSet(currentStoredNumber, currentStoredNumber + 1)) { return true; } } } class LeakThread implements Runnable { @Override public void run() { while (true) { if (remainingNumber.get() == 0) { logger.info("當前桶已空"); try { timeUnit.sleep(leakTime); } catch (InterruptedException e) { e.printStackTrace(); } } else { int old = remainingNumber.get(); int newValue = old - leakNumber; if (newValue < 0) newValue = 0; remainingNumber.compareAndSet(old, newValue); logger.info("泄露:{},當前:{}", leakNumber, newValue); try { timeUnit.sleep(leakTime); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }