上一篇分析了Ribbon如何發送出去一個自帶負載均衡效果的HTTP請求,本節就重點分析各個算法都是如何實現。
負載均衡整體是從IRule進去的:
public interface IRule{
/*
* choose one alive server from lb.allServers or
* lb.upServers according to key
*
* @return choosen Server object. NULL is returned if none
* server is available
*/
public Server choose(Object key);
public void setLoadBalancer(ILoadBalancer lb);
public ILoadBalancer getLoadBalancer();
}
通過 choose方法選擇指定的算法。
完整的算法包含如下:
- RandomRule:隨機算法實現;
- RoundRobinRule:輪詢負載均衡策略,依次輪詢所有可用服務器列表,遇到第一個可用的即返回;
- RetryRule :先按照RoundRobinRule策略獲取服務,如果獲取服務失敗會在指定時間內重試;
- AvaliabilityFilteringRule: 過濾掉那些因為一直連接失敗的被標記為circuit tripped的后端server,並過濾掉那些高並發的的后端server(active connections 超過配置的閾值) ;
- BestAvailableRule :會先過濾掉由於多次訪問故障二處於斷路器跳閘狀態的服務,然后選擇一個並發量最小的服務;
- WeightedResponseTimeRule: 根據響應時間分配一個weight,響應時間越長,weight越小,被選中的可能性越低;
- ZoneAvoidanceRule: 復合判斷server所在區域的性能和server的可用性選擇server
下面我們一起分析每一個算法的實現。
RandomRule
public class RandomRule extends AbstractLoadBalancerRule {
Random rand;
public RandomRule() {
rand = new Random();
}
/**
* Randomly choose from all living servers
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
if (Thread.interrupted()) {
return null;
}
//獲取所有 可用的節點
List<Server> upList = lb.getReachableServers();
//獲取所有節點,不區分是否可用
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
/*
* No servers. End regardless of pass, because subsequent passes
* only get more restrictive.
*/
return null;
}
//使用 隨機數算法,將 總結點數作為種子
int index = rand.nextInt(serverCount);
//在所有可用的節點中隨機算則一個節點
server = upList.get(index);
if (server == null) {
/*
* The only time this should happen is if the server list were
* somehow trimmed. This is a transient condition. Retry after
* yielding.
*/
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Shouldn't actually happen.. but must be transient or a bug.
server = null;
Thread.yield();
}
return server;
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
// TODO Auto-generated method stub
}
}
隨機算法的實現原理很簡單,將當前總節點數作為種子,生成一個隨機數,在可用節點中選擇一個節點返回即可。
RoundRobinRule
輪詢負載均衡策略,該算法順序查找所有服務列表,直到遇到第一個可用的服務就返回。限制了最多只查詢10次,超過10次還未查到可用服務直接返回空。
public class RoundRobinRule extends AbstractLoadBalancerRule {
private AtomicInteger nextServerCyclicCounter;
private static final boolean AVAILABLE_ONLY_SERVERS = true;
private static final boolean ALL_SERVERS = false;
private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);
public RoundRobinRule() {
nextServerCyclicCounter = new AtomicInteger(0);
}
public RoundRobinRule(ILoadBalancer lb) {
this();
setLoadBalancer(lb);
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
//最多嘗試10次,如果都沒有找到可用的服務器 就返回null
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
//1,2,3...... 順序獲取 index
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
/**
* nextServerCyclicCounter 初始值為0,modulo 為所有服務器總數
* next值 為 1,2,3......
* 正常情況下 current 和 next 肯定是相等的
*
* @param modulo The modulo to bound the value of the counter.
* @return The next value.
*/
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
WeightedResponseTimeRule
響應時間作為選取權重的負載均衡策略,響應時間越短的服務被選中的可能性大。
public class WeightedResponseTimeRule extends RoundRobinRule {
public static final IClientConfigKey<Integer> WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey<Integer>() {
@Override
public String key() {
return "ServerWeightTaskTimerInterval";
}
@Override
public String toString() {
return key();
}
@Override
public Class<Integer> type() {
return Integer.class;
}
};
public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;
private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL;
private static final Logger logger = LoggerFactory.getLogger(WeightedResponseTimeRule.class);
// holds the accumulated weight from index 0 to current index
// for example, element at index 2 holds the sum of weight of servers from 0 to 2
private volatile List<Double> accumulatedWeights = new ArrayList<Double>();
private final Random random = new Random();
protected Timer serverWeightTimer = null;
protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);
String name = "unknown";
public WeightedResponseTimeRule() {
super();
}
public WeightedResponseTimeRule(ILoadBalancer lb) {
super(lb);
}
@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof BaseLoadBalancer) {
name = ((BaseLoadBalancer) lb).getName();
}
initialize(lb);
}
void initialize(ILoadBalancer lb) {
if (serverWeightTimer != null) {
serverWeightTimer.cancel();
}
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
+ name, true);
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
serverWeightTaskTimerInterval);
// do a initial run
ServerWeight sw = new ServerWeight();
sw.maintainWeights();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
logger
.info("Stopping NFLoadBalancer-serverWeightTimer-"
+ name);
serverWeightTimer.cancel();
}
}));
}
public void shutdown() {
if (serverWeightTimer != null) {
logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + name);
serverWeightTimer.cancel();
}
}
List<Double> getAccumulatedWeights() {
return Collections.unmodifiableList(accumulatedWeights);
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
@Override
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
// accumulatedWeights 里面封裝的是已經計算完畢權重的所有服務器,具體在 ServerWeight類中
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
int serverIndex = 0;
// 取出已經計算完權重的服務器列表中的最后一個權重,見下面解釋,最后一個權重為 當前所有權重之和
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// 如果沒有命中任何一個服務器或者是服務器列表權重還沒有被初始化
// 那么就使用 默認的 RoundRobinRule 算法重新進行選擇
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// 生成一個隨機權重 0 <= randomWeight < maxTotalWeight
double randomWeight = random.nextDouble() * maxTotalWeight;
// 看當前的 randomWeight 在哪個區間,那么該區間對應的服務器即為被選中
int n = 0;
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}
server = allList.get(serverIndex);
}
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
// Next.
server = null;
}
return server;
}
class DynamicServerWeightTask extends TimerTask {
public void run() {
ServerWeight serverWeight = new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception e) {
logger.error("Error running DynamicServerWeightTask for {}", name, e);
}
}
}
//進行服務器的權重設置
class ServerWeight {
public void maintainWeights() {
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}
if (!serverWeightAssignmentInProgress.compareAndSet(false, true)) {
return;
}
try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
//在AbstractLoadBalancer中維護了一個服務器列表,里面有當前服務器的統計信息
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// 如果沒有統計信息,返回
return;
}
//循環所有服務器,將所有服務器的平均響應時間 相加
double totalResponseTime = 0;
for (Server server : nlb.getAllServers()) {
// 取出某個服務器的統計信息
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// 計算權重的方式是: 權重 = totalResponseTime - 該服務器的響應時間
// 即響應時間越長的服務器,權重就會越小,所以被選擇的機會就越小
Double weightSoFar = 0.0;
// 這個for循環就是按照上述方法來計算每個服務器的權重
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
//這里的值,相當於是一個區間段,起始是0.0,往后每一個數都比前面大當前的weight
//eg:0.0--5---10---15 ,那么最后一個數就是當前所有權重的總和
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}
}
}
void setWeights(List<Double> weights) {
this.accumulatedWeights = weights;
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
super.initWithNiwsConfig(clientConfig);
serverWeightTaskTimerInterval = clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, DEFAULT_TIMER_INTERVAL);
}
}
既然是按照響應時間權重來選擇服務,那么先整理一下權重算法是怎么做的。
觀察initialize方法,啟動了定時器定時執行DynamicServerWeightTask的run來調用計算服務權重,計算權重是通過內部類ServerWeight的maintainWeights方法來進行。
整理一下maintainWeights方法的邏輯,里面有兩個for循環,第一個for循環拿到所有服務的總響應時間,第二個for循環計算每個服務的權重以及總權重。
第一個for循環。
假設有4個服務,每個服務的響應時間(ms):
A: 200
B: 500
C: 30
D: 1200
總響應時間:
200+500+30+1200=1930ms
接下來第二個for循環,計算每個服務的權重。
服務的權重=總響應時間-服務自身的響應時間:
A: 1930-200=1730
B: 1930-500=1430
C: 1930-30=1900
D: 1930-1200=730
總權重:
1730+1430+1900+730=5790
響應時間及權重計算結果示意圖:
結果就是響應時間越短的服務,它的權重就越大。
再看一下choose方法。
重點在while循環的第3個if這里。
首先如果判定沒有服務或者權重還沒計算出來時,會采用父類RoundRobinRule以線性輪詢的方式選擇服務器。
有服務,有權重計算結果后,就是以總權重值為限制,拿到一個隨機數,然后看隨機數落到哪個區間,就選擇對應的服務。
所以選取服務的結論就是:
響應時間越短的服務,它的權重就越大,被選中的可能性就越大。
AvaliabilityFilteringRule
//抽象策略,繼承自ClientConfigEnabledRoundRobinRule
//基於Predicate的策略
//Predicateshi Google Guava Collection工具對集合進行過濾的條件接口
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
//定義了一個抽象函數來獲取AbstractServerPredicate
public abstract AbstractServerPredicate getPredicate();
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
//通過AbstractServerPredicate的chooseRoundRobinAfterFiltering函數來選出具體的服務實例
//AbstractServerPredicate的子類實現的Predicate邏輯來過濾一部分服務實例
//然后在以輪詢的方式從過濾后的實例中選出一個
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
public abstract class AbstractServerPredicate implements Predicate<PredicateKey> {
public Optional<Server> chooseRandomlyAfterFiltering(List<Server> servers) {
//先通過內部定義的getEligibleServers函數來獲取備選清單(實現了過濾)
List<Server> eligible = getEligibleServers(servers);
if (eligible.size() == 0) {
//如果返回的清單為空,則用Optional.absent()來表示不存在
return Optional.absent();
}
//以線性輪詢的方式從備選清單中獲取一個實例
return Optional.of(eligible.get(random.nextInt(eligible.size())));
}
public List<Server> getEligibleServers(List<Server> servers) {
return getEligibleServers(servers, null);
}
/**
* Get servers filtered by this predicate from list of servers.
*/
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
if (loadBalancerKey == null) {
return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));
} else {
List<Server> results = Lists.newArrayList();
//遍歷服務清單,使用apply方法來判斷實例是否需要保留,如果是,就添加到結果列表中
//所以apply方法需要在子類中實現,子類就可實現高級策略
for (Server server: servers) {
if (this.apply(new PredicateKey(loadBalancerKey, server))) {
results.add(server);
}
}
return results;
}
}
}
public Server choose(Object key) {
int count = 0;
//通過輪詢選擇一個server
Server server = roundRobinRule.choose(key);
//嘗試10次如果都不滿足要求,就放棄,采用父類的choose
//這里為啥嘗試10次?
//1. 輪詢結果相互影響,可能導致某個請求每次調用輪詢返回的都是同一個有問題的server
//2. 集群很大時,遍歷整個集群判斷效率低,我們假設集群中健康的實例要比不健康的多,如果10次找不到,就用父類的choose,這也是一種快速失敗機制
while (count++ <= 10) {
if (predicate.apply(new PredicateKey(server))) {
return server;
}
server = roundRobinRule.choose(key);
}
return super.choose(key);
}
判定規則:
以下兩項有一項成立,就表示該服務不可用,不能使用該服務。
配置項niws.loadbalancer.availabilityFilteringRule.filterCircuitTripped為true(未配置則默認為true),並且已經觸發斷路。
服務的活動請求數 > 配置項niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit(未配則默認為Interge.MAX_VALUE)。
在AvailabilityFilteringRule的choose中無法選出服務的情況下,會調用父類PredicateBasedRule的choose,PredicateBasedRule采用先過濾后線性輪行方法選擇服務,不過,用來判定的predicate還是AvailabilityPredicate,所以過濾用的判定規則和上面是一樣的。
public class AvailabilityFilteringRule extends PredicateBasedRule {
private AbstractServerPredicate predicate;
public AvailabilityFilteringRule() {
super();
predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig))
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}
@Monitor(name="AvailableServersCount", type = DataSourceType.GAUGE)
public int getAvailableServersCount() {
ILoadBalancer lb = getLoadBalancer();
List<Server> servers = lb.getAllServers();
if (servers == null) {
return 0;
}
return Collections2.filter(servers, predicate.getServerOnlyPredicate()).size();
}
/**
* This method is overridden to provide a more efficient implementation which does not iterate through
* all servers. This is under the assumption that in most cases, there are more available instances
* than not.
*/
@Override
public Server choose(Object key) {
int count = 0;
Server server = roundRobinRule.choose(key);
while (count++ <= 10) {
if (predicate.apply(new PredicateKey(server))) {
return server;
}
server = roundRobinRule.choose(key);
}
return super.choose(key);
}
@Override
public AbstractServerPredicate getPredicate() {
return predicate;
}
}
RetryRule
顧名思義,可重試的策略。
public class RetryRule extends AbstractLoadBalancerRule {
IRule subRule = new RoundRobinRule();
long maxRetryMillis = 500;
public RetryRule() {
}
public RetryRule(IRule subRule) {
this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
}
public RetryRule(IRule subRule, long maxRetryMillis) {
this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
this.maxRetryMillis = (maxRetryMillis > 0) ? maxRetryMillis : 500;
}
public void setRule(IRule subRule) {
this.subRule = (subRule != null) ? subRule : new RoundRobinRule();
}
public IRule getRule() {
return subRule;
}
public void setMaxRetryMillis(long maxRetryMillis) {
if (maxRetryMillis > 0) {
this.maxRetryMillis = maxRetryMillis;
} else {
this.maxRetryMillis = 500;
}
}
public long getMaxRetryMillis() {
return maxRetryMillis;
}
@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
subRule.setLoadBalancer(lb);
}
/*
* Loop if necessary. Note that the time CAN be exceeded depending on the
* subRule, because we're not spawning additional threads and returning
* early.
*/
public Server choose(ILoadBalancer lb, Object key) {
long requestTime = System.currentTimeMillis();
//超時時間為 當前時間+500 ms
long deadline = requestTime + maxRetryMillis;
Server answer = null;
//默認的策略是 RoundRobinRule
answer = subRule.choose(key);
//如果默認策略選出的服務器為空,或者該服務器狀態為不存活並且當前時間還在超時時間內
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
InterruptTask task = new InterruptTask(deadline
- System.currentTimeMillis());
只要滿足條件,一直重試
while (!Thread.interrupted()) {
answer = subRule.choose(key);
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
/* pause and retry hoping it's transient */
Thread.yield();
} else {
break;
}
}
task.cancel();
}
//如果在最大超時時間內仍未能選出可用的服務器那就返回空
if ((answer == null) || (!answer.isAlive())) {
return null;
} else {
return answer;
}
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
這個策略默認就是用RoundRobinRule策略選取服務,當然可以通過配置,在構造RetryRule的時候傳進想要的策略。
為了應對在有可能出現無法選取出服務的情況,比如瞬時斷線的情況,那么就要提供一種重試機制,在最大重試時間的限定下重復嘗試選取服務,直到選取出一個服務或者超時。
最大重試時間maxRetryMillis是可配置的。
BestAvailableRule
該策略繼承ClientConfigEnabledRoundRobinRule
,在實現中它注入了負載均衡的統計對象LoadBalancerStats
,同時在具體的choose算法中利用LoadBalancerStats
保存的實例統計信息來選擇滿足要求的實例。它通過遍歷負載均衡器中的維護的所有實例,會過濾掉故障的實例,並找出並發請求數最小的一個,所以該策略的特性時可選出最空閑的實例。
該算法核心依賴與LoadBalancerStats
統計信息,當其為空時候策略是無法執行,默認執行父類的線性輪詢機制。
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {
private LoadBalancerStats loadBalancerStats;
@Override
public Server choose(Object key) {
if (loadBalancerStats == null) {
return super.choose(key);
}
//獲取當前所有的服務器信息
List<Server> serverList = getLoadBalancer().getAllServers();
int minimalConcurrentConnections = Integer.MAX_VALUE;
long currentTime = System.currentTimeMillis();
Server chosen = null;
for (Server server: serverList) {
//循環每一個服務器,獲取當前服務器的統計信息
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
//如果當前服務器沒有發生故障
if (!serverStats.isCircuitBreakerTripped(currentTime)) {
//獲取服務器當前的並發請求量
int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
//如果當前請求量小於minimalConcurrentConnections,就用當前值覆蓋
//那么最后chosen 就是並發量最小的服務器啦
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
}
}
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}
@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof AbstractLoadBalancer) {
loadBalancerStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats();
}
}
}
ZoneAvoidanceRule
該策略是com.netflix.loadbalancer.PredicateBasedRule
的具體實現類。它使用了CompositePredicate
來進行服務實例清單的過濾。這是一個組合過濾條件,在其構造函數中,它以ZoneAvoidancePredicate
為主要過濾條件,判斷判定一個zone的運行性能是否可用,剔除不可用的zone(所有server),AvailabilityPredicate
為次要過濾條件,用於過濾掉連接數過多的Server,初始化了組合過濾條件的實例。
查看源碼發現,ZoneAvoidanceRule並沒有重寫choose方法,而是直接使用了父類PredicateBasedRule的choose方法。
public class ZoneAvoidanceRule extends PredicateBasedRule {
private static final Random random = new Random();
//使用CompositePredicate來進行服務實例清單過濾。
private CompositePredicate compositePredicate;
public ZoneAvoidanceRule() {
super();
//判斷一個區域的服務是否可用的過濾條件
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
//判斷一個服務的連接數是否過多的過濾條件
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
//將這兩個條件組合到一起
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
//這里構造了一個兩個過濾條件的Predicate
private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
return CompositePredicate.withPredicates(p1, p2)
.addFallbackPredicate(p2)
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}
static Map<String, ZoneSnapshot> createSnapshot(LoadBalancerStats lbStats) {
Map<String, ZoneSnapshot> map = new HashMap<String, ZoneSnapshot>();
for (String zone : lbStats.getAvailableZones()) {
ZoneSnapshot snapshot = lbStats.getZoneSnapshot(zone);
map.put(zone, snapshot);
}
return map;
}
static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,
Set<String> chooseFrom) {
if (chooseFrom == null || chooseFrom.size() == 0) {
return null;
}
String selectedZone = chooseFrom.iterator().next();
if (chooseFrom.size() == 1) {
return selectedZone;
}
int totalServerCount = 0;
for (String zone : chooseFrom) {
totalServerCount += snapshot.get(zone).getInstanceCount();
}
int index = random.nextInt(totalServerCount) + 1;
int sum = 0;
for (String zone : chooseFrom) {
sum += snapshot.get(zone).getInstanceCount();
if (index <= sum) {
selectedZone = zone;
break;
}
}
return selectedZone;
}
public static Set<String> getAvailableZones(
Map<String, ZoneSnapshot> snapshot, double triggeringLoad,
double triggeringBlackoutPercentage) {
if (snapshot.isEmpty()) {
return null;
}
Set<String> availableZones = new HashSet<String>(snapshot.keySet());
if (availableZones.size() == 1) {
return availableZones;
}
Set<String> worstZones = new HashSet<String>();
double maxLoadPerServer = 0;
boolean limitedZoneAvailability = false;
for (Map.Entry<String, ZoneSnapshot> zoneEntry : snapshot.entrySet()) {
String zone = zoneEntry.getKey();
ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
int instanceCount = zoneSnapshot.getInstanceCount();
if (instanceCount == 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
double loadPerServer = zoneSnapshot.getLoadPerServer();
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
// they are the same considering double calculation
// round error
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}
}
}
}
if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
// zone override is not needed here
return availableZones;
}
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;
}
public static Set<String> getAvailableZones(LoadBalancerStats lbStats,
double triggeringLoad, double triggeringBlackoutPercentage) {
if (lbStats == null) {
return null;
}
Map<String, ZoneSnapshot> snapshot = createSnapshot(lbStats);
return getAvailableZones(snapshot, triggeringLoad,
triggeringBlackoutPercentage);
}
@Override
public AbstractServerPredicate getPredicate() {
return compositePredicate;
}
}
上面的源碼中看到 在構造函數中用兩個過濾條件構造了一個CompositePredicate,那么它里面怎么做的呢?
public class CompositePredicate extends AbstractServerPredicate {
private AbstractServerPredicate delegate;
private List<AbstractServerPredicate> fallbacks = Lists.newArrayList();
private int minimalFilteredServers = 1;
private float minimalFilteredPercentage = 0;
@Override
public boolean apply(@Nullable PredicateKey input) {
return delegate.apply(input);
}
...
...
...
/**
* Get the filtered servers from primary predicate, and if the number of the filtered servers
* are not enough, trying the fallback predicates
*/
@Override
public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
////使用主過濾條件對所有實例過濾並返回過濾后的清單
List<Server> result = super.getEligibleServers(servers, loadBalancerKey);
Iterator<AbstractServerPredicate> i = fallbacks.iterator();
//依次使用次過濾條件對主過濾條件的結果進行過濾
//不論是主過濾條件還是次過濾條件,都需要判斷下面兩個條件
//只要有一個條件符合,就不再過濾,將當前結果返回供線性輪詢
//算法選擇
//第1個條件:過濾后的實例總數>=最小過濾實例數(默認為1)
//第2個條件:過濾互的實例比例>最小過濾百分比(默認為0)
while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage))
&& i.hasNext()) {
AbstractServerPredicate predicate = i.next();
result = predicate.getEligibleServers(servers, loadBalancerKey);
}
return result;
}