Dubbo的負載均衡算法


1 簡介

Dubbo提供了4種負載均衡機制:

  • 權重隨機算法:RandomLoadBalance
  • 最少活躍調用數算法:LeastActiveLoadBalance
  • 一致性哈希算法:ConsistentHashLoadBalance
  • 加權輪詢算法:RoundRobinLoadBalance

Dubbo的負載均衡算法均實現自LoadBalance接口,其類圖結構如下:
image

1.1 自適應默認算法

Dubbo的負載均衡算法利用Dubbo的自適應機制,默認的實現為RandomLoadBalance(即:權重隨機算法),接口如下:

// 默認為RandomLoadBalance算法
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
    // 該方法為自適應方法
    @Adaptive("loadbalance")
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}

1.2 抽象基類

1.2.1 選擇Invoker

抽象基類針對select()方法判斷invokers集合的數量,如果集合中只有一個Invoker,則無需使用算法,直接返回即可:

@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    if (invokers == null || invokers.isEmpty())
        return null;
    // 如果只有一個元素,無需負載均衡算法
    if (invokers.size() == 1)
        return invokers.get(0);
    // 負載均衡算法:該方法為抽象方法,由子類實現
    return doSelect(invokers, url, invocation);
}

1.2.2 計算權重

抽象基類除了對select()方法進行了重寫,還封裝了服務提供者的權重計算邏輯。Dubbo的權重計算,考慮到服務預熱(服務器啟動后,以低功率方式運行一段時間,當服務器運行效率逐漸達到最佳狀態):

若當前的服務器運行時長小於服務預熱時間,對服務器降權,讓服務器的權重降低。

protected int getWeight(Invoker<?> invoker, Invocation invocation) {
    // 從URL中獲取服務器Invoker的權重信息
    int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
    if (weight > 0) {
        // 獲取服務器Invoker的啟動時間戳
        long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
        if (timestamp > 0L) {
            int uptime = (int) (System.currentTimeMillis() - timestamp);
            // 獲取服務器Invoker的預熱時長(默認10分鍾)
            int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
            if (uptime > 0 && uptime < warmup) {
                weight = calculateWarmupWeight(uptime, warmup, weight);
            }
        }
    }
    return weight;
}
// (uptime / warmup) * weight,即:(啟動時長 / 預熱時長) * 原權重值
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
    int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
    return ww < 1 ? 1 : (ww > weight ? weight : ww);
}

2 負載均衡算法實現

2.1 加權隨機算法

該算法思想簡單:假如有服務器:

  • A,權重weight=2
  • B,權重weight=3
  • C,權重weight=5

這些服務器totalWeight = AWeight + BWeight + CWeight = 10。這樣生成10以內的隨機數

  • [0, 2):屬於服務器A
  • [2, 5):屬於服務器B
  • [5, 10):屬於服務器C

加權隨機算法由RandomLoadBalance,其核心源碼實現如下:

private final Random random = new Random();

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    int length = invokers.size();
    // 總權重值
    int totalWeight = 0;
    // 是否所有的Invoker服務器的權重值都相等,若都相等,則在Invoker列表中隨機選中一個即可
    boolean sameWeight = true;
    for (int i = 0; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        totalWeight += weight; // Sum
        if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) {
            sameWeight = false;
        }
    }
    // 生成totalWeight以內的隨機數offset,然后該offset挨個減Invoker的權重,一旦小於0,就選中當前的Invoker
    if (totalWeight > 0 && !sameWeight) {
        int offset = random.nextInt(totalWeight);
        for (int i = 0; i < length; i++) {
            offset -= getWeight(invokers.get(i), invocation);
            if (offset < 0) {
                return invokers.get(i);
            }
        }
    }
    return invokers.get(random.nextInt(length));
}

2.2 最小活躍數算法

活躍調用數越小,表明該Provider的效率越高,單位時間內可以處理更多的請求。此時如果有新的請求,應該將請求優先分配給該ProviderDubbo中,每個Provider都會保持一個active,表示該Provider的活躍數:

沒收到一個請求,active的值加1,請求處理完成后,active的值減1。

Dubbo使用LeastActiveLoadBalance實現最小活躍數算法,LeastActiveLoadBalance引入了權重,若多個Provider的活躍數相同

則權重較高的Provider被選中的幾率更大
若權重相同,則隨機選取一個Provider

該算法的核心思想:

  • 循環遍歷所有Invoker,找出活躍數最小的所有Invoker
  • 如果有多個Invoker具有多個相同的最小活躍數,此時需要記錄這些Invoker。Dubbo使用額外的int數組,來記錄這些Invoker在原invokers列表中的索引位置,這種情況下,則需要依據隨機權重算法,最終決定該選擇的Invoker
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    int length = invokers.size(); // Number of invokers
    // “最小活躍數”的值
    int leastActive = -1;
    // 具有相同“最小活躍數”的Invoker(即Provider)的數量
    int leastCount = 0;
    // “最小活躍數”的Invoker可能不止一個,因此需要記錄所有具有“最小活躍數”的Invoker在invokers列表中的索引位置
    int[] leastIndexs = new int[length];
    // 記錄所有“最小活躍數”的Invoker總的權重值
    int totalWeight = 0;
    // 記錄第一個“最小活躍數”的Invoker的權重值
    int firstWeight = 0;
    // 所有的“最小活躍數”的Invoker的權重值是否都相等
    boolean sameWeight = true;
    for (int i = 0; i < length; i++) {
        Invoker<T> invoker = invokers.get(i);
        // 活躍數
        int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
        // 權重
        int afterWarmup = getWeight(invoker, invocation); 
        if (leastActive == -1 || active < leastActive) {
            leastActive = active; 
            leastCount = 1; 
            leastIndexs[0] = i; 
            totalWeight = afterWarmup; 
            firstWeight = afterWarmup; 
            sameWeight = true;
        } else if (active == leastActive) { 
            leastIndexs[leastCount++] = i; 
            totalWeight += afterWarmup;
            if (sameWeight && i > 0 && afterWarmup != firstWeight) {
                sameWeight = false;
            }
        }
    }
    if (leastCount == 1) {
        return invokers.get(leastIndexs[0]);
    }
    if (!sameWeight && totalWeight > 0) {
        int offsetWeight = random.nextInt(totalWeight) + 1;
        for (int i = 0; i < leastCount; i++) {
            int leastIndex = leastIndexs[i];
            offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
            if (offsetWeight <= 0)
                return invokers.get(leastIndex);
        }
    }
    return invokers.get(leastIndexs[random.nextInt(leastCount)]);
}

2.3 一致性哈希

該算法起初用於緩存系統的負載均衡,算法過程如下:

① 根據IP或其他信息為緩存節點生成一個hash值,將該hash值映射到[0, 2^32-1]的圓環中。
② 當查詢或寫入緩存時,為緩存的key生成一個hash值,查找第一個大於等於該hash值的緩存節點,以應用該緩存
③ 若當前節點掛了,則依然可以計算得到一個新的緩存節點

image

Dubbo通過引入虛擬節點,讓所有的Invoker在圓環上分散開,避免數據傾斜(由於節點不夠分散,導致大量請求落在同一個節點,而其他節點只會接受到少量請求)。Dubbo對一個Invoker默認使用160個虛擬節點:

如:Invoker1-1, invoker1-2, invoker1-3... invoker1-160

image

Dubbo使用ConsistentHashLoadBalance來實現一致性哈希算法,其內部定義ConsistentHashSelector內部類完成節點選擇。Dubbo中使用方法的入參數值進行哈希,來選擇落點到哪一個Invoker上。

public class ConsistentHashLoadBalance extends AbstractLoadBalance {
    // key:group+version+methodName,即調用方法的完整名稱
    // value:ConsistentHashSelector,利用該類完成節點選擇
    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors =  new ConcurrentHashMap<>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // 獲取調用方法名
        String methodName = RpcUtils.getMethodName(invocation);
        String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;

        // 獲取invokers的hashcode
        int identityHashCode = System.identityHashCode(invokers);
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        // 如果 invokers 是一個新的 List 對象,意味着服務提供者數量發生了變化
        // 此時 selector.identityHashCode != identityHashCode 條件成立
        if (selector == null || selector.identityHashCode != identityHashCode) {
            // 創建新的 ConsistentHashSelector
            selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }

        // 利用選擇器,選擇節點
        return selector.select(invocation);
    }
    private static final class ConsistentHashSelector<T> {...}
}

ConsistentHashLoadBalance#doSelect()方法主要是做前置工作,每個方法的一致性哈希選擇具體由ConsistentHashSelector來實現:

private static final class ConsistentHashSelector<T> {
    // 使用TreeMap存儲Invoker虛擬節點,保證查詢效率
    private final TreeMap<Long, Invoker<T>> virtualInvokers;
    // 虛擬節點數量
    private final int replicaNumber;
    // invokers的hashCode值
    private final int identityHashCode;
    // 參數索引數組
    private final int[] argumentIndex;

    ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
        this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
        this.identityHashCode = identityHashCode;
        URL url = invokers.get(0).getUrl();
        this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
        // 獲取參與 hash 計算的參數下標值,默認對方法的第一個參數進行 hash 運算
        String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
        argumentIndex = new int[index.length];
        for (int i = 0; i < index.length; i++) {
            argumentIndex[i] = Integer.parseInt(index[i]);
        }
        // 如下方法創建虛擬節點,每個invoker生成replicaNumber個虛擬結點,
        for (Invoker<T> invoker : invokers) {
            String address = invoker.getUrl().getAddress();
            for (int i = 0; i < replicaNumber / 4; i++) {
                byte[] digest = md5(address + i);  // digest是一個長度為16的數組
                // 對 digest 部分字節進行4次 hash 運算,得到四個不同的 long 型正整數
                for (int h = 0; h < 4; h++) {
                    // 根據h的值,對digest進行位運算。h=0,則0~3字節位運算;h=1,則4~7字節位運算。
                    long m = hash(digest, h);  
                    virtualInvokers.put(m, invoker);
                }
            }
        }
    }
}    

選擇節點,使用TreeMap#tailMap方法,可以獲取到TreeMap中大於Key的子Map,子Map中第一個Entry即為選擇的Invoker。因為TreeMap不是環形結構,因此如果沒有取到任何值,則認為是第一個Key的值。如下:
image

public Invoker<T> select(Invocation invocation) {
    // 根據argumentIndex,決定方法的哪幾個參數值作為Key
    String key = toKey(invocation.getArguments());
    byte[] digest = md5(key);
    return selectForKey(hash(digest, 0));
}
private Invoker<T> selectForKey(long hash) {
    /*
        TreeMap<Integer, String> tree_map = new TreeMap<Integer, String>();
        tree_map.put(10, "Geeks");
        tree_map.put(15, "4");
        tree_map.put(20, "Geeks");
        tree_map.put(25, "Welcomes");
        tree_map.put(30, "You");
        System.out.println("The tailMap is " + tree_map.tailMap(15));
        // The tailMap is {15=4, 20=Geeks, 25=Welcomes, 30=You}
    */
    Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry();
    if (entry == null) {
        entry = virtualInvokers.firstEntry();
    }
    return entry.getValue();
}

private String toKey(Object[] args) {
    StringBuilder buf = new StringBuilder();
    for (int i : argumentIndex) {
        if (i >= 0 && i < args.length) {
            buf.append(args[i]);
        }
    }
    return buf.toString();
}

2.4 加權輪詢算法

待補充


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM