Spring Cloud-Ribbon負載均衡策略類IRule(五)


IRule

IRule

 

AbstractloadBalancerRule

負載均衡策略抽象類 負責獲得負載均衡器 保存在內部 通過負載均衡器維護的信息 作為分配的依據

public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {

    private ILoadBalancer lb;
        
    @Override
    public void setLoadBalancer(ILoadBalancer lb){
        this.lb = lb;
    }
    
    @Override
    public ILoadBalancer getLoadBalancer(){
        return lb;
    }      
}

 

RandomRule

隨機選擇一個服務的策略

public class RandomRule extends AbstractLoadBalancerRule {

    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        }
        Server server = null;

        while (server == null) {
            if (Thread.interrupted()) {
                return null;
            }
            //通過負載均衡器獲得可用服務
            List<Server> upList = lb.getReachableServers();
            //通過負載均衡器獲得所有服務
            List<Server> allList = lb.getAllServers();

            int serverCount = allList.size();
            //沒有服務返回空
            if (serverCount == 0) {
                /*
                 * No servers. End regardless of pass, because subsequent passes
                 * only get more restrictive.
                 */
                return null;
            }
             //通過ThreadLocalRandom.current().nextInt(serverCount); 獲得一個隨機數
            int index = chooseRandomInt(serverCount);
            //獲得一個隨機的服務
            server = upList.get(index);

            if (server == null) {
                /**
                 * 線程讓步  將線程的cpu執行時間讓步出來  可以理解為本來是排隊有序的做一件事情
                 * 然后輪到那個人的時候他突然說 大家一起競賽吧 誰先搶到就是誰的  也包括自己 線程優先級越高 獲得的機率越大
                 */
                Thread.yield();
                continue;
            }
            //判斷服務是否有效
            if (server.isAlive()) {
                return (server);
            }

            // Shouldn't actually happen.. but must be transient or a bug.
            server = null;
            Thread.yield();
        }

        return server;

    }

 

RoundRobinRule

public class RoundRobinRule extends AbstractLoadBalancerRule {

    private AtomicInteger nextServerCyclicCounter;
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        int count = 0;
        while (server == null && count++ < 10) {
            //獲得所有有效服務
            List<Server> reachableServers = lb.getReachableServers();
            //獲得所有服務
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();

            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }

            //獲得線性輪訓 當前輪到的服務下標
            int nextServerIndex = incrementAndGetModulo(serverCount);
            //去除服務
            server = allServers.get(nextServerIndex);

            if (server == null) {
               //讓出cpu執行時間
                Thread.yield();
                continue;
            }

            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }

            // Next.
            server = null;
        }

        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }
    private int incrementAndGetModulo(int modulo) {
        //死循環
        for (;;) {
            //cas AtomicInteger 類 保證++的原子性
            int current = nextServerCyclicCounter.get();
            //線性輪訓算法 
            int next = (current + 1) % modulo;
            //compareAndSet的作用是防止多線程下還沒執行到這一句 current被修改   如果被修改返回false 重新開始
            if (nextServerCyclicCounter.compareAndSet(current, next))
                return next;
        }
    }

RetryRule

/**
 * 具有重試機制的Rule
 */
public class RetryRule extends AbstractLoadBalancerRule {
    //內部默認維護一個線性輪訓的Rule
    IRule subRule = new RoundRobinRule();
    long maxRetryMillis = 500;

    public RetryRule(IRule subRule) {
        this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
    }

    public RetryRule(IRule subRule, long maxRetryMillis) {
        this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
        this.maxRetryMillis = (maxRetryMillis > 0) ? maxRetryMillis : 500;
    }
    public IRule getRule() {
        return subRule;
    }

    /**
     * 內部找到就返回 找不到就重試
     * @param lb
     * @param key
     * @return
     */
    public Server choose(ILoadBalancer lb, Object key) {
        long requestTime = System.currentTimeMillis();
        //嘗試結束時間  maxRetryMillis閾值 可配置
        long deadline = requestTime + maxRetryMillis;

        Server answer = null;

        answer = subRule.choose(key);

        if (((answer == null) || (!answer.isAlive()))
                && (System.currentTimeMillis() < deadline)) {

            InterruptTask task = new InterruptTask(deadline
                    - System.currentTimeMillis());
            /**
             *   new Tread().interrupt()給線程增加一個中斷標志 但是並不會影響線程執行  但是如果這個時候對線程執行sleep和 wait 底層會將中斷狀態重置為false並拋出異常InterruptedException 所以我們可以根據捕獲這個異常判斷線程是否中斷
             *   Thread.interrupted()判斷線程的中斷狀態 並重置線程的中斷狀態為false
             *   new Tread().isInterrupted();僅僅判斷線程是否中斷不會重置
             */

            while (!Thread.interrupted()) {
                answer = subRule.choose(key);

                if (((answer == null) || (!answer.isAlive()))
                        && (System.currentTimeMillis() < deadline)) {
                    /* pause and retry hoping it's transient */
                    Thread.yield();
                } else {
                    break;
                }
            }

            task.cancel();
        }

        if ((answer == null) || (!answer.isAlive())) {
            return null;
        } else {
            return answer;
        }
    }

 

/**
 * RoundRobinRule的擴展
 * 內部根據實例運行情況來進行權重 並根據權重挑選實例
 */
public class WeightedResponseTimeRule extends RoundRobinRule {
    

    void initialize(ILoadBalancer lb) {
        if (this.serverWeightTimer != null) {
            this.serverWeightTimer.cancel();
        }

        this.serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + this.name, true);
        //開啟一個定時任務為實例進行統計 用於計算權重  默認30秒執行一次
        this.serverWeightTimer.schedule(new WeightedResponseTimeRule.DynamicServerWeightTask(), 0L, (long)this.serverWeightTaskTimerInterval);
        WeightedResponseTimeRule.ServerWeight sw = new WeightedResponseTimeRule.ServerWeight();
        sw.maintainWeights();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                WeightedResponseTimeRule.logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + WeightedResponseTimeRule.this.name);
                WeightedResponseTimeRule.this.serverWeightTimer.cancel();
            }
        }));
    }

    @SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        } else {
            Server server = null;

            while(server == null) {
                //獲得權重
                List<Double> currentWeights = this.accumulatedWeights;
                if (Thread.interrupted()) {
                    return null;
                }
                //獲得所有服務
                List<Server> allList = lb.getAllServers();
                int serverCount = allList.size();
                if (serverCount == 0) {
                    return null;
                }

                int serverIndex = 0;
                //獲得最后一個權重
                double maxTotalWeight = currentWeights.size() == 0 ? 0.0D : (Double)currentWeights.get(currentWeights.size() - 1);

                //如果權重大於0.01
                if (maxTotalWeight >= 0.001D && serverCount == currentWeights.size()) {
                    //通過隨機數計算一個權重
                    double randomWeight = this.random.nextDouble() * maxTotalWeight;
                    int n = 0;

                    for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) {
                        Double d = (Double)var13.next();
                        //如果實例在那個權重區間 則定位此服務索引
                        if (d >= randomWeight) {
                            serverIndex = n;
                            break;
                        }
                    }
                    //返回對應實例
                    server = (Server)allList.get(serverIndex);
                } else {
                    //如果實例的權重小於0.0.1 則采用父類的線性輪訓算法
                    server = super.choose(this.getLoadBalancer(), key);
                    if (server == null) {
                        return server;
                    }
                }

                if (server == null) {
                    Thread.yield();
                } else {
                    if (server.isAlive()) {
                        return server;
                    }

                    server = null;
                }
            }

            return server;
        }
    }

    void setWeights(List<Double> weights) {
        this.accumulatedWeights = weights;
    }

    public void initWithNiwsConfig(IClientConfig clientConfig) {
        super.initWithNiwsConfig(clientConfig);
        this.serverWeightTaskTimerInterval = (Integer)clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, 30000);
    }

    class ServerWeight {
        ServerWeight() {
        }

        public void maintainWeights() {
            ILoadBalancer lb = WeightedResponseTimeRule.this.getLoadBalancer();
            if (lb != null) {
                if (WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) {
                    try {
                        WeightedResponseTimeRule.logger.info("Weight adjusting job started");
                        AbstractLoadBalancer nlb = (AbstractLoadBalancer)lb;
                        //獲得統計信息
                        LoadBalancerStats stats = nlb.getLoadBalancerStats();
                        if (stats != null) {
                            //保存所有實例的的平均響應時間總和
                            double totalResponseTime = 0.0D;

                            ServerStats ss;
                            for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) {
                                Server server = (Server)var6.next();
                                ss = stats.getSingleServerStat(server);
                            }

                            Double weightSoFar = 0.0D;
                            //用於保存權重  下標對應實例在負載均衡器中的位置
                            List<Double> finalWeights = new ArrayList();
                            Iterator var20 = nlb.getAllServers().iterator();

                            while(var20.hasNext()) {
                                Server serverx = (Server)var20.next();
                                //如果服務的狀態不再快照匯總 則這里加載
                                ServerStats ssx = stats.getSingleServerStat(serverx);
                                //計算權重 平均響應時間總和-實例的響應平均響應時間+weightSoFar
                                double weight = totalResponseTime - ssx.getResponseTimeAvg();
                                //每次都會累加
                                weightSoFar = weightSoFar + weight;
                                //保存權重
                                finalWeights.add(weightSoFar);
                            }

                            WeightedResponseTimeRule.this.setWeights(finalWeights);
                            return;
                        }
                    } catch (Exception var16) {
                        WeightedResponseTimeRule.logger.error("Error calculating server weights", var16);
                        return;
                    } finally {
                        WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.set(false);
                    }

                }
            }
        }
    }
    //負責權重計算的定時任務
    class DynamicServerWeightTask extends TimerTask {
        DynamicServerWeightTask() {
        }

        public void run() {
            WeightedResponseTimeRule.ServerWeight serverWeight = WeightedResponseTimeRule.this.new ServerWeight();

            try {
                //計算權重
                serverWeight.maintainWeights();
            } catch (Exception var3) {
                WeightedResponseTimeRule.logger.error("Error running DynamicServerWeightTask for {}", WeightedResponseTimeRule.this.name, var3);
            }

        }
    }
}

WeightedResponseTimeRule

 

 

ClientConfigEnabledRoundRobinRule

不怎么使用 也是線性輪訓  用於繼承擴展

public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {

    RoundRobinRule roundRobinRule = new RoundRobinRule();

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        roundRobinRule = new RoundRobinRule();
    }

    @Override
    public void setLoadBalancer(ILoadBalancer lb) {
        super.setLoadBalancer(lb);
        roundRobinRule.setLoadBalancer(lb);
    }
    
    @Override
    public Server choose(Object key) {
        if (roundRobinRule != null) {
            return roundRobinRule.choose(key);
        } else {
            throw new IllegalArgumentException(
                    "This class has not been initialized with the RoundRobinRule class");
        }
    }

}

BestAvailableRule

選出最空閑的服務實例

/**
 *該策略是選擇 最空閑的那一個
 */
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {

    private LoadBalancerStats loadBalancerStats;

    @Override
    public Server choose(Object key) {
        if (loadBalancerStats == null) {
            return super.choose(key);
        }
        //取得所有服務實例
        List<Server> serverList = getLoadBalancer().getAllServers();
        int minimalConcurrentConnections = Integer.MAX_VALUE;
        long currentTime = System.currentTimeMillis();
        Server chosen = null;
        //遍歷所有服務實例
        for (Server server: serverList) {
            ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
            if (!serverStats.isCircuitBreakerTripped(currentTime)) {
                int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
                //取得最空閑的服務
                if (concurrentConnections < minimalConcurrentConnections) {
                    minimalConcurrentConnections = concurrentConnections;
                    chosen = server;
                }
            }
        }
        //如果沒有找到 繼續延用父類的線性輪訓
        if (chosen == null) {
            return super.choose(key);
        } else {
            return chosen;
        }
    }


}

PredicateBasedRule

先過濾清單再輪訓

public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
    //內部使用PredicateBasedRule 實現服務的過濾
    public abstract AbstractServerPredicate getPredicate();
    
    @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        /**
         * 基於Predicate實現服務的過濾
         * Predicate是Google Guava Collection的集合工具
         * 可以幫助我們讓集合操作代碼更為簡短精練並大大增強代碼的可讀 性
         */
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }
    }
}
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {
    public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
        if (loadBalancerKey == null) {
            return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
        } else {
            List<Server> results = Lists.newArrayList();
            for (Server server : servers) {
                //過濾服務
                if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                    results.add(server);
                }
            }
            return results;
        }
    }

    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers) {
        List<Server> eligible = getEligibleServers(servers);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }


}

AvailabilityFilteringRule

public class AvailabilityFilteringRule extends PredicateBasedRule {
    private AbstractServerPredicate predicate;
    public AvailabilityFilteringRule() {
        super();
        //初始化 下面predicate.apply的比較策略
        predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
                .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                .build();
    }
    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        //初始化下面 predicate.apply的比較策略
        predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig))
                .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                .build();
    }
    @Override
    public Server choose(Object key) {
        int count = 0;
        Server server = roundRobinRule.choose(key);
        while (count++ <= 10) {
            /**
             * 優化父類 先過濾再遍歷的額外開銷
             * 一邊遍歷 判斷是否故障或者超過最大並發閥值 是否故障, 即斷路器是否生效已斷開。
             * 實例的並發請求數大於闕值,默認值為 232 -1, 該配置可通過參數<clientName>. <nameSpace>.ActiveConnectionsLimit 來修改。
             */
            if (predicate.apply(new PredicateKey(server))) {
                return server;
            }
            server = roundRobinRule.choose(key);
        }
        return super.choose(key);
    }

    @Override
    public AbstractServerPredicate getPredicate() {
        return predicate;
    }
}

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM