令牌桶(Token Bucket)


概要

  限流可以認為服務降級的一種,限流就是限制系統的輸入和輸出流量已達到保護系統的目的。一般來說系統的吞吐量是可以被測算的,為了保證系統的穩定運行,一旦達到的需要限制的閾值,就需要限制流量並采取一些措施以完成限制流量的目的。比如:延遲處理,拒絕處理,或者部分拒絕處理等等。

 

令牌桶算法

  令牌桶算法的原理是系統會以一個恆定的速度往桶里放入令牌,而如果請求需要被處理,則需要先從桶里獲取一個令牌,當桶里沒有令牌可取時,則拒絕服務。 當桶滿時,新添加的令牌被丟棄或拒絕。

令牌桶算法是一個存放固定容量令牌(token)的桶,按照固定速率往桶里添加令牌。令牌桶算法基本可以用下面的幾個概念來描述:

  • 令牌將按照固定的速率被放入令牌桶中。比如每秒放10個。
  • 桶中最多存放b個令牌,當桶滿時,新添加的令牌被丟棄或拒絕。
  • 當一個n個字節大小的數據包到達,將從桶中刪除n個令牌,接着數據包被發送到網絡上。
  • 如果桶中的令牌不足n個,則不會刪除令牌,且該數據包將被限流(要么丟棄,要么緩沖區等待)。

 

令牌算法是根據放令牌的速率去控制輸出的速率,也就是上圖的to network的速率。to network我們可以理解為消息的處理程序,執行某段業務或者調用某個RPC。

​通俗的理解,令牌桶是一個水桶,而令牌是通過一根水管流到水桶中的水

    ​    ​令牌桶的填滿時間,是由桶的自身容量、令牌漏出速率(桶下面的水管)、超過平均速率的突發流量持續的時間三個方面共同決定的。如果突發流量的時間比較短,令牌桶不會溢出,在通信流上不會受到影響,如果突發流量比較大,時間比較長,那令牌桶就會溢出,多余的通信流就會被限制。

 

​令牌桶算法和漏桶算法的區別

  主要區別在於“漏桶算法”能夠強行限制數據的傳輸速率,而“令牌桶算法”在能夠限制數據的平均傳輸速率外,還允許某種程度的突發傳輸。在“令牌桶算法”中,只要令牌桶中存在令牌,那么就允許突發地傳輸數據直到達到用戶配置的門限,因此它適合於具有突發特性的流量。

 

redis 實現

"local function addToQueue(x, time)\n"
+ " local count = 0\n"
+ " for i = 1, x, 1 do\n"
+ " redis.call('lpush', KEYS[1], time)\n"
+ " count = count + 1\n"
+ " end\n"
+ " return count\n"
+ "end\n"
+ "local result = 0\n"
+ "local timeBase = redis.call('lindex', KEYS[1], tonumber(ARGV[2]) - tonumber(ARGV[1]))\n"
+ "if (timeBase == false) or (tonumber(ARGV[4]) - tonumber(timeBase) > tonumber(ARGV[3])) then\n"
+ " result = result + addToQueue(tonumber(ARGV[1]), tonumber(ARGV[4]))\n"
+ "end\n"
+ "if (timeBase ~= false) then\n"
+ " redis.call('ltrim', KEYS[1], 0, tonumber(ARGV[2]))\n"
+ "end\n"
+ "return result"
;

 

使用RateLimiter完成簡單的大流量限流

    import com.google.common.util.concurrent.RateLimiter;  
      
    import java.util.ArrayList;  
    import java.util.List;  
    import java.util.concurrent.ExecutorService;  
    import java.util.concurrent.Executors;  
      
    /** 
     *
     * 有很多個任務,但希望每秒不超過X個,可用此類 
     */  
    public class Demo {  
      
        public static void main(String[] args) {  
            //0.5代表一秒最多多少個  
            RateLimiter rateLimiter = RateLimiter.create(0.5);  
            List<Runnable> tasks = new ArrayList<Runnable>();  
            for (int i = 0; i < 10; i++) {  
                tasks.add(new UserRequest(i));  
            }  
            ExecutorService threadPool = Executors.newCachedThreadPool();  
            for (Runnable runnable : tasks) {  
                System.out.println("等待時間:" + rateLimiter.acquire());  
                threadPool.execute(runnable);  
            }  
        }  
      
        private static class UserRequest implements Runnable {  
            private int id;  
      
            public UserRequest(int id) {  
                this.id = id;  
            }  
      
            public void run() {  
                System.out.println(id);  
            }  
        }  
      
    }  

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM