概要
限流可以認為服務降級的一種,限流就是限制系統的輸入和輸出流量已達到保護系統的目的。一般來說系統的吞吐量是可以被測算的,為了保證系統的穩定運行,一旦達到的需要限制的閾值,就需要限制流量並采取一些措施以完成限制流量的目的。比如:延遲處理,拒絕處理,或者部分拒絕處理等等。
令牌桶算法
令牌桶算法的原理是系統會以一個恆定的速度往桶里放入令牌,而如果請求需要被處理,則需要先從桶里獲取一個令牌,當桶里沒有令牌可取時,則拒絕服務。 當桶滿時,新添加的令牌被丟棄或拒絕。
令牌桶算法是一個存放固定容量令牌(token)的桶,按照固定速率往桶里添加令牌。令牌桶算法基本可以用下面的幾個概念來描述:
- 令牌將按照固定的速率被放入令牌桶中。比如每秒放10個。
- 桶中最多存放b個令牌,當桶滿時,新添加的令牌被丟棄或拒絕。
- 當一個n個字節大小的數據包到達,將從桶中刪除n個令牌,接着數據包被發送到網絡上。
- 如果桶中的令牌不足n個,則不會刪除令牌,且該數據包將被限流(要么丟棄,要么緩沖區等待)。

令牌算法是根據放令牌的速率去控制輸出的速率,也就是上圖的to network的速率。to network我們可以理解為消息的處理程序,執行某段業務或者調用某個RPC。
通俗的理解,令牌桶是一個水桶,而令牌是通過一根水管流到水桶中的水
令牌桶的填滿時間,是由桶的自身容量、令牌漏出速率(桶下面的水管)、超過平均速率的突發流量持續的時間三個方面共同決定的。如果突發流量的時間比較短,令牌桶不會溢出,在通信流上不會受到影響,如果突發流量比較大,時間比較長,那令牌桶就會溢出,多余的通信流就會被限制。
令牌桶算法和漏桶算法的區別
主要區別在於“漏桶算法”能夠強行限制數據的傳輸速率,而“令牌桶算法”在能夠限制數據的平均傳輸速率外,還允許某種程度的突發傳輸。在“令牌桶算法”中,只要令牌桶中存在令牌,那么就允許突發地傳輸數據直到達到用戶配置的門限,因此它適合於具有突發特性的流量。
redis 實現
"local function addToQueue(x, time)\n"
+ " local count = 0\n"
+ " for i = 1, x, 1 do\n"
+ " redis.call('lpush', KEYS[1], time)\n"
+ " count = count + 1\n"
+ " end\n"
+ " return count\n"
+ "end\n"
+ "local result = 0\n"
+ "local timeBase = redis.call('lindex', KEYS[1], tonumber(ARGV[2]) - tonumber(ARGV[1]))\n"
+ "if (timeBase == false) or (tonumber(ARGV[4]) - tonumber(timeBase) > tonumber(ARGV[3])) then\n"
+ " result = result + addToQueue(tonumber(ARGV[1]), tonumber(ARGV[4]))\n"
+ "end\n"
+ "if (timeBase ~= false) then\n"
+ " redis.call('ltrim', KEYS[1], 0, tonumber(ARGV[2]))\n"
+ "end\n"
+ "return result";
使用RateLimiter完成簡單的大流量限流
import com.google.common.util.concurrent.RateLimiter; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * * 有很多個任務,但希望每秒不超過X個,可用此類 */ public class Demo { public static void main(String[] args) { //0.5代表一秒最多多少個 RateLimiter rateLimiter = RateLimiter.create(0.5); List<Runnable> tasks = new ArrayList<Runnable>(); for (int i = 0; i < 10; i++) { tasks.add(new UserRequest(i)); } ExecutorService threadPool = Executors.newCachedThreadPool(); for (Runnable runnable : tasks) { System.out.println("等待時間:" + rateLimiter.acquire()); threadPool.execute(runnable); } } private static class UserRequest implements Runnable { private int id; public UserRequest(int id) { this.id = id; } public void run() { System.out.println(id); } } }