1. 引子
在高並發系統開發時有時候需要進行接口保護,防止高並發的情況把系統搞崩,因此需要對一個查詢接口進行限流,主要的目的就是限制單位時間內請求此查詢的次數,例如 1000 次,來保護接口。
2. 計數器 AtomicLong
可以使用Java中的AtomicLong進行限流:
try { if(atomic.incrementAndGet() > 限流數) { //拒絕請求 } //處理請求 } finally { atomic.decrementAndGet(); }
適合對業務無損的服務或者需要過載保護的服務進行限流,如搶購業務,超出了大小要么讓用戶排隊,要么告訴用戶沒貨了,對用戶來說是可以接受的。而一些開放平台也會限制用戶調用某個接口的試用請求量,也可以用這種計數器方式實現。這種方式也是簡單粗暴的限流,沒有平滑處理,需要根據實際情況選擇使用.
3. Semaphore
private static Semaphore apiSemaphore = new Semaphore(100); // 並發訪問控制 boolean concurrentPermission = apiSemaphore.tryAcquire(50, TimeUnit.MILLISECONDS); if (concurrentPermission) { // 允許訪問 } else { // 並發訪問超過系統允許上限,請稍后再試! }
4. RateLimiter限制資源的並發訪問線程數
RateLimiter類似於JDK的信號量Semphore,他用來限制對資源並發訪問的線程數。
RateLimiter limiter = RateLimiter.create(4.0); //每秒不超過4個任務被提交 limiter.acquire(); //請求RateLimiter, 超過permits會被阻塞 executor.submit(runnable); //提交任務
也可以以非阻塞的形式來使用:
if (limiter.tryAcquire()) { //未請求到limiter則立即返回false doSomething(); } else { doSomethingElse(); }
5. 利用緩存,存儲一個計數器,然后用這個計數器來實現限流
限流某個接口的時間窗請求數:即一個時間窗口內的請求數,如想限制某個接口/服務每秒/每分鍾/每天的請求數/調用量。如一些基礎服務會被很多其他系統調用,比如商品詳情頁服務會調用基礎商品服務調用,但是怕因為更新量比較大將基礎服務打掛,這時我們要對每秒/每分鍾的調用量進行限速;一種實現方式如下所示:
static LoadingCache<Long, AtomicLong> count = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build(new CacheLoader<Long, AtomicLong>() { @Override public AtomicLong load(Long o) throws Exception { //System.out.println("Load call!"); return new AtomicLong(0L); } });
我們通過 CacheBuilder 來新建一個 LoadingCache 緩存對象 count,然后設置其有效時間為 1 秒,即每 1 秒鍾刷新一次;緩存中,key 為一個 long 型的時間戳類型,value 是一個計數器,使用原子性的 AtomicLong 保證自增和自減操作的原子性, 每次查詢緩存時如果不能命中,即查詢的時間戳不在緩存中,則重新加載緩存,執行 load 將當前的時間戳的計數值初始化為 0。這樣對於每一秒的時間戳,能計算這一秒內執行的次數,從而達到限流的目的;
測試代碼如下:
public class Counter { static int counter = 0; public static int getCounter() throws Exception{ return counter++; } }
現在我們創建多個線程來執行這個方法:
public class Test { public static void main(String args[]) throws Exception { for (int i = 0; i < 100; i++) { new Thread() { @Override public void run() { try { System.out.println(Counter.getCounter()); } catch (Exception e) { e.printStackTrace(); } } }.start(); } } }
這里的 for 循環執行 100 個進程時間是很快的,那么現在我們要限制每秒只能有 10 個線程來執行 getCounter() 方法,該怎么辦呢,上面講的限流方法就派上用場了:
public class Counter { static LoadingCache<Long, AtomicLong> count = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.SECONDS).build(new CacheLoader<Long, AtomicLong>() { @Override public AtomicLong load(Long o) throws Exception { System.out.println("Load call!"); return new AtomicLong(0L); } }); static long limits = 10; static int counter = 0; public static synchronized int getCounter() throws Exception { while (true) { //獲取當前的時間戳作為key Long currentSeconds = System.currentTimeMillis() / 1000; if (count.get(currentSeconds).getAndIncrement() > limits) { continue; } return counter++; } } }
這樣一來,就可以限制每秒的執行數了。對於每個線程,獲取當前時間戳,如果當前時間 (當前這 1 秒) 內有超過 10 個線程正在執行,那么這個進程一直在這里循環,直到下一秒,或者更靠后的時間,重新加載,執行 load,將新的時間戳的計數值重新為 0。
執行結果可以看出每秒執行 11 個(因為從 0 開始),每一秒之后,load 方法會執行一次;為了更加直觀,我們可以讓每個for循環sleep一段時間:
public class Test { public static void main(String args[]) throws Exception { for (int i = 0; i < 100; i++) { new Thread() { @Override public void run() { try { System.out.println(Counter.getCounter()); } catch (Exception e) { e.printStackTrace(); } } }.start(); Thread.sleep(100); } } }
在上述這樣的情況下,一個線程如果遇到當前時間正在執行的線程超過 limit 值就會一直在 while 循環,這樣會浪費大量的資源,我們在做限流的時候,如果出現這種情況,可以不進行 while 循環,而是直接拋出異常或者返回,來拒絕這次執行(查詢),這樣便可以節省資源。