依賴
compile 'org.springframework.cloud:spring-cloud-starter-gateway'
包結構
SpringBoot - SPI機制
GatewayClassPathWarningAutoConfiguration.class
ClassPath檢測,如果SpringWeb的 org.springframework.web.servlet.DispatcherServlet存在或者 org.springframework.web.reactive.DispatcherHandler 不存在則警告
@Configuration(proxyBeanMethods = false) @AutoConfigureBefore(GatewayAutoConfiguration.class) public class GatewayClassPathWarningAutoConfiguration { private static final Log log = LogFactory.getLog(GatewayClassPathWarningAutoConfiguration.class); private static final String BORDER = "\n\n**********************************************************\n\n"; @Configuration(proxyBeanMethods = false) @ConditionalOnClass(name = "org.springframework.web.servlet.DispatcherServlet") protected static class SpringMvcFoundOnClasspathConfiguration { public SpringMvcFoundOnClasspathConfiguration() { log.warn(BORDER + "Spring MVC found on classpath, which is incompatible with Spring Cloud Gateway at this time. " + "Please remove spring-boot-starter-web dependency." + BORDER); } } @Configuration(proxyBeanMethods = false) @ConditionalOnMissingClass("org.springframework.web.reactive.DispatcherHandler") protected static class WebfluxMissingFromClasspathConfiguration { public WebfluxMissingFromClasspathConfiguration() { log.warn(BORDER + "Spring Webflux is missing from the classpath, " + "which is required for Spring Cloud Gateway at this time. " + "Please add spring-boot-starter-webflux dependency." + BORDER); } } }
GatewayAutoConfiguration.class
核心配置類:RouteLocator,RouteDefinitionLocator,RouteRefreshListener,GlobalCorsProperties,ForwardPathFilter,以及一些 Predicate Beans,GatewayFilter Factory beans 等等配置。
@Configuration(proxyBeanMethods = false) @ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true) @EnableConfigurationProperties @AutoConfigureBefore({ HttpHandlerAutoConfiguration.class, WebFluxAutoConfiguration.class }) @AutoConfigureAfter({ GatewayLoadBalancerClientAutoConfiguration.class, GatewayClassPathWarningAutoConfiguration.class }) @ConditionalOnClass(DispatcherHandler.class) public class GatewayAutoConfiguration { @Bean public RouteLocatorBuilder routeLocatorBuilder(ConfigurableApplicationContext context) { return new RouteLocatorBuilder(context); } @Bean @Primary public RouteDefinitionLocator routeDefinitionLocator(List<RouteDefinitionLocator> routeDefinitionLocators) { return new CompositeRouteDefinitionLocator(Flux.fromIterable(routeDefinitionLocators)); } @Bean public RouteRefreshListener routeRefreshListener(ApplicationEventPublisher publisher) { return new RouteRefreshListener(publisher); } @Bean public GlobalCorsProperties globalCorsProperties() { return new GlobalCorsProperties(); } @Bean public ForwardPathFilter forwardPathFilter() { return new ForwardPathFilter(); } // Predicate Factory beans @Bean public AfterRoutePredicateFactory afterRoutePredicateFactory() { return new AfterRoutePredicateFactory(); } @Bean public BeforeRoutePredicateFactory beforeRoutePredicateFactory() { return new BeforeRoutePredicateFactory(); } @Bean public BetweenRoutePredicateFactory betweenRoutePredicateFactory() { return new BetweenRoutePredicateFactory(); } @Bean public CookieRoutePredicateFactory cookieRoutePredicateFactory() { return new CookieRoutePredicateFactory(); } @Bean public HeaderRoutePredicateFactory headerRoutePredicateFactory() { return new HeaderRoutePredicateFactory(); } @Bean public HostRoutePredicateFactory hostRoutePredicateFactory() { return new HostRoutePredicateFactory(); } @Bean public MethodRoutePredicateFactory methodRoutePredicateFactory() { return new MethodRoutePredicateFactory(); } @Bean public PathRoutePredicateFactory pathRoutePredicateFactory() { return new PathRoutePredicateFactory(); } @Bean public QueryRoutePredicateFactory queryRoutePredicateFactory() { return new QueryRoutePredicateFactory(); } // GatewayFilter Factory beans @Bean public AddRequestHeaderGatewayFilterFactory addRequestHeaderGatewayFilterFactory() { return new AddRequestHeaderGatewayFilterFactory(); } @Bean public MapRequestHeaderGatewayFilterFactory mapRequestHeaderGatewayFilterFactory() { return new MapRequestHeaderGatewayFilterFactory(); } @Bean public AddRequestParameterGatewayFilterFactory addRequestParameterGatewayFilterFactory() { return new AddRequestParameterGatewayFilterFactory(); } @Bean public AddResponseHeaderGatewayFilterFactory addResponseHeaderGatewayFilterFactory() { return new AddResponseHeaderGatewayFilterFactory(); } @Bean public ModifyRequestBodyGatewayFilterFactory modifyRequestBodyGatewayFilterFactory( ServerCodecConfigurer codecConfigurer) { return new ModifyRequestBodyGatewayFilterFactory(codecConfigurer.getReaders()); } @Bean public DedupeResponseHeaderGatewayFilterFactory dedupeResponseHeaderGatewayFilterFactory() { return new DedupeResponseHeaderGatewayFilterFactory(); } // 限流 @Bean(name = PrincipalNameKeyResolver.BEAN_NAME) @ConditionalOnBean(RateLimiter.class) @ConditionalOnMissingBean(KeyResolver.class) public PrincipalNameKeyResolver principalNameKeyResolver() { return new PrincipalNameKeyResolver(); } @Bean @ConditionalOnBean({ RateLimiter.class, KeyResolver.class }) public RequestRateLimiterGatewayFilterFactory requestRateLimiterGatewayFilterFactory( RateLimiter rateLimiter, KeyResolver resolver) { return new RequestRateLimiterGatewayFilterFactory(rateLimiter, resolver); } // Netty網絡通信配置 @Configuration(proxyBeanMethods = false) @ConditionalOnClass(HttpClient.class) protected static class NettyConfiguration { ... 省略 } // 斷路器配置 @Configuration(proxyBeanMethods = false) @ConditionalOnClass({ HystrixObservableCommand.class, RxReactiveStreams.class }) protected static class HystrixConfiguration { ... 省略 } // 健康檢查配置 @Configuration(proxyBeanMethods = false) @ConditionalOnClass(Health.class) protected static class GatewayActuatorConfiguration { ... 省略 } }
GatewayHystrixCircuitBreakerAutoConfiguration.class: 熔斷器實例化
spring: cloud: gateway: # 默認過濾器,對所有路由生效 default-filters: - name: Hystrix args: - name: fallbackcmd fallbackUri: forward:/myfallback routes: - id: test-service uri: lb://test predicates: - Path=//test/** # 當前服務過濾器 filters: - name: Hystrix args: - name: fallbackcmd fallbackUri: forward:/myfallback #熔斷器配置 hystrix: command: default: execution: isolation: strategy: SEMAPHORE thread: timeoutInMilliseconds: 5000 shareSecurityContext: true
如果請求超過5秒還沒返回, 則觸發熔斷. 熔斷之后的操作就是降級方法(fallbackcmd)。
GatewayLoadBalancerClientAutoConfiguration.class:引入ribbon時的負載均衡實例化
@Configuration(proxyBeanMethods = false) @ConditionalOnClass({ LoadBalancerClient.class, RibbonAutoConfiguration.class, DispatcherHandler.class }) @AutoConfigureAfter(RibbonAutoConfiguration.class) @EnableConfigurationProperties(LoadBalancerProperties.class) public class GatewayLoadBalancerClientAutoConfiguration { @Bean @ConditionalOnBean(LoadBalancerClient.class) @ConditionalOnMissingBean({ LoadBalancerClientFilter.class, ReactiveLoadBalancerClientFilter.class }) public LoadBalancerClientFilter loadBalancerClientFilter(LoadBalancerClient client, LoadBalancerProperties properties) { return new LoadBalancerClientFilter(client, properties); } }
LoadBalancerClientFilter 過濾器 實現 GlobalFilter, Ordered。order 值越小,優先級越高
GatewayNoLoadBalancerClientAutoConfiguration.class:沒有引入ribbon時的負載均衡實例化
@Configuration(proxyBeanMethods = false) @ConditionalOnMissingClass("org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration") @ConditionalOnMissingBean(LoadBalancerClient.class) @EnableConfigurationProperties(LoadBalancerProperties.class) @AutoConfigureAfter(GatewayLoadBalancerClientAutoConfiguration.class) public class GatewayNoLoadBalancerClientAutoConfiguration { @Bean @ConditionalOnMissingBean(LoadBalancerClientFilter.class) public NoLoadBalancerClientFilter noLoadBalancerClientFilter(LoadBalancerProperties properties) { return new NoLoadBalancerClientFilter(properties.isUse404()); } protected static class NoLoadBalancerClientFilter implements GlobalFilter, Ordered { ...省略 @Override @SuppressWarnings("Duplicates") public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR); // 如果沒有指定url或者 url不是以lb為前綴的:lb://test,否則異常 if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) { return chain.filter(exchange); } throw NotFoundException.create(use404, "Unable to find instance for " + url.getHost()); } } }
GatewayMetricsAutoConfiguration.class:監控指標實例化
@Configuration(proxyBeanMethods = false) @ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true) @EnableConfigurationProperties(GatewayMetricsProperties.class) @AutoConfigureBefore(HttpHandlerAutoConfiguration.class) @AutoConfigureAfter({ MetricsAutoConfiguration.class, CompositeMeterRegistryAutoConfiguration.class }) @ConditionalOnClass({ DispatcherHandler.class, MeterRegistry.class, MetricsAutoConfiguration.class }) public class GatewayMetricsAutoConfiguration { @Bean public GatewayHttpTagsProvider gatewayHttpTagsProvider() { return new GatewayHttpTagsProvider(); } @Bean public GatewayRouteTagsProvider gatewayRouteTagsProvider() { return new GatewayRouteTagsProvider(); } @Bean public PropertiesTagsProvider propertiesTagsProvider( GatewayMetricsProperties gatewayMetricsProperties) { return new PropertiesTagsProvider(gatewayMetricsProperties.getTags()); } @Bean @ConditionalOnBean(MeterRegistry.class) @ConditionalOnProperty(name = "spring.cloud.gateway.metrics.enabled", matchIfMissing = true) public GatewayMetricsFilter gatewayMetricFilter(MeterRegistry meterRegistry, List<GatewayTagsProvider> tagsProviders) { return new GatewayMetricsFilter(meterRegistry, tagsProviders); } }
GatewayRedisAutoConfiguration.class:redis限流實例化
@Configuration(proxyBeanMethods = false) @AutoConfigureAfter(RedisReactiveAutoConfiguration.class) @AutoConfigureBefore(GatewayAutoConfiguration.class) @ConditionalOnBean(ReactiveRedisTemplate.class) @ConditionalOnClass({ RedisTemplate.class, DispatcherHandler.class }) class GatewayRedisAutoConfiguration { @Bean @SuppressWarnings("unchecked") public RedisScript redisRequestRateLimiterScript() { DefaultRedisScript redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("META-INF/scripts/request_rate_limiter.lua"))); redisScript.setResultType(List.class); return redisScript; } @Bean @ConditionalOnMissingBean public RedisRateLimiter redisRateLimiter(ReactiveStringRedisTemplate redisTemplate, @Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript, ConfigurationService configurationService) { return new RedisRateLimiter(redisTemplate, redisScript, configurationService); } }
RedisRateLimiter+request_rate_limiter.lua腳本
GatewayDiscoveryClientAutoConfiguration.class:服務發現
@Configuration(proxyBeanMethods = false) @ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true) @AutoConfigureBefore(GatewayAutoConfiguration.class) @AutoConfigureAfter(CompositeDiscoveryClientAutoConfiguration.class) @ConditionalOnClass({ DispatcherHandler.class }) @EnableConfigurationProperties public class GatewayDiscoveryClientAutoConfiguration { public static List<PredicateDefinition> initPredicates() { ArrayList<PredicateDefinition> definitions = new ArrayList<>(); // TODO: add a predicate that matches the url at /serviceId? // add a predicate that matches the url at /serviceId/** PredicateDefinition predicate = new PredicateDefinition(); predicate.setName(normalizeRoutePredicateName(PathRoutePredicateFactory.class)); predicate.addArg(PATTERN_KEY, "'/'+serviceId+'/**'"); definitions.add(predicate); return definitions; } public static List<FilterDefinition> initFilters() { ArrayList<FilterDefinition> definitions = new ArrayList<>(); // add a filter that removes /serviceId by default FilterDefinition filter = new FilterDefinition(); filter.setName(normalizeFilterFactoryName(RewritePathGatewayFilterFactory.class)); String regex = "'/' + serviceId + '/(?<remaining>.*)'"; String replacement = "'/${remaining}'"; filter.addArg(REGEXP_KEY, regex); filter.addArg(REPLACEMENT_KEY, replacement); definitions.add(filter); return definitions; } @Bean public DiscoveryLocatorProperties discoveryLocatorProperties() { DiscoveryLocatorProperties properties = new DiscoveryLocatorProperties(); properties.setPredicates(initPredicates()); properties.setFilters(initFilters()); return properties; } @Configuration(proxyBeanMethods = false) @ConditionalOnProperty(value = "spring.cloud.discovery.reactive.enabled", matchIfMissing = true) public static class ReactiveDiscoveryClientRouteDefinitionLocatorConfiguration { @Bean @ConditionalOnProperty(name = "spring.cloud.gateway.discovery.locator.enabled") public DiscoveryClientRouteDefinitionLocator discoveryClientRouteDefinitionLocator( ReactiveDiscoveryClient discoveryClient, DiscoveryLocatorProperties properties) { return new DiscoveryClientRouteDefinitionLocator(discoveryClient, properties); } } // 本機反應式服務發現功能 @Configuration(proxyBeanMethods = false) @Deprecated @ConditionalOnProperty(value = "spring.cloud.discovery.reactive.enabled", havingValue = "false") public static class BlockingDiscoveryClientRouteDefinitionLocatorConfiguration { @Bean @ConditionalOnProperty(name = "spring.cloud.gateway.discovery.locator.enabled") public DiscoveryClientRouteDefinitionLocator discoveryClientRouteDefinitionLocator( DiscoveryClient discoveryClient, DiscoveryLocatorProperties properties) { return new DiscoveryClientRouteDefinitionLocator(discoveryClient, properties); } } }
SimpleUrlHandlerMappingGlobalCorsAutoConfiguration.class:跨域
@Configuration(proxyBeanMethods = false) @ConditionalOnClass(SimpleUrlHandlerMapping.class) @ConditionalOnProperty( name = "spring.cloud.gateway.globalcors.add-to-simple-url-handler-mapping", matchIfMissing = false) public class SimpleUrlHandlerMappingGlobalCorsAutoConfiguration { @Autowired private GlobalCorsProperties globalCorsProperties; @Autowired private SimpleUrlHandlerMapping simpleUrlHandlerMapping; // 在實例化之后初始化之前執行,並且只會被服務器調用一次 @PostConstruct void config() { simpleUrlHandlerMapping.setCorsConfigurations(globalCorsProperties.getCorsConfigurations()); } }
GatewayReactiveLoadBalancerClientAutoConfiguration.class:反應式負載均衡
與GatewayLoadBalancerClientAutoConfiguration 中的 LoadBalancerClientFilter 互斥;LoadBalancerClientFilter與ReactiveLoadBalancerClientFilter只會實例化一個
@Configuration(proxyBeanMethods = false) @ConditionalOnClass({ ReactiveLoadBalancer.class, LoadBalancerAutoConfiguration.class, DispatcherHandler.class }) @AutoConfigureBefore(GatewayLoadBalancerClientAutoConfiguration.class) @AutoConfigureAfter(LoadBalancerAutoConfiguration.class) @EnableConfigurationProperties(LoadBalancerProperties.class) public class GatewayReactiveLoadBalancerClientAutoConfiguration { @Bean @ConditionalOnBean(LoadBalancerClientFactory.class) @ConditionalOnMissingBean(ReactiveLoadBalancerClientFilter.class) @Conditional(OnNoRibbonDefaultCondition.class) public ReactiveLoadBalancerClientFilter gatewayLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) { return new ReactiveLoadBalancerClientFilter(clientFactory, properties); } private static final class OnNoRibbonDefaultCondition extends AnyNestedCondition { private OnNoRibbonDefaultCondition() { super(ConfigurationPhase.REGISTER_BEAN); } @ConditionalOnProperty(value = "spring.cloud.loadbalancer.ribbon.enabled", havingValue = "false") static class RibbonNotEnabled {} @ConditionalOnMissingClass("org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient") static class RibbonLoadBalancerNotPresent {} } }
流程圖
執行過程
DispatcherHandler.class
public class DispatcherHandler implements WebHandler, ApplicationContextAware { public DispatcherHandler(ApplicationContext applicationContext) { initStrategies(applicationContext); } protected void initStrategies(ApplicationContext context) { // 獲取所有HandlerMapping接口的實現類 Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerMapping.class, true, false); ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values()); AnnotationAwareOrderComparator.sort(mappings); // 將轉換后的Mappings轉換為一個不能修改,只讀的List this.handlerMappings = Collections.unmodifiableList(mappings); // 獲取所有HandlerAdapter的實現類 Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerAdapter.class, true, false); this.handlerAdapters = new ArrayList<>(adapterBeans.values()); AnnotationAwareOrderComparator.sort(this.handlerAdapters); // 獲取所有HandlerResultHandler的實現類 Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors( context, HandlerResultHandler.class, true, false); this.resultHandlers = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(this.resultHandlers); } // 處理請求 @Override public Mono<Void> handle(ServerWebExchange exchange) { // 如果 handlerMappings 為空,創建異常返回 if (this.handlerMappings == null) { return createNotFoundError(); } // 不為空,就執行handlerMapping的getHandler方法 return Flux.fromIterable(this.handlerMappings) .concatMap(mapping -> mapping.getHandler(exchange)) .next() .switchIfEmpty(createNotFoundError()) .flatMap(handler -> invokeHandler(exchange, handler)) .flatMap(result -> handleResult(exchange, result)); } }
AbstractHandlerMapping.class
public abstract class AbstractHandlerMapping extends ApplicationObjectSupport implements HandlerMapping, Ordered, BeanNameAware { @Override public Mono<Object> getHandler(ServerWebExchange exchange) { // 獲取內部處理 handler return getHandlerInternal(exchange).map(handler -> { if (logger.isDebugEnabled()) { logger.debug(exchange.getLogPrefix() + "Mapped to " + handler); } ServerHttpRequest request = exchange.getRequest(); if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) { CorsConfiguration config = (this.corsConfigurationSource != null ? this.corsConfigurationSource.getCorsConfiguration(exchange) : null); CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange); config = (config != null ? config.combine(handlerConfig) : handlerConfig); if (config != null) { config.validateAllowCredentials(); } if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) { return REQUEST_HANDLED_HANDLER; } } return handler; }); } //抽象方法,由每個繼承類定義具體的實現 protected abstract Mono<?> getHandlerInternal(ServerWebExchange var1); }
RoutePredicateHandlerMapping.class
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping { @Override protected Mono<?> getHandlerInternal(ServerWebExchange exchange) { // 如果請求的端口是管理端口則不處理 if (this.managementPortType == DIFFERENT && this.managementPort != null && exchange.getRequest().getURI().getPort() == this.managementPort) { return Mono.empty(); } // 將 RoutePredicateHandlerMapping 緩存在 exchange 中 exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName()); // 查找路由 return lookupRoute(exchange) // .log("route-predicate-handler-mapping", Level.FINER) //name this .flatMap((Function<Route, Mono<?>>) r -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isDebugEnabled()) { logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r); } // 將 route 緩存 exchange 中 exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r); // webHandler處理 return Mono.just(webHandler); }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> { exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR); if (logger.isTraceEnabled()) { logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]"); } }))); } }
FilteringWebHandler.class
public class FilteringWebHandler implements WebHandler { // 處理所有全局過濾器 public FilteringWebHandler(List<GlobalFilter> globalFilters) { this.globalFilters = loadFilters(globalFilters); } private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) { // 如果實現了Ordered接口,包裝成 OrderedGatewayFilter; 如果沒有實現則用 GatewayFilterAdapter 包裝 return filters.stream().map(filter -> { GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter); if (filter instanceof Ordered) { int order = ((Ordered) filter).getOrder(); return new OrderedGatewayFilter(gatewayFilter, order); } return gatewayFilter; }).collect(Collectors.toList()); } // 過濾器鏈處理 @Override public Mono<Void> handle(ServerWebExchange exchange) { // 獲取 exchange 中的 route 路由信息 Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR); // 獲取路由中的過濾器 List<GatewayFilter> gatewayFilters = route.getFilters(); // 全局過濾器 + 路由中的過濾器 List<GatewayFilter> combined = new ArrayList<>(this.globalFilters); combined.addAll(gatewayFilters); // 排序 AnnotationAwareOrderComparator.sort(combined); if (logger.isDebugEnabled()) { logger.debug("Sorted gatewayFilterFactories: " + combined); } // 創建網關過濾器鏈,遞歸調用過濾器鏈進行過濾處理 return new DefaultGatewayFilterChain(combined).filter(exchange); } // 默認網關過濾器鏈 private static class DefaultGatewayFilterChain implements GatewayFilterChain { DefaultGatewayFilterChain(List<GatewayFilter> filters) { this.filters = filters; this.index = 0; } @Override public Mono<Void> filter(ServerWebExchange exchange) { // 遞歸調用每個過濾器filter方法 return Mono.defer(() -> { if (this.index < filters.size()) { GatewayFilter filter = filters.get(this.index); DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1); return filter.filter(exchange, chain); } else { return Mono.empty(); // complete } }); } } }
1.GatewayMetricsFilter:度量監控過濾器
該過濾器主要用於做網關度量監控的,要啟用,需添加spring-boot-starter-actuator依賴。
默認情況下,屬性spring.cloud.gateway.metrics.enabled未設置為false,GatewayMetricsFilter就會運行。
這些指標隨后可從/actuator/metrics/gateway.requests中進行抓取。
public class GatewayMetricsFilter implements GlobalFilter, Ordered { // 啟動計時器並報告度量標准事件 @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { Sample sample = Timer.start(meterRegistry); return chain.filter(exchange).doOnSuccess(aVoid -> endTimerRespectingCommit(exchange, sample)) .doOnError(throwable -> endTimerRespectingCommit(exchange, sample)); } private void endTimerRespectingCommit(ServerWebExchange exchange, Sample sample) { ServerHttpResponse response = exchange.getResponse(); if (response.isCommitted()) { endTimerInner(exchange, sample); } else { response.beforeCommit(() -> { endTimerInner(exchange, sample); return Mono.empty(); }); } } private void endTimerInner(ServerWebExchange exchange, Sample sample) { Tags tags = compositeTagsProvider.apply(exchange); if (log.isTraceEnabled()) { log.trace(metricsPrefix + ".requests tags: " + tags); } sample.stop(meterRegistry.timer(metricsPrefix + ".requests", tags)); } }
2.ReactiveLoadBalancerClientFilter:負載均衡過濾器
LoadBalancerClientFilter在ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR的exchange
屬性中查找URI。
如果URL的方案為lb(例如lb:// myservice
),將使用名稱(在本例中為myservice)解析為實際的主機和端口,並替換同一屬性中的URI。
過濾器會在exchange的屬性 GATEWAY_SCHEME_PREFIX_ATTR
中查找其是否等於lb。如果是,則適用相同的規則。
public class ReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered { @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR); // url 為空或者scheme不是 lb if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) { return chain.filter(exchange); } // 保留原始 url 路徑 addOriginalRequestUrl(exchange, url); if (log.isTraceEnabled()) { log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url); } URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR); String serviceId = requestUri.getHost(); // 獲取支持的生命周期處理器 Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator .getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), RequestDataContext.class, ResponseData.class, ServiceInstance.class); // 構建獲取實例請求 DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(new RequestDataContext( new RequestData(exchange.getRequest()), getHint(serviceId, loadBalancerProperties.getHint()))); return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> { // 返回結果中沒有實例則異常處理 if (!response.hasServer()) { supportedLifecycleProcessors.forEach(lifecycle -> lifecycle .onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response))); throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost()); } // 獲取實例 ServiceInstance retrievedInstance = response.getServer(); URI uri = exchange.getRequest().getURI(); // 如果使用了 lb:<scheme>機制,則使用<scheme>作為默認值, // 如果loadbalancer沒有提供 String overrideScheme = retrievedInstance.isSecure() ? "https" : "http"; if (schemePrefix != null) { overrideScheme = url.getScheme(); } // 代理實例 DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme); // 重建url,並緩存url 與 結果 URI requestUrl = reconstructURI(serviceInstance, uri); if (log.isTraceEnabled()) { log.trace("LoadBalancerClientFilter url chosen: " + requestUrl); } exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl); exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response); supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response)); }).then(chain.filter(exchange)) .doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>( CompletionContext.Status.FAILED, throwable, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR))))) .doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>( CompletionContext.Status.SUCCESS, lbRequest, exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR), new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest())))))); } // 負載均衡獲取一個實例 private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId, Set<LoadBalancerLifecycle> supportedLifecycleProcessors) { ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class); if (loadBalancer == null) { throw new NotFoundException("No loadbalancer available for " + serviceId); } supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest)); return loadBalancer.choose(lbRequest); } }
3.NettyRoutingFilter:路由全局過濾器
如果 exchange 屬性中的URL帶有http或https,則將運行NettyRoutingFilter。它使用Netty HttpClient發出下游代理請求。
響應將放入 exchange
屬性中,供后面的過濾器使用。
public class NettyRoutingFilter implements GlobalFilter, Ordered { @Override @SuppressWarnings("Duplicates") public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { // 獲取請求url URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR); String scheme = requestUrl.getScheme(); // 如果請求已路由或者不是http或https的schema,跳過當前過濾器繼續執行過濾器鏈 if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) { return chain.filter(exchange); } // 設置 GATEWAY_ALREADY_ROUTED_ATTR 標識已路由 setAlreadyRouted(exchange); ServerHttpRequest request = exchange.getRequest(); final HttpMethod method = HttpMethod.valueOf(request.getMethodValue()); final String url = requestUrl.toASCIIString(); HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange); // 把請求頭設置到 new DefaultHttpHeaders final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders(); filtered.forEach(httpHeaders::set); boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false); Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR); // 構建帶超時的HttpClient,設置 headers Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> { headers.add(httpHeaders); // 先清掉所有host頭,再獲取第一個host頭,添加進去 headers.remove(HttpHeaders.HOST); if (preserveHost) { String host = request.getHeaders().getFirst(HttpHeaders.HOST); headers.add(HttpHeaders.HOST, host); } // 請求 }).request(method).uri(url).send((req, nettyOutbound) -> { if (log.isTraceEnabled()) { nettyOutbound.withConnection(connection -> log.trace("outbound route: " + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix())); } return nettyOutbound.send(request.getBody().map(this::getByteBuf)); }).responseConnection((res, connection) -> { // 推遲提交響應,直到所有路由篩選器都運行為止。 // 將客戶端響應作為ServerWebExchange屬性,並在以后寫入響應NettyWriteResponseFilter exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res); exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection); ServerHttpResponse response = exchange.getResponse(); // 設置標題和狀態,以便篩選器可以修改響應 HttpHeaders headers = new HttpHeaders(); res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue())); // 獲取到contentType的值,如果有就緩存在 exchange 中 String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE); if (StringUtils.hasLength(contentTypeValue)) { exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue); } setResponseStatus(res, response); // 確保 HttpHeadersFilter 過濾器在設置狀態后運行,以便在響應時可用 HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange, Type.RESPONSE); if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING) && filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) { // 同時具有傳輸編碼頭和內容長度頭是無效的。 // 如果存在內容長度標頭,請在響應中刪除傳輸編碼標頭 response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING); } // 緩存 exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet()); response.getHeaders().putAll(filteredResponseHeaders); // 返回結果 return Mono.just(res); }); Duration responseTimeout = getResponseTimeout(route); if (responseTimeout != null) { responseFlux = responseFlux .timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))) .onErrorMap(TimeoutException.class, th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th)); } // 繼續執行過濾器鏈 return responseFlux.then(chain.filter(exchange)); } }
注:在程序啟動加載 spring.factories 時,內置的全局過濾器(GlobalFilter)和過濾器工廠(GatewayFilter Factory),謂詞工廠(PredicateFactory)都會實例化。
在 RoutePredicateHandlerMapping 的 getHandlerInternal 方法中會將 route 路由信息緩存起來,最后在 FilteringWebHandler 的 handle 方法中獲取 過濾器鏈,進行過濾處理。