前言:在上一篇博客中,介紹了zookeeper作為dubbo的注冊中心是如何工作的,有一個很重要的點,我們的程序是分布式應用,服務部署在幾個節點(服務器)上,當消費者調用服務時,zk返回給dubbo的是一個節點列表,但是dubbo只會選擇一台服務器,那么它究竟會選擇哪一台呢?這就是dubbo的負載均衡策略了,本篇博客就來聚焦dubbo的負載均衡策略。
本篇博客的目錄
一:負載均衡介紹
1.1:負載均衡簡介
以下是wikipedia對負載均衡的定義:
負載均衡改善了跨多個計算資源(例如計算機,計算機集群,網絡鏈接,中央處理單元或磁盤驅動的的工作負載分布。負載平衡旨在優化資源使用,最大化吞吐量,最小化響應時間,並避免任何單個資源的過載。使用具有負載平衡而不是單個組件的多個組件可以通過冗余提高可靠性和可用性。負載平衡通常涉及專用軟件或硬件
1.2:簡單解釋
這個概念如何理解呢?通俗點來說假如一個請求從客戶端發起,比如(查詢訂單列表),要選擇服務器進行處理,但是我們的集群環境提供了5個服務器A\B\C\D\E,每個服務器都有處理這個請求的能力,此時客戶端就必須選擇一個服務器來進行處理(不存在先選擇A,處理一會又選擇C,又跳到D).說白了就是一個選擇的問題。當請求多了的話,就要考慮各服務器的負載,一共5個服務器,不可能每次都讓一個服務器都來處理吧,比如把讓其他服務器來分壓。這就是負載均衡的優點:避免單個服務器響應同一請求,容易造成服務器宕機、崩潰等問題。
二:dubbo的loadBalance接口
1.1:loadBalance
dubbo的負載均衡策略,主體向外暴露出來是一個接口,名字叫做loadBlace,位於com.alibaba.dubbo.rpc.cluster包下,很明顯根據包名就可以看出它是用來管理集群的:
這個接口就一個方法,select方法的作用就是從眾多的調用的List選擇出一個調用者,Invoker可以理解為客戶端的調用者,dubbo專門封裝一個類來表示,URL就是調用者發起的URL請求鏈接,從這個URL中可以獲取很多請求的具體信息,Invocation表示的是調用的具體過程,dubbo用這個類模擬調用具體細節過程:
1.2:AbstractLoadBlance
該接口在下面的子類都會對其進行實現。接口下是一個抽象類AbstractLoadBlance
package com.alibaba.dubbo.rpc.cluster;
public interface LoadBalance {@Adaptive("loadbalance") <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; }
AbstractLoadBlance抽象類繼承自LoadBalance,其中有個static方法表明它在類加載的時候就會運行,它表示的含義是計算預熱加載權重,參數是uptime,這里可以理解為服務啟動的時間,warmup就是預熱時間,weight是權重的值,下面會對比進行詳細解釋:
public abstract class AbstractLoadBalance implements LoadBalance{ static int calculateWarmupWeight(int uptime, int warmup, int weight){ // } @Override public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation){ // } protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
protected int getWeight(Invoker<?> invoker, Invocation invocation) { } }
1.2.1:select方法
抽象類方法中有個有方法體的方法select,先判斷調用者組成的List是否為null,如果是null就返回null。再判斷調用者的大小,如果只有一個就返回那個唯一的調用者(試想,如果服務調用另一個服務時,當服務的提供者機器只有一個,那么就可以返回那一個,因為沒有選擇了!)如果這些都不成立,就繼續往下走,走doSelect方法:
@Override public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { if (invokers == null || invokers.isEmpty()) return null; if (invokers.size() == 1) return invokers.get(0); return doSelect(invokers, url, invocation); }
1.2.2:doSelect方法
該方法是抽象的,交給具體的子類去實現,由此也可以思考出一個問題就是:dubbo為什么要將一個接口首先做出一個實現抽象類,再由不同的子類去實現?原因是抽象類中的非抽象方法,再子類中都是必須要實現的,而他們子類的不同點就是具體做出選擇的策略不同,將公共的邏輯提取出來放在抽象類里,子類不用寫多余的代碼,只用維護和實現最終要的自己的邏輯
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
1.2.3:getWeight方法
顧名思義,這個方法的含義就是獲取權重,首先通過URL(URL為dubbo封裝的一個實體)獲取基本的權重,如果權重大於0,會獲取服務啟動時間,再用當前的時間-啟動時間就是服務到目前為止運行了多久,因此這個upTime就可以理解為服務啟動時間,再獲取配置的預熱時間,如果啟動時間小於預熱時間,就會再次調用獲取權重。這個預熱的方法其實dubbo針對JVM做出的一個很契合的優化,因為JVM從啟動到起來都運行到最佳狀態是需要一點時間的,這個時間叫做warmup,而dubbo就會對這個時間進行設定,然后等到服務運行時間和warmup相等時再計算權重,這樣就可以保障服務的最佳運行狀態!
protected int getWeight(Invoker<?> invoker, Invocation invocation) { int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); if (weight > 0) { long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L); if (timestamp > 0L) { int uptime = (int) (System.currentTimeMillis() - timestamp); int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP); if (uptime > 0 && uptime < warmup) { weight = calculateWarmupWeight(uptime, warmup, weight); } } } return weight; }
三:dubbo的幾種負載均衡策略
3.1:整體架構圖
可以看出抽象的負載均衡下的類分為4個,這4個類表示了4種負載均衡策略,分別是一致性Hash均衡算法、隨機調用法、輪詢法、最少活動調用法
3.2:RandomLoadBalance
隨機調用負載均衡,該類實現了抽象的AbstractLoadBalance接口,重寫了doSelect方法,看方法的細節就是首先遍歷每個提供服務的機器,獲取每個服務的權重,然后累加權重值,判斷每個服務的提供者權重是否相同,如果每個調用者的權重不相同,並且每個權重大於0,那么就會根據權重的總值生成一個隨機數,再用這個隨機數,根據調用者的數量每次減去調用者的權重,直到計算出當前的服務提供者隨機數小於0,就選擇那個提供者!另外,如果每個機器的權重的都相同,那么權重就不會參與計算,直接選擇隨機算法生成的某一個選擇,完全隨機。可以看出,隨機調用法,
public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; private final Random random = new Random(); @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // Number of invokers int totalWeight = 0; // The sum of weights boolean sameWeight = true; // Every invoker has the same weight? for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; // Sum if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offset = random.nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(random.nextInt(length)); }
3.3:RoundRobinLoadBlance
輪詢調用,輪詢調用的過程主要是維護了局部變量的一個LinkdesHashMap(有順序的Map)去存儲調用者和權重值的對應關系,然后遍歷每個調用者,把調用者和當前大於0的權重值放進去,再累加權重值。還有一個全局變量的map,找到第一個服務調用者,首先是找到每個服務的key值和method,這里可以理解為標識第一個調用者的唯一key,然后再給它對應的值保證原子性的+1(AtomicPositiveInteger是原子的),再對這個值取模總權重,再每次對其權重值-1,知道它取模與總權重值等於0就選擇該調用者,可以稱之為"降權取模"(只是一種的計算層面,而不是真正降權)。總結:輪詢調用並不是簡單的一個接着一個依次調用,它是根據權重的值進行循環的。
public class RoundRobinLoadBalance extends AbstractLoadBalance { public static final String NAME = "roundrobin"; private final ConcurrentMap<String, AtomicPositiveInteger> sequences = new ConcurrentHashMap<String, AtomicPositiveInteger>(); @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int length = invokers.size(); // Number of invokers int maxWeight = 0; // The maximum weight int minWeight = Integer.MAX_VALUE; // The minimum weight final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>(); int weightSum = 0; for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight minWeight = Math.min(minWeight, weight); // Choose the minimum weight if (weight > 0) { invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight)); weightSum += weight; } } AtomicPositiveInteger sequence = sequences.get(key); if (sequence == null) { sequences.putIfAbsent(key, new AtomicPositiveInteger()); sequence = sequences.get(key); } int currentSequence = sequence.getAndIncrement(); if (maxWeight > 0 && minWeight < maxWeight) { int mod = currentSequence % weightSum; for (int i = 0; i < maxWeight; i++) { for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) { final Invoker<T> k = each.getKey(); final IntegerWrapper v = each.getValue(); if (mod == 0 && v.getValue() > 0) { return k; } if (v.getValue() > 0) { v.decrement(); mod--; } } } } // Round robin return invokers.get(currentSequence % length); }
2.4:LeastActiveLoadBlance
最少活躍數調用法:這個方法的主要作用根據服務的提供者的運行狀態去選擇服務器,主要的思路就是遍歷每個調用者,然后獲取每個服務器的運行狀態,如果當前運行的運行狀態小於最小的狀態-1,把它保存在leastIndexs中的第一個位置,並且認定所有的調用者權重都相同,然后直接返回那個調用者(這里的邏輯是:找到最少活躍數(在代碼層反應就是:active的值))。如果計算出的權重值和最少的權重值相同,那么把它保存在leastIndexs數組里面,累加權重值,如果當前的權重值不等於初始值firstWeight,那么就認定不是所有的調用者的權重不同。然后再遍歷lestIndexs,取權重累加值的隨機數生成權重偏移量,在累減它,到它小於0的時候返回那個調用者。如果這些都不符合,就從leastIndexs隨機選一個index,返回那個調用者!
public class LeastActiveLoadBalance extends AbstractLoadBalance { public static final String NAME = "leastactive"; private final Random random = new Random(); @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // Number of invokers int leastActive = -1; // The least active value of all invokers int leastCount = 0; // The number of invokers having the same least active value (leastActive) int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive) int totalWeight = 0; // The sum of weights int firstWeight = 0; // Initial value, used for comparision boolean sameWeight = true; // Every invoker has the same weight value? for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value. leastActive = active; // Record the current least active value leastCount = 1; // Reset leastCount, count again based on current leastCount leastIndexs[0] = i; // Reset totalWeight = weight; // Reset firstWeight = weight; // Record the weight the first invoker sameWeight = true; // Reset, every invoker has the same weight value? } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating. leastIndexs[leastCount++] = i; // Record index number of this invoker totalWeight += weight; // Add this invoker's weight to totalWeight. // If every invoker has the same weight? if (sameWeight && i > 0 && weight != firstWeight) { sameWeight = false; } } } // assert(leastCount > 0) if (leastCount == 1) { // If we got exactly one invoker having the least active value, return this invoker directly. return invokers.get(leastIndexs[0]); } if (!sameWeight && totalWeight > 0) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offsetWeight = random.nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexs[i]; offsetWeight -= getWeight(invokers.get(leastIndex), invocation); if (offsetWeight <= 0) return invokers.get(leastIndex); } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(leastIndexs[random.nextInt(leastCount)]); } }
2.2.5:ConsistentHashLoadBalance
一致性Hash算法,doSelect方法進行選擇。一致性Hash負載均衡涉及到兩個主要的配置參數為hash.arguments與hash.nodes
public class ConsistentHashLoadBalance extends AbstractLoadBalance { private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>(); @SuppressWarnings("unchecked") @Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName(); int identityHashCode = System.identityHashCode(invokers); ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
//若選擇器不存在去創建 if (selector == null || selector.identityHashCode != identityHashCode) { selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode)); selector = (ConsistentHashSelector<T>) selectors.get(key); } return selector.select(invocation); }
//私有內部類 private static final class ConsistentHashSelector<T> { private final TreeMap<Long, Invoker<T>> virtualInvokers; private final int replicaNumber; private final int identityHashCode; private final int[] argumentIndex; ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) { this.virtualInvokers = new TreeMap<Long, Invoker<T>>(); this.identityHashCode = identityHashCode; URL url = invokers.get(0).getUrl(); this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160); String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0")); argumentIndex = new int[index.length]; for (int i = 0; i < index.length; i++) { argumentIndex[i] = Integer.parseInt(index[i]); } for (Invoker<T> invoker : invokers) { String address = invoker.getUrl().getAddress(); for (int i = 0; i < replicaNumber / 4; i++) { byte[] digest = md5(address + i); for (int h = 0; h < 4; h++) { long m = hash(digest, h);
//虛擬調用者 virtualInvokers.put(m, invoker); } } } } //選擇調用 public Invoker<T> select(Invocation invocation) { String key = toKey(invocation.getArguments()); byte[] digest = md5(key); return selectForKey(hash(digest, 0)); } //轉化為服務的key值 private String toKey(Object[] args) { StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); } // private Invoker<T> selectForKey(long hash) {
//從TreeMap中去尋找 Map.Entry<Long, Invoker<T>> entry = virtualInvokers.tailMap(hash, true).firstEntry(); if (entry == null) { entry = virtualInvokers.firstEntry(); } return entry.getValue(); } //計算Hash值 private long hash(byte[] digest, int number) { return (((long) (digest[3 + number * 4] & 0xFF) << 24) | ((long) (digest[2 + number * 4] & 0xFF) << 16) | ((long) (digest[1 + number * 4] & 0xFF) << 8) | (digest[number * 4] & 0xFF)) & 0xFFFFFFFFL; } //md5加密 private byte[] md5(String value) { MessageDigest md5; try { md5 = MessageDigest.getInstance("MD5"); } catch (NoSuchAlgorithmException e) { throw new IllegalStateException(e.getMessage(), e); } md5.reset(); byte[] bytes; try { bytes = value.getBytes("UTF-8"); } catch (UnsupportedEncodingException e) { throw new IllegalStateException(e.getMessage(), e); } md5.update(bytes); return md5.digest(); } } }
三:dubbo的默認負載均衡策略
3.1:由@SPI注解可以看到,dubbo默認的負載均衡策略是隨機調用法
3.2:如何改變dubbo的負載均衡策略?
3.2.1:如果是springboot項目,直接注解在@Reference中引用,然后注明loadblance="xx".其中xx為每個實現類中的name的值
3.2.2:xml配置的方式
<dubbo:serviceinterface="..."loadbalance="roundrobin"/>
四:總結
本篇博客講述了dubbo的負載均衡機制,其中可以看到除了一致性Hash算法,其它都是根據權重進行計算的,在實際的分布式應用中,理解dubbo如何與zookeeper進行通信選擇,如何實現負載均衡,如何維護服務的高可用性,理解負載均衡對於微服務的重要意義,將對於我們學習分布式的開發起着推波助瀾的作用。