1 簡介
Dubbo提供了4種負載均衡機制:
- 權重隨機算法:
RandomLoadBalance
- 最少活躍調用數算法:
LeastActiveLoadBalance
- 一致性哈希算法:
ConsistentHashLoadBalance
- 加權輪詢算法:
RoundRobinLoadBalance
Dubbo的負載均衡算法均實現自LoadBalance
接口,其類圖結構如下:
1.1 自適應默認算法
Dubbo的負載均衡算法利用Dubbo的自適應機制,默認的實現為RandomLoadBalance
(即:權重隨機算法),接口如下:
// 默認為RandomLoadBalance算法
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
// 該方法為自適應方法
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
1.2 抽象基類
1.2.1 選擇Invoker
抽象基類針對select()
方法判斷invokers集合的數量,如果集合中只有一個Invoker
,則無需使用算法,直接返回即可:
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.isEmpty())
return null;
// 如果只有一個元素,無需負載均衡算法
if (invokers.size() == 1)
return invokers.get(0);
// 負載均衡算法:該方法為抽象方法,由子類實現
return doSelect(invokers, url, invocation);
}
1.2.2 計算權重
抽象基類除了對select()
方法進行了重寫,還封裝了服務提供者的權重
計算邏輯。Dubbo的權重計算,考慮到服務預熱(服務器啟動后,以低功率
方式運行一段時間,當服務器運行效率逐漸達到最佳狀態):
若當前的服務器運行時長
小於
服務預熱時間,對服務器降權,讓服務器的權重降低。
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
// 從URL中獲取服務器Invoker的權重信息
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
// 獲取服務器Invoker的啟動時間戳
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
int uptime = (int) (System.currentTimeMillis() - timestamp);
// 獲取服務器Invoker的預熱時長(默認10分鍾)
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
// (uptime / warmup) * weight,即:(啟動時長 / 預熱時長) * 原權重值
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
2 負載均衡算法實現
2.1 加權隨機算法
該算法思想簡單:假如有服務器:
- A,權重weight=2
- B,權重weight=3
- C,權重weight=5
這些服務器totalWeight = AWeight + BWeight + CWeight = 10。這樣生成10以內的隨機數
- [0, 2):屬於服務器A
- [2, 5):屬於服務器B
- [5, 10):屬於服務器C
加權隨機算法由RandomLoadBalance
,其核心源碼實現如下:
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();
// 總權重值
int totalWeight = 0;
// 是否所有的Invoker服務器的權重值都相等,若都相等,則在Invoker列表中隨機選中一個即可
boolean sameWeight = true;
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // Sum
if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
// 生成totalWeight以內的隨機數offset,然后該offset挨個減Invoker的權重,一旦小於0,就選中當前的Invoker
if (totalWeight > 0 && !sameWeight) {
int offset = random.nextInt(totalWeight);
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
return invokers.get(random.nextInt(length));
}
2.2 最小活躍數算法
活躍調用數越小,表明該Provider
的效率越高,單位時間內可以處理更多的請求。此時如果有新的請求,應該將請求優先分配給該Provider
。Dubbo
中,每個Provider
都會保持一個active
,表示該Provider
的活躍數:
沒收到一個請求,active的值加1,請求處理完成后,active的值減1。
Dubbo
使用LeastActiveLoadBalance
實現最小活躍數算法,LeastActiveLoadBalance
引入了權重,若多個Provider
的活躍數相同
則權重較高的
Provider
被選中的幾率更大
若權重相同,則隨機選取一個Provider
該算法的核心思想:
- 循環遍歷所有Invoker,找出活躍數最小的所有Invoker
- 如果有多個Invoker具有多個相同的最小活躍數,此時需要記錄這些Invoker。Dubbo使用額外的int數組,來記錄這些Invoker在原invokers列表中的索引位置,這種情況下,則需要依據隨機權重算法,最終決定該選擇的Invoker
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
// “最小活躍數”的值
int leastActive = -1;
// 具有相同“最小活躍數”的Invoker(即Provider)的數量
int leastCount = 0;
// “最小活躍數”的Invoker可能不止一個,因此需要記錄所有具有“最小活躍數”的Invoker在invokers列表中的索引位置
int[] leastIndexs = new int[length];
// 記錄所有“最小活躍數”的Invoker總的權重值
int totalWeight = 0;
// 記錄第一個“最小活躍數”的Invoker的權重值
int firstWeight = 0;
// 所有的“最小活躍數”的Invoker的權重值是否都相等
boolean sameWeight = true;
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// 活躍數
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// 權重
int afterWarmup = getWeight(invoker, invocation);
if (leastActive == -1 || active < leastActive) {
leastActive = active;
leastCount = 1;
leastIndexs[0] = i;
totalWeight = afterWarmup;
firstWeight = afterWarmup;
sameWeight = true;
} else if (active == leastActive) {
leastIndexs[leastCount++] = i;
totalWeight += afterWarmup;
if (sameWeight && i > 0 && afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
if (leastCount == 1) {
return invokers.get(leastIndexs[0]);
}
if (!sameWeight && totalWeight > 0) {
int offsetWeight = random.nextInt(totalWeight) + 1;
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexs[i];
offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
if (offsetWeight <= 0)
return invokers.get(leastIndex);
}
}
return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}
2.3 一致性哈希
該算法起初用於緩存系統的負載均衡,算法過程如下:
① 根據IP或其他信息為緩存節點生成一個hash值,將該hash值映射到[0, 2^32-1]的圓環中。
② 當查詢或寫入緩存時,為緩存的key生成一個hash值,查找第一個大於等於該hash值的緩存節點,以應用該緩存
③ 若當前節點掛了,則依然可以計算得到一個新的緩存節點
Dubbo
通過引入虛擬節點,讓所有的Invoker
在圓環上分散開,避免數據傾斜(由於節點不夠分散,導致大量請求落在同一個節點,而其他節點只會接受到少量請求)。Dubbo
對一個Invoker
默認使用160個虛擬節點:
如:Invoker1-1, invoker1-2, invoker1-3... invoker1-160
Dubbo
使用ConsistentHashLoadBalance
來實現一致性哈希算法,其內部定義ConsistentHashSelector
內部類完成節點選擇。Dubbo
中使用方法的入參數值進行哈希,來選擇落點到哪一個Invoker上。
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
// key:group+version+methodName,即調用方法的完整名稱
// value:ConsistentHashSelector,利用該類完成節點選擇
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<>();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// 獲取調用方法名
String methodName = RpcUtils.getMethodName(invocation);
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
// 獲取invokers的hashcode
int identityHashCode = System.identityHashCode(invokers);
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
// 如果 invokers 是一個新的 List 對象,意味着服務提供者數量發生了變化
// 此時 selector.identityHashCode != identityHashCode 條件成立
if (selector == null || selector.identityHashCode != identityHashCode) {
// 創建新的 ConsistentHashSelector
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
// 利用選擇器,選擇節點
return selector.select(invocation);
}
private static final class ConsistentHashSelector<T> {...}
}
ConsistentHashLoadBalance#doSelect()
方法主要是做前置工作,每個方法的一致性哈希選擇具體由ConsistentHashSelector
來實現:
private static final class ConsistentHashSelector<T> {
// 使用TreeMap存儲Invoker虛擬節點,保證查詢效率
private final TreeMap<Long, Invoker<T>> virtualInvokers;
// 虛擬節點數量
private final int replicaNumber;
// invokers的hashCode值
private final int identityHashCode;
// 參數索引數組
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
// 獲取參與 hash 計算的參數下標值,默認對方法的第一個參數進行 hash 運算
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
// 如下方法創建虛擬節點,每個invoker生成replicaNumber個虛擬結點,
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(address + i); // digest是一個長度為16的數組
// 對 digest 部分字節進行4次 hash 運算,得到四個不同的 long 型正整數
for (int h = 0; h < 4; h++) {
// 根據h的值,對digest進行位運算。h=0,則0~3字節位運算;h=1,則4~7字節位運算。
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
}
選擇節點,使用TreeMap#tailMap
方法,可以獲取到TreeMap中大於Key的子Map,子Map中第一個Entry即為選擇的Invoker。因為TreeMap不是環形結構,因此如果沒有取到任何值,則認為是第一個Key的值。如下:
public Invoker<T> select(Invocation invocation) {
// 根據argumentIndex,決定方法的哪幾個參數值作為Key
String key = toKey(invocation.getArguments());
byte[] digest = md5(key);
return selectForKey(hash(digest, 0));
}
private Invoker<T> selectForKey(long hash) {
/*
TreeMap<Integer, String> tree_map = new TreeMap<Integer, String>();
tree_map.put(10, "Geeks");
tree_map.put(15, "4");
tree_map.put(20, "Geeks");
tree_map.put(25, "Welcomes");
tree_map.put(30, "You");
System.out.println("The tailMap is " + tree_map.tailMap(15));
// The tailMap is {15=4, 20=Geeks, 25=Welcomes, 30=You}
*/
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
2.4 加權輪詢算法
待補充