深入理解SpringCloud之Gateway 接上篇Webflux快速入門


引用於 https://www.cnblogs.com/niechen/p/11672630.html

雖然在服務網關有了zuul(在這里是zuul1),其本身還是基於servlet實現的,換言之還是同步阻塞方式的實現。就其本身來講它的最根本弊端也是再此。而非阻塞帶來的好處不言而喻,高效利用線程資源進而提高吞吐量,基於此Spring率先拿出針對於web的殺手鐧,對,就是webflux。而Gateway本身就是基於webflux基礎之上實現的。畢竟spring推出的技術,當然要得以推廣嘛。不過就國內的軟件公司而言為了穩定而選擇保守,因此就這項技術的廣度來說我本身還是在觀望中。

  1. Gateway快速上手
    添加依賴:

    implementation 'org.springframework.cloud:spring-cloud-starter-gateway'
    這里請注意,springcloud-gateway是基於netty運行的環境,在servlet容器環境或者把它構建為war包運行的話是不允許的,因此在項目當中沒有必要添加spring-boot-starter-web。在gateway當中有三個重要的元素他們分別是:

Route 是最核心的路由元素,它定義了ID,目標URI ,predicates的集合與filter的集合,如果Predicate聚合返回真,則匹配該路由
Predicate 基於java8的函數接口Predicate,其輸入參數類型ServerWebExchange,其作用就是允許開發人員根據當前的http請求進行規則的匹配,比如說http請求頭,請求時間等,匹配的結果將決定執行哪種路由
Filter為GatewayFilter,它是由特殊的工廠構建,通過Filter可以在下層請求路由前后改變http請求與響應
我們編輯application.yaml,定義如下配置:

spring:
  application:
    name: gateway
  cloud:
    gateway:
      routes:
        - id: before_route
          uri: http://www.baidu.com
          predicates:
            - Path=/baidu
server:
  port: 8088

此時當我們訪問路徑中包含/baidu的,gateway將會幫我們轉發至百度頁面

  1. 工作流程
    在這里我貼上官網的一張圖:

在這里我想結合源代碼來說明其流程,這里面有個關鍵的類,叫RoutePredicateHandlerMapping,我們可以發現這個類有如下特點:

public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
  
  // ....省略部分代碼
  @Override
	protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
		// don't handle requests on management port if set and different than server port
		if (this.managementPortType == DIFFERENT && this.managementPort != null
				&& exchange.getRequest().getURI().getPort() == this.managementPort) {
			return Mono.empty();
		}
		exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

		return lookupRoute(exchange)
				// .log("route-predicate-handler-mapping", Level.FINER) //name this
				.flatMap((Function<Route, Mono<?>>) r -> {
					exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
					if (logger.isDebugEnabled()) {
						logger.debug(
								"Mapping [" + getExchangeDesc(exchange) + "] to " + r);
					}

					exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
					return Mono.just(webHandler);
				}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
					exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
					if (logger.isTraceEnabled()) {
						logger.trace("No RouteDefinition found for ["
								+ getExchangeDesc(exchange) + "]");
					}
				})));
	}
  
  //...省略部分代碼

}

此類繼承了AbstractHandlerMapping,注意這里的是reactive包下的,也就是webflux提供的handlermapping,其作用等同於webmvc的handlermapping,其作用是將請求映射找到對應的handler來處理。
在這里處理的關鍵就是先尋找合適的route,關鍵的方法為lookupRoute():
protected Mono lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes()
// individually filter routes so that filterWhen error delaying is not a
// problem
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and
// swallow it
.doOnError(e -> logger.error(
"Error applying predicate for route: " + route.getId(),
e))
.onErrorResume(e -> Mono.empty()))
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono. empty().log("noroute"))
.next()
// TODO: error handling
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
validateRoute(route, exchange);
return route;
});

  		/*
  		 * TODO: trace logging if (logger.isTraceEnabled()) {
  		 * logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }
  		 */
  	}

其中RouteLocator的接口作用是獲取Route定義,那么在GatewayAutoConfiguaration里有相關的配置,大家可自行查閱:
@Bean
public RouteLocator routeDefinitionRouteLocator(GatewayProperties properties,
List GatewayFilters,
List predicates,
RouteDefinitionLocator routeDefinitionLocator,
@Qualifier("webFluxConversionService") ConversionService conversionService) {
return new RouteDefinitionRouteLocator(routeDefinitionLocator, predicates,
GatewayFilters, properties, conversionService);
}
然后在注釋add the current route we are testing處可以得到一個結論,其是根據Predicate的聲明條件過濾出合適的Route
最終拿到FilteringWebHandler作為它的返回值,這個類是真正意義上處理請求的類,它實現了webflux提供的WebHandler接口:
public class FilteringWebHandler implements WebHandler {

    //.....省略其它代碼
    
    @Override
  	public Mono<Void> handle(ServerWebExchange exchange) {
      //拿到當前的route
  		Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
      //獲取所有的gatewayFilter
  		List<GatewayFilter> gatewayFilters = route.getFilters();
  		//獲取全局過濾器
  		List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
  		combined.addAll(gatewayFilters);
  		// TODO: needed or cached?
  		AnnotationAwareOrderComparator.sort(combined);
  
  		if (logger.isDebugEnabled()) {
  			logger.debug("Sorted gatewayFilterFactories: " + combined);
  		}
  		//交給默認的過濾器鏈執行所有的過濾操作
  		return new DefaultGatewayFilterChain(combined).filter(exchange);
  	}
  
    //....省略其它代碼
  }

在這里可以看到它的實際處理方式是委派給過濾器鏈進行處理請求操作的

  1. Predicate
    Spring Cloud Gateway包含許多內置的Predicate Factory。所有的Predicate都匹配HTTP請求的不同屬性。如果配置類多個Predicate, 那么必須滿足所有的predicate才可以,官網上列舉的內置的Predicate,我在這里不做過多的說明,請大家參考:地址,predicate的實現可以在org.springframework.cloud.gateway.handler.predicate的包下找到。

3.1、自定義Predicate
先改一下application.yaml中的配置:

spring:
application:
name: gateway
cloud:
gateway:
routes:
- id: before_route
uri: http://www.baidu.com
predicates:
- Number=1
默認命名規則:名稱RoutePredicateFactory,在這里我們可以看到如下代碼規則用以解析Predicate的名稱,該代碼在NameUtils當中:

	public static String normalizeRoutePredicateName(
			Class<? extends RoutePredicateFactory> clazz) {
		return removeGarbage(clazz.getSimpleName()
				.replace(RoutePredicateFactory.class.getSimpleName(), ""));
	}

那么在這里我們就按照如上規則建立對應的NumberRoutePredicateFactory,代碼如下:

@Component
public class NumberRoutePredicateFactory extends AbstractRoutePredicateFactory<NumberRoutePredicateFactory.Config> {


    public NumberRoutePredicateFactory() {
        super(Config.class);
    }

    @Override
    public List<String> shortcutFieldOrder() {
        return Arrays.asList("number");
    }

    @Override
    public ShortcutType shortcutType() {
        return ShortcutType.GATHER_LIST;
    }

    @Override
    public Predicate<ServerWebExchange> apply(Config config) {
        return new GatewayPredicate() {
            @Override
            public boolean test(ServerWebExchange serverWebExchange) {
                String number = serverWebExchange.getRequest().getQueryParams().getFirst("number");
                return config.number == Integer.parseInt(number);
            }
        };
    }

  
    public static class Config {
        private int number;

        public int getNumber() {
            return number;
        }

        public void setNumber(int number) {
            this.number = number;
        }
    }
}

該類可以繼承AbstractRoutePredicateFactory,同時需要注冊為spring的Bean
在此類當中按照規范來講,需要定義一個內部類,該類的作用用於封裝application.yaml中的配置,Number=1這個配置會按照規則進行封裝,這個規則由以下幾項決定:
ShortcutType,該值是枚舉類型,分別是
DEFAULT :按照shortcutFieldOrder順序依次賦值
GATHER_LIST:shortcutFiledOrder只能有一個值,如果參數有多個拼成一個集合
GATHER_LIST_TAIL_FLAG:shortcutFiledOrder只能有兩個值,其中最后一個值為true或者false,其余的值變成一個集合付給第一個值
shortcutFieldOrder,這個值決定了Config中配置的屬性,配置的參數都會被封裝到該屬性當中
4. Filter
Gateway中的filter可以分為(GlobalFilter)全局過濾器與普通過濾器,過濾器可以在路由到代理服務的前后改變請求與響應。在這里我會列舉兩個常見的filter給大家用作參考:

4.1、負載均衡的實現
與zuul類似,Gateway也可以作為服務端的負載均衡,那么負載均衡的處理關鍵就是與Ribbon集成,那么Gateway是利用GlobalFilter進行實現的,它的實現類是LoadBalancerClientFilter:

public class LoadBalancerClientFilter implements GlobalFilter, Ordered {

  protected final LoadBalancerClient loadBalancer;

	private LoadBalancerProperties properties;

	//....
  
  @Override
	@SuppressWarnings("Duplicates")
	public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
	
		// preserve the original url
		addOriginalRequestUrl(exchange, url);

		log.trace("LoadBalancerClientFilter url before: " + url);

    //選擇一個服務實例
		final ServiceInstance instance = choose(exchange);
    
		if (instance == null) {
			throw NotFoundException.create(properties.isUse404(),
					"Unable to find instance for " + url.getHost());
		}

		URI uri = exchange.getRequest().getURI();

		// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
		// if the loadbalancer doesn't provide one.
    //判斷協議類型
		String overrideScheme = instance.isSecure() ? "https" : "http";
		if (schemePrefix != null) {
			overrideScheme = url.getScheme();
		}
		//重構uri地址
		URI requestUrl = loadBalancer.reconstructURI(
				new DelegatingServiceInstance(instance, overrideScheme), uri);
    
		//...
		return chain.filter(exchange);
	}
}

在這里我們可以看到這里它是基於Spring-Cloud-Commons規范里的LoadBalanceClient包裝實現的。

4.2、集成Hystrix
Gateway同樣也可以和Hystrix進行集成,這里面的關鍵類是HystrixGatewayFilterFactory,這里面的關鍵是RouteHystrixCommand該類繼承了HystrixObservableCommand:

@Override
		protected Observable<Void> construct() {
      // 執行過濾器鏈
			return RxReactiveStreams.toObservable(this.chain.filter(exchange));//1
		}

		@Override
		protected Observable<Void> resumeWithFallback() {
			if (this.fallbackUri == null) {
				return super.resumeWithFallback();
			}

			// TODO: copied from RouteToRequestUrlFilter
			URI uri = exchange.getRequest().getURI();
			// TODO: assume always?
			boolean encoded = containsEncodedParts(uri);
			URI requestUrl = UriComponentsBuilder.fromUri(uri).host(null).port(null)
					.uri(this.fallbackUri).scheme(null).build(encoded).toUri();//2
			exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
			addExceptionDetails();

			ServerHttpRequest request = this.exchange.getRequest().mutate()
					.uri(requestUrl).build();
			ServerWebExchange mutated = exchange.mutate().request(request).build();
			return RxReactiveStreams.toObservable(getDispatcherHandler().handle(mutated));//3
		}

在代碼1處會執行濾器鏈,寫到此處的代碼會被統一加上hystrix的保護
在代碼2處再是執行回退的方法,根據fallbackUri構建一個回退請求地址
在代碼3處獲取WebFlux的總控制器DispatcherHandler進行回退地址的處理
5、服務發現
服務發現對於Gateway來說也是個非常重要的內容,Gateway在這里定義了一個核心接口叫做:RouteDefinitionLocator,這個接口用於獲取Route的定義,服務發現的機制實現了該接口:

public class DiscoveryClientRouteDefinitionLocator implements RouteDefinitionLocator {
  
  @Override
	public Flux<RouteDefinition> getRouteDefinitions() {

	//....省略部分代碼
    
		return Flux.fromIterable(discoveryClient.getServices())//獲取所有服務
				.map(discoveryClient::getInstances) //映射轉換所有服務實例
				.filter(instances -> !instances.isEmpty()) //過濾出不為空的服務實例
				.map(instances -> instances.get(0)).filter(includePredicate)//根據properites里的include表達式過濾實例
				.map(instance -> {
          
          /*
          		構建Route的定義
          	*/
					String serviceId = instance.getServiceId();

					RouteDefinition routeDefinition = new RouteDefinition();
					routeDefinition.setId(this.routeIdPrefix + serviceId);
					String uri = urlExpr.getValue(evalCtxt, instance, String.class);
					routeDefinition.setUri(URI.create(uri));

					final ServiceInstance instanceForEval = new DelegatingServiceInstance(
							instance, properties);

          //添加Predicate
					for (PredicateDefinition original : this.properties.getPredicates()) {
						PredicateDefinition predicate = new PredicateDefinition();
						predicate.setName(original.getName());
						for (Map.Entry<String, String> entry : original.getArgs()
								.entrySet()) {
							String value = getValueFromExpr(evalCtxt, parser,
									instanceForEval, entry);
							predicate.addArg(entry.getKey(), value);
						}
						routeDefinition.getPredicates().add(predicate);
					}
					//添加filter
					for (FilterDefinition original : this.properties.getFilters()) {
						FilterDefinition filter = new FilterDefinition();
						filter.setName(original.getName());
						for (Map.Entry<String, String> entry : original.getArgs()
								.entrySet()) {
							String value = getValueFromExpr(evalCtxt, parser,
									instanceForEval, entry);
							filter.addArg(entry.getKey(), value);
						}
						routeDefinition.getFilters().add(filter);
					}

					return routeDefinition;
				});
	}
}

由此我們可以知道,這里面利用DiscoveryClient獲取所有的服務實例並將每個實例構建為一個Route,不過在此之前,在自動裝配的類GatewayDiscoveryClientAutoConfiguration里已經配置了默認的Predicate與Filter,它會預先幫我們配置默認的Predicate與Filter:

public static List<PredicateDefinition> initPredicates() {
		ArrayList<PredicateDefinition> definitions = new ArrayList<>();
		// TODO: add a predicate that matches the url at /serviceId?

		// add a predicate that matches the url at /serviceId/**
		PredicateDefinition predicate = new PredicateDefinition();
		predicate.setName(normalizeRoutePredicateName(PathRoutePredicateFactory.class));
		predicate.addArg(PATTERN_KEY, "'/'+serviceId+'/**'");
		definitions.add(predicate);
		return definitions;
	}

public static List<FilterDefinition> initFilters() {
		ArrayList<FilterDefinition> definitions = new ArrayList<>();

		// add a filter that removes /serviceId by default
		FilterDefinition filter = new FilterDefinition();
		filter.setName(normalizeFilterFactoryName(RewritePathGatewayFilterFactory.class));
		String regex = "'/' + serviceId + '/(?<remaining>.*)'";
		String replacement = "'/${remaining}'";
		filter.addArg(REGEXP_KEY, regex);
		filter.addArg(REPLACEMENT_KEY, replacement);
		definitions.add(filter);

		return definitions;
	}

這里面主要會根據ServiceId構建為 Path=/serviceId/**的Predicate和路由至對應服務前把ServiceId去掉的filter

6、總結
根據上述說明,我僅僅選取了兩個比較典型意義的Predicate與Filter代碼進行說明,由於官網上沒有說明自定義Predicate,我在這里索性寫了個簡單的例子,那么自定義Filter的例子可以參考官網地址:

這里需要吐槽一下官方 什么時候能把TODO補充完整的呢?

Gateway是基於Webflux實現的,它通過擴展HandlerMapping與WebHandler來處理用戶的請求,先通過Predicate定位到Router然后在經過FilterChain的過濾處理,最后定位到下層服務。同時官方給我們提供了許多Prdicate與Filter,比如說限流的。從這點來說它的功能比zuul還強大呢,zuul里有的服務發現,斷路保護等,Gateway分別通過GlobalFilter與Filter來實現。

最后至於Gateway能普及到什么樣的程度,亦或者能不能最終成為統一的網關標准,這個我也不能再這里有所保證,那么就交給時間來證明吧。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM