Spring cloud Gateway(二) 一個Http請求的流程解析


Spring cloud Gateway(二) 一個Http請求的流程解析


簡介

    通過一個簡單示例,debug出Spring Cloud Gateway的一個HTTP請求的處理流程

思路整理

    在上篇文章中大致了解了SpringCloudGateway的大致作用和關鍵模塊(路由匹配和Filter),在這篇文章中將梳理出一個HTTP請求的處理鏈路

    目前先不關心其具體細節,主要梳理出其路由匹配的核心處理類,請求和響應的處理流向核心類

    需要的一些前置知識:netty相關使用經驗,因為分析大量用到了netty相關概念的類比和猜測

示例代碼

    配置啟動一個簡單的程序,請求轉發到后台服務器,在請求和響應上添加一些東西,程序代碼大致如下:

@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	public RouteLocator myRoutes(RouteLocatorBuilder builder) {
		return builder.routes()
				.route(p -> p.path("/")
						.filters(f -> f.addRequestParameter("test", "test")
								.addResponseHeader("return", "return"))
						.uri("http://localhost:8082/"))
				.build();
	}
}

相關處理類查找

路由匹配

    首先在沒有打任何斷點的情況下運行一次程序:把程序跑起來,訪問 http://localhost:8080/,成功得到結果。通過查看日志,發現有下面關於路由匹配的語句:

o.s.c.g.h.p.RoutePredicateFactory        : Pattern "[/image/webp]" does not match against value "/"
o.s.c.g.h.p.RoutePredicateFactory        : Pattern "/" matches against value "/"
o.s.c.g.h.RoutePredicateHandlerMapping   : Route matched: 606b3b86-7ef4-4538-bbcb-b512c411c325

    一眼便看到 Route matched,還有類名帶有 RoutePredicate,這大概率是路由匹配的核心處理類了,於是我們直接搜索打開這個類,看到下面的關鍵內容:

public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
    // 這 lookupRoute 方法名一看就是妥妥的路由查找
	protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
		return this.routeLocator.getRoutes()
				// individually filter routes so that filterWhen error delaying is not a
				// problem
				.concatMap(route -> Mono.just(route).filterWhen(r -> {
					// add the current route we are testing
					exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
					return r.getPredicate().apply(exchange);
				})
						// instead of immediately stopping main flux due to error, log and
						// swallow it
						.doOnError(e -> logger.error(
								"Error applying predicate for route: " + route.getId(),
								e))
						.onErrorResume(e -> Mono.empty()))
				// .defaultIfEmpty() put a static Route not found
				// or .switchIfEmpty()
				// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
				.next()
				// TODO: error handling
				.map(route -> {
					if (logger.isDebugEnabled()) {
						logger.debug("Route matched: " + route.getId());
					}
					validateRoute(route, exchange);
					return route;
				});
	}
}

    路由匹配找到這個就差不多了,具體的比如路由如何加載、查找后如何調整到處理鏈,到后面在此處打斷點,通過調用棧應該可以看到

Filter處理

Request處理發送

    一樣通過查看debug日志,可以看到下面明顯的日志:

o.s.c.g.handler.FilteringWebHandler      : Sorted gatewayFilterFactories: 
o.s.c.g.filter.RouteToRequestUrlFilter   : RouteToRequestUrlFilter start

    首先查看 RouteToRequestUrlFilter,發現其中沒有明顯的處理鏈(根據寫網關的Filter的經驗或者Netty的pipeline,應該入口是一個列表的循環處理或者定義的地方)。

    於是我們打開另外一個類:FilteringWebHandler,很幸運,看到filters,還有其get方法,很像,大致如下:

public class FilteringWebHandler implements WebHandler {
    ......

	private static class DefaultGatewayFilterChain implements GatewayFilterChain {

		@Override
		public Mono<Void> filter(ServerWebExchange exchange) {
			return Mono.defer(() -> {
				if (this.index < filters.size()) {
					GatewayFilter filter = filters.get(this.index);
					DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
							this.index + 1);
					return filter.filter(exchange, chain);
				}
				else {
					return Mono.empty(); // complete
				}
			});
		}

	}

    ......
}

    這里應該就是Filter處理的核心了,接下來在這個函數進行打斷點進行調試

    在程序debug停在此處的時候,我們可以查看 filters 的值是啥,大致的內容如下:

0 RemoveCachedBodyFilter
1 AdaptCachedBodyGlobalFilter
2 NettyWriteResponseFilter
3 ForwardPathFilter
4 GatewayMetricsFilter
5 [[AddRequestParameter test = 'test'], order = 0]
6 [[AddResponseHeader return = 'return'], order = 0]
7 RouteToRequestUrlFilter
8 LoadBalancerClientFilter
9 WebsocketRoutingFilter
10 NettyRoutingFilter
11 ForwardRoutingFilter

    此時我們沿着這條鏈路一直debug下去,進入上面所有的 filter 走一遍

    在這次的debug過程中,能清晰的看到每個類都走了一遍,但是在最后一個filter:ForwardRoutingFilter,它的關鍵代碼如下:

public class ForwardRoutingFilter implements GlobalFilter, Ordered {
    ......

	@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

		String scheme = requestUrl.getScheme();
		if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
			return chain.filter(exchange);
		}

		// TODO: translate url?

		if (log.isTraceEnabled()) {
			log.trace("Forwarding to URI: " + requestUrl);
		}

		return this.getDispatcherHandler().handle(exchange);
	}

    ......
}

    目前的猜測是最后一個filter,發送請求到后台服務器,但是這個filter完全沒有看到這個類似的代碼,於是又瞎debug了第二遍和第三遍,發現了一個及其可疑的類: NettyRoutingFilter,在這個類中發現了請求發送相關代碼,大致如下:

public class NettyRoutingFilter implements GlobalFilter, Ordered {
    ......

	@Override
	@SuppressWarnings("Duplicates")
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);

		String scheme = requestUrl.getScheme();
		if (isAlreadyRouted(exchange)
				|| (!"http".equals(scheme) && !"https".equals(scheme))) {
			return chain.filter(exchange);
		}

        ......

		Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange)
				.headers(headers -> {
					headers.add(httpHeaders);
					// Will either be set below, or later by Netty
					headers.remove(HttpHeaders.HOST);
					if (preserveHost) {
						String host = request.getHeaders().getFirst(HttpHeaders.HOST);
						headers.add(HttpHeaders.HOST, host);
					}
				}).request(method).uri(url).send((req, nettyOutbound) -> {
					if (log.isTraceEnabled()) {
                        ///////////////////////////////////////////////////////////////////
                        // 熟悉的netty outbound
						nettyOutbound
								.withConnection(connection -> log.trace("outbound route: "
										+ connection.channel().id().asShortText()
										+ ", inbound: " + exchange.getLogPrefix()));
					}
                    // 使用outbound send 確定發送請求無疑了
					return nettyOutbound.send(request.getBody().map(this::getByteBuf));
                    /////////////////////////////////////////////////////////////////////////
				}).responseConnection((res, connection) -> {
                    ......
				});

        ......
		return responseFlux.then(chain.filter(exchange));
	}
    ......
}

    在這里看到netty熟悉的outbound和send函數,那就確定是在這個類里面進行數據的發送,request的流程就走完了

Response響應處理

    通過上面的步驟,我們找到了request的流程,但是發現好像把所有的filter都走完一遍了,如果類比到netty,那就應該有inbound和outbound。outbound的部分我們在上面找到了,現在要找inbound的部分。

    所有filter都走了一遍,代碼中也沒有類似inbound和outbound屬性的判斷,則假設這些filter是雙工的,同時兼備inbound和outbound的類似功能。通過查看這些filter的代碼,我們在下面那個地方找到了response相關的代碼:

    在第二個類 NettyWriteResponseFilter,清晰的看到了熟悉的 writeAndFlushWith,到這里就確定了response響應的最后是到了writeAndFlushWith。但是,整個流程是怎么樣的目前還是不清楚,沒有看到鏈表逆序或者反向傳播的相關代碼

    判斷當前有些關鍵的組件知識欠缺,在這些filter類代碼中,仔細查看相關的代碼,看着像是lamda表達式,但是核心思想是逐個調用處理,依照這個思路來梳理下面的代碼:

public class NettyRoutingFilter implements GlobalFilter, Ordered {
    ......

	@Override
	@SuppressWarnings("Duplicates")
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ......

		Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange)
                // 猜測設置header
				.headers(headers -> {
                    ......
                // 發送request
				}).request(method).uri(url).send((req, nettyOutbound) -> {
                    ......
                // 接收到返回的response
				}).responseConnection((res, connection) -> {
                    // 這里看到將結果放入了,exchange中,而這個exchange貫穿了這個請求流程,感覺這個就類似netty的ctx
					exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
					exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

					ServerHttpResponse response = exchange.getResponse();
					// put headers and status so filters can modify the response
					HttpHeaders headers = new HttpHeaders();

					res.responseHeaders().forEach(
							entry -> headers.add(entry.getKey(), entry.getValue()));

					String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
					if (StringUtils.hasLength(contentTypeValue)) {
						exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
								contentTypeValue);
					}

					setResponseStatus(res, response);

					// make sure headers filters run after setting status so it is
					// available in response
					HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
							getHeadersFilters(), headers, exchange, Type.RESPONSE);

					if (!filteredResponseHeaders
							.containsKey(HttpHeaders.TRANSFER_ENCODING)
							&& filteredResponseHeaders
									.containsKey(HttpHeaders.CONTENT_LENGTH)) {
						// It is not valid to have both the transfer-encoding header and
						// the content-length header.
						// Remove the transfer-encoding header in the response if the
						// content-length header is present.
						response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
					}

					exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
							filteredResponseHeaders.keySet());

					response.getHeaders().putAll(filteredResponseHeaders);

					return Mono.just(res);
				});

		return responseFlux.then(chain.filter(exchange));
	}
}



// 在下面這個類的filter方法中,我們看到response的獲取
// 看到了屬性的writeAndFlushWith
public class NettyWriteResponseFilter implements GlobalFilter, Ordered {

	@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // 通過debug可以看到 filter是觸發下一個filter類的執行,filter執行完后執行then,里面有明顯的發送響應到客戶端的代碼
		return chain.filter(exchange)
				.doOnError(throwable -> cleanup(exchange))
				.then(Mono.defer(() -> {
					Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);

					if (connection == null) {
						return Mono.empty();
					}
					if (log.isTraceEnabled()) {
						log.trace("NettyWriteResponseFilter start inbound: "
								+ connection.channel().id().asShortText() + ", outbound: "
								+ exchange.getLogPrefix());
					}
					ServerHttpResponse response = exchange.getResponse();

					// TODO: needed?
					final Flux<DataBuffer> body = connection
							.inbound()
							.receive()
							.retain()
							.map(byteBuf -> wrap(byteBuf, response));

					MediaType contentType = null;
					try {
						contentType = response.getHeaders().getContentType();
					}
					catch (Exception e) {
						if (log.isTraceEnabled()) {
							log.trace("invalid media type", e);
						}
					}
					return (isStreamingMediaType(contentType)
							? response.writeAndFlushWith(body.map(Flux::just))
							: response.writeWith(body));
				})).doOnCancel(() -> cleanup(exchange));
	}
}

    到這我們就大致摸清楚了response響應的數據流向

請求數據流向

    數據流圖如下,圖中使用序號代替filter處理器

0 RemoveCachedBodyFilter
1 AdaptCachedBodyGlobalFilter
2 NettyWriteResponseFilter
3 ForwardPathFilter
4 GatewayMetricsFilter
5 AddRequestParameterGatewayFilterFactory -- [[AddRequestParameter test = 'test'], order = 0]
6 AddResponseHeaderGatewayFilterFactory -- [[AddResponseHeader return = 'return'], order = 0]
7 RouteToRequestUrlFilter
8 LoadBalancerClientFilter
9 WebsocketRoutingFilter
10 NettyRoutingFilter
11 ForwardRoutingFilter

在這里插入圖片描述

    通過debug梳理下來,我們大致看到數據如何在整個filter鏈中流動的。從中也大致體會到響應式編程的一點點思想,感覺還挺好用

    AddResponseHeaderGatewayFilterFactory 添加 response的 header,通過查看代碼,發現思想還有點巧妙:在 exchange 中初始化了響應response,所有的響應的相應的修改都在這個上面進行修改,在拿到服務器的響應以后,直接將這些放到里面去。這個思想還挺巧妙的,很有借鑒意義

總結

    request每個filter都會走一遍,但並不是每個filter都會對request進行處理,類似下面的代碼,通過判斷是不是websocket的前綴來判斷是否需要進行處理。這里好像通過這個實現了不同請求類型的轉發,也就是 HTTP(NettyRoutingFilter)和 Web(WebsocketRoutingFilter)結構上應該是同級的,這個思想挺有意思

public class WebsocketRoutingFilter implements GlobalFilter, Ordered {

	@Override
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
		changeSchemeIfIsWebSocketUpgrade(exchange);

		URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
		String scheme = requestUrl.getScheme();

        // 這里對請求的類型進行判斷
		if (isAlreadyRouted(exchange)
				|| (!"ws".equals(scheme) && !"wss".equals(scheme))) {
			return chain.filter(exchange);
		}
		setAlreadyRouted(exchange);

		HttpHeaders headers = exchange.getRequest().getHeaders();
		HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);

		List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
		if (protocols != null) {
			protocols = headers.get(SEC_WEBSOCKET_PROTOCOL).stream().flatMap(
					header -> Arrays.stream(commaDelimitedListToStringArray(header)))
					.map(String::trim).collect(Collectors.toList());
		}

		return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(
				requestUrl, this.webSocketClient, filtered, protocols));
	}
}

    還有一個就是 response 修改的實現也是很巧妙,感覺收獲了很多


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM