IRule
IRule
AbstractloadBalancerRule
負載均衡策略抽象類 負責獲得負載均衡器 保存在內部 通過負載均衡器維護的信息 作為分配的依據
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware { private ILoadBalancer lb; @Override public void setLoadBalancer(ILoadBalancer lb){ this.lb = lb; } @Override public ILoadBalancer getLoadBalancer(){ return lb; } }
RandomRule
隨機選擇一個服務的策略
public class RandomRule extends AbstractLoadBalancerRule { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { if (Thread.interrupted()) { return null; } //通過負載均衡器獲得可用服務 List<Server> upList = lb.getReachableServers(); //通過負載均衡器獲得所有服務 List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); //沒有服務返回空 if (serverCount == 0) { /* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */ return null; } //通過ThreadLocalRandom.current().nextInt(serverCount); 獲得一個隨機數 int index = chooseRandomInt(serverCount); //獲得一個隨機的服務 server = upList.get(index); if (server == null) { /** * 線程讓步 將線程的cpu執行時間讓步出來 可以理解為本來是排隊有序的做一件事情 * 然后輪到那個人的時候他突然說 大家一起競賽吧 誰先搶到就是誰的 也包括自己 線程優先級越高 獲得的機率越大 */ Thread.yield(); continue; } //判斷服務是否有效 if (server.isAlive()) { return (server); } // Shouldn't actually happen.. but must be transient or a bug. server = null; Thread.yield(); } return server; }
RoundRobinRule
public class RoundRobinRule extends AbstractLoadBalancerRule { private AtomicInteger nextServerCyclicCounter; public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; } Server server = null; int count = 0; while (server == null && count++ < 10) { //獲得所有有效服務 List<Server> reachableServers = lb.getReachableServers(); //獲得所有服務 List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } //獲得線性輪訓 當前輪到的服務下標 int nextServerIndex = incrementAndGetModulo(serverCount); //去除服務 server = allServers.get(nextServerIndex); if (server == null) { //讓出cpu執行時間 Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } // Next. server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } private int incrementAndGetModulo(int modulo) { //死循環 for (;;) { //cas AtomicInteger 類 保證++的原子性 int current = nextServerCyclicCounter.get(); //線性輪訓算法 int next = (current + 1) % modulo; //compareAndSet的作用是防止多線程下還沒執行到這一句 current被修改 如果被修改返回false 重新開始 if (nextServerCyclicCounter.compareAndSet(current, next)) return next; } }
RetryRule
/** * 具有重試機制的Rule */ public class RetryRule extends AbstractLoadBalancerRule { //內部默認維護一個線性輪訓的Rule IRule subRule = new RoundRobinRule(); long maxRetryMillis = 500; public RetryRule(IRule subRule) { this.subRule = (subRule != null) ? subRule : new RoundRobinRule(); } public RetryRule(IRule subRule, long maxRetryMillis) { this.subRule = (subRule != null) ? subRule : new RoundRobinRule(); this.maxRetryMillis = (maxRetryMillis > 0) ? maxRetryMillis : 500; } public IRule getRule() { return subRule; } /** * 內部找到就返回 找不到就重試 * @param lb * @param key * @return */ public Server choose(ILoadBalancer lb, Object key) { long requestTime = System.currentTimeMillis(); //嘗試結束時間 maxRetryMillis閾值 可配置 long deadline = requestTime + maxRetryMillis; Server answer = null; answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis()); /** * new Tread().interrupt()給線程增加一個中斷標志 但是並不會影響線程執行 但是如果這個時候對線程執行sleep和 wait 底層會將中斷狀態重置為false並拋出異常InterruptedException 所以我們可以根據捕獲這個異常判斷線程是否中斷 * Thread.interrupted()判斷線程的中斷狀態 並重置線程的中斷狀態為false * new Tread().isInterrupted();僅僅判斷線程是否中斷不會重置 */ while (!Thread.interrupted()) { answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { /* pause and retry hoping it's transient */ Thread.yield(); } else { break; } } task.cancel(); } if ((answer == null) || (!answer.isAlive())) { return null; } else { return answer; } }
/** * RoundRobinRule的擴展 * 內部根據實例運行情況來進行權重 並根據權重挑選實例 */ public class WeightedResponseTimeRule extends RoundRobinRule { void initialize(ILoadBalancer lb) { if (this.serverWeightTimer != null) { this.serverWeightTimer.cancel(); } this.serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + this.name, true); //開啟一個定時任務為實例進行統計 用於計算權重 默認30秒執行一次 this.serverWeightTimer.schedule(new WeightedResponseTimeRule.DynamicServerWeightTask(), 0L, (long)this.serverWeightTaskTimerInterval); WeightedResponseTimeRule.ServerWeight sw = new WeightedResponseTimeRule.ServerWeight(); sw.maintainWeights(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { WeightedResponseTimeRule.logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + WeightedResponseTimeRule.this.name); WeightedResponseTimeRule.this.serverWeightTimer.cancel(); } })); } @SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"}) public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } else { Server server = null; while(server == null) { //獲得權重 List<Double> currentWeights = this.accumulatedWeights; if (Thread.interrupted()) { return null; } //獲得所有服務 List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int serverIndex = 0; //獲得最后一個權重 double maxTotalWeight = currentWeights.size() == 0 ? 0.0D : (Double)currentWeights.get(currentWeights.size() - 1); //如果權重大於0.01 if (maxTotalWeight >= 0.001D && serverCount == currentWeights.size()) { //通過隨機數計算一個權重 double randomWeight = this.random.nextDouble() * maxTotalWeight; int n = 0; for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) { Double d = (Double)var13.next(); //如果實例在那個權重區間 則定位此服務索引 if (d >= randomWeight) { serverIndex = n; break; } } //返回對應實例 server = (Server)allList.get(serverIndex); } else { //如果實例的權重小於0.0.1 則采用父類的線性輪訓算法 server = super.choose(this.getLoadBalancer(), key); if (server == null) { return server; } } if (server == null) { Thread.yield(); } else { if (server.isAlive()) { return server; } server = null; } } return server; } } void setWeights(List<Double> weights) { this.accumulatedWeights = weights; } public void initWithNiwsConfig(IClientConfig clientConfig) { super.initWithNiwsConfig(clientConfig); this.serverWeightTaskTimerInterval = (Integer)clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, 30000); } class ServerWeight { ServerWeight() { } public void maintainWeights() { ILoadBalancer lb = WeightedResponseTimeRule.this.getLoadBalancer(); if (lb != null) { if (WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) { try { WeightedResponseTimeRule.logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer)lb; //獲得統計信息 LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats != null) { //保存所有實例的的平均響應時間總和 double totalResponseTime = 0.0D; ServerStats ss; for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) { Server server = (Server)var6.next(); ss = stats.getSingleServerStat(server); } Double weightSoFar = 0.0D; //用於保存權重 下標對應實例在負載均衡器中的位置 List<Double> finalWeights = new ArrayList(); Iterator var20 = nlb.getAllServers().iterator(); while(var20.hasNext()) { Server serverx = (Server)var20.next(); //如果服務的狀態不再快照匯總 則這里加載 ServerStats ssx = stats.getSingleServerStat(serverx); //計算權重 平均響應時間總和-實例的響應平均響應時間+weightSoFar double weight = totalResponseTime - ssx.getResponseTimeAvg(); //每次都會累加 weightSoFar = weightSoFar + weight; //保存權重 finalWeights.add(weightSoFar); } WeightedResponseTimeRule.this.setWeights(finalWeights); return; } } catch (Exception var16) { WeightedResponseTimeRule.logger.error("Error calculating server weights", var16); return; } finally { WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.set(false); } } } } } //負責權重計算的定時任務 class DynamicServerWeightTask extends TimerTask { DynamicServerWeightTask() { } public void run() { WeightedResponseTimeRule.ServerWeight serverWeight = WeightedResponseTimeRule.this.new ServerWeight(); try { //計算權重 serverWeight.maintainWeights(); } catch (Exception var3) { WeightedResponseTimeRule.logger.error("Error running DynamicServerWeightTask for {}", WeightedResponseTimeRule.this.name, var3); } } } }
WeightedResponseTimeRule
ClientConfigEnabledRoundRobinRule
不怎么使用 也是線性輪訓 用於繼承擴展
public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule { RoundRobinRule roundRobinRule = new RoundRobinRule(); @Override public void initWithNiwsConfig(IClientConfig clientConfig) { roundRobinRule = new RoundRobinRule(); } @Override public void setLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); roundRobinRule.setLoadBalancer(lb); } @Override public Server choose(Object key) { if (roundRobinRule != null) { return roundRobinRule.choose(key); } else { throw new IllegalArgumentException( "This class has not been initialized with the RoundRobinRule class"); } } }
BestAvailableRule
選出最空閑的服務實例
/** *該策略是選擇 最空閑的那一個 */ public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule { private LoadBalancerStats loadBalancerStats; @Override public Server choose(Object key) { if (loadBalancerStats == null) { return super.choose(key); } //取得所有服務實例 List<Server> serverList = getLoadBalancer().getAllServers(); int minimalConcurrentConnections = Integer.MAX_VALUE; long currentTime = System.currentTimeMillis(); Server chosen = null; //遍歷所有服務實例 for (Server server: serverList) { ServerStats serverStats = loadBalancerStats.getSingleServerStat(server); if (!serverStats.isCircuitBreakerTripped(currentTime)) { int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); //取得最空閑的服務 if (concurrentConnections < minimalConcurrentConnections) { minimalConcurrentConnections = concurrentConnections; chosen = server; } } } //如果沒有找到 繼續延用父類的線性輪訓 if (chosen == null) { return super.choose(key); } else { return chosen; } } }
PredicateBasedRule
先過濾清單再輪訓
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule { //內部使用PredicateBasedRule 實現服務的過濾 public abstract AbstractServerPredicate getPredicate(); @Override public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); /** * 基於Predicate實現服務的過濾 * Predicate是Google Guava Collection的集合工具 * 可以幫助我們讓集合操作代碼更為簡短精練並大大增強代碼的可讀 性 */ Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } } }
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> { public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { if (loadBalancerKey == null) { return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate())); } else { List<Server> results = Lists.newArrayList(); for (Server server : servers) { //過濾服務 if (this.apply(new PredicateKey(loadBalancerKey, server))) { results.add(server); } } return results; } } public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers) { List<Server> eligible = getEligibleServers(servers); if (eligible.size() == 0) { return Optional.absent(); } return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); } }
AvailabilityFilteringRule
public class AvailabilityFilteringRule extends PredicateBasedRule { private AbstractServerPredicate predicate; public AvailabilityFilteringRule() { super(); //初始化 下面predicate.apply的比較策略 predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null)) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { //初始化下面 predicate.apply的比較策略 predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig)) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); } @Override public Server choose(Object key) { int count = 0; Server server = roundRobinRule.choose(key); while (count++ <= 10) { /** * 優化父類 先過濾再遍歷的額外開銷 * 一邊遍歷 判斷是否故障或者超過最大並發閥值 是否故障, 即斷路器是否生效已斷開。 * 實例的並發請求數大於闕值,默認值為 232 -1, 該配置可通過參數<clientName>. <nameSpace>.ActiveConnectionsLimit 來修改。 */ if (predicate.apply(new PredicateKey(server))) { return server; } server = roundRobinRule.choose(key); } return super.choose(key); } @Override public AbstractServerPredicate getPredicate() { return predicate; } }