近期業務大量突增微服務性能優化總結-1.改進客戶端負載均衡算法


最近,業務增長的很迅猛,對於我們后台這塊也是一個不小的挑戰,這次遇到的核心業務接口的性能瓶頸,並不是單獨的一個問題導致的,而是幾個問題揉在一起:我們解決一個之后,發上線,之后發現還有另一個的性能瓶頸問題。這也是我經驗不足,導致沒能一下子定位解決;而我又對我們后台整個團隊有着固執的自尊,不想通過大量水平擴容這種方式挺過壓力高峰,導致線上連續幾晚都出現了不同程度的問題,肯定對於我們的業務增長是有影響的。這也是我不成熟和要反思的地方。這系列文章主要記錄下我們針對這次業務增長,對於我們后台微服務系統做的通用技術優化,針對業務流程和緩存的優化由於只適用於我們的業務,這里就不再贅述了。本系列會分為如下幾篇:

  1. 改進客戶端負載均衡算法
  2. 開發日志輸出異常堆棧的過濾插件
  3. 針對 x86 雲環境改進異步日志等待策略
  4. 增加對於同步微服務的 HTTP 請求等待隊列的監控以及雲上部署,需要小心達到實例網絡流量上限導致的請求響應緩慢
  5. 針對系統關鍵業務增加必要的侵入式監控

改進客戶端負載均衡算法

Spring Cloud LoadBalancer 內置輪詢算法以及問題

我們是用 Spring Cloud 作為我們的微服務體系,並且針對其中很多組件做了優化改造,請參考我的另一系列。之前我們的客戶端負載均衡算法,是不同請求之間相互獨立的輪詢。由於我們實現的微服務框架會針對可以重試的請求進行重試,重試需要重試與之前不同的實例。沒有重試,無法實現在線發布對於用戶無感知,並且我們部署同一個微服務的不同實例是處於不同的可用區,並且微服務不會每次都全部出問題,而是某些實例出問題,有重試可以讓某些實例出問題的時候,對用戶無感知。當某些實例壓力過大時,重試也可以讓請求重試壓力比較小的實例。使用 Spring Cloud LoadBalancer 的內置的負載均衡算法均無法滿足我們的需求,所以我們針對其中的輪詢算法進行了改進。原有的流程是:

  1. 獲取服務實例列表
  2. 所有線程共用同一個原子變量 position,每次請求原子加 1
  3. position 對實例個數取余,返回對應下標的實例進行調用

這樣的算法問題是:假設有微服務 A 有兩個實例:實例 1 和實例 2。請求 A 到達時,RoundRobinLoadBalancer 返回實例 1,這時有請求 B 到達,RoundRobinLoadBalancer 返回實例 2。然后如果請求 A 失敗重試,RoundRobinLoadBalancer 又返回了實例 1。這不是我們期望看到的。

本次優化前,我們的負載均衡算法以及問題

在本次業務突增很多的改進之前,我們第一版改進后的流程是:

  1. 獲取服務實例列表,將實例列表按照 ip 端口排序,如果不排序即使 position 是下一個可能也代表的是之前已經調用過的實例
  2. 根據請求中的 traceId,從本地緩存中以 traceId 為 key 獲取一個初始值為隨機數的原子變量 position,這樣防止所有請求都從第一個實例開始調用,之后第二個、第三個這樣。
  3. position 原子加一,之后對實例個數取余,返回對應下標的實例進行調用

其中請求包含 traceId 是來自於我們使用了 spring-cloud-sleuth 鏈路追蹤,基於這種機制我們能保證請求不會重試到之前已經調用過的實例。源碼是:

//一定必須是實現ReactorServiceInstanceLoadBalancer
//而不是ReactorLoadBalancer<ServiceInstance>
//因為注冊的時候是ReactorServiceInstanceLoadBalancer
@Log4j2
public class RoundRobinWithRequestSeparatedPositionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    private final ServiceInstanceListSupplier serviceInstanceListSupplier;
    //每次請求算上重試不會超過1分鍾
    //對於超過1分鍾的,這種請求肯定比較重,不應該重試
    private final LoadingCache<Long, AtomicInteger> positionCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
            //隨機初始值,防止每次都是從第一個開始調用
            .build(k -> new AtomicInteger(ThreadLocalRandom.current().nextInt(0, 1000)));
    private final String serviceId;
    private final Tracer tracer;


    public RoundRobinWithRequestSeparatedPositionLoadBalancer(ServiceInstanceListSupplier serviceInstanceListSupplier, String serviceId, Tracer tracer) {
        this.serviceInstanceListSupplier = serviceInstanceListSupplier;
        this.serviceId = serviceId;
        this.tracer = tracer;
    }
    
    //每次重試,其實都會調用這個 choose 方法重新獲取一個實例
    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        return serviceInstanceListSupplier.get().next().map(serviceInstances -> getInstanceResponse(serviceInstances));
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        return getInstanceResponseByRoundRobin(serviceInstances);
    }

    private Response<ServiceInstance> getInstanceResponseByRoundRobin(List<ServiceInstance> serviceInstances) {
        if (serviceInstances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        }
        //為了解決原始算法不同調用並發可能導致一個請求重試相同的實例
        //從 sleuth 的 Tracer 中獲取當前請求的上下文
        Span currentSpan = tracer.currentSpan();
        //如果上下文不存在,則可能不是前端用戶請求,而是其他某些機制觸發,我們就創建一個新的上下文
        if (currentSpan == null) {
            currentSpan = tracer.newTrace();
        }
        //從請求上下文中獲取請求的 traceId,用來唯一標識一個請求
        long l = currentSpan.context().traceId();
        AtomicInteger seed = positionCache.get(l);
        int s = seed.getAndIncrement();
        int pos = s % serviceInstances.size();
        log.info("position {}, seed: {}, instances count: {}", pos, s, serviceInstances.size());
        return new DefaultResponse(serviceInstances.stream()
                //實例返回列表順序可能不同,為了保持一致,先排序再取
                .sorted(Comparator.comparing(ServiceInstance::getInstanceId))
                .collect(Collectors.toList()).get(pos));
    }
}

但是在這次請求突增很多的時候,這種負載均衡算法還是給我們帶來了問題。

首先,本次突增,我們並沒有采取擴容,導致本次的性能壓力對於壓力的均衡分布非常敏感。舉個例子是,假設微服務 A 有 9 個實例,在業務高峰點來的時候,最理想的情況是保證無論何時這 9 個負載壓力都完全均衡,但是由於我們使用了初始值為隨機數的原子變量 position,雖然從一天的總量上來看,負責均衡壓力肯定是均衡,但是在某一小段時間內,很可能壓力全都跑到了某幾個實例上,導致這幾個實例被壓垮,熔斷,然后又都跑到了另外的幾個實例上,又被壓垮,熔斷,如此惡性循環。

然后,我們部署采用的是 k8s 部署,同一個虛擬機上面可能會跑很多微服務的 pod。在某些情況下,同一個微服務的多個 pod 可能會跑到同一個虛擬機 Node 上,這個可以從pod 的 ip 網段上看出來:例如某個微服務有如下 7 個實例:10.238.13.12:8181,10.238.13.24:8181,10.238.15.12:8181,10.238.17.12:8181,10.238.20.220:8181,10.238.21.31:8181,10.238.21.121:8181,那么 10.238.13.12:8181 與 10.238.13.24:8181 很可能在同一個 Node 上,10.238.21.31:8181 和 10.238.21.121:8181 很可能在同一個 Node 上。我們重試,需要優先重試與之前重試過的實例盡量不在同一個 Node 上的實例,因為同一個 Node 上的實例只要有一個有問題或者壓力過大,其他的基本上也有問題或者壓力過大。

最后,如果調用某個實例一直失敗,那么這個實例的調用優先級需要排在其他正常的實例后面。這個對於減少快速刷新發布(一下子啟動很多實例之后停掉多個老實例,實例個數大於重試次數配置)對於用戶的影響,以及某個可用區突然發生異常導致多個實例下線對用戶的影響,以及業務壓力已經過去,壓力變小后,需要關掉不再需要的實例,導致大量實例發生遷移的時候對用戶的影響,有很大的作用。

針對以上問題的優化方案

我們針對上面三個問題,提出了一種優化后的解決方案:

  1. 針對每次請求,記錄:
  2. 本次請求已經調用過哪些實例 -> 請求調用過的實例緩存
  3. 調用的實例,當前有多少請求在處理中 -> 實例運行請求數
  4. 調用的實例,最近請求錯誤率 -> 實例請求錯誤率
  5. 隨機將實例列表打亂,防止在以上三個指標都相同時,總是將請求發給同一個實例。
  6. 按照 當前請求沒有調用過靠前 -> 錯誤率越小越靠前 的順序排序 -> 實例運行請求數越小越靠前
  7. 取排好序之后的列表第一個實例作為本次負載均衡的實例

具體實現是:以下的代碼來自於:https://github.com/JoJoTec/spring-cloud-parent

我們使用了依賴:

<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-core</artifactId>
</dependency>

記錄實例數據的緩存類:

@Log4j2
public class ServiceInstanceMetrics {
	private static final String CALLING = "-Calling";
	private static final String FAILED = "-Failed";

	private MetricRegistry metricRegistry;

	ServiceInstanceMetrics() {
	}

	public ServiceInstanceMetrics(MetricRegistry metricRegistry) {
		this.metricRegistry = metricRegistry;
	}

	/**
	 * 記錄調用實例
	 * @param serviceInstance
	 */
	public void recordServiceInstanceCall(ServiceInstance serviceInstance) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		metricRegistry.counter(key + CALLING).inc();
	}
	/**
	 * 記錄調用實例結束
	 * @param serviceInstance
	 * @param isSuccess 是否成功
	 */
	public void recordServiceInstanceCalled(ServiceInstance serviceInstance, boolean isSuccess) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		metricRegistry.counter(key + CALLING).dec();
		if (!isSuccess) {
			//不成功則記錄失敗
			metricRegistry.meter(key + FAILED).mark();
		}
	}

	/**
	 * 獲取正在運行的調用次數
	 * @param serviceInstance
	 * @return
	 */
	public long getCalling(ServiceInstance serviceInstance) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		long count = metricRegistry.counter(key + CALLING).getCount();
		log.debug("ServiceInstanceMetrics-getCalling: {} -> {}", key, count);
		return count;
	}

	/**
	 * 獲取最近一分鍾調用失敗次數分鍾速率,其實是滑動平均數
	 * @param serviceInstance
	 * @return
	 */
	public double getFailedInRecentOneMin(ServiceInstance serviceInstance) {
		String key = serviceInstance.getHost() + ":" + serviceInstance.getPort();
		double rate = metricRegistry.meter(key + FAILED).getOneMinuteRate();
		log.debug("ServiceInstanceMetrics-getFailedInRecentOneMin: {} -> {}", key, rate);
		return rate;
	}
}

負載均衡核心代碼:

private final LoadingCache<Long, Set<String>> calledIpPrefixes = Caffeine.newBuilder()
        .expireAfterAccess(3, TimeUnit.MINUTES)
        .build(k -> Sets.newConcurrentHashSet());
private final String serviceId;
private final Tracer tracer;
private final ServiceInstanceMetrics serviceInstanceMetrics;

//每次重試,其實都會調用這個 choose 方法重新獲取一個實例
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
    Span span = tracer.currentSpan();
    return serviceInstanceListSupplier.get().next()
            .map(serviceInstances -> {
                //保持 span 和調用 choose 的 span 一樣
                try (Tracer.SpanInScope cleared = tracer.withSpanInScope(span)) {
                    return getInstanceResponse(serviceInstances);
                }
            });
}


private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> serviceInstances) {
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: " + this.serviceId);
        return new EmptyResponse();
    }
    //讀取 spring-cloud-sleuth 的對於當前請求的鏈路追蹤上下文,獲取對應的 traceId
    Span currentSpan = tracer.currentSpan();
    if (currentSpan == null) {
        currentSpan = tracer.newTrace();
    }
    long l = currentSpan.context().traceId();
    return getInstanceResponseByRoundRobin(l, serviceInstances);
}

@VisibleForTesting
public Response<ServiceInstance> getInstanceResponseByRoundRobin(long traceId, List<ServiceInstance> serviceInstances) {
    //首先隨機打亂列表中實例的順序
    Collections.shuffle(serviceInstances);
    //需要先將所有參數緩存起來,否則 comparator 會調用多次,並且可能在排序過程中參數發生改變(針對實例的請求統計數據一直在並發改變)
    Map<ServiceInstance, Integer> used = Maps.newHashMap();
    Map<ServiceInstance, Long> callings = Maps.newHashMap();
    Map<ServiceInstance, Double> failedInRecentOneMin = Maps.newHashMap();
    serviceInstances = serviceInstances.stream().sorted(
            Comparator
                    //之前已經調用過的網段,這里排后面
                    .<ServiceInstance>comparingInt(serviceInstance -> {
                        return used.computeIfAbsent(serviceInstance, k -> {
                            return calledIpPrefixes.get(traceId).stream().anyMatch(prefix -> {
                                return serviceInstance.getHost().contains(prefix);
                            }) ? 1 : 0;
                        });
                    })
                    //當前錯誤率最少的
                    .thenComparingDouble(serviceInstance -> {
                        return failedInRecentOneMin.computeIfAbsent(serviceInstance, k -> {
                            double value = serviceInstanceMetrics.getFailedInRecentOneMin(serviceInstance);
                            //由於使用的是移動平均值(EMA),需要忽略過小的差異(保留兩位小數,不是四舍五入,而是直接舍棄)
                            return ((int) (value * 100)) / 100.0;
                        });
                    })
                    //當前負載請求最少的
                    .thenComparingLong(serviceInstance -> {
                        return callings.computeIfAbsent(serviceInstance, k ->
                                serviceInstanceMetrics.getCalling(serviceInstance)
                        );
                    })
    ).collect(Collectors.toList());
    if (serviceInstances.isEmpty()) {
        log.warn("No servers available for service: " + this.serviceId);
        return new EmptyResponse();
    }
    ServiceInstance serviceInstance = serviceInstances.get(0);
    //記錄本次返回的網段
    calledIpPrefixes.get(traceId).add(serviceInstance.getHost().substring(0, serviceInstance.getHost().lastIndexOf(".")));
    //目前記錄這個只為了兼容之前的單元測試(調用次數測試)
    positionCache.get(traceId).getAndIncrement();
    return new DefaultResponse(serviceInstance);
}

一些組內關於方案設計的取舍 Q&A

1. 為何沒有使用所有微服務共享的緩存來保存調用數據,來讓這些數據更加准確

共享緩存的可選方案包括將這些數據記錄放入 Redis,或者是 Apache Ignite 這樣的內存網格中。但是有兩個問題:

  1. 如果數據記錄放入 Redis 這樣的額外存儲,如果 Redis 不可用會導致所有的負載均衡都無法執行。如果放入 Apache Ignite,如果對應的節點下線,那么對應的負載均衡也無法執行。這些都是不能接受的。
  2. 假設微服務 A 需要調用微服務 B,可能 A 的某個實例調用 B 的某個實例有問題,但是 A 的其他實例調用 B 的這個實例卻沒有問題,例如當某個可用區與另一個可用區網絡擁塞的時候。如果用同一個緩存 Key 記錄 A 所有的實例調用 B 這個實例的數據,顯然是不准確的。

每個微服務使用本地緩存,記錄自己調用其他實例的數據,在我們這里看來,不僅是更容易實現,也是更准確的做法。

2. 采用 EMA 的方式而不是請求窗口的方式統計最近錯誤率

采用請求窗口的方式統計,肯定是最准確的,例如我們統計最近一分鍾的錯誤率,就將最近一分鍾的請求緩存起來,讀取的時候,將緩存起來的請求數據加在一起取平均數即可。但是這種方式在請求突增的時候,可能會占用很多很多內存來緩存這些請求。同時計算錯誤率的時候,隨着緩存請求數的增多也會消耗更大量的 CPU 進行計算。這樣做很不值得。

EMA 這種滑動平均值的計算方式,常見於各種性能監控統計場景,例如 JVM 中 TLAB 大小的動態計算,G1 GC Region 大小的伸縮以及其他很多 JVM 需要動態得出合適值的地方,都用這種計算方式。他不用將請求緩存起來,而是直接用最新值乘以一個比例之后加上老值乘以 (1 - 這個比例),這個比例一般高於 0.5,表示 EMA 和當前最新值更加相關。

但是 EMA 也帶來另一個問題,我們會發現隨着程序運行小數點位數會非常多,會看到類似於如下的值:0.00000000123, 0.120000001, 0.120000003, 為了忽略過於細致差異的影響(其實這些影響也來自於很久之前的錯誤請求),我們只保留兩位小數進行排序

微信搜索“我的編程喵”關注公眾號,每日一刷,輕松提升技術,斬獲各種offer


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM