當Dubbo應用出現多個服務提供者時,服務消費者如何選擇哪一個來調用呢?這就涉及到負載均衡算法。
LoadBalance 中文意思為負載均衡,它的職責是將網絡請求,或者其他形式的負載“均攤”到不同的機器上。避免集群中部分服務器壓力過大,而另一些服務器比較空閑的情況。通過負載均衡,可以讓每台服務器獲取到適合自己處理能力的負載。在為高負載服務器分流的同時,還可以避免資源浪費,一舉兩得。
Dubbo中提供了4種負載均衡實現:
- 基於權重隨機算法的 RandomLoadBalance
- 基於最少活躍調用數算法的 LeastActiveLoadBalance
- 基於 hash 一致性的 ConsistentHashLoadBalance
- 基於加權輪詢算法的 RoundRobinLoadBalance
一、LoadBalance
在Dubbo中,所有的負載均衡實現類都繼承自抽象類AbstractLoadBalance,該類實現LoadBalance接口。
@SPI(RandomLoadBalance.NAME) public interface LoadBalance { /** * select one invoker in list. * * @param invokers invokers. * @param url refer url * @param invocation invocation. * @return selected invoker. */ @Adaptive("loadbalance") <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; }
可以看到,該接口的SPI注解指定了默認的實現RandomLoadBalance,先看看抽象類的邏輯。
1、選擇服務
先來看負載均衡的入口方法 select,它邏輯比較簡單。校驗服務提供者是否為空;如果 invokers 列表中僅有一個 Invoker,直接返回即可,無需進行負載均衡;有多個Invoker就調用子類實現進行負載均衡。
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { if (invokers == null || invokers.isEmpty()) return null; //如果只有一個服務提供者,直接返回,無需負載均衡 if (invokers.size() == 1) return invokers.get(0); return doSelect(invokers, url, invocation); }
2、獲取權重
這里包含兩個邏輯,一個是獲取配置的權重值,默認為100;另一個是根據服務運行時長重新計算權重。
protected int getWeight(Invoker<?> invoker, Invocation invocation) { //獲取權重值,默認為100 int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), "weight",100); if (weight > 0) { //服務提供者啟動時間戳 long timestamp = invoker.getUrl().getParameter("remote.timestamp", 0L); if (timestamp > 0L) { //當前時間-啟動時間=運行時長 int uptime = (int) (System.currentTimeMillis() - timestamp); //獲取服務預熱時間 默認10分鍾 int warmup = invoker.getUrl().getParameter("warmup", 600000 ); //如果服務運行時間小於預熱時間,即服務啟動未到達10分鍾 if (uptime > 0 && uptime < warmup) { //重新計算服務權重 weight = calculateWarmupWeight(uptime, warmup, weight); } } } return weight; }
如上代碼,獲取服務權重值。然后判斷服務啟動時長是否小於服務預熱時間,然后重新計算權重。服務預熱時間默認是10分鍾。大致流程如下:
- 獲取配置的權重值,默認為100
- 獲取服務啟動的時間戳
- 當前時間 - 服務啟動時間 = 服務運行時長
- 獲取服務預熱時間,默認為10分鍾
- 判斷服務運行時長是否小於預熱時間,條件成立則重新計算權重
重新計算權重其實就是降權的過程。
static int calculateWarmupWeight(int uptime, int warmup, int weight) { int ww = (int) ((float) uptime / ((float) warmup / (float) weight)); return ww < 1 ? 1 : (ww > weight ? weight : ww); }
代碼看起來很簡單,但卻不大好理解。可以把上面的代碼換成下面的公式來看:
(uptime / warmup) * weight // 即進度百分比*權重
假設我們把權重設置為100,預熱時間為10分鍾。那么:
|
運行時長
|
公式
|
計算后權重
|
|
1分鍾
|
1/10 * 100
|
10
|
|
2分鍾
|
2/10 * 100
|
20
|
|
5分鍾
|
5/10 * 100
|
50
|
|
10分鍾
|
10/10 * 100
|
100
|
由此可見,在未達到服務預熱時間之前,權重都被降級了。Dubbo為什么要這樣做呢?主要用於保證當服務運行時長小於服務預熱時間時,對服務進行降權,避免讓服務在啟動之初就處於高負載狀態。服務預熱是一個優化手段,與此類似的還有 JVM 預熱。主要目的是讓服務啟動后“低功率”運行一段時間,使其效率慢慢提升至最佳狀態。
二、權重隨機算法
RandomLoadBalance 是加權隨機算法的具體實現,也是Dubbo中負載均衡算法默認的實現。這里我們需要先把服務器按照權重進行分區,比如:假設有三台服務器:【A、B、C】 它們對應的權重為:【1、3、6】,總權重為10。那么,可以得出:
|
區間
|
所屬服務器
|
|
0-1
|
A
|
|
1-4
|
B
|
|
4-10
|
C
|
剩下的就簡單了,獲取總權重totalWeight,然后生成[0-totalWeight]之間的隨機數,計算隨機數會落在哪個區間就好了。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //服務提供者列表數量 int length = invokers.size(); //總權重 int totalWeight = 0; //是否具有相同的權重 boolean sameWeight = true; //循環服務列表,計算總權重和檢測每個服務權重是否相同 for (int i = 0; i < length; i++) { //獲取單個服務的權重值 int weight = getWeight(invokers.get(i), invocation); //累加 計算總權重 totalWeight += weight; //校驗服務權重是否相同 if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { //獲取[0-totalWeight]之間的隨機數 int offset = random.nextInt(totalWeight); //計算隨機數處於哪個區間,返回對應invoker for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } //如果權重相同,隨機返回 return invokers.get(random.nextInt(length)); }
以上面的例子,總結一下上面代碼的流程:
- 獲取服務提供者數量 = 3
- 累加,計算總權重 = 10
- 校驗服務權重是否相等,不相等。依次為1、3、6
- 獲取0 - 10直接的隨機數,假設 offset = 6
- 第1次循環,6-=1>0,條件不成立,offset = 5
- 第2次循環,5-=3>0,條件不成立,offset = 2
- 第3次循環,2-=6<0,條件成立,返回第3組服務器
最后,如果權重都相同,直接隨機返回一個服務Invoker。
三、最小活躍數算法
最小活躍數負載均衡算法對應LeastActiveLoadBalance。活躍調用數越小,表明該服務提供者效率越高,單位時間內可處理更多的請求,此時應優先將請求分配給該服務提供者。Dubbo會為每個服務提供者Invoker分配一個active,代表活躍數大小。調用之前做自增操作,調用完成后做自減操作。這樣有的服務處理的快,有的處理的慢。越快的,active數量越小,就優先分配。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //服務提供者列表數量 int length = invokers.size(); //默認的最小活躍數值 int leastActive = -1; //最小活躍數invoker數量 int leastCount = 0; //最小活躍數invoker索引 int[] leastIndexs = new int[length]; //總權重 int totalWeight = 0; //第一個Invoker權重值 用於比較invoker直接的權重是否相同 int firstWeight = 0; boolean sameWeight = true; //循環比對Invoker的活躍數大小 for (int i = 0; i < length; i++) { //獲取當前Invoker對象 Invoker<T> invoker = invokers.get(i); //獲取活躍數大小 int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); //獲取權重值 int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), "weight", 100); //對比發現更小的活躍數,重置 if (leastActive == -1 || active < leastActive) { //更新最小活躍數 leastActive = active; //更新最小活躍數 數量為1 leastCount = 1; //記錄坐標 leastIndexs[0] = i; totalWeight = weight; firstWeight = weight; sameWeight = true; //如果當前Invoker的活躍數 與 最小活躍數相等 } else if (active == leastActive) { leastIndexs[leastCount++] = i; totalWeight += weight; if (sameWeight && i > 0 && weight != firstWeight) { sameWeight = false; } } } //如果只有一個Invoker具有最小活躍數,直接返回即可 if (leastCount == 1) { return invokers.get(leastIndexs[0]); } //多個Invoker具體相同的最小活躍數,但權重不同,就走權重的邏輯 if (!sameWeight && totalWeight > 0) { int offsetWeight = random.nextInt(totalWeight); for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight <= 0) return invokers.get(leastIndex); } } //從leastIndexs中隨機獲取一個返回 return invokers.get(leastIndexs[random.nextInt(leastCount)]); }
以上代碼分為兩部分。第一是通過比較,確定最小活躍數的Invoker;第二是根據權重確定Invoker。再分步驟總結一下:
- 定義變量-最小活躍數大小、數量、數組、權重值
- 循環invokers數組,獲取當前Invoker活躍數大小和權重
- 比對當前Invoker的活躍數,是否比上一個小;條件成立則重置最小活躍數;如果相等,則累加權重值,並且判斷權重是否相同
- 比對完成,如果只有一個最小活躍數,就直接返回Invoker
- 如果多個Invoker,具有相同的活躍數,但權重不同;就走權重的邏輯
- 如果以上兩個條件都不成立,就在最小活躍數 數量范圍內取得隨機數,返回Invoker
看到這里,有沒有想到另外一個問題,那就是針對活躍數在哪里自增、自減的呢?這就要說到Dubbo的過濾器,涉及到ActiveLimitFilter這個類。在這個類中,有這樣一段代碼:
//觸發active自增操作 RpcStatus.beginCount(url, methodName); Result result = invoker.invoke(invocation); //觸發active自減操作 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true); return result;
最后,這個Filter需要手動添加一下,在配置文件我們這樣定義:
<dubbo:consumer filter="activelimit">
四、hash 一致性算法
一致性 hash 算法由麻省理工學院的 Karger 及其合作者於1997年提供出的,算法提出之初是用於大規模緩存系統的負載均衡。它的原理大致如下:
先構造一個長度為232的整數環(一致性Hash環),然后根據節點名稱的Hash值(分布在0 - 232-1)將服務器節點放置在這個Hash環上。最后,根據數據的Key值計算得到其Hash值,在Hash環上順時針查找距離這個Key值的Hash值最近的服務器節點,完成Key到服務器的映射查找。在Dubbo中,引入了虛擬節點用於解決數據傾斜問題。圖示如下:
這里相同顏色的節點均屬於同一個服務提供者,比如 Invoker1-1,Invoker1-2,…,Invoker1-160。即每個Invoker會共創建160個虛擬節點,Hash環總長度為160*節點數量。先來看ConsistentHashLoadBalance.doSelect實現。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //請求類名+方法名 //比如:com.viewscenes.netsupervisor.service.InfoUserService.sayHello String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); //對當前的invokers進行hash取值 int identityHashCode = System.identityHashCode(invokers); ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key); //如果ConsistentHashSelector為空 或者 新的invokers hashCode取值不同 //說明服務提供者列表可能發生變化,需要獲取創建ConsistentHashSelector if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } //選擇Invoker return selector.select(invocation); }
以上代碼,主要是為了獲取ConsistentHashSelector,然后調用它的方法選擇Invoker返回。還有一點需注意,如果服務提供者列表發生變化,那么它們兩次的HashCode取值會不同,此時會重新創建ConsistentHashSelector對象。 此時的問題的關鍵就變成了,ConsistentHashSelector是如何被創建的?
1、創建ConsistentHashSelector
這個類有幾個屬性,先來看一下。
private static final class ConsistentHashSelector<T> { //使用 TreeMap 存儲 Invoker 虛擬節點 private final TreeMap<Long, Invoker<T>> virtualInvokers; //虛擬節點數量,默認160 private final int replicaNumber; //服務提供者列表的Hash值 private final int identityHashCode; //參數下標 private final int[] argumentIndex; }
再看它的構造方法,主要是創建虛擬節點Invoker,放入virtualInvokers中。
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { //初始化TreeMap this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); //當前invokers列表的Hash值 this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); //獲取虛擬節點數,默認為160 this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); //默認對第一個參數進行hash取值 String[] index = Constants.COMMA_SPLIT_PATTERN.split( url.getMethodParameter(methodName, "hash.arguments", "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } //循環創建虛擬節點Invoker for (Invoker<T> invoker : invokers) { String address = invoker.getUrl().getAddress(); for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h); virtualInvokers.put(m, invoker); } } } }
以上代碼的重點就是創建虛擬節點Invoker。首先,先獲取通信服務器的地址,比如192.168.1.1:20880; 然后,先對address + i進行MD5運算,得到一個數組,接着對這個數組的部分字節進行4次 hash 運算,得到四個不同的 long 型正整數; 最后將hash和invoker的映射關系存儲到TreeMap中。它們的映射關系如下:
2、選擇
創建完了ConsistentHashSelector,就該調用它的方法來選擇一個Invoker了。
public Invoker<T> select(Invocation invocation) { String key = toKey(invocation.getArguments()); byte[] digest = md5(key); return selectForKey(hash(digest, 0)); }
以上代碼很簡單,分為兩部分來看。
2.1、轉換參數
獲取到參數列表,然后通過toKey方法,轉換為字符串。這里看似簡單,卻隱含着另外一層邏輯。它只會取第一個參數,我們看toKey方法。
private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); }
獲取到參數值key后,對字符串key進行MD5運算,接着通過hash獲取 long 型正整數。這一步總的來說,就是把參數列表中的第一個參數值轉換為一個long型正整數。 那么,相同的參數值就會得到同一個hash值,所以,這里的負載均衡邏輯就會只受參數值影響,具有相同參數值的請求將會被分配給同一個服務提供者。
2.2、確定
計算出Hash值之后,事情就變得簡單了。按照一致性Hash算法中的原理來說就是在Hash環上順時針查找距離這個Key值的Hash值最近的服務器節點 。落實到Dubbo上來說,就是在virtualInvokers這個TreeMap中,返回其鍵大於或等於Hash值的部分數據,然后取第一個。
private Invoker<T> selectForKey(long hash) { Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry(); if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); }
五、加權輪詢算法
如果采購的服務器性能大致相同,那采用輪詢再合適不過了,簡單高效。那什么是加權輪詢呢?如果我們的服務器性能是有差異的,就不好用簡單的輪詢來做。小身板服務器表示扛不住那么大的壓力,請求降權。假設,我們有服務器【A、B、C】,權重分別是【1、2、3】。面對6次請求,它們負載均衡的結果如下:【A、B、C、B、C、C】。該算法對應的類是RoundRobinLoadBalance,在開始之前先看它的兩個屬性。
sequences
它是一個編號,記錄的是服務的調用編號,它是一個AtomicPositiveInteger實例。根據全限定類名 + 方法名來獲取,如果為空則創建。
AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); }
然后在每次調用服務前,做自增操作來獲取當前的編號。
int currentSequence = sequence.getAndIncrement();
IntegerWrapper
這個也很簡單,就是一個int類型的包裝類,主要是一個自減方法。
private static final class IntegerWrapper { private int value; public IntegerWrapper(int value) { this.value = value; } public int getValue() { return value; } public void setValue(int value) { this.value = value; } public void decrement() { this.value--; } }
然后我們來看doSelect方法,為方便解析,我們拆開來看。
1、獲取權重
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //全限定類型+方法名 String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); //服務提供者數量 int length = invokers.size(); //最大權重 int maxWeight = 0; //最小權重 int minWeight = Integer.MAX_VALUE; final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>(); int weightSum = 0; //循環主要用於查找最大和最小權重,計算權重總和等 for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight minWeight = Math.min(minWeight, weight); // Choose the minimum weight if (weight > 0) { //將Invoker對象和對應的權重大小IntegerWrapper放入Map中 invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight)); weightSum += weight; } } }
如上代碼,主要就是獲取Invoker的權重大小、計算總權重。其中重點在於向invokerToWeightMap中放入Invoker對象和其對應的權重大小IntegerWrapper。
2、獲取服務調用編號
每次調用前都會對sequence進行自增來獲取服務調用編號,需要注意它的獲取key為全限定類名+方法名。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //全限定類型+方法名 String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); //..... AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); } int currentSequence = sequence.getAndIncrement(); }
3、權重
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //...... //調用編號 int currentSequence = sequence.getAndIncrement(); if (maxWeight > 0 && minWeight < maxWeight) { //使用調用編號對權重總和進行取余操作 int mod = currentSequence % weightSum; //遍歷 最大權重大小 次數 for (int i = 0; i < maxWeight; i++) { //遍歷invokerToWeightMap for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { //當前Invoker final Invoker<T> k = each.getKey(); //當前Invoker對應的權重大小 final IntegerWrapper v = each.getValue(); //取余等於0 且 當前權重大於0 返回Invoker if (mod == 0 && v.getValue() > 0) { return k; } //如果取余不等於0 且 當前權重大於0 對權重和取余數-- if (v.getValue() > 0) { v.decrement(); mod--; } } } } }
以上代碼就是根據權重輪詢來獲取Invoker的過程,只看代碼的話其實有點晦澀難懂。但如果我們Debug來看,就能更好的理解它。 我們以上面的例子模擬一下運行過程,此時有服務器【A、B、C】,權重分別是【1、2、3】,總權重為6,最大權重為3。
- mod = 0:滿足條件,此時直接返回服務器 A
- mod = 1:自減1次后才能滿足條件,此時返回服務器 B
- mod = 2:自減2次后才能滿足條件,此時返回服務器 C
- mod = 3:自減3次后才能滿足條件,經過遞減后,服務器權重為 [0, 1, 2],此時返回服務器 B
- mod = 4:自減4次后才能滿足條件,經過遞減后,服務器權重為 [0, 0, 1],此時返回服務器 C
- mod = 5:只剩服務器C還有權重,返回C。
這樣6次調用,得到的結果就是【A、B、C、B、C、C】。當第7次調用時,此時調用編號為6,總權重大小也為6;mod則為0,重新開始。
4、輪詢
最后,如果大家的權重都一樣,那就沒什么好說的了,輪詢即可。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { //..... //輪詢 return invokers.get(currentSequence % length); }
