Dubbo源碼學習--集群負載均衡算法的實現


相關文章:

Dubbo源碼學習文章目錄

前言

Dubbo 的定位是分布式服務框架,為了避免單點壓力過大,服務的提供者通常部署多台,如何從服務提供者集群中選取一個進行調用,就依賴於Dubbo的負載均衡策略。

Dubbo 負載均衡策略

Dubbo 負載均衡策略提供下列四種方式:

  1. Random LoadBalance 隨機,按權重設置隨機概率。 Dubbo的默認負載均衡策略
    在一個截面上碰撞的概率高,但調用量越大分布越均勻,而且按概率使用權重后也比較均勻,有利於動態調整提供者權重。

  2. RoundRobin LoadBalance 輪循,按公約后的權重設置輪循比率。
    存在慢的提供者累積請求問題,比如:第二台機器很慢,但沒掛,當請求調到第二台時就卡在那,久而久之,所有請求都卡在調到第二台上。

  3. LeastActive LoadBalance 最少活躍調用數,相同活躍數的隨機,活躍數指調用前后計數差。
    使慢的提供者收到更少請求,因為越慢的提供者的調用前后計數差會越大。

  4. ConsistentHash LoadBalance 一致性Hash,相同參數的請求總是發到同一提供者。
    當某一台提供者掛時,原本發往該提供者的請求,基於虛擬節點,平攤到其它提供者,不會引起劇烈變動。

源碼

LoadBalance

首先查看 LoadBalance 接口

Invoker select(List<Invoker > invokers, URL url, Invocation invocation) throws RpcException;

LoadBalance 定義了一個方法就是從 invokers 列表中選取一個

AbstractLoadBalance

AbstractLoadBalance 抽象類是所有負載均衡策略實現類的父類,實現了LoadBalance接口 的方法,同時提供抽象方法交由子類實現,

 public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        if (invokers == null || invokers.size() == 0)
            return null;
        if (invokers.size() == 1)
            return invokers.get(0);
        return doSelect(invokers, url, invocation);
    }

    protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);

RandomLoadBalance

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size();
        int totalWeight = 0; 
        boolean sameWeight = true; 
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            totalWeight += weight; 
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false; 
            }
        }
        if (totalWeight > 0 && ! sameWeight) {
            int offset = random.nextInt(totalWeight);
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        return invokers.get(random.nextInt(length));
    }

RandomLoadBalance 實現很簡單,如果每個提供者的權重都相同,那么根據列表長度直接隨機選取一個,
如果權重不同,累加權重值。根據0~累加的權重值 選取一個隨機數,然后判斷該隨機數落在那個提供者上。

RoundRobinLoadBalance

  private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    private final ConcurrentMap<String, AtomicPositiveInteger> weightSequences = new ConcurrentHashMap<String, AtomicPositiveInteger>();

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size(); 
        int maxWeight = 0; 
        int minWeight = Integer.MAX_VALUE; 
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight); 
            minWeight = Math.min(minWeight, weight); 
        }
        if (maxWeight > 0 && minWeight < maxWeight) { 
            AtomicPositiveInteger weightSequence = weightSequences.get(key);
            if (weightSequence == null) {
                weightSequences.putIfAbsent(key, new AtomicPositiveInteger());
                weightSequence = weightSequences.get(key);
            }
            int currentWeight = weightSequence.getAndIncrement() % maxWeight;
            List<Invoker<T>> weightInvokers = new ArrayList<Invoker<T>>();
            for (Invoker<T> invoker : invokers) { 
                if (getWeight(invoker, invocation) > currentWeight) {
                    weightInvokers.add(invoker);
                }
            }
            int weightLength = weightInvokers.size();
            if (weightLength == 1) {
                return weightInvokers.get(0);
            } else if (weightLength > 1) {
                invokers = weightInvokers;
                length = invokers.size();
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        return invokers.get(sequence.getAndIncrement() % length);
    }

首先也是判斷權重是否一致,如果一致,通過維護一個 AtomicInteger 的增長 進行取模亂來輪訓。
如果權重不一致,通過維護一個 AtomicInteger 的增長 與最大權重取模作為當前權重,然后獲取大於當前權重的列表作為調用者列表,然后進行取模輪訓

LeastActiveLoadBalance

LeastActiveLoadBalance 源碼比較簡單就不列出了,思路主要是,獲取最小的活躍數,把活躍數等於最小活躍數的調用者維護成一個數組
如果權重一致隨機取出,如果不同則跟 RandomLoadBalance 一致,累加權重,然后隨機取出。

ConsistentHashLoadBalance


    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int identityHashCode = System.identityHashCode(invokers);
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        if (selector == null || selector.getIdentityHashCode() != identityHashCode) {
            selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
        return selector.select(invocation);
    }

      public ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = System.identityHashCode(invokers);
            URL url = invokers.get(0).getUrl();
            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i ++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            for (Invoker<T> invoker : invokers) {
                for (int i = 0; i < replicaNumber / 4; i++) {
                    byte[] digest = md5(invoker.getUrl().toFullString() + i);
                    for (int h = 0; h < 4; h++) {
                        long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }

通過doselect方法可以看出 ConsistentHashLoadBalance 主要是通過內部類 ConsistentHashSelector 來實現的,首先看ConsistentHashSelector構造函數的源碼可以看出
首先根據invokers的url獲取分片個數,創建相同大小的虛擬節點。

        public Invoker<T> select(Invocation invocation) {
            String key = toKey(invocation.getArguments());
            byte[] digest = md5(key);
            Invoker<T> invoker = sekectForKey(hash(digest, 0));
            return invoker;
        }

        private String toKey(Object[] args) {
            StringBuilder buf = new StringBuilder();
            for (int i : argumentIndex) {
                if (i >= 0 && i < args.length) {
                    buf.append(args[i]);
                }
            }
            return buf.toString();
        }

        private Invoker<T> sekectForKey(long hash) {
            Invoker<T> invoker;
            Long key = hash;
            if (!virtualInvokers.containsKey(key)) {
                SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
                if (tailMap.isEmpty()) {
                    key = virtualInvokers.firstKey();
                } else {
                    key = tailMap.firstKey();
                }
            }
            invoker = virtualInvokers.get(key);
            return invoker;
        }

然后根據參數的MD5值 獲取對應的提供者


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM