轉自:https://www.jianshu.com/p/8f548e469bbe
參考:https://www.jianshu.com/p/5d4fe4b2a726

本次實戰,我們用的是guava的RateLimiter,場景是spring mvc在處理請求時候,從桶中申請令牌,申請到了就成功響應,申請不到時直接返回失敗。
實例
1、添加guava jar包
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency>
2、AccessLimitService.java 限流服務封裝到一個類中AccessLimitService,提供tryAcquire()方法,用來嘗試獲取令牌,返回true表示獲取到
@Service public class AccessLimitService { //每秒只發出5個令牌 RateLimiter rateLimiter = RateLimiter.create(5.0); /** * 嘗試獲取令牌 * @return */ public boolean tryAcquire(){ return rateLimiter.tryAcquire(); } }
3、Controller層每次收到請求的時候都嘗試去獲取令牌,獲取成功和失敗打印不同的信息
@Controller public class HelloController { private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Autowired private AccessLimitService accessLimitService; @RequestMapping("/access") @ResponseBody public String access(){ //嘗試獲取令牌 if(accessLimitService.tryAcquire()){ //模擬業務執行500毫秒 try { Thread.sleep(500); }catch (InterruptedException e){ e.printStackTrace(); } return "aceess success [" + sdf.format(new Date()) + "]"; }else{ return "aceess limit [" + sdf.format(new Date()) + "]"; } } }
4、測試:十個線程並發訪問接口
public class AccessClient { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10); /** * get請求 * @param realUrl * @return */ public static String sendGet(URL realUrl) { String result = ""; BufferedReader in = null; try { // 打開和URL之間的連接 URLConnection connection = realUrl.openConnection(); // 設置通用的請求屬性 connection.setRequestProperty("accept", "*/*"); connection.setRequestProperty("connection", "Keep-Alive"); connection.setRequestProperty("user-agent", "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)"); // 建立實際的連接 connection.connect(); // 定義 BufferedReader輸入流來讀取URL的響應 in = new BufferedReader(new InputStreamReader( connection.getInputStream())); String line; while ((line = in.readLine()) != null) { result += line; } } catch (Exception e) { System.out.println("發送GET請求出現異常!" + e); e.printStackTrace(); } // 使用finally塊來關閉輸入流 finally { try { if (in != null) { in.close(); } } catch (Exception e2) { e2.printStackTrace(); } } return result; } public void access() throws Exception{ final URL url = new URL("http://localhost:8080/guavalimitdemo/access"); for(int i=0;i<10;i++) { fixedThreadPool.submit(new Runnable() { public void run() { System.out.println(sendGet(url)); } }); } fixedThreadPool.shutdown(); fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); } public static void main(String[] args) throws Exception{ AccessClient accessClient = new AccessClient(); accessClient.access(); } }
部分請求由於獲取的令牌可以成功執行,其余請求沒有拿到令牌,我們可以根據實際業務來做區分處理。還有一點要注意,我們通過RateLimiter.create(5.0)配置的是每一秒5枚令牌,但是限流的時候發出的是6枚,改用其他值驗證,也是實際的比配置的大1。
以上就是快速實現限流的實戰過程,此處僅是單進程服務的限流,而實際的分布式服務中會考慮更多因素,會復雜很多。
RateLimiter方法摘要
修飾符和類型 | 方法和描述 |
---|---|
double | acquire() 從RateLimiter獲取一個許可,該方法會被阻塞直到獲取到請求 |
double | acquire(int permits)從RateLimiter獲取指定許可數,該方法會被阻塞直到獲取到請求 |
static RateLimiter | create(double permitsPerSecond)根據指定的穩定吞吐率創建RateLimiter,這里的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少查詢) |
static RateLimiter | create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)根據指定的穩定吞吐率和預熱期來創建RateLimiter,這里的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少個請求量),在這段預熱時間內,RateLimiter每秒分配的許可數會平穩地增長直到預熱期結束時達到其最大速率。(只要存在足夠請求數來使其飽和) |
double | getRate()返回RateLimiter 配置中的穩定速率,該速率單位是每秒多少許可數 |
void | setRate(double permitsPerSecond)更新RateLimite的穩定速率,參數permitsPerSecond 由構造RateLimiter的工廠方法提供。 |
String | toString()返回對象的字符表現形式 |
boolean | tryAcquire()從RateLimiter 獲取許可,如果該許可可以在無延遲下的情況下立即獲取得到的話 |
boolean | tryAcquire(int permits)從RateLimiter 獲取許可數,如果該許可數可以在無延遲下的情況下立即獲取得到的話 |
boolean | tryAcquire(int permits, long timeout, TimeUnit unit)從RateLimiter 獲取指定許可數如果該許可數可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可數的話,那么立即返回false (無需等待) |
boolean | tryAcquire(long timeout, TimeUnit unit)從RateLimiter 獲取許可如果該許可可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可的話,那么立即返回false(無需等待) |
- 舉例來說明如何使用RateLimiter,想象下我們需要處理一個任務列表,但我們不希望每秒的任務提交超過兩個:
//速率是每秒兩個許可 final RateLimiter rateLimiter = RateLimiter.create(2.0); void submitTasks(List tasks, Executor executor) { for (Runnable task : tasks) { rateLimiter.acquire(); // 也許需要等待 executor.execute(task); } }
1、基本算法
1.1 漏桶算法
請求先進入到漏桶里,漏桶以一定的速度出水,當水請求過大會直接溢出,可以看出漏桶算法能強行限制數據的傳輸速率。
缺點:除了要求能夠限制數據的平均傳輸速率外,還要求允許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更為適合。
1.2 令牌桶
令牌桶算法的原理是系統會以一個恆定的速度往桶里放入令牌,而如果請求需要被處理,則需要先從桶里獲取一個令牌,當桶里沒有令牌可取時,則拒絕服務。
2. Guava實現
2.1 簡單的demo
public
class
RateLimiterDemo {
public
static
void
main(String[] args) {
testNoRateLimiter();
testWithRateLimiter();
}
public
static
void
testNoRateLimiter() {
Long start = System.currentTimeMillis();
for
(
int
i =
0
; i <
10
; i++) {
System.out.println(
"call execute.."
+ i);
}
Long end = System.currentTimeMillis();
System.out.println(end - start);
}
public
static
void
testWithRateLimiter() {
Long start = System.currentTimeMillis();
final
RateLimiter limiter = RateLimiter.create(
10.0
);
// 每秒不超過10個任務被提交
for
(
int
i =
0
; i <
10
; i++) {
limiter.acquire();
// 請求RateLimiter, 超過permits會被阻塞
System.out.println(
"call execute.."
+ i);
}
Long end = System.currentTimeMillis();
System.out.println(end - start);
}
}
testNoRateLimiter方法快速打印出來
testWithRateLimiter方法
1
秒打印
10
個
|
2.2 RateLimiter介紹
2.2.1 總體框架
RateLimiter類圖結構
RateLimiter:抽象類,定義了主要的對外接口以及整個流程的模板
SmoothRateLimiter:RateLimiter的實現類,實現了RateLimiter的部分方法
SmoothWarmingUp:SmoothRateLimiter的實現類,漸進模式,令牌生成速度緩慢提升直到維持在一個穩定值
SmoothBursty:SmoothRateLimiter的實現類,穩定模式,令牌生成速度恆定
2.2.2 具體步驟
2.2.2.1 RateLimiter的創建
通過調用RateLimiter的create
接口來創建實例,實際是調用的SmoothBuisty
穩定模式創建的實例。
public
static
RateLimiter create(
double
permitsPerSecond) {
return
create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
}
static
RateLimiter create(
double
permitsPerSecond, SleepingStopwatch stopwatch) {
RateLimiter rateLimiter =
new
SmoothBursty(stopwatch,
1.0
/* maxBurstSeconds */
);
rateLimiter.setRate(permitsPerSecond);
return
rateLimiter;
}
|
SmoothBursty
中的兩個構造參數含義:
- SleepingStopwatch:guava中的一個時鍾類實例,會通過這個來計算時間及令牌
- maxBurstSeconds:官方解釋,在ReteLimiter未使用時,最多保存幾秒的令牌,默認是1
在解析SmoothBursty原理前,重點解釋下SmoothBursty中幾個屬性的含義
/**
* The work (permits) of how many seconds can be saved up if this RateLimiter is unused?
* 在RateLimiter未使用時,最多存儲幾秒的令牌
* */
final
double
maxBurstSeconds;
/**
* The currently stored permits.
* 當前存儲令牌數
*/
double
storedPermits;
/**
* The maximum number of stored permits.
* 最大存儲令牌數 = maxBurstSeconds * stableIntervalMicros(見下文)
*/
double
maxPermits;
/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
* 添加令牌時間間隔 = SECONDS.toMicros(1L) / permitsPerSecond;(1秒/每秒的令牌數)
*/
double
stableIntervalMicros;
/**
* The time when the next request (no matter its size) will be granted. After granting a request,
* this is pushed further in the future. Large requests push this further than small requests.
* 下一次請求可以獲取令牌的起始時間
* 由於RateLimiter允許預消費,上次請求預消費令牌后
* 下次請求需要等待相應的時間到nextFreeTicketMicros時刻才可以獲取令牌
*/
private
long
nextFreeTicketMicros = 0L;
// could be either in the past or future
|
2.2.2.2 關鍵方法
- setRate
public
final
void
setRate(
double
permitsPerSecond) {
checkArgument(
permitsPerSecond >
0.0
&& !Double.isNaN(permitsPerSecond),
"rate must be positive"
);
synchronized
(mutex()) {
doSetRate(permitsPerSecond, stopwatch.readMicros());
}
}
|
通過這個接口設置令牌通每秒生成令牌的數量,內部時間通過調用SmoothRateLimiter
的doSetRate
來實現
- doSetRate
@Override
final
void
doSetRate(
double
permitsPerSecond,
long
nowMicros) {
resync(nowMicros);
double
stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this
.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
|
這里先通過調用resync
生成令牌以及更新下一期令牌生成時間,然后更新stableIntervalMicros,最后又調用了SmoothBursty
的doSetRate
- resync
/**
* Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
* 基於當前時間,更新下一次請求令牌的時間,以及當前存儲的令牌(可以理解為生成令牌)
*/
void
resync(
long
nowMicros) {
// if nextFreeTicket is in the past, resync to now
if
(nowMicros > nextFreeTicketMicros) {
double
newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
|
延遲計算,如上resync
函數。該函數會在每次獲取令牌之前調用,其實現思路為,若當前時間晚於nextFreeTicketMicros,則計算該段時間內可以生成多少令牌,將生成的令牌加入令牌桶中並更新數據。這樣一來,只需要在獲取令牌時計算一次即可。
- SmoothBursty的doSetRate
@Override
void
doSetRate(
double
permitsPerSecond,
double
stableIntervalMicros) {
double
oldMaxPermits =
this
.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if
(oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
// Double.POSITIVE_INFINITY 代表無窮啊
storedPermits = maxPermits;
}
else
{
storedPermits =
(oldMaxPermits ==
0.0
)
?
0.0
// initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
|
桶中可存放的最大令牌數由maxBurstSeconds計算而來,其含義為最大存儲maxBurstSeconds秒生成的令牌。
該參數的作用在於,可以更為靈活地控制流量。如,某些接口限制為300次/20秒,某些接口限制為50次/45秒等。也就是流量不局限於qps
2.2.2.3 RateLimiter幾個常用接口分析
理解RateLimiter暴露出來的接口
@CanIgnoreReturnValue
public
double
acquire() {
return
acquire(
1
);
}
/**
* 獲取令牌,返回阻塞的時間
**/
@CanIgnoreReturnValue
public
double
acquire(
int
permits) {
long
microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return
1.0
* microsToWait / SECONDS.toMicros(1L);
}
final
long
reserve(
int
permits) {
checkPermits(permits);
synchronized
(mutex()) {
return
reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
|
acquire
函數主要用於獲取permits個令牌,並計算需要等待多長時間,進而掛起等待,並將該值返回,主要通過reserve
返回需要等待的時間,reserve
中通過調用reserveAndGetWaitLength
獲取等待時間
/**
* Reserves next ticket and returns the wait time that the caller must wait for.
*
* @return the required wait time, never negative
*/
final
long
reserveAndGetWaitLength(
int
permits,
long
nowMicros) {
long
momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return
max(momentAvailable - nowMicros,
0
);
}
|
最后調用了reserveEarliestAvailable
@Override
final
long
reserveEarliestAvailable(
int
requiredPermits,
long
nowMicros) {
resync(nowMicros);
long
returnValue = nextFreeTicketMicros;
double
storedPermitsToSpend = min(requiredPermits,
this
.storedPermits);
double
freshPermits = requiredPermits - storedPermitsToSpend;
long
waitMicros =
storedPermitsToWaitTime(
this
.storedPermits, storedPermitsToSpend)
+ (
long
) (freshPermits * stableIntervalMicros);
this
.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
this
.storedPermits -= storedPermitsToSpend;
return
returnValue;
}
|
首先通過resync生成令牌以及同步nextFreeTicketMicros時間戳,
freshPermits從令牌桶中獲取令牌后還需要的令牌數量,
通過storedPermitsToWaitTime計算出獲取freshPermits還需要等待的時間,
在穩定模式中,這里就是(long) (freshPermits * stableIntervalMicros) ,
然后更新nextFreeTicketMicros以及storedPermits,這次獲取令牌需要的等待到的時間點,
reserveAndGetWaitLength返回需要等待的時間間隔。
從`reserveEarliestAvailable`可以看出RateLimiter的預消費原理,
以及獲取令牌的等待時間時間原理(可以解釋示例結果),
再獲取令牌不足時,並沒有等待到令牌全部生成,
而是更新了下次獲取令牌時的nextFreeTicketMicros,
從而影響的是下次獲取令牌的等待時間。
`reserve`這里返回等待時間后,
`acquire`通過調用`stopwatch.sleepMicrosUninterruptibly(microsToWait);`
進行sleep操作,這里不同於Thread.sleep(), 這個函數的sleep是uninterruptibly的,內部實現:
public
static
void
sleepUninterruptibly(
long
sleepFor, TimeUnit unit) {
//sleep 阻塞線程 內部通過Thread.sleep()
boolean
interrupted =
false
;
try
{
long
remainingNanos = unit.toNanos(sleepFor);
long
end = System.nanoTime() + remainingNanos;
while
(
true
) {
try
{
// TimeUnit.sleep() treats negative timeouts just like zero.
NANOSECONDS.sleep(remainingNanos);
return
;
}
catch
(InterruptedException e) {
interrupted =
true
;
remainingNanos = end - System.nanoTime();
//如果被interrupt可以繼續,更新sleep時間,循環繼續sleep
}
}
}
finally
{
if
(interrupted) {
Thread.currentThread().interrupt();
//如果被打斷過,sleep過后再真正中斷線程
}
}
}
|
sleep之后,`acquire`返回sleep的時間,阻塞結束,獲取到令牌。
public
boolean
tryAcquire(
int
permits) {
return
tryAcquire(permits,
0
, MICROSECONDS);
}
public
boolean
tryAcquire() {
return
tryAcquire(
1
,
0
, MICROSECONDS);
}
public
boolean
tryAcquire(
int
permits,
long
timeout, TimeUnit unit) {
long
timeoutMicros = max(unit.toMicros(timeout),
0
);
checkPermits(permits);
long
microsToWait;
synchronized
(mutex()) {
long
nowMicros = stopwatch.readMicros();
if
(!canAcquire(nowMicros, timeoutMicros)) {
return
false
;
}
else
{
microsToWait = reserveAndGetWaitLength(permits, nowMicros);
}
}
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return
true
;
}
private
boolean
canAcquire(
long
nowMicros,
long
timeoutMicros) {
return
queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}
@Override
final
long
queryEarliestAvailable(
long
nowMicros) {
return
nextFreeTicketMicros;
}
|
tryAcquire
函數可以嘗試在timeout時間內獲取令牌,如果可以則掛起等待相應時間並返回true,否則立即返回falsecanAcquire
用於判斷timeout時間內是否可以獲取令牌,通過判斷當前時間+超時時間是否大於nextFreeTicketMicros 來決定是否能夠拿到足夠的令牌數,如果可以獲取到,則過程同acquire,線程sleep等待,如果通過canAcquire
在此超時時間內不能回去到令牌,則可以快速返回,不需要等待timeout后才知道能否獲取到令牌。
2.2.3 方法摘要
修飾符和類型
|
方法和描述
|
---|---|
修飾符和類型
|
方法和描述
|
double | acquire() 從RateLimiter獲取一個許可,該方法會被阻塞直到獲取到請求 |
double | acquire(int permits) 從RateLimiter獲取指定許可數,該方法會被阻塞直到獲取到請求 |
static RateLimiter | create(double permitsPerSecond) 根據指定的穩定吞吐率創建RateLimiter,這里的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少查詢) |
static RateLimiter | create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) 根據指定的穩定吞吐率和預熱期來創建RateLimiter,這里的吞吐率是指每秒多少許可數(通常是指QPS,每秒多少個請求量),在這段預熱時間內,RateLimiter每秒分配的許可數會平穩地增長直到預熱期結束時達到其最大速率。(只要存在足夠請求數來使其飽和) |
double | getRate() 返回RateLimiter 配置中的穩定速率,該速率單位是每秒多少許可數 |
void | setRate(double permitsPerSecond) 更新RateLimite的穩定速率,參數permitsPerSecond 由構造RateLimiter的工廠方法提供。 |
String | toString() 返回對象的字符表現形式 |
boolean | tryAcquire() 從RateLimiter 獲取許可,如果該許可可以在無延遲下的情況下立即獲取得到的話 |
boolean | tryAcquire(int permits) 從RateLimiter 獲取許可數,如果該許可數可以在無延遲下的情況下立即獲取得到的話 |
boolean | tryAcquire(int permits, long timeout, TimeUnit unit) 從RateLimiter 獲取指定許可數如果該許可數可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可數的話,那么立即返回false (無需等待) |
boolean | tryAcquire(long timeout, TimeUnit unit) 從RateLimiter 獲取許可如果該許可可以在不超過timeout的時間內獲取得到的話,或者如果無法在timeout 過期之前獲取得到許可的話,那么立即返回false(無需等待) |