Dubbo提供了四種負載均衡:RandomLoadBalance,RoundRobinLoadBalance,LeastActiveLoadBalance,ConsistentHashLoadBalance。

這里順便說下Dubbo的負載均衡是針對單個客戶端的,不是全局的。
以下代碼基於2.7.2-SNAPSHOT版本。
LoadBalance
LoadBalance接口只提供了一個對外暴露的方法:
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
AbstractLoadBalance
AbstractLoadBalance使用模板設計模式,具體負載均衡算法由子類的doSelect實現
public abstract class AbstractLoadBalance implements LoadBalance {
//預熱權重計算,provider剛啟動權重在預熱時間內隨啟動時間逐漸增加,最小為1
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
//模板方法,參數判斷
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
if (invokers.size() == 1) {
return invokers.get(0);
}
return doSelect(invokers, url, invocation);
}
//真正執行負載均衡的方法
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
//計算權重
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
int uptime = (int) (System.currentTimeMillis() - timestamp);
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);//預熱時間,默認為10分鍾
if (uptime > 0 && uptime < warmup) {//預熱
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight >= 0 ? weight : 0;
}
}
RandomLoadBalance
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size();//invoker個數
boolean sameWeight = true;//每個invoker都有相同權重
int[] weights = new int[length];//權重數組
int firstWeight = getWeight(invokers.get(0), invocation);//第一個權重
weights[0] = firstWeight;
int totalWeight = firstWeight;//總權重
//計算總權重和判斷權重是否相同
for (int i = 1; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
weights[i] = weight;
totalWeight += weight;
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}
//權重不相同
if (totalWeight > 0 && !sameWeight) {
//得到一個在[0,totalWeight)的偏移量,然后這個偏移量所在的invoker
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < length; i++) {
offset -= weights[i];
if (offset < 0) {
return invokers.get(i);
}
}
}
//權重相同,直接隨機[0,length)
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
}
RoundRobinLoadBalance
public class RoundRobinLoadBalance extends AbstractLoadBalance {
public static final String NAME = "roundrobin";
//循環周期,如果在這個周期內invoker沒有被客戶端獲取,那么該invoker對應的輪詢記錄將被刪除。
private static final int RECYCLE_PERIOD = 60000;
//自己理解為輪詢記錄,一個invoker對應一個WeightedRoundRobin
protected static class WeightedRoundRobin {
private int weight;//權重
//目前權重,隨着輪詢次數改變,每個輪詢周期增加weight,如果invoker被調用減去總權重
private AtomicLong current = new AtomicLong(0);
//最后一次獲取到invoker的時間,判斷invoker是否失去連接
private long lastUpdate;
public int getWeight() {
return weight;
}
public void setWeight(int weight) {
this.weight = weight;
current.set(0);
}
public long increaseCurrent() {
return current.addAndGet(weight);
}
public void sel(int total) {
current.addAndGet(-1 * total);
}
public long getLastUpdate() {
return lastUpdate;
}
public void setLastUpdate(long lastUpdate) {
this.lastUpdate = lastUpdate;
}
}
//保存輪詢記錄,方法的全限定路徑--->Map(invoker的IdentityString---->invoker的WeightedRoundRobin)
private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();
//修改methodWeightMap的鎖
private AtomicBoolean updateLock = new AtomicBoolean();
//從methodWeightMap獲取所有invoker對應WeightedRoundRobin
protected <T> Collection<String> getInvokerAddrList(List<Invoker<T>> invokers, Invocation invocation) {
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
Map<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map != null) {
return map.keySet();
}
return null;
}
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
//從methodWeightMap獲取所有invoker對應WeightedRoundRobin
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<String, WeightedRoundRobin>());
map = methodWeightMap.get(key);
}
int totalWeight = 0;//總權重
long maxCurrent = Long.MIN_VALUE;//最大目前權重
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;//選中invoker
WeightedRoundRobin selectedWRR = null;//選中invoker對應的WeightedRoundRobin
for (Invoker<T> invoker : invokers) {
//獲取invoker對應的WeightedRoundRobin
String identifyString = invoker.getUrl().toIdentityString();
WeightedRoundRobin weightedRoundRobin = map.get(identifyString);
int weight = getWeight(invoker, invocation);
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(identifyString, weightedRoundRobin);
}
//更新weight
if (weight != weightedRoundRobin.getWeight()) {
//weight changed
weightedRoundRobin.setWeight(weight);
}
//一次輪詢周期增加一次權重
long cur = weightedRoundRobin.increaseCurrent();
//更新invoker獲取時間
weightedRoundRobin.setLastUpdate(now);
//選中最大的目前權重
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
//如果invokers發生改變(增加或者失去連接),更新map
if (!updateLock.get() && invokers.size() != map.size()) {
if (updateLock.compareAndSet(false, true)) {
try {
// copy -> modify -> update reference
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<String, WeightedRoundRobin>();
newMap.putAll(map);
Iterator<Entry<String, WeightedRoundRobin>> it = newMap.entrySet().iterator();
while (it.hasNext()) {
//如果在60000ms內invoker沒有被客戶端獲取,則認為該invoker下線,那么該invoker對應的輪詢記錄將被刪除。
Entry<String, WeightedRoundRobin> item = it.next();
if (now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {
it.remove();
}
}
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
}
//選中的invoker減去總權重
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}
}
輪詢負載均衡存在一個問題,不推薦使用,文檔說法如下:
存在慢的提供者累積請求的問題,比如:第二台機器很慢,但沒掛,當請求調到第二台時就卡在那,久而久之,所有請求都卡在調到第二台上。
LeastActiveLoadBalance
public class LeastActiveLoadBalance extends AbstractLoadBalance {
public static final String NAME = "leastactive";
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// invokers個數
int length = invokers.size();
// 最小活躍度
int leastActive = -1;
// 具有最小活躍度的invoker個數
int leastCount = 0;
// 每個具有最小活躍度的invoker的下標
int[] leastIndexes = new int[length];
// 權重數組
int[] weights = new int[length];
// 所有最小活躍度invoker的總權重
int totalWeight = 0;
// 第一個最小活躍度invoker的權重
int firstWeight = 0;
// 判斷每個最小活躍度invoker的權重是否相同
boolean sameWeight = true;
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// 獲取活躍度
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// 獲取權重
int afterWarmup = getWeight(invoker, invocation);
weights[i] = afterWarmup;
// 記錄最小活躍度
if (leastActive == -1 || active < leastActive) {
// Reset the active number of the current invoker to the least active number
leastActive = active;
// Reset the number of least active invokers
leastCount = 1;
// Put the first least active invoker first in leastIndexs
leastIndexes[0] = i;
// Reset totalWeight
totalWeight = afterWarmup;
// Record the weight the first least active invoker
firstWeight = afterWarmup;
// Each invoke has the same weight (only one invoker here)
sameWeight = true;
// If current invoker's active value equals with leaseActive, then accumulating.
} else if (active == leastActive) {//有多個最小活躍度
// Record the index of the least active invoker in leastIndexs order
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker
totalWeight += afterWarmup;
// If every invoker has the same weight?
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// 最小活躍度的只有一個直接返回
if (leastCount == 1) {
return invokers.get(leastIndexes[0]);
}
//多個最小活躍度,處理跟隨機負載均衡一樣
if (!sameWeight && totalWeight > 0) {
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
}
LeastActiveLoadBalance與ActiveLimitFilter一起使用,ActiveLimitFilter負責記錄每個invoker調用的活躍度。
ConsistentHashLoadBalance
一致性hash算法
一致性hash將整個hash空間組織成一個環,范圍在0-2^32-1

然后將各個節點使用唯一標志(ip或者hostname等)hash分布在環上

最后將數據key使用相同hash算法定位在環上位置,順時針遇到的第一台服務器就是該數據的存儲節點

算法的容錯性
此外如果某個節點宕機或增加節點,影響的僅僅是鄰近的下一個節點

虛擬節點
當節點個數太少時,容易出現節點分布不均勻導致數據傾斜問題

為了解決數據傾斜問題,引入虛擬節點機制,將節點多次hash,生成多個節點位置,具體做法可以在節點標志后面加編號區分。

Dubbo實現
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
public static final String NAME = "consistenthash";
private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();
@SuppressWarnings("unchecked")
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
String methodName = RpcUtils.getMethodName(invocation);
String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
int identityHashCode = System.identityHashCode(invokers);
ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
selector = (ConsistentHashSelector<T>) selectors.get(key);
}
return selector.select(invocation);
}
private static final class ConsistentHashSelector<T> {
private final TreeMap<Long, Invoker<T>> virtualInvokers;
private final int replicaNumber;
private final int identityHashCode;
private final int[] argumentIndex;
ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
this.identityHashCode = identityHashCode;
URL url = invokers.get(0).getUrl();
//虛擬節點個數
this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
//hash的參數下標
String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
argumentIndex = new int[index.length];
for (int i = 0; i < index.length; i++) {
argumentIndex[i] = Integer.parseInt(index[i]);
}
//創建虛擬節點
for (Invoker<T> invoker : invokers) {
String address = invoker.getUrl().getAddress();
for (int i = 0; i < replicaNumber / 4; i++) {
byte[] digest = md5(address + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
virtualInvokers.put(m, invoker);
}
}
}
}
public Invoker<T> select(Invocation invocation) {
String key = toKey(invocation.getArguments());
byte[] digest = md5(key);
return selectForKey(hash(digest, 0));
}
private String toKey(Object[] args) {
StringBuilder buf = new StringBuilder();
for (int i : argumentIndex) {
if (i >= 0 && i < args.length) {
buf.append(args[i]);
}
}
return buf.toString();
}
private Invoker<T> selectForKey(long hash) {
Map.Entry<Long, Invoker<T>> entry = virtualInvokers.ceilingEntry(hash);
if (entry == null) {
entry = virtualInvokers.firstEntry();
}
return entry.getValue();
}
private long hash(byte[] digest, int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
private byte[] md5(String value) {
MessageDigest md5;
try {
md5 = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new IllegalStateException(e.getMessage(), e);
}
md5.reset();
byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
md5.update(bytes);
return md5.digest();
}
}
}
參考資料
http://huangzehong.me/2018/09/06/20180906-%E3%80%90Dubbo%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E3%80%91%E5%9B%9B%E7%A7%8D%E8%B4%9F%E8%BD%BD%E5%9D%87%E8%A1%A1/
https://www.cnblogs.com/lpfuture/p/5796398.html
