本文基於 spring cloud gateway 2.0.1
1、簡介
GlobalGilter 全局過濾器接口與 GatewayFilter 網關過濾器接口具有相同的方法定義。全局過濾器是一系列特殊的過濾器,會根據條件應用到所有路由中。網關過濾器是更細粒度的過濾器,作用於指定的路由中。
從類圖中可以看到 GlobalFilter 有十一個實現類,包括路由轉發、負載均衡、ws 路由、netty 路由等全局過濾器。下面我們就分別介紹一下這些全局路由過濾器的實現。
2、ForwardRoutingFilter 轉發路由過濾器
ForwardRoutingFilter 在交換屬性 ServerWebExchangeUtils.GATEWAY_ REQUEST_ URL_ ATTR 中 查找 URL, 如果 URL 為轉發模式即 forward:/// localendpoint, 它將使用Spring DispatcherHandler 來處 理請求。 未修改的原始 URL 將保存到 GATEWAY_ ORIGINAL_ REQUEST_ URL_ ATTR 屬性的列表中。
public class ForwardRoutingFilter implements GlobalFilter, Ordered {
private static final Log log = LogFactory.getLog(ForwardRoutingFilter.class);
private final ObjectProvider<DispatcherHandler> dispatcherHandler;
public ForwardRoutingFilter(ObjectProvider<DispatcherHandler> dispatcherHandler) {
this.dispatcherHandler = dispatcherHandler;
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
//獲取請求URI的請求結構
String scheme = requestUrl.getScheme();
//該路由已經被處理或者URI格式不是forward則繼續其它過濾器
if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);
//TODO: translate url?
if (log.isTraceEnabled()) {
log.trace("Forwarding to URI: "+requestUrl);
}
// 使用dispatcherHandler進行處理
return this.dispatcherHandler.getIfAvailable().handle(exchange);
}
}
轉發路由過濾器實現比較簡單,構造函數傳入請求的分發處理器DispatcherHandler。過濾器執行時,首先獲取請求地址的url前綴,然后判斷該請求是否已被路由處理或者URL的前綴不是forward,則繼續執行過濾器鏈;否則設置路由處理狀態並交由DispatcherHandler進行處理。
請求路由是否被處理的判斷如下:
// ServerWebExchangeUtils.java
public static void setAlreadyRouted(ServerWebExchange exchange) {
exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true);
}
public static boolean isAlreadyRouted(ServerWebExchange exchange) {
return exchange.getAttributeOrDefault(GATEWAY_ALREADY_ROUTED_ATTR, false);
}
兩個 方法 定義 在 ServerWebExchangeUtils 中, 這 兩個 方法 用於 修改 與 查詢 ServerWebExchange 中的 Map< String, Object> getAttributes(),# getAttributes 方法 返回 當前 exchange 所請 求 屬性 的 可變 映射。
這兩個方法定義在 ServerWebExchangeUtils 中,分別用於修改和查詢 GATEWAY_ALREADY_ROUTED_ATTR 狀態。
3、LoadBalancerClientFilter 負載均衡客戶端過濾器
spring:
cloud:
gateway:
routes:
- id: myRoute
uri: lb://service
predicates:
- Path=/service/**
LoadBalancerClientFilter 在交換屬性 GATEWAY_ REQUEST_ URL_ ATTR 中查找URL, 如果URL有一個 lb 前綴 ,即 lb:// myservice,將使用 LoadBalancerClient 將名稱 解析為實際的主機和端口,如示例中的 myservice。 未修改的原始 URL將保存到 GATEWAY_ ORIGINAL_ REQUEST_ URL_ ATTR 屬性的列表中。過濾器還將查看ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR屬性以查看它是否等於lb,然后應用相同的規則。
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
//保留原始url
addOriginalRequestUrl(exchange, url);
log.trace("LoadBalancerClientFilter url before: " + url);
//負載均衡到具體服務實例
final ServiceInstance instance = choose(exchange);
if (instance == null) {
throw new NotFoundException("Unable to find instance for " + url.getHost());
}
URI uri = exchange.getRequest().getURI();
//如果沒有提供前綴的話,則會使用默認的'< scheme>',否則使用' lb:< scheme>' 機制。
String overrideScheme = null;
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
//根據獲取的服務實例信息,重新組裝請求的 url
URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);
// Routing 相關 的 GatewayFilter 會 通過 GATEWAY_ REQUEST_ URL_ ATTR 屬性, 發起 請求。
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}
從過濾器執行方法中可以看出,負載均衡客戶端過濾器的實現步驟如下:
1、構造函數傳入負載均衡客戶端,依賴中添加 Spring Cloud Netflix Ribbon 即可 注入 該 Bean。
2、獲取請求的 URL 及其前綴,如果 URL 不為空且前綴為lb或者網關請求的前綴是 lb,則保存原始的URL,負載到具體的服務實例並根據獲取的服務實例信息,重新組裝請求的URL。
3、最后,添加請求的URL到GATEWAY_ REQUEST_ URL_ ATTR,並提交到過濾器鏈中繼續執行
在組裝請求的地址時,如果loadbalancer沒有提供前綴的話,則使用默認的,即overrideScheme 為null,否則的話使用 lb:
4、NettyRoutingFilter 和 NettyWriteResponseFilter
如果 ServerWebExchangeUtils.GATEWAY_ REQUEST_ URL_ ATTR 請求屬性中的URL 具有http或https前綴,NettyRoutingFilter 路由過濾器將運行,它使用 Netty HttpClient 代理對下游的請求。響應信息放在ServerWebExchangeUtils.CLIENT_ RESPONSE_ ATTR 屬性中,在過濾器鏈中進行傳遞。
該過濾器實際處理 和客戶端負載均衡的實現方式類似:
首先獲取請求的URL及前綴,判斷前綴是不是http或者https,如果該請求已經被路由或者前綴不合法,則調用過濾器鏈直接向后傳遞;否則正常對頭部進行過濾操作。
public class NettyRoutingFilter implements GlobalFilter, Ordered {
private final HttpClient httpClient;
private final ObjectProvider<List<HttpHeadersFilter>> headersFilters;
private final HttpClientProperties properties;
public NettyRoutingFilter(HttpClient httpClient,
ObjectProvider<List<HttpHeadersFilter>> headersFilters,
HttpClientProperties properties) {
this.httpClient = httpClient;
this.headersFilters = headersFilters;
this.properties = properties;
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);
ServerHttpRequest request = exchange.getRequest();
final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString());
final String url = requestUrl.toString();
HttpHeaders filtered = filterRequest(this.headersFilters.getIfAvailable(),
exchange);
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);
boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);
boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
Mono<HttpClientResponse> responseMono = this.httpClient.request(method, url, req -> {
final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach)
.headers(httpHeaders)
.chunkedTransfer(chunkedTransfer)
.failOnServerError(false)
.failOnClientError(false);
if (preserveHost) {
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
proxyRequest.header(HttpHeaders.HOST, host);
}
if (properties.getResponseTimeout() != null) {
proxyRequest.context(ctx -> ctx.addHandlerFirst(
new ReadTimeoutHandler(properties.getResponseTimeout().toMillis(), TimeUnit.MILLISECONDS)));
}
return proxyRequest.sendHeaders() //I shouldn't need this
.send(request.getBody().map(dataBuffer ->
((NettyDataBuffer) dataBuffer).getNativeBuffer()));
});
return responseMono.doOnNext(res -> {
ServerHttpResponse response = exchange.getResponse();
// put headers and status so filters can modify the response
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
}
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
this.headersFilters.getIfAvailable(), headers, exchange, Type.RESPONSE);
response.getHeaders().putAll(filteredResponseHeaders);
HttpStatus status = HttpStatus.resolve(res.status().code());
if (status != null) {
response.setStatusCode(status);
} else if (response instanceof AbstractServerHttpResponse) {
// https://jira.spring.io/browse/SPR-16748
((AbstractServerHttpResponse) response).setStatusCodeValue(res.status().code());
} else {
throw new IllegalStateException("Unable to set status code on response: " +res.status().code()+", "+response.getClass());
}
// Defer committing the response until all route filters have run
// Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
})
.onErrorMap(t -> properties.getResponseTimeout() != null && t instanceof ReadTimeoutException,
t -> new TimeoutException("Response took longer than timeout: " +
properties.getResponseTimeout()))
.then(chain.filter(exchange));
}
}
NettyRoutingFilter 過濾器的構造函數有三個參數:
HttpClient httpClient : 基於 Netty 實現的 HttpClient,通過該屬性請求后端 的 Http 服務
ObjectProvider<List> headersFilters: ObjectProvider 類型 的 headersFilters,用於頭部過濾
HttpClientProperties properties: Netty HttpClient 的配置屬性
4.1、NettyRoutingFilter ## HttpHeadersFilter 頭部過濾器接口
filterRequest 用於對請求頭部的信息進行處理,是定義在接口 HttpHeadersFilter 中的默認方法,該接口有三個實現類,請求頭部將會經過這三個頭部過濾器,並最終返回修改之后的頭部。
public interface HttpHeadersFilter {
enum Type {
REQUEST, RESPONSE
}
/**
* Filters a set of Http Headers
*
* @param input Http Headers
* @param exchange
* @return filtered Http Headers
*/
HttpHeaders filter(HttpHeaders input, ServerWebExchange exchange);
static HttpHeaders filterRequest(List<HttpHeadersFilter> filters,
ServerWebExchange exchange) {
HttpHeaders headers = exchange.getRequest().getHeaders();
return filter(filters, headers, exchange, Type.REQUEST);
}
static HttpHeaders filter(List<HttpHeadersFilter> filters, HttpHeaders input,
ServerWebExchange exchange, Type type) {
HttpHeaders response = input;
if (filters != null) {
HttpHeaders reduce = filters.stream()
.filter(headersFilter -> headersFilter.supports(type))
.reduce(input,
(headers, filter) -> filter.filter(headers, exchange),
(httpHeaders, httpHeaders2) -> {
httpHeaders.addAll(httpHeaders2);
return httpHeaders;
});
return reduce;
}
return response;
}
default boolean supports(Type type) {
return type.equals(Type.REQUEST);
}
}
HttpHeadersFilter 接口的三個實現類:
-
ForwardedHeadersFilter:
增加 Forwarded頭部,頭部值為協議類型、host和目標地址
-
XForwardedHeadersFilter:
增加 X- Forwarded- For、 X- Forwarded- Host、 X- Forwarded- Port 和 X- Forwarded- Proto 頭部。 代理轉發時,用以自定義的頭部信息向下游傳遞。
-
RemoveHopByHopHeadersFilter:
為了定義緩存和非緩存代理的行為,我們將HTTP頭字段分為兩類:端到端的頭部字段,發送給請求或響應的最終接收人;逐跳頭部字段,對單個傳輸級別連接有意義,並且不被緩存存儲或由代理轉發。
所以該頭部過濾器會移除逐跳頭部字段,包括以下8個字段:
Proxy- Authenticate
Proxy- Authorization
TE
Trailer
Transfer- Encoding
Upgrade
proxy- connection
content- length
4.2、NettyWriteResponseFilter
NettyWriteResponseFilter 與 NettyRoutingFilter 成對使用。“ 預” 過濾階段沒有任何內容,因為 CLIENT_ RESPONSE_ ATTR 在 WebHandler 運行之前不會被添加。
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added
// until the WebHandler is run
return chain.filter(exchange).then(Mono.defer(() -> {
HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);
if (clientResponse == null) {
return Mono.empty();
}
log.trace("NettyWriteResponseFilter start");
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
//TODO: what if it's not netty
final Flux<NettyDataBuffer> body = clientResponse.receive()
.retain() //TODO: needed?
.map(factory::wrap);
MediaType contentType = null;
try {
contentType = response.getHeaders().getContentType();
} catch (Exception e) {
log.trace("invalid media type", e);
}
return (isStreamingMediaType(contentType) ?
response.writeAndFlushWith(body.map(Flux::just)) : response.writeWith(body));
}));
}
如果 CLIENT_ RESPONSE_ ATTR 請求 屬性 中 存在 Netty HttpClientResponse, 則 會應用 NettyWriteResponseFilter。 它在其他過濾器完成后運行,並將代理響應寫回 網關客戶端響應。成對出現的 WebClientHttpRoutingFilter 和 WebClientWriteResponseFilter 過濾器,與基於Nettty 的路由和響應過濾器執行相同 的功能,但不需要使用Netty。
5、RouteToRequestUrlFilter 路由到指定url的過濾器
如果 ServerWebExchangeUtils.GATEWAY_ ROUTE_ ATTR 請求屬性中有Route對象, 則 會運行 RouteToRequestUrlFilter 過濾器。他會根據請求URI創建一個新的URI。 新的 URI 位於 ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR 請求屬性中。該過濾器會組裝成發送到代理服務的URL地址,向后傳遞到路由轉發的過濾器。
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
if (route == null) {
return chain.filter(exchange);
}
log.trace("RouteToRequestUrlFilter start");
URI uri = exchange.getRequest().getURI();
boolean encoded = containsEncodedParts(uri);
URI routeUri = route.getUri();
if (hasAnotherScheme(routeUri)) {
// this is a special url, save scheme to special attribute
// replace routeUri with schemeSpecificPart
exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
routeUri = URI.create(routeUri.getSchemeSpecificPart());
}
URI mergedUrl = UriComponentsBuilder.fromUri(uri)
// .uri(routeUri)
.scheme(routeUri.getScheme())
.host(routeUri.getHost())
.port(routeUri.getPort())
.build(encoded)
.toUri();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
return chain.filter(exchange);
}
-
首先獲取請求中的 Route, 如 果為 空 則 直接 提交 過濾器 鏈; 否則 獲取 routeUri, 並 判斷 routeUri 是否 特殊, 如果 是 則需 要 處理 URL, 保存 前綴 到 GATEWAY_SCHEME_PREFIX_ATTR, 並將 routeUri 替換
-
首先獲取請求中的Route,如果為空則直接提交給過濾器鏈
-
獲取routeUri並判斷是否特殊,如果是則需要處理URL,保存前綴到GATEWAY_SCHEME_PREFIX_ATTR,並將routeUri 替換為schemeSpecificPart
-
然后拼接requestUrl,將請求的URI轉換為路由定義的routeUri
-
最后,提交到過濾器鏈繼續執行
6、WebsocketRoutingFilter
如果請求中的ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR 屬性對應的URL前綴為 ws 或 wss,則啟用Websocket 路由過濾器。它使用Spring Web Socket 作為底層通信組件向下游轉發 WebSocket 請求。Websocket 可以通過添加前綴 lb來實現負載均衡,如 lb:ws://serviceid
如果您使用SockJS作為普通http的回調,則應配置正常的HTTP路由以及Websocket路由
spring: cloud: gateway: routes: # SockJS route - id: websocket_sockjs_route uri: http://localhost:3001 predicates: - Path=/websocket/info/** # Normwal Websocket route - id: websocket_route uri: ws://localhost:3001 predicates: - Path=/websocket/**
Websocket 路由過濾器進行處理時,首先獲取請求的URL及其前綴,判斷是否滿足 Websocket 過濾器啟用的條件;對於未被路由處理且請求前綴為ws或wss的請求,設置路由處理狀態位,構造過濾后的頭部。最后將請求通過代理轉發。
// WebsocketRoutingFilter.java
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//檢查websocket 是否是 upgrade
changeSchemeIfIsWebSocketUpgrade(exchange);
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
//判斷是否滿足websocket啟用條件
if (isAlreadyRouted(exchange) || (!"ws".equals(scheme) && !"wss".equals(scheme))) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);
HttpHeaders headers = exchange.getRequest().getHeaders();
HttpHeaders filtered = filterRequest(getHeadersFilters(),
exchange);
List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
if (protocols != null) {
protocols = headers.get(SEC_WEBSOCKET_PROTOCOL).stream()
.flatMap(header -> Arrays.stream(commaDelimitedListToStringArray(header)))
.map(String::trim)
.collect(Collectors.toList());
}
//將請求代理轉發
return this.webSocketService.handleRequest(exchange,
new ProxyWebSocketHandler(requestUrl, this.webSocketClient,
filtered, protocols));
}
ProxyWebSocketHandler 是 WebSocketHandler 的實現類,處理客戶端 WebSocket Session。 下面看一下代理 WebSocket 處理器的具體實現:
// WebsocketRoutingFilter.java
private static class ProxyWebSocketHandler implements WebSocketHandler {
private final WebSocketClient client;
private final URI url;
private final HttpHeaders headers;
private final List<String> subProtocols;
public ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers, List<String> protocols) {
this.client = client;
this.url = url;
this.headers = headers;
if (protocols != null) {
this.subProtocols = protocols;
} else {
this.subProtocols = Collections.emptyList();
}
}
@Override
public List<String> getSubProtocols() {
return this.subProtocols;
}
@Override
public Mono<Void> handle(WebSocketSession session) {
// pass headers along so custom headers can be sent through
return client.execute(url, this.headers, new WebSocketHandler() {
@Override
public Mono<Void> handle(WebSocketSession proxySession) {
// Use retain() for Reactor Netty
Mono<Void> proxySessionSend = proxySession
.send(session.receive().doOnNext(WebSocketMessage::retain));
// .log("proxySessionSend", Level.FINE);
Mono<Void> serverSessionSend = session
.send(proxySession.receive().doOnNext(WebSocketMessage::retain));
// .log("sessionSend", Level.FINE);
return Mono.zip(proxySessionSend, serverSessionSend).then();
}
/**
* Copy subProtocols so they are available downstream.
* @return
*/
@Override
public List<String> getSubProtocols() {
return ProxyWebSocketHandler.this.subProtocols;
}
});
}
}
-
WebSocketClient# execute 方法連接后端被代理的 WebSocket 服務。
-
連接成功后,回調WebSocketHandler實現的內部類的handle( WebSocketSession session)方法
-
WebSocketHandler 實現的內部類實現對消息的轉發: 客戶端=> 具體業務服務=> 客戶 端; 然后合並代理服務的會話信息 proxySessionSend 和業務服務的會話信息serverSessionSend。
7、其它過濾器
AdaptCachedBodyGlobalFilter— 用於緩存請求體的過濾器,在全局過濾器中的優先級較高。
ForwardPathFilter— 請求中的 gatewayRoute 屬性對應 Route 對象,當 Route 中的 URI scheme 為 forward 模式 時, 該過濾器用於設置請求的 URI 路徑為 Route 對象 中的 URI 路徑。