背景
隨着微服務架構的流行,服務按照不同的維度進行拆分,一次請求往往需要涉及到多個服務。而諸多的服務可能分布在了幾千台服務器,橫跨多個不同的數據中心。為了快速定位和解決故障,應用性能進行分析,全鏈路監控組件就在這樣的問題背景下產生了。最出名的是谷歌公開的論文提到的Google Dapper。想要在這個上下文中理解分布式系統的行為,就需要監控那些橫跨了不同的應用、不同的服務器之間的關聯動作。
1.1
全鏈路原理
通過業務調用過程中添加並傳遞調用鏈ID,實現應用間生成鏈路數據,最終串聯成一條完整的調用鏈。
其中整個調用過程中每個請求都要透傳TxId、SpanId和pSpanId。

1.2
Spring Cloud Gateway
作為Spring Cloud官方推出的第二代網關框架,Spring cloud gateway是基於Spring 5.0、Spring Boot2.0和Reactor等技術開發的網關,采用了NIO模型進行通信。

1.2.1
Spring Webflux
Spring Boot 2.0 包括一個新的 spring-webflux 模塊,名稱中的 Flux 來源於 Reactor 中的類 Flux。該模塊包含對響應式 HTTP 和 WebSocket 客戶端的支持,以及對 REST、HTML 和 WebSocket 交互等程序的支持。
一般來說,Spring MVC 用於同步處理;Spring Webflux 用於異步處理。

1.2.2
Mono & Flux
Mono表示的是包含 0 或者 1 個元素的異步序列,即要么成功發布元素,要么錯誤

Flux 表示的是包含 0 到 N 個元素的異步序列 ,即要么成功發布 0 到 N 個元素,要么錯誤

Flux和Mono之間可以相互轉換,比如對一個 Flux 序列進行計數操作,得到的結果是一個 Mono對象,或者把兩個 Mono 序列合並在一起,得到的是一個 Flux 對象。
2
Spring Cloud Gateway不做監控
Spring Cloud Gateway作為入口網關,主要負責服務的路由轉發。如果網關沒有進行監控,則全鏈路會缺失網關節點,直接展示為用戶訪問后續應用;不能有效定位用戶請求慢是網關問題還是后續節點。

3
Spring Cloud Gateway進行監控
由於Spring Cloud Gateway采用了Reactor框架非阻塞式調用,任務之間會跨線程執行,導致HTTP頭信息所需的調用鏈ID不好傳遞。
· Gateway接收線程
· Gateway返回線程
· 路由轉發線程
3.1
Spring Cloud Gateway流程
現對Spring Cloud Gateway的流程進行梳理,本篇由於只涉及到跨線程服務調度,不討論路由過程。
|
|
Gateway Client 發起請求 |
| Gateway Handler Mapping 確定請求與路由匹配 |
|
| Gateway Web Handler 處理請求並經過一系列的Filter鏈。 在Filter鏈中,通過虛線分割Pre Filter和Post Filter。所有的Pre類型的Filter執行完畢之后,才會轉發請求到被代理的服務處理。被代理的服務把所有請求完畢之后,才會執行Post類型的過濾器。 |
3.1.1
Spring Cloud Gateway的請求入口
org.springframework.http.server.reactive.ReactorHttpHandlerAdapter#apply
先將接收到的HttpServerRequest或者最終需要返回的HttpServerResponse包裝轉換為ReactorServerHttpRequest和ReactorServerHttpResponse,再處理請求。
| public Mono<Void> apply(HttpServerRequest request, HttpServerResponse response) { NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(response.alloc()); ServerHttpRequest adaptedRequest; ServerHttpResponse adaptedResponse; try { adaptedRequest = new ReactorServerHttpRequest(request, bufferFactory); adaptedResponse = new ReactorServerHttpResponse(response, bufferFactory); } catch (URISyntaxException ex) { logger.error("Invalid URL " + ex.getMessage(), ex); response.status(HttpResponseStatus.BAD_REQUEST); return Mono.empty(); } if (adaptedRequest.getMethod() == HttpMethod.HEAD) { adaptedResponse = new HttpHeadResponseDecorator(adaptedResponse); } return this.httpHandler.handle(adaptedRequest, adaptedResponse) .doOnError(ex -> logger.error("Handling completed with error", ex)) .doOnSuccess(aVoid -> logger.debug("Handling completed with success")); } |
3.1.2
構造網關上下文
org.springframework.web.server.adapter.HttpWebHandlerAdapter#handle
· createExchange()構造網關上下文ServerWebExchange
· getDelegate()通過委托的方式獲取一系列需要處理的WebHandler
| public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) { ServerWebExchange exchange = createExchange(request, response); return getDelegate().handle(exchange) .onErrorResume(ex -> handleFailure(request, response, ex)) .then(Mono.defer(response::setComplete)); }
protected ServerWebExchange createExchange(ServerHttpRequest request, ServerHttpResponse response) { return new DefaultServerWebExchange(request, response, this.sessionManager, getCodecConfigurer(), getLocaleContextResolver(), this.applicationContext); } |
3.1.3
進入Filter鏈
org.springframework.cloud.gateway.handler.FilteringWebHandler#handle
獲得 GatewayFilter 數組,並根據獲得的 GatewayFilter 數組創建DefaultGatewayFilterChain,過濾處理請求。
| public Mono<Void> handle(ServerWebExchange exchange) { Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR); List<GatewayFilter> gatewayFilters = route.getFilters(); List<GatewayFilter> combined = new ArrayList<>(this.globalFilters); combined.addAll(gatewayFilters);
AnnotationAwareOrderComparator.sort(combined); logger.debug("Sorted gatewayFilterFactories: "+ combined); return new DefaultGatewayFilterChain(combined).filter(exchange); } |
3.1.4
執行Filter鏈
org.springframework.cloud.gateway.handler.FilteringWebHandler$DefaultGatewayFilterChain#filter
過濾器的鏈式調用
| public Mono<Void> filter(ServerWebExchange exchange) { return Mono.defer(() -> { if (this.index < filters.size()) { GatewayFilter filter = filters.get(this.index); DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1); return filter.filter(exchange, chain); } else { return Mono.empty(); // complete } }); } |
3.1.5
Gateway Filter適配器
org.springframework.cloud.gateway.handler.FilteringWebHandler$GatewayFilterAdapter#filter
GatewayFilterAdapter是GlobalFilter過濾器的包裝類,最終委托Global Filter進行執行。
| private static class GatewayFilterAdapter implements GatewayFilter { private final GlobalFilter delegate; public GatewayFilterAdapter(GlobalFilter delegate) { this.delegate = delegate; }
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { return this.delegate.filter(exchange, chain); } } |
3.1.6
Netty路由網關過濾器
org.springframework.cloud.gateway.filter.NettyRoutingFilter#filter

GlobalFilter實現有很多,此處只分析NettyRoutingFIlter和NettyWriteResponseFilter。而NettyRoutingFilter負責使用 Netty HttpClient 代理對下游的請求。
| public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 獲得 requestUrl URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
// 判斷是否能夠處理,http或https前綴 String scheme = requestUrl.getScheme(); if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) { return chain.filter(exchange); } // 設置已經路由 setAlreadyRouted(exchange);
ServerHttpRequest request = exchange.getRequest(); // 創建Netty Request Method對象 final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString()); final String url = requestUrl.toString(); HttpHeaders filtered = filterRequest(this.headersFilters.getIfAvailable(), exchange);
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set);
String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING); boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding); boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); // 請求后端服務 return this.httpClient.request(method, url, req -> { final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach) .headers(httpHeaders) .chunkedTransfer(chunkedTransfer) .failOnServerError(false) .failOnClientError(false); if (preserveHost) { String host = request.getHeaders().getFirst(HttpHeaders.HOST); proxyRequest.header(HttpHeaders.HOST, host); } return proxyRequest.sendHeaders() //發送請求頭 .send(request.getBody().map(dataBuffer -> // 發送請求Body ((NettyDataBuffer)dataBuffer).getNativeBuffer())); }).doOnNext(res -> { ServerHttpResponse response = exchange.getResponse(); // put headers and status so filters can modify the response HttpHeaders headers = new HttpHeaders(); res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue())); exchange.getAttributes().put("original_response_content_type", headers.getContentType()); HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter( this.headersFilters.getIfAvailable(), headers, exchange, Type.RESPONSE); response.getHeaders().putAll(filteredResponseHeaders);
HttpStatus status = HttpStatus.resolve(res.status().code()); if (status != null) { response.setStatusCode(status); } else if (response instanceof AbstractServerHttpResponse) { // https://jira.spring.io/browse/SPR-16748 ((AbstractServerHttpResponse) response).setStatusCodeValue(res.status().code()); } else { throw new IllegalStateException("Unable to set status code on response: " +res.status().code()+", "+response.getClass()); } // Defer committing the response until all route filters have run // Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); }).then(chain.filter(exchange)); } |
3.1.7
Netty 回寫響應網關過濾器
org.springframework.cloud.gateway.filter.NettyWriteResponseFilter#filter
NettyWriteResponseFilter 與NettyRoutingFilter成對出現,負責將代理響應寫回網關客戶端響應。
| public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // then方法實現After Filter邏輯 return chain.filter(exchange).then(Mono.defer(() -> { // 獲得Netty Response HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR); if (clientResponse == null) { return Mono.empty(); } log.trace("NettyWriteResponseFilter start"); ServerHttpResponse response = exchange.getResponse(); // 將Netty Response回寫給客戶端 NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory(); //TODO: what if it's not netty final Flux<NettyDataBuffer> body = clientResponse.receive() .retain() // ByteBufFlux => ByteBufFlux .map(factory::wrap); // ByteBufFlux => Flux<NettyDataBuffer> MediaType contentType = response.getHeaders().getContentType(); return (isStreamingMediaType(contentType) ? response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body)); })); } |
4
Spring Cloud Gateway進行監控
當最終將網關也進行監控,可以全局的看到應用請求流轉,網關的請求業務分流,各應用的調用負載情況。

轉自:https://mp.weixin.qq.com/s?__biz=MzI3OTQ3Mjc1OA==&mid=2247485571&idx=1&sn=cddc35e7fa124151b88f8e0850643c33&scene=21#wechat_redirect
https://blog.csdn.net/lyb9292/article/details/106562650/

