1、最近在寫一個分布式服務的框架,對於分布式服務的框架來說,除了遠程調用,還要進行服務的治理
當進行促銷的時候,所有的資源都用來完成重要的業務,就比如雙11的時候,主要的業務就是讓用戶查詢商品,以及購買支付,
此時,金幣查詢、積分查詢等業務就是次要的,因此要對這些服務進行服務的降級,典型的服務降級算法是采用令牌桶算法,
因此在寫框架的時候去研究了一下令牌桶算法
2、在實施QOS策略時,可以將用戶的數據限制在特定的帶寬,當用戶的流量超過額定帶寬時,超過的帶寬將采取其它方式來處理。
要衡量流量是否超過額定的帶寬,網絡設備並不是采用單純的數字加減法來決定的,也就是說,比如帶寬為100K,而用戶發來
的流量為110K,網絡設備並不是靠110K減去100K等於10K,就認為用戶超過流量10K。網絡設備衡量流量是否超過額定帶寬,
需要使用令牌桶算法來計算。下面詳細介紹令牌桶算法機制:
當網絡設備衡量流量是否超過額定帶寬時,需要查看令牌桶,而令牌桶中會放置一定數量的令牌,一個令牌允許接口發送
或接收1bit數據(有時是1 Byte數據),當接口通過1bit數據后,同時也要從桶中移除一個令牌。當桶里沒有令牌的時候,任何流
量都被視為超過額定帶寬,只有當桶中有令牌時,數據才可以通過接口。令牌桶中的令牌不僅僅可以被移除,同樣也可以往里添加,
所以為了保證接口隨時有數據通過,就必須不停地往桶里加令牌,由此可見,往桶里加令牌的速度,就決定了數據通過接口的速度。
因此,我們通過控制往令牌桶里加令牌的速度從而控制用戶流量的帶寬。而設置的這個用戶傳輸數據的速率被稱為承諾信息速率(CIR),
通常以秒為單位。比如我們設置用戶的帶寬為1000 bit每秒,只要保證每秒鍾往桶里添加1000個令牌即可。
3、舉例:
將CIR設置為8000 bit/s,那么就必須每秒將8000個令牌放入桶中,當接口有數據通過時,就從桶中移除相應的令牌,每通過1 bit,
就從桶中移除1個令牌。當桶里沒有令牌的時候,任何流量都被視為超出額定帶寬,而超出的流量就要采取額外動作。每秒鍾往桶里加的令牌
就決定了用戶流量的速率,這個速率就是CIR,但是每秒鍾需要往桶里加的令牌總數,並不是一次性加完的,一次性加進的令牌數量被稱為Burst size(Bc),
如果Bc只是CIR的一半,那么很明顯每秒鍾就需要往桶里加兩次令牌,每次加的數量總是Bc的數量。還有就是加令牌的時間,Time interval(Tc),
Tc表示多久該往桶里加一次令牌,而這個時間並不能手工設置,因為這個時間可以靠CIR和Bc的關系計算得到, Bc/ CIR= Tc。
4、令牌桶算法圖例
a. 按特定的速率向令牌桶投放令牌
b. 根據預設的匹配規則先對報文進行分類,不符合匹配規則的報文不需要經過令牌桶的處理,直接發送;
c. 符合匹配規則的報文,則需要令牌桶進行處理。當桶中有足夠的令牌則報文可以被繼續發送下去,同時令牌桶中的令牌 量按報文的長度做相應的減少;
d. 當令牌桶中的令牌不足時,報文將不能被發送,只有等到桶中生成了新的令牌,報文才可以發送。這就可以限制報文的流量只能是小於等於令牌生成的速度,達到限制流量的目的。
5、Java參考代碼:
package com.netease.datastream.util.flowcontrol; import java.io.BufferedWriter; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; /** * <pre> * Created by inter12 on 15-3-18. * </pre> */
public class TokenBucket { // 默認桶大小個數 即最大瞬間流量是64M
private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64; // 一個桶的單位是1字節
private int everyTokenSize = 1; // 瞬間最大流量
private int maxFlowRate; // 平均流量
private int avgFlowRate; // 隊列來緩存桶數量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * // 1024 * 64
private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>( DEFAULT_BUCKET_SIZE); private ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(); private volatile boolean isStart = false; private ReentrantLock lock = new ReentrantLock(true); private static final byte A_CHAR = 'a'; public TokenBucket() { } public TokenBucket(int maxFlowRate, int avgFlowRate) { this.maxFlowRate = maxFlowRate; this.avgFlowRate = avgFlowRate; } public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) { this.everyTokenSize = everyTokenSize; this.maxFlowRate = maxFlowRate; this.avgFlowRate = avgFlowRate; } public void addTokens(Integer tokenNum) { // 若是桶已經滿了,就不再家如新的令牌
for (int i = 0; i < tokenNum; i++) { tokenQueue.offer(Byte.valueOf(A_CHAR)); } } public TokenBucket build() { start(); return this; } /** * 獲取足夠的令牌個數 * * @return
*/
public boolean getTokens(byte[] dataSize) { // Preconditions.checkNotNull(dataSize); // Preconditions.checkArgument(isStart, // "please invoke start method first !");
int needTokenNum = dataSize.length / everyTokenSize + 1;// 傳輸內容大小對應的桶個數
final ReentrantLock lock = this.lock; lock.lock(); try { boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足夠的桶數量
if (!result) { return false; } int tokenCount = 0; for (int i = 0; i < needTokenNum; i++) { Byte poll = tokenQueue.poll(); if (poll != null) { tokenCount++; } } return tokenCount == needTokenNum; } finally { lock.unlock(); } } public void start() { // 初始化桶隊列大小
if (maxFlowRate != 0) { tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate); } // 初始化令牌生產者
TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this); scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS); isStart = true; } public void stop() { isStart = false; scheduledExecutorService.shutdown(); } public boolean isStarted() { return isStart; } class TokenProducer implements Runnable { private int avgFlowRate; private TokenBucket tokenBucket; public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) { this.avgFlowRate = avgFlowRate; this.tokenBucket = tokenBucket; } @Override public void run() { tokenBucket.addTokens(avgFlowRate); } } public static TokenBucket newBuilder() { return new TokenBucket(); } public TokenBucket everyTokenSize(int everyTokenSize) { this.everyTokenSize = everyTokenSize; return this; } public TokenBucket maxFlowRate(int maxFlowRate) { this.maxFlowRate = maxFlowRate; return this; } public TokenBucket avgFlowRate(int avgFlowRate) { this.avgFlowRate = avgFlowRate; return this; } private String stringCopy(String data, int copyNum) { StringBuilder sbuilder = new StringBuilder(data.length() * copyNum); for (int i = 0; i < copyNum; i++) { sbuilder.append(data); } return sbuilder.toString(); } public static void main(String[] args) throws IOException, InterruptedException { tokenTest(); } private static void arrayTest() { ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>( 10); tokenQueue.offer(1); tokenQueue.offer(1); tokenQueue.offer(1); System.out.println(tokenQueue.size()); System.out.println(tokenQueue.remainingCapacity()); } private static void tokenTest() throws InterruptedException, IOException { TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512) .maxFlowRate(1024).build(); BufferedWriter bufferedWriter = new BufferedWriter( new OutputStreamWriter(new FileOutputStream("D:/ds_test"))); String data = "xxxx";// 四個字節
for (int i = 1; i <= 1000; i++) { Random random = new Random(); int i1 = random.nextInt(100); boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes()); TimeUnit.MILLISECONDS.sleep(100); if (tokens) { bufferedWriter.write("token pass --- index:" + i1); System.out.println("token pass --- index:" + i1); } else { bufferedWriter.write("token rejuect --- index" + i1); System.out.println("token rejuect --- index" + i1); } bufferedWriter.newLine(); bufferedWriter.flush(); } bufferedWriter.close(); } }