在前兩篇《擼一擼Spring Cloud Ribbon的原理》,《擼一擼Spring Cloud Ribbon的原理-負載均衡器》中,整理了Ribbon如何通過負載均衡攔截器植入RestTemplate,以及調用負載均衡器獲取服務列表,如何過濾,如何更新等的處理過程。
因為,負載均衡器最終是調用負載均衡策略的choose方法來選擇一個服務,所以這一篇,整理Ribbon的負載均衡策略。
策略類
- RandomRule
- RoundRobinRule
- RetryRule
- WeightedResponseTimeRule
- ClientConfigEnabledRoundRobinRule
- BestAvailableRule
- PredicateBasedRule
- AvailabilityFilteringRule
- ZoneAvoidanceRule
他們的位置在:
ribbon-loadbalancer-2.2.2.jar
com.netflix.loadbalancer
類繼承關系
RandomRule
隨機選取負載均衡策略。
choose方法中,通過隨機Random對象,在所有服務實例數量中隨機找一個服務的索引號,然后從上線的服務中獲取對應的服務。
這時候,很可能會有不在線的服務,就有可能從上線的服務中獲取不到,那么休息會兒再獲取知道隨機獲取到一個上線的服務為止。
public class RandomRule extends AbstractLoadBalancerRule { Random rand; public RandomRule() { rand = new Random(); } @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { if (Thread.interrupted()) { return null; } List<Server> upList = lb.getReachableServers(); List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { /* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */ return null; } int index = rand.nextInt(serverCount); server = upList.get(index); if (server == null) { /* * The only time this should happen is if the server list were * somehow trimmed. This is a transient condition. Retry after * yielding. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Shouldn't actually happen.. but must be transient or a bug. server = null; Thread.yield(); } return server; } //略 }
RoundRobinRule
線性輪詢負載均衡策略。
choose方法中,通過incrementAndGetModulo方法以線性輪詢方式獲取服務。
在incrementAndGetModulo中,實際上在類中維護了一個原子性的nextServerCyclicCounter成員變量作為當前服務的索引號,每次在所有服務數量的限制下,就是將服務的索引號加1,到達服務數量限制時再從頭開始。
public class RoundRobinRule extends AbstractLoadBalancerRule { private AtomicInteger nextServerCyclicCounter; // 略 public RoundRobinRule() { nextServerCyclicCounter = new AtomicInteger(0); } public RoundRobinRule(ILoadBalancer lb) { this(); setLoadBalancer(lb); } public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextServerCyclicCounter.get(); int next = (current + 1) % modulo; if (nextServerCyclicCounter.compareAndSet(current, next)) return next; } } // 略 }
WeightedResponseTimeRule
響應時間作為選取權重的負載均衡策略。
其含義就是,響應時間越短的服務被選中的可能性大。
繼承自RoundRobinRule類。
public class WeightedResponseTimeRule extends RoundRobinRule { // 略 // holds the accumulated weight from index 0 to current index // for example, element at index 2 holds the sum of weight of servers from 0 to 2 private volatile List<Double> accumulatedWeights = new ArrayList<Double>(); // 略 void initialize(ILoadBalancer lb) { if (serverWeightTimer != null) { serverWeightTimer.cancel(); } serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + name, true); serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval); // do a initial run ServerWeight sw = new ServerWeight(); sw.maintainWeights(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { logger .info("Stopping NFLoadBalancer-serverWeightTimer-" + name); serverWeightTimer.cancel(); } })); } // 略 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") @Override public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { // get hold of the current reference in case it is changed from the other thread List<Double> currentWeights = accumulatedWeights; if (Thread.interrupted()) { return null; } List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int serverIndex = 0; // last one in the list is the sum of all weights double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); // No server has been hit yet and total weight is not initialized // fallback to use round robin if (maxTotalWeight < 0.001d) { server = super.choose(getLoadBalancer(), key); if(server == null) { return server; } } else { // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive) double randomWeight = random.nextDouble() * maxTotalWeight; // pick the server index based on the randomIndex int n = 0; for (Double d : currentWeights) { if (d >= randomWeight) { serverIndex = n; break; } else { n++; } } server = allList.get(serverIndex); } if (server == null) { /* Transient. */ Thread.yield(); continue; } if (server.isAlive()) { return (server); } // Next. server = null; } return server; } class DynamicServerWeightTask extends TimerTask { public void run() { ServerWeight serverWeight = new ServerWeight(); try { serverWeight.maintainWeights(); } catch (Exception e) { logger.error("Error running DynamicServerWeightTask for {}", name, e); } } } class ServerWeight { public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; } if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) { return; } try { logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats == null) { // no statistics, nothing to do return; } double totalResponseTime = 0; // find maximal 95% response time for (Server server : nlb.getAllServers()) { // this will automatically load the stats if not in cache ServerStats ss = stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } // weight for each server is (sum of responseTime of all servers - responseTime) // so that the longer the response time, the less the weight and the less likely to be chosen Double weightSoFar = 0.0; // create new list and hot swap the reference List<Double> finalWeights = new ArrayList<Double>(); for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); double weight = totalResponseTime - ss.getResponseTimeAvg(); weightSoFar += weight; finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error calculating server weights", e); } finally { serverWeightAssignmentInProgress.set(false); } } } void setWeights(List<Double> weights) { this.accumulatedWeights = weights; } // 略 }
既然是按照響應時間權重來選擇服務,那么先整理一下權重算法是怎么做的。
觀察initialize方法,啟動了定時器定時執行DynamicServerWeightTask的run來調用計算服務權重,計算權重是通過內部類ServerWeight的maintainWeights方法來進行。
整理一下maintainWeights方法的邏輯,里面有兩個for循環,第一個for循環拿到所有服務的總響應時間,第二個for循環計算每個服務的權重以及總權重。
第一個for循環。
假設有4個服務,每個服務的響應時間(ms):
A: 200
B: 500
C: 30
D: 1200
總響應時間:
200+500+30+1200=1930ms
接下來第二個for循環,計算每個服務的權重。
服務的權重=總響應時間-服務自身的響應時間:
A: 1930-200=1730
B: 1930-500=1430
C: 1930-30=1900
D: 1930-1200=730
總權重:
1730+1430+1900+730=5790
響應時間及權重計算結果示意圖:
結果就是響應時間越短的服務,它的權重就越大。
再看一下choose方法。
重點在while循環的第3個if這里。
首先如果判定沒有服務或者權重還沒計算出來時,會采用父類RoundRobinRule以線性輪詢的方式選擇服務器。
有服務,有權重計算結果后,就是以總權重值為限制,拿到一個隨機數,然后看隨機數落到哪個區間,就選擇對應的服務。
所以選取服務的結論就是:
響應時間越短的服務,它的權重就越大,被選中的可能性就越大。
RetryRule
這個策略默認就是用RoundRobinRule策略選取服務,當然可以通過配置,在構造RetryRule的時候傳進想要的策略。
為了應對在有可能出現無法選取出服務的情況,比如瞬時斷線的情況,那么就要提供一種重試機制,在最大重試時間的限定下重復嘗試選取服務,直到選取出一個服務或者超時。
最大重試時間maxRetryMillis是可配置的。
public class RetryRule extends AbstractLoadBalancerRule { IRule subRule = new RoundRobinRule(); long maxRetryMillis = 500; public RetryRule() { } public RetryRule(IRule subRule) { this.subRule = (subRule != null) ? subRule : new RoundRobinRule(); } public RetryRule(IRule subRule, long maxRetryMillis) { this.subRule = (subRule != null) ? subRule : new RoundRobinRule(); this.maxRetryMillis = (maxRetryMillis > 0) ? maxRetryMillis : 500; } // 略 public void setMaxRetryMillis(long maxRetryMillis) { if (maxRetryMillis > 0) { this.maxRetryMillis = maxRetryMillis; } else { this.maxRetryMillis = 500; } } // 略 /* * Loop if necessary. Note that the time CAN be exceeded depending on the * subRule, because we're not spawning additional threads and returning * early. */ public Server choose(ILoadBalancer lb, Object key) { long requestTime = System.currentTimeMillis(); long deadline = requestTime + maxRetryMillis; Server answer = null; answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis()); while (!Thread.interrupted()) { answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { /* pause and retry hoping it's transient */ Thread.yield(); } else { break; } } task.cancel(); } if ((answer == null) || (!answer.isAlive())) { return null; } else { return answer; } } // 略 }
ClientConfigEnabledRoundRobinRule
該策略本身很簡單,就是默認通過RoundRobinRule策略選取服務,策略還不能自定義。
這個策略的的目的就是通過繼承該類,並且通過對choose的重寫,來實現更多的功能,繼承后保底是有RoundRobinRule策略。
public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule { RoundRobinRule roundRobinRule = new RoundRobinRule(); @Override public void initWithNiwsConfig(IClientConfig clientConfig) { roundRobinRule = new RoundRobinRule(); } @Override public void setLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); roundRobinRule.setLoadBalancer(lb); } @Override public Server choose(Object key) { if (roundRobinRule != null) { return roundRobinRule.choose(key); } else { throw new IllegalArgumentException( "This class has not been initialized with the RoundRobinRule class"); } } }
BestAvailableRule
繼承自上面的ClientConfigEnabledRoundRobinRule,重寫choose方法。
從所有的服務中,從那些沒有斷開的服務中,找到,到目前為止,請求數量最小的服務。
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule { private LoadBalancerStats loadBalancerStats; @Override public Server choose(Object key) { if (loadBalancerStats == null) { return super.choose(key); } List<Server> serverList = getLoadBalancer().getAllServers(); int minimalConcurrentConnections = Integer.MAX_VALUE; long currentTime = System.currentTimeMillis(); Server chosen = null; for (Server server: serverList) { ServerStats serverStats = loadBalancerStats.getSingleServerStat(server); if (!serverStats.isCircuitBreakerTripped(currentTime)) { int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); if (concurrentConnections < minimalConcurrentConnections) { minimalConcurrentConnections = concurrentConnections; chosen = server; } } } if (chosen == null) { return super.choose(key); } else { return chosen; } } @Override public void setLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); if (lb instanceof AbstractLoadBalancer) { loadBalancerStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats(); } } }
PredicateBasedRule
這是一個抽象類,提供一個choose的模板,通過調用AbstractServerPredicate實現類的過濾方法來過濾出目標的服務,再通過輪詢方法選出一個服務。
PredicateBasedRule的子類ZoneAvoidanceRule使用了該模板選擇服務。
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule { public abstract AbstractServerPredicate getPredicate(); @Override public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } } }
AvailabilityFilteringRule
按可用性進行過濾服務的負載均衡策略。
繼承自PredicateBasedRule,但是重寫了choose方法。
public class AvailabilityFilteringRule extends PredicateBasedRule { private AbstractServerPredicate predicate; // 略
@Override public Server choose(Object key) { int count = 0; Server server = roundRobinRule.choose(key); while (count++ <= 10) { if (predicate.apply(new PredicateKey(server))) { return server; } server = roundRobinRule.choose(key); } return super.choose(key); } // 略 }
先用線性輪詢策略選出一個服務,通過predicate對象判斷是否符合可用性要求,符合的就作為目標服務。這里的做法就是先選擇后過濾。
如果嘗試了十次都沒有可用的服務,調用父類PredicateBasedRule的choose。
在構造函數中會看到AvailabilityFilteringRule用的是AvailabilityPredicate對象對服務進行判定。
public class AvailabilityFilteringRule extends PredicateBasedRule { private AbstractServerPredicate predicate; public AvailabilityFilteringRule() { super(); predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null)) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig)) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); } // 略 }
在AvailabilityPredicate中,通過shouldSkipServer函數判定服務狀態。
public class AvailabilityPredicate extends AbstractServerPredicate { // 略 @Override public boolean apply(@Nullable PredicateKey input) { LoadBalancerStats stats = getLBStats(); if (stats == null) { return true; } return !shouldSkipServer(stats.getSingleServerStat(input.getServer())); } private boolean shouldSkipServer(ServerStats stats) { if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true; } return false; } }
判定規則:
以下兩項有一項成立,就表示該服務不可用,不能使用該服務。
1.
配置項niws.loadbalancer.availabilityFilteringRule.filterCircuitTripped為true(未配置則默認為true),並且已經觸發斷路。
2.
服務的活動請求數 > 配置項niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit(未配則默認為Interge.MAX_VALUE)。
在AvailabilityFilteringRule的choose中無法選出服務的情況下,會調用父類PredicateBasedRule的choose,PredicateBasedRule采用先過濾后線性輪行方法選擇服務,不過,用來判定的predicate還是AvailabilityPredicate,所以過濾用的判定規則和上面是一樣的。
ZoneAvoidanceRule
可以發現在ZoneAvoidanceRule本身並沒有重寫choose方法,用的還是抽象父類PredicateBasedRule的choose。
上面記錄過,PredicateBasedRule的choose本身使用的是predicate進行過濾。
看一下ZoneAvoidanceRule定義的predicate。
public class ZoneAvoidanceRule extends PredicateBasedRule { // 略 private CompositePredicate compositePredicate; public ZoneAvoidanceRule() { super(); ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this); AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this); compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); } private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) { return CompositePredicate.withPredicates(p1, p2) .addFallbackPredicate(p2) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig); AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig); compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); } // 略 @Override public AbstractServerPredicate getPredicate() { return compositePredicate; } }
在這里的predicate主要用了兩個,ZoneAvoidancePredicate,AvailabilityPredicate,都被復合進了CompositePredicate。
PredicateBasedRule的choose會去調用CompositePredicate的getEligibleServers來獲取符合要求的服務。
而在getEligibleServers內部會對所有服務逐一調用復合進來的predicate(ZoneAvoidancePredicate,AvailabilityPredicate)的getEligibleServers。
public class CompositePredicate extends AbstractServerPredicate { private AbstractServerPredicate delegate; private List<AbstractServerPredicate> fallbacks = Lists.newArrayList(); private int minimalFilteredServers = 1; private float minimalFilteredPercentage = 0; @Override public boolean apply(@Nullable PredicateKey input) { return delegate.apply(input); } // 略 /** * Get the filtered servers from primary predicate, and if the number of the filtered servers * are not enough, trying the fallback predicates */ @Override public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { List<Server> result = super.getEligibleServers(servers, loadBalancerKey); Iterator<AbstractServerPredicate> i = fallbacks.iterator(); while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage)) && i.hasNext()) { AbstractServerPredicate predicate = i.next(); result = predicate.getEligibleServers(servers, loadBalancerKey); } return result; } }
調用的復合進來的predicate的getEligibleServers方法是定義在抽象類AbstractServerPredicate中。
可以看到,最終還是調用各predicate的apply方法進行過濾。
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> { protected IRule rule; private volatile LoadBalancerStats lbStats; private final Random random = new Random(); private final AtomicInteger nextIndex = new AtomicInteger(); // 略 /** * Get servers filtered by this predicate from list of servers. Load balancer key * is presumed to be null. * * @see #getEligibleServers(List, Object) * */ public List<Server> getEligibleServers(List<Server> servers) { return getEligibleServers(servers, null); } /** * Get servers filtered by this predicate from list of servers. */ public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { if (loadBalancerKey == null) { return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate())); } else { List<Server> results = Lists.newArrayList(); for (Server server: servers) { if (this.apply(new PredicateKey(loadBalancerKey, server))) { results.add(server); } } return results; } } // 略 }
AvailabilityPredicate過濾時的判定規則在上面的AvailabilityFilteringRule中已經寫過。
現在主要看一下ZoneAvoidancePredicate的Apply方法。
public class ZoneAvoidancePredicate extends AbstractServerPredicate { // 略 @Override public boolean apply(@Nullable PredicateKey input) { if (!ENABLED.get()) { return true; } String serverZone = input.getServer().getZone(); if (serverZone == null) { // there is no zone information from the server, we do not want to filter // out this server return true; } LoadBalancerStats lbStats = getLBStats(); if (lbStats == null) { // no stats available, do not filter return true; } if (lbStats.getAvailableZones().size() <= 1) { // only one zone is available, do not filter return true; } Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats); if (!zoneSnapshot.keySet().contains(serverZone)) { // The server zone is unknown to the load balancer, do not filter it out return true; } logger.debug("Zone snapshots: {}", zoneSnapshot); Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()); logger.debug("Available zones: {}", availableZones); if (availableZones != null) { return availableZones.contains(input.getServer().getZone()); } else { return false; } } }
比較清楚的是,這里獲取了可用區域,看一下每個服務是否都在可用區中,在的就作為符合ZoneAvoidancePredicate要求的服務。
獲得可用區域可以看一下ZoneAvoidanceRule的getAvailableZones方法。
End