Netflix中的負載均衡策略


Spring Cloud的負載均衡策略可以通過配置Ribbon搞定,也就是注入實現com.netflix.loadbalancer.IRule的類,當前包含的策略包括

1.RandomRule 隨機策略 在while循環內,如果服務地址不為空會不停的循環直到隨機出一個可用的服務。

@SuppressWarnings({"RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE"})
    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        } else {
            Server server = null;

            while(server == null) {
                if (Thread.interrupted()) {
                    return null;
                }

                List<Server> upList = lb.getReachableServers();
                List<Server> allList = lb.getAllServers();
                int serverCount = allList.size();
                if (serverCount == 0) {
                    return null;
                }

                int index = this.rand.nextInt(serverCount);
                server = (Server)upList.get(index);
                if (server == null) {
                    Thread.yield();
                } else {
                    if (server.isAlive()) {
                        return server;
                    }

                    server = null;
                    Thread.yield();
                }
            }

            return server;
        }
    }
View Code

不過感覺怎么第一個就有坑呢。。upList表示當前可用的服務實例集合,這個集合可以由客戶端開啟定時任務定期對調用服務進行ping來更新,allList表示當前所有服務實例的集合。

也就是說當存在。通過com.netflix.loadbalancer.BaseLoadBalancer中可見:

     public boolean[] pingServers(IPing ping, Server[] servers) {
            int numCandidates = servers.length;
            boolean[] results = new boolean[numCandidates];
            BaseLoadBalancer.logger.debug("LoadBalancer:  PingTask executing [{}] servers configured", numCandidates);

            for(int i = 0; i < numCandidates; ++i) {
                results[i] = false;

                try {
                    if (ping != null) {
                        results[i] = ping.isAlive(servers[i]);
                    }
                } catch (Exception var7) {
                    BaseLoadBalancer.logger.error("Exception while pinging Server: '{}'", servers[i], var7);
                }
            }

            return results;
        }
     public void runPinger() throws Exception {
            if (BaseLoadBalancer.this.pingInProgress.compareAndSet(false, true)) {
                Server[] allServers = null;
                boolean[] results = null;
                Lock allLock = null;
                Lock upLock = null;

                try {
                    allLock = BaseLoadBalancer.this.allServerLock.readLock();
                    allLock.lock();
                    allServers = (Server[])BaseLoadBalancer.this.allServerList.toArray(new Server[BaseLoadBalancer.this.allServerList.size()]);
                    allLock.unlock();
                    int numCandidates = allServers.length;
                    boolean[] resultsx = this.pingerStrategy.pingServers(BaseLoadBalancer.this.ping, allServers);
                    List<Server> newUpList = new ArrayList();
                    List<Server> changedServers = new ArrayList();

                    for(int i = 0; i < numCandidates; ++i) {
                        boolean isAlive = resultsx[i];
                        Server svr = allServers[i];
                        boolean oldIsAlive = svr.isAlive();
                        svr.setAlive(isAlive);
                        if (oldIsAlive != isAlive) {
                            changedServers.add(svr);
                            BaseLoadBalancer.logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", new Object[]{BaseLoadBalancer.this.name, svr.getId(), isAlive ? "ALIVE" : "DEAD"});
                        }

                        if (isAlive) {
                            newUpList.add(svr);
                        }
                    }

                    upLock = BaseLoadBalancer.this.upServerLock.writeLock();
                    upLock.lock();
                    BaseLoadBalancer.this.upServerList = newUpList;
                    upLock.unlock();
                    BaseLoadBalancer.this.notifyServerStatusChangeListener(changedServers);
                } finally {
                    BaseLoadBalancer.this.pingInProgress.set(false);
                }
            }
        }
View Code

如此看來,當upList數量不等於allList數量時,這個server = (Server)upList.get(index);就出問題了!?當然,默認情況下ping的方法是不進行真實健康監測的,即所有服務都是健康的,保證allList.size()=upList.size();不過感覺很怪異。

2.RoundRobinRule 輪詢策略,但是有個查找次數的限制,也就是說查了10次都是不可用的服務的話就會警告沒有可用服務並返回null了,選擇的方式是很簡單,取余運算。

  public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        } else {
            Server server = null;
            int count = 0;

            while(true) {
                if (server == null && count++ < 10) {
                    List<Server> reachableServers = lb.getReachableServers();
                    List<Server> allServers = lb.getAllServers();
                    int upCount = reachableServers.size();
                    int serverCount = allServers.size();
                    if (upCount != 0 && serverCount != 0) {
                        int nextServerIndex = this.incrementAndGetModulo(serverCount);
                        server = (Server)allServers.get(nextServerIndex);
                        if (server == null) {
                            Thread.yield();
                        } else {
                            if (server.isAlive() && server.isReadyToServe()) {
                                return server;
                            }

                            server = null;
                        }
                        continue;
                    }

                    log.warn("No up servers available from load balancer: " + lb);
                    return null;
                }

                if (count >= 10) {
                    log.warn("No available alive servers after 10 tries from load balancer: " + lb);
                }

                return server;
            }
        }
    }

    private int incrementAndGetModulo(int modulo) {
        int current;
        int next;
        do {
            current = this.nextServerCyclicCounter.get();
            next = (current + 1) % modulo;
        } while(!this.nextServerCyclicCounter.compareAndSet(current, next));

        return next;
    }
View Code

此處的upCount依然是個擺設。。。

3.ClientConfigEnabledRoundRobinRule 默認使用RoundRobinRule 策略 不過字面意思,客戶端可配置的,所以可以作為父類擴展

 public void initWithNiwsConfig(IClientConfig clientConfig) {
        this.roundRobinRule = new RoundRobinRule();
    }
    public Server choose(Object key) {
        if (this.roundRobinRule != null) {
            return this.roundRobinRule.choose(key);
        } else {
            throw new IllegalArgumentException("This class has not been initialized with the RoundRobinRule class");
        }
    }
View Code

4.WeightedResponseTimeRule 實例初始化的時候會開啟一個定時任務,通過定時任務來獲取服務響應時間定期維護每個服務的權重

 public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        } else {
            Server server = null;

            while(server == null) {
                List<Double> currentWeights = this.accumulatedWeights;
                if (Thread.interrupted()) {
                    return null;
                }

                List<Server> allList = lb.getAllServers();
                int serverCount = allList.size();
                if (serverCount == 0) {
                    return null;
                }

                int serverIndex = 0;
                double maxTotalWeight = currentWeights.size() == 0 ? 0.0D : ((Double)currentWeights.get(currentWeights.size() - 1)).doubleValue();
                if (maxTotalWeight < 0.001D) {
                    server = super.choose(this.getLoadBalancer(), key);
                    if (server == null) {
                        return server;
                    }
                } else {
                    double randomWeight = this.random.nextDouble() * maxTotalWeight;
                    int n = 0;

                    for(Iterator var13 = currentWeights.iterator(); var13.hasNext(); ++n) {
                        Double d = (Double)var13.next();
                        if (d.doubleValue() >= randomWeight) {
                            serverIndex = n;
                            break;
                        }
                    }

                    server = (Server)allList.get(serverIndex);
                }

                if (server == null) {
                    Thread.yield();
                } else {
                    if (server.isAlive()) {
                        return server;
                    }

                    server = null;
                }
            }

            return server;
        }
    }
View Code
    public void maintainWeights() {
            ILoadBalancer lb = WeightedResponseTimeRule.this.getLoadBalancer();
            if (lb != null) {
                if (WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.compareAndSet(false, true)) {
                    try {
                        WeightedResponseTimeRule.logger.info("Weight adjusting job started");
                        AbstractLoadBalancer nlb = (AbstractLoadBalancer)lb;
                        LoadBalancerStats stats = nlb.getLoadBalancerStats();
                        if (stats != null) {
                            double totalResponseTime = 0.0D;//所有實例的響應時間總和

                            ServerStats ss;
                            for(Iterator var6 = nlb.getAllServers().iterator(); var6.hasNext(); totalResponseTime += ss.getResponseTimeAvg()) {
  //通過ss.getResponseTimeAvg()獲取每個服務的平均響應時間 然后累加到totalResponseTime中
                                Server server = (Server)var6.next();
                                ss = stats.getSingleServerStat(server);
                            }

                            Double weightSoFar = 0.0D;
                            List<Double> finalWeights = new ArrayList();
                            Iterator var20 = nlb.getAllServers().iterator();

                            while(var20.hasNext()) {
                                Server serverx = (Server)var20.next();
                                ServerStats ssx = stats.getSingleServerStat(serverx);
                                double weight = totalResponseTime - ssx.getResponseTimeAvg();//所有服務的平均響應時間的和-該服務的平均響應時間=該服務的權重
                                weightSoFar = weightSoFar.doubleValue() + weight;
//由於通過集合存儲 所以此處采取區間的模式 也就是從0到n,n到...的模式
//比如 三個服務 響應時間分別為10,20,30 則權重分別為(0-50)(50-90)(90-120)
                                finalWeights.add(weightSoFar);
                            }

                            WeightedResponseTimeRule.this.setWeights(finalWeights);
                            return;
                        }
                    } catch (Exception var16) {
                        WeightedResponseTimeRule.logger.error("Error calculating server weights", var16);
                        return;
                    } finally {
                        WeightedResponseTimeRule.this.serverWeightAssignmentInProgress.set(false);
                    }

                }
            }
        }

不過當權重的集合中沒有數據的時候,這個類繼承了RoundRobinRule 類,就使用輪詢的方式選擇了。如果存在權重信息則使用this.random.nextDouble() * maxTotalWeight的方式也就是1以內小數*最大權重值區間內的隨機數來選取服務索引的方式。跟RandomRule 的模式一樣,當選取的服務狀態異常的時候會While循環走下去。直到。。。死循環。

5.BestAvailableRule 對所有實例進行迭代,首先過濾掉不可用的服務,然后選出連接數最少的服務返回,繼承了ClientConfigEnabledRoundRobinRule類也就是使用了RoundRobinRule策略,也就是loadBalancerStats進行統計服務連接信息為空的時候先采用輪詢策略過渡。

 public Server choose(Object key) {
        if (this.loadBalancerStats == null) {
            return super.choose(key);
        } else {
            List<Server> serverList = this.getLoadBalancer().getAllServers();
            int minimalConcurrentConnections = 2147483647;
            long currentTime = System.currentTimeMillis();
            Server chosen = null;
            Iterator var7 = serverList.iterator();

            while(var7.hasNext()) {
                Server server = (Server)var7.next();
                ServerStats serverStats = this.loadBalancerStats.getSingleServerStat(server);
                if (!serverStats.isCircuitBreakerTripped(currentTime)) {
                    int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
                    if (concurrentConnections < minimalConcurrentConnections) {
                        minimalConcurrentConnections = concurrentConnections;
                        chosen = server;
                    }
                }
            }

            if (chosen == null) {
                return super.choose(key);
            } else {
                return chosen;
            }
        }
    }
View Code

6.RetryRule 采用了輪詢策略(內部直接實例化RoundRobinRule使用)的重試策略來獲取可用的服務實例。這里有個maxRetryMillis屬性用來限定重試的時間,如果首次獲取服務實例為空,則開啟一個定指定關閉時間的定時線程,在該指定時間內如果沒有找到可用的實例就返回null了。默認為500毫秒。(輪詢策略內不是10次以內不管找到可用實例與否都返回結果,所以此處可以看成一個次數微微可控的加強版)

  public Server choose(ILoadBalancer lb, Object key) {
        long requestTime = System.currentTimeMillis();
        long deadline = requestTime + this.maxRetryMillis;
        Server answer = null;
        answer = this.subRule.choose(key);
        if ((answer == null || !answer.isAlive()) && System.currentTimeMillis() < deadline) {
            InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis());

            while(!Thread.interrupted()) {
                answer = this.subRule.choose(key);
                if (answer != null && answer.isAlive() || System.currentTimeMillis() >= deadline) {
                    break;
                }

                Thread.yield();
            }

            task.cancel();
        }

        return answer != null && answer.isAlive() ? answer : null;
    }
View Code

7.PredicateBasedRule 繼承自ClientConfigEnabledRoundRobinRule的一個抽象類。

   public abstract AbstractServerPredicate getPredicate();

    public Server choose(Object key) {
        ILoadBalancer lb = this.getLoadBalancer();
        Optional<Server> server = this.getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        return server.isPresent() ? (Server)server.get() : null;
    }

使用的時候需要重寫getPredicate方法,目測是先過濾一部分服務然后在選擇一個服務。

//上邊方法this.getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);調用到這
  public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        List<Server> eligible = this.getEligibleServers(servers, loadBalancerKey);
        return eligible.size() == 0 ? Optional.absent() : 
Optional.of(eligible.get(this.nextIndex.getAndIncrement() % eligible.size()));
//此處可見過濾后的集合為空則返回 Optional.absent()表示不存在對象集合(通過isPresent()方法默認就是false),集合不為空則還是如同輪詢算法般取余
    }
    public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
        if (loadBalancerKey == null) {
            return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));//loadBalancerKey 如果為null的話 則返回當前即可(這個filter的過濾條件是不過濾。。。)
        } else {
            List<Server> results = Lists.newArrayList();
            Iterator var4 = servers.iterator();

            while(var4.hasNext()) {
                Server server = (Server)var4.next();
//此處進行條件判斷 將滿足條件的集合返回
                if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                    results.add(server);
                }
            }

            return results;
        }
    }

當然這個抽象類需要我們實現getPredicate()返回AbstractServerPredicate過濾條件(默認全部返回為true,也就是等價於采用輪詢的模式了)。

8.AvailabilityFilteringRule實現PredicateBasedRule類,如代碼所示,組合條件是一個new AvailabilityPredicate().

   private AbstractServerPredicate predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, (IClientConfig)null)).addFallbackPredicate(AbstractServerPredicate.alwaysTrue()).build();
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        this.predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig)).addFallbackPredicate(AbstractServerPredicate.alwaysTrue()).build();
    }
View Code

查看過濾條件

    public boolean apply(@Nullable PredicateKey input) {
        LoadBalancerStats stats = this.getLBStats();
        if (stats == null) {
            return true;
        } else {
            return !this.shouldSkipServer(stats.getSingleServerStat(input.getServer()));//下邊返回true則這塊會把該服務實例過濾掉 返回為!true
        }
    }
   //也就是這塊 可以看出 如果斷路器當前是開啟狀態或者當前服務實例的請求連接數大於配置的連接數閾值則進行過濾(默認是2147483647,可以通過clientConfig進行配置 Spring Cloud中也就是<clientName>.<nameSpace>.ActiveConnectionsLimit進行配置)
    private boolean shouldSkipServer(ServerStats stats) {//滿足其一條件則會返回true 
        return CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped() || stats.getActiveRequestsCount() >= ((Integer)this.activeConnectionsLimit.get()).intValue();
    }
    public Server choose(Object key) {
        int count = 0;

        for(Server server = this.roundRobinRule.choose(key); count++ <= 10; server = this.roundRobinRule.choose(key)) {
            if (this.predicate.apply(new PredicateKey(server))) {
                return server;
            }
        }

        return super.choose(key);
    }
View Code

篩選的條件可以發現是先使用輪詢的方式挑選出一個服務實例,然后再進行過濾查看是否滿足可以的條件,不滿足再輪詢下一條。

8.ZoneAvoidanceRule實現PredicateBasedRule類,此處的過濾條件通過構造函數可以看出,字面意思,第一個是根據區域進行篩選,第二個是根據可用性進行篩選

   public void initWithNiwsConfig(IClientConfig clientConfig) {
        ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
        AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
        this.compositePredicate = this.createCompositePredicate(zonePredicate, availabilityPredicate);
    }

 ZoneAvoidancePredicate的過濾條件如下:

public boolean apply(@Nullable PredicateKey input) {
        if (!ENABLED.get()) {//查看niws.loadbalancer.zoneAvoidanceRule.enabled配置的熟悉是否為true(默認為true)如果為false沒有開啟分片過濾 則不進行過濾
            return true;
        } else {
            String serverZone = input.getServer().getZone();//獲取配置的分片字符串 默認為UNKNOWN
            if (serverZone == null) {
                return true;
            } else {
                LoadBalancerStats lbStats = this.getLBStats();
                if (lbStats == null) {//無負載均衡的要求
                    return true;
                } else if (lbStats.getAvailableZones().size() <= 1) {
                    return true;//可用的分片(處於Up狀態)<=1 當然就沒必要再過濾了    
                } else {
                    Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);//key為服務實例配置的Zone
                    if (!zoneSnapshot.keySet().contains(serverZone)) {
                        return true;//如果所有分片的配置都不符合規則 那就沒必要繼續篩選了 不進行過濾 也就表示當前的分片設置沒啥意義了
                    } else {
                        logger.debug("Zone snapshots: {}", zoneSnapshot);
                        Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, this.triggeringLoad.get(), this.triggeringBlackoutPercentage.get());//此處開始挑選可用的區域
                        logger.debug("Available zones: {}", availableZones);
                        return availableZones != null ? availableZones.contains(input.getServer().getZone()) : false;
                    }
                }
            }
        }
    }

對兩個過濾條件進行實例化后會通過this.compositePredicate = this.createCompositePredicate(zonePredicate, availabilityPredicate);將過濾條件合並。

 
 private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();//也就是所有過濾條件都存到這個fallback里了
 public static CompositePredicate.Builder withPredicate(AbstractServerPredicate primaryPredicate) {
        return new CompositePredicate.Builder(primaryPredicate);
    }
    public CompositePredicate.Builder addFallbackPredicate(AbstractServerPredicate fallback) {
            this.toBuild.fallbacks.add(fallback);
            return this;
        }

ZoneAvoidanceRule實現PredicateBasedRule類所以還是會通過父類的choose方法進行選擇。

  public Server choose(Object key) {
        ILoadBalancer lb = this.getLoadBalancer();
        Optional<Server> server = this.getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        return server.isPresent() ? (Server)server.get() : null;
    }

    public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        List<Server> eligible = this.getEligibleServers(servers, loadBalancerKey);
        return eligible.size() == 0 ? Optional.absent() : Optional.of(eligible.get(this.nextIndex.getAndIncrement() % eligible.size()));
    }
View Code

getEligibleServers方法在AbstractServerPredicate的子類CompositePredicate中進行了重寫。

public class CompositePredicate extends AbstractServerPredicate { 
    private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();
    private int minimalFilteredServers = 1;
    private float minimalFilteredPercentage = 0.0F;
   public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
        List<Server> result = super.getEligibleServers(servers, loadBalancerKey);

        AbstractServerPredicate predicate;
        for(Iterator i = this.fallbacks.iterator(); (result.size() < this.minimalFilteredServers || result.size() <= (int)((float)servers.size() * this.minimalFilteredPercentage)) && i.hasNext(); result = predicate.getEligibleServers(servers, loadBalancerKey)) {
            predicate = (AbstractServerPredicate)i.next();
        }

        return result;
    }
}
View Code

先使用父類的getEligibleServers進行過濾一遍( 默認情況下也就是沒過濾)

然后按照fallbacks中存儲的過濾器順序進行過濾(此處就行先ZoneAvoidancePredicate然后AvailabilityPredicate)

當然進行下一條過濾是存在條件的 也就是:

(result.size() < this.minimalFilteredServers || result.size() <= (int)((float)servers.size() * this.minimalFilteredPercentage)) && i.hasNext() 

當前過濾后的實例結果集大小小於最小過濾集合總數了(此處小於默認值1也就是0了)或者過濾后的結果集大小小於實例總數的最小過濾集合百分比了(此處比例因子是0所有相當於結果集大小還是0了)也就是當前服務示例的結果集以及不滿足繼續過濾的需求了 但這時候&& i.hasNext() 也就是過濾條件還沒結束。。則繼續進行過濾。

反復琢磨了會。。沒看懂啊!!什么情況,假如父類過濾后result.size()>0的話,那循環條件中直接就(xx;false&&true;xxxx)了直接就退出了,那過濾條件是擺設么。。。換句話說,result.size()=0了 滿足(xx;true&&true;xxxx)然后開始執行xxxx的過濾條件了。。問題上result都為空了。還過濾什么?!

 

總結:

Spring Cloud使用Feign+Ribbon可以方便的實現客戶端負載均衡策略,而且提供多種負載規則,當然也可以通過實現AbstractLoadBalancerRule抽象類或者IRule進行擴展。簡單方便。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM