Ribbon提供的負載均衡算法IRule(四)


 

 

 

 

一、Ribbon算法的介紹

 

Ribbon的源碼地址:https://github.com/Netflix/ribbon

IRule:根據特定算法中從服務器列表中選取一個要訪問的服務,Ribbon默認的算法為ZoneAvoidanceRule;

Ribbon中的7中負載均衡算法:

(1)RoundRobinRule:輪詢;

(2)RandomRule:隨機;

(3)AvailabilityFilteringRule:會先過濾掉由於多次訪問故障而處於斷路器狀態的服務,還有並發的連接數量超過閾值的服務,然后對剩余的服務列表按照輪詢策略進行訪問;

(4)WeightedResponseTimeRule:根據平均響應時間計算所有服務的權重,響應時間越快的服務權重越大被選中的概率越大。剛啟動時如果統計信息不足,則使用RoundRobinRule(輪詢)策略,等統計信息足夠,會切換到WeightedResponseTimeRule;

(5)RetryRule:先按照RoundRobinRule(輪詢)策略獲取服務,如果獲取服務失敗則在指定時間內進行重試,獲取可用的服務;

(6)BestAvailableRule:會先過濾掉由於多次訪問故障而處於斷路器跳閘狀態的服務,然后選擇一個並發量最小的服務;

(7)ZoneAvoidanceRule:復合判斷Server所在區域的性能和Server的可用性選擇服務器,在沒有Zone的情況下是類似輪詢的算法;

ribbion的負載均衡算法結構:

 

 

二、配置指定的負載均衡算法

1、打開消費者工程,增加如下的配置

@Configuration
public class ConfigBean
{
    @Bean
    @LoadBalanced //Ribbon 是客戶端負載均衡的工具;
    public RestTemplate getRestTemplate()
    {
        return new RestTemplate();
    }

    //配置負載均衡的策略為隨機,默認算法為輪詢算法
    @Bean
    public IRule myRule()
    {
        //return new RoundRobinRule();
        return new RandomRule();  
    }      
}

 2、啟動類增加 @EnableEurekaClient 注解

@SpringBootApplication
@EnableEurekaClient     //本服務啟動后自動注冊到eureka中(如果用了注冊中心記得加)
public class DeptProvider8001_App
{
    public static void main(String[] args)
    {
        SpringApplication.run(DeptProvider8001_App.class, args);
    }
}

3、然后重啟這個消費者服務,訪問;可以查看到隨機訪問生產者服務。

三、RetryRule(重試)

//具備重試機制的實例選擇功能
public class RetryRule extends AbstractLoadBalancerRule {
    //默認使用RoundRobinRule實例
    IRule subRule = new RoundRobinRule();
    //閾值為500ms
    long maxRetryMillis = 500;
 
    public RetryRule() {
    }
 
    public RetryRule(IRule subRule) {
        this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
    }
 
    public RetryRule(IRule subRule, long maxRetryMillis) {
        this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
        this.maxRetryMillis = (maxRetryMillis > 0) ? maxRetryMillis : 500;
    }
 
    public void setRule(IRule subRule) {
        this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
    }
 
    public IRule getRule() {
        return subRule;
    }
 
    public void setMaxRetryMillis(long maxRetryMillis) {
        if (maxRetryMillis > 0) {
            this.maxRetryMillis = maxRetryMillis;
        } else {
            this.maxRetryMillis = 500;
        }
    }
 
    public long getMaxRetryMillis() {
        return maxRetryMillis;
    }
 
    
    
    @Override
    public void setLoadBalancer(ILoadBalancer lb) {        
        super.setLoadBalancer(lb);
        subRule.setLoadBalancer(lb);
    }
 
 
    public Server choose(ILoadBalancer lb, Object key) {
        long requestTime = System.currentTimeMillis();
        long deadline = requestTime + maxRetryMillis;
 
        Server answer = null;
 
        answer = subRule.choose(key);
 
        if (((answer == null) || (!answer.isAlive()))
                && (System.currentTimeMillis() < deadline)) {
 
            InterruptTask task = new InterruptTask(deadline
                    - System.currentTimeMillis());
            //反復重試
            while (!Thread.interrupted()) {
                //選擇實例
                answer = subRule.choose(key);
                //500ms內沒選擇到就返回null
                if (((answer == null) || (!answer.isAlive()))
                        && (System.currentTimeMillis() < deadline)) {
                    /* pause and retry hoping it's transient */
                    Thread.yield();
                } 
                else //若能選擇到實例,就返回
                {
                    break;
                }
            }
 
            task.cancel();
        }
 
        if ((answer == null) || (!answer.isAlive())) {
            return null;
        } else {
            return answer;
        }
    }
 
    @Override
    public Server choose(Object key) {
        return choose(getLoadBalancer(), key);
    }
 
    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
    }
}
 

四、WeightedResponseTimeRule(權重)

          WeightedResponseTimeRule這個策略每30秒計算一次服務器響應時間,以響應時間作為權重,響應時間越短的服務器被選中的概率越大。它有一個LoadBalancerStats類,這里面有三個緩存,它的作用是記住發起請求服務提供者的一些參數,例如響應時間

 

 

 有了緩存數據后權重是怎么處理的呢,下面看WeightedResponseTimeRule,它有一個定時任務,定時去計算權重

//該策略是對RoundRobinRule的擴展,增加了根據實例的運行情況來計算權重
//並根據權重來挑選實例,以達到更優的分配效果
public class WeightedResponseTimeRule extends RoundRobinRule {
 
    public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey<Integer>() {
        @Override
        public String key() {
            return "ServerWeightTaskTimerInterval";
        }
        
        @Override
        public String toString() {
            return key();
        }
 
        @Override
        public Class<Integer> type() {
            return Integer.class;
        }
    };
    //默認30秒執行一次
    public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;
    
    private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL;
 
    private static final Logger logger = LoggerFactory.getLogger(WeightedResponseTimeRule.class);
    
    // 存儲權重的對象,該List中每個權重所處的位置對應了負載均衡器維護實例清單中所有實例在
    //清單中的位置。
    private volatile List<Double> accumulatedWeights = new ArrayList<Double>();
    
 
    private final Random random = new Random();
 
    protected Timer serverWeightTimer = null;
 
    protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);
 
    String name = "unknown";
 
    public WeightedResponseTimeRule() {
        super();
    }
 
    public WeightedResponseTimeRule(ILoadBalancer lb) {
        super(lb);
    }
    
    @Override
    public void setLoadBalancer(ILoadBalancer lb) {
        super.setLoadBalancer(lb);
        if (lb instanceof BaseLoadBalancer) {
            name = ((BaseLoadBalancer) lb).getName();
        }
        initialize(lb);
    }
 
    void initialize(ILoadBalancer lb) {        
        if (serverWeightTimer != null) {
            serverWeightTimer.cancel();
        }
        serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
                + name, true);
        //啟動一個定時任務,用來為每個服務實例計算權重,默認30秒執行一次,調用DynamicServerWeighTask方法,向下找
        serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
                serverWeightTaskTimerInterval);
        // do a initial run
        ServerWeight sw = new ServerWeight(); 
        sw.maintainWeights();
 
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                logger
                        .info("Stopping NFLoadBalancer-serverWeightTimer-"
                                + name);
                serverWeightTimer.cancel();
            }
        }));
    }
 
    public void shutdown() {
        if (serverWeightTimer != null) {
            logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + name);
            serverWeightTimer.cancel();
        }
    }
 
    List<Double> getAccumulatedWeights() {
        return Collections.unmodifiableList(accumulatedWeights);
    }
 
    /*
    第一步:生成一個[0,maxTotalWeight]的隨機值
    第二步:遍歷權重列表,比較權重值與隨機數的大小,如果權重值大於隨機數,就拿當前權重列表
    的索引值去服務實例表獲取具體的實例。
    */
    @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
    @Override
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        }
        Server server = null;
 
        while (server == null) {
            // get hold of the current reference in case it is changed from the other thread
            List<Double> currentWeights = accumulatedWeights;
            if (Thread.interrupted()) {
                return null;
            }
            List<Server> allList = lb.getAllServers();
 
            int serverCount = allList.size();
 
            if (serverCount == 0) {
                return null;
            }
 
            int serverIndex = 0;
 
            // 獲取最后一個實例的權重
            double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
            // 如果最后一個實例的權重小於0.001,則采用父類實現的現象輪詢的策略
            if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
                server =  super.choose(getLoadBalancer(), key);
                if(server == null) {
                    return server;
                }
            } else {
                // 產生一個[0,maxTotalWeight]的隨機值
                double randomWeight = random.nextDouble() * maxTotalWeight;
                int n = 0;
                for (Double d : currentWeights) {
                    //如果遍歷維護的權重清單,若權重值大於隨機得到的數值,就選擇這個實例
                    if (d >= randomWeight) {
                        serverIndex = n;
                        break;
                    } else {
                        n++;
                    }
                }
 
                server = allList.get(serverIndex);
            }
 
            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }
 
            if (server.isAlive()) {
                return (server);
            }
 
            // Next.
            server = null;
        }
        return server;
    }
 
    class DynamicServerWeightTask extends TimerTask {
        public void run() {
            ServerWeight serverWeight = new ServerWeight();
            try {
//點擊maintainWeights可以進入權重計算的方法 serverWeight.maintainWeights(); }
catch (Exception e) { logger.error("Error running DynamicServerWeightTask for {}", name, e); } } } class ServerWeight { /*該函數主要分為兩個步驟 1 根據LoadBalancerStats中記錄的每個實例的統計信息,累計所有實例的平均響應時間, 得到總的平均響應時間totalResponseTime,該值用於后面的計算。 2 為負載均衡器中維護的實例清單逐個計算權重(從第一個開始),計算規則為: weightSoFar+totalResponseTime-實例平均相應時間,其中weightSoFar初始化為0,並且 每計算好一個權重需要累加到weightSoFar上供下一次計算使用。 示例:4個實例A、B、C、D,它們的平均響應時間為10,40,80,100,所以總的響應時間為 230,每個實例的權重為總響應時間與實例自身的平均響應時間的差的累積所得,所以實例A B,C,D的權重分別為: A:230-10=220 B:220+230-40=410 C:410+230-80=560 D:560+230-100=690 需要注意的是,這里的權重值只是表示各實例權重區間的上限,並非某個實例的優先級,所以不 是數值越大被選中的概率就越大。而是由實例的權重區間來決定選中的概率和優先級。 A:[0,220] B:(220,410] C:(410,560] D:(560,690) 實際上每個區間的寬度就是:總的平均響應時間-實例的平均響應時間,所以實例的平均響應時間越短 ,權重區間的寬度越大,而權重區間寬度越大被選中的概率就越大。 */ public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; } if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) { return; } try { logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
//根據負載均衡器的狀態信息進行計算 LoadBalancerStats stats
= nlb.getLoadBalancerStats(); if (stats == null) { // no statistics, nothing to do return; } double totalResponseTime = 0; // 計算所有實例的平均響應時間的總和 for (Server server : nlb.getAllServers()) { // 如果服務實例的狀態快照不在緩存中,那么這里會進行自動加載 ServerStats ss = stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } // 逐個計算每個實例的權重 Double weightSoFar = 0.0; // create new list and hot swap the reference List<Double> finalWeights = new ArrayList<Double>(); //weightSoFar+totalResponseTime-實例平均相應時間 for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); double weight = totalResponseTime - ss.getResponseTimeAvg(); weightSoFar += weight; finalWeights.add(weightSoFar); }
//將計算結果進行保存 setWeights(finalWeights); }
catch (Exception e) { logger.error("Error calculating server weights", e); } finally { serverWeightAssignmentInProgress.set(false); } } } void setWeights(List<Double> weights) { this.accumulatedWeights = weights; } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { super.initWithNiwsConfig(clientConfig); serverWeightTaskTimerInterval = clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, DEFAULT_TIMER_INTERVAL); } }

五、利用配置來重設負載均衡算法

     

 

 根據文檔要求進行配置權重算法

 

 Debugger一下,可以看到進入了自己定義的權重里面來了,而且服務節點數什么都是對的,所以如果Ribbon提供的算法你覺得不夠好,你就可以自己定義一個,其實和我之前自定義一個GhyPing一樣,自己定義一個類然后繼承抽象類 AbstractLoadBalancerRule,實現它的抽象方法

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM