Gateway網關源碼


依賴

compile 'org.springframework.cloud:spring-cloud-starter-gateway'

包結構

 

 

 SpringBoot - SPI機制

GatewayClassPathWarningAutoConfiguration.class

ClassPath檢測,如果SpringWeb的 org.springframework.web.servlet.DispatcherServlet存在或者 org.springframework.web.reactive.DispatcherHandler 不存在則警告

@Configuration(proxyBeanMethods = false)
@AutoConfigureBefore(GatewayAutoConfiguration.class) public class GatewayClassPathWarningAutoConfiguration {

    private static final Log log = LogFactory.getLog(GatewayClassPathWarningAutoConfiguration.class);

    private static final String BORDER = "\n\n**********************************************************\n\n";

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(name = "org.springframework.web.servlet.DispatcherServlet")
    protected static class SpringMvcFoundOnClasspathConfiguration {

        public SpringMvcFoundOnClasspathConfiguration() {
            log.warn(BORDER
                    + "Spring MVC found on classpath, which is incompatible with Spring Cloud Gateway at this time. "
                    + "Please remove spring-boot-starter-web dependency." + BORDER);
        }
    }

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnMissingClass("org.springframework.web.reactive.DispatcherHandler")
    protected static class WebfluxMissingFromClasspathConfiguration {

        public WebfluxMissingFromClasspathConfiguration() {
            log.warn(BORDER + "Spring Webflux is missing from the classpath, "
                    + "which is required for Spring Cloud Gateway at this time. "
                    + "Please add spring-boot-starter-webflux dependency." + BORDER);
        }
    }
}

GatewayAutoConfiguration.class

核心配置類:RouteLocator,RouteDefinitionLocator,RouteRefreshListener,GlobalCorsProperties,ForwardPathFilter,以及一些 Predicate Beans,GatewayFilter Factory beans 等等配置。

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true)
@EnableConfigurationProperties
@AutoConfigureBefore({ HttpHandlerAutoConfiguration.class, WebFluxAutoConfiguration.class }) @AutoConfigureAfter({ GatewayLoadBalancerClientAutoConfiguration.class, GatewayClassPathWarningAutoConfiguration.class })
@ConditionalOnClass(DispatcherHandler.class)
public class GatewayAutoConfiguration {

    @Bean
    public RouteLocatorBuilder routeLocatorBuilder(ConfigurableApplicationContext context) {
        return new RouteLocatorBuilder(context);
    }

    @Bean
    @Primary
    public RouteDefinitionLocator routeDefinitionLocator(List<RouteDefinitionLocator> routeDefinitionLocators) {
        return new CompositeRouteDefinitionLocator(Flux.fromIterable(routeDefinitionLocators));
    }

    @Bean
    public RouteRefreshListener routeRefreshListener(ApplicationEventPublisher publisher) {
        return new RouteRefreshListener(publisher);
    }

    @Bean
    public GlobalCorsProperties globalCorsProperties() {
        return new GlobalCorsProperties();
    }

    @Bean
    public ForwardPathFilter forwardPathFilter() {
        return new ForwardPathFilter();
    }

    // Predicate Factory beans
    @Bean
    public AfterRoutePredicateFactory afterRoutePredicateFactory() {
        return new AfterRoutePredicateFactory();
    }

    @Bean
    public BeforeRoutePredicateFactory beforeRoutePredicateFactory() {
        return new BeforeRoutePredicateFactory();
    }

    @Bean
    public BetweenRoutePredicateFactory betweenRoutePredicateFactory() {
        return new BetweenRoutePredicateFactory();
    }

    @Bean
    public CookieRoutePredicateFactory cookieRoutePredicateFactory() {
        return new CookieRoutePredicateFactory();
    }

    @Bean
    public HeaderRoutePredicateFactory headerRoutePredicateFactory() {
        return new HeaderRoutePredicateFactory();
    }

    @Bean
    public HostRoutePredicateFactory hostRoutePredicateFactory() {
        return new HostRoutePredicateFactory();
    }

    @Bean
    public MethodRoutePredicateFactory methodRoutePredicateFactory() {
        return new MethodRoutePredicateFactory();
    }

    @Bean
    public PathRoutePredicateFactory pathRoutePredicateFactory() {
        return new PathRoutePredicateFactory();
    }

    @Bean
    public QueryRoutePredicateFactory queryRoutePredicateFactory() {
        return new QueryRoutePredicateFactory();
    }

    // GatewayFilter Factory beans
    @Bean
    public AddRequestHeaderGatewayFilterFactory addRequestHeaderGatewayFilterFactory() {
        return new AddRequestHeaderGatewayFilterFactory();
    }

    @Bean
    public MapRequestHeaderGatewayFilterFactory mapRequestHeaderGatewayFilterFactory() {
        return new MapRequestHeaderGatewayFilterFactory();
    }

    @Bean
    public AddRequestParameterGatewayFilterFactory addRequestParameterGatewayFilterFactory() {
        return new AddRequestParameterGatewayFilterFactory();
    }

    @Bean
    public AddResponseHeaderGatewayFilterFactory addResponseHeaderGatewayFilterFactory() {
        return new AddResponseHeaderGatewayFilterFactory();
    }

    @Bean
    public ModifyRequestBodyGatewayFilterFactory modifyRequestBodyGatewayFilterFactory(
            ServerCodecConfigurer codecConfigurer) {
        return new ModifyRequestBodyGatewayFilterFactory(codecConfigurer.getReaders());
    }

    @Bean
    public DedupeResponseHeaderGatewayFilterFactory dedupeResponseHeaderGatewayFilterFactory() {
        return new DedupeResponseHeaderGatewayFilterFactory();
    }

    // 限流
    @Bean(name = PrincipalNameKeyResolver.BEAN_NAME)
    @ConditionalOnBean(RateLimiter.class)
    @ConditionalOnMissingBean(KeyResolver.class)
    public PrincipalNameKeyResolver principalNameKeyResolver() {
        return new PrincipalNameKeyResolver();
    }

    @Bean
    @ConditionalOnBean({ RateLimiter.class, KeyResolver.class })
    public RequestRateLimiterGatewayFilterFactory requestRateLimiterGatewayFilterFactory(
            RateLimiter rateLimiter, KeyResolver resolver) {
        return new RequestRateLimiterGatewayFilterFactory(rateLimiter, resolver);
    }


    // Netty網絡通信配置
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(HttpClient.class)
    protected static class NettyConfiguration {
        ... 省略
    }

    // 斷路器配置
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass({ HystrixObservableCommand.class, RxReactiveStreams.class })
    protected static class HystrixConfiguration {
        ... 省略
    }

    // 健康檢查配置
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(Health.class)
    protected static class GatewayActuatorConfiguration {
        ... 省略
    }
}

GatewayHystrixCircuitBreakerAutoConfiguration.class: 熔斷器實例化

spring:
  cloud:
    gateway:
      # 默認過濾器,對所有路由生效
      default-filters:
        - name: Hystrix
          args:
            - name: fallbackcmd
              fallbackUri: forward:/myfallback
      routes:
        - id: test-service
          uri: lb://test
          predicates:
            - Path=//test/**
          # 當前服務過濾器
          filters:
            - name: Hystrix
              args:
                - name: fallbackcmd
                  fallbackUri: forward:/myfallback

#熔斷器配置
hystrix:
  command:
    default:
      execution:
        isolation:
          strategy: SEMAPHORE
          thread:
            timeoutInMilliseconds: 5000
  shareSecurityContext: true

如果請求超過5秒還沒返回, 則觸發熔斷. 熔斷之后的操作就是降級方法(fallbackcmd)。

GatewayLoadBalancerClientAutoConfiguration.class:引入ribbon時的負載均衡實例化

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ LoadBalancerClient.class, RibbonAutoConfiguration.class, DispatcherHandler.class })
@AutoConfigureAfter(RibbonAutoConfiguration.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class GatewayLoadBalancerClientAutoConfiguration {

    @Bean
    @ConditionalOnBean(LoadBalancerClient.class)
    @ConditionalOnMissingBean({ LoadBalancerClientFilter.class, ReactiveLoadBalancerClientFilter.class })
    public LoadBalancerClientFilter loadBalancerClientFilter(LoadBalancerClient client, LoadBalancerProperties properties) {
        return new LoadBalancerClientFilter(client, properties);
    }
}

LoadBalancerClientFilter 過濾器 實現 GlobalFilter, Ordered。order 值越小,優先級越高

GatewayNoLoadBalancerClientAutoConfiguration.class:沒有引入ribbon時的負載均衡實例化

@Configuration(proxyBeanMethods = false)
@ConditionalOnMissingClass("org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration")
@ConditionalOnMissingBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
@AutoConfigureAfter(GatewayLoadBalancerClientAutoConfiguration.class)
public class GatewayNoLoadBalancerClientAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean(LoadBalancerClientFilter.class)
    public NoLoadBalancerClientFilter noLoadBalancerClientFilter(LoadBalancerProperties properties) {
        return new NoLoadBalancerClientFilter(properties.isUse404());
    }

    protected static class NoLoadBalancerClientFilter implements GlobalFilter, Ordered {
        ...省略
        @Override
        @SuppressWarnings("Duplicates")
        public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
            URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
            String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
            // 如果沒有指定url或者 url不是以lb為前綴的:lb://test,否則異常
            if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
                return chain.filter(exchange);
            }
            throw NotFoundException.create(use404, "Unable to find instance for " + url.getHost());
        }
    }
}

GatewayMetricsAutoConfiguration.class:監控指標實例化

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true)
@EnableConfigurationProperties(GatewayMetricsProperties.class)
@AutoConfigureBefore(HttpHandlerAutoConfiguration.class)
@AutoConfigureAfter({ MetricsAutoConfiguration.class, CompositeMeterRegistryAutoConfiguration.class })
@ConditionalOnClass({ DispatcherHandler.class, MeterRegistry.class, MetricsAutoConfiguration.class })
public class GatewayMetricsAutoConfiguration {

    @Bean
    public GatewayHttpTagsProvider gatewayHttpTagsProvider() {
        return new GatewayHttpTagsProvider();
    }

    @Bean
    public GatewayRouteTagsProvider gatewayRouteTagsProvider() {
        return new GatewayRouteTagsProvider();
    }

    @Bean
    public PropertiesTagsProvider propertiesTagsProvider( GatewayMetricsProperties gatewayMetricsProperties) {
        return new PropertiesTagsProvider(gatewayMetricsProperties.getTags());
    }

    @Bean
    @ConditionalOnBean(MeterRegistry.class)
    @ConditionalOnProperty(name = "spring.cloud.gateway.metrics.enabled", matchIfMissing = true)
    public GatewayMetricsFilter gatewayMetricFilter(MeterRegistry meterRegistry, List<GatewayTagsProvider> tagsProviders) {
        return new GatewayMetricsFilter(meterRegistry, tagsProviders);
    }

}

GatewayRedisAutoConfiguration.class:redis限流實例化

@Configuration(proxyBeanMethods = false)
@AutoConfigureAfter(RedisReactiveAutoConfiguration.class)
@AutoConfigureBefore(GatewayAutoConfiguration.class)
@ConditionalOnBean(ReactiveRedisTemplate.class)
@ConditionalOnClass({ RedisTemplate.class, DispatcherHandler.class })
class GatewayRedisAutoConfiguration {

    @Bean
    @SuppressWarnings("unchecked")
    public RedisScript redisRequestRateLimiterScript() {
        DefaultRedisScript redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));
        redisScript.setResultType(List.class);
        return redisScript;
    }

    @Bean
    @ConditionalOnMissingBean
    public RedisRateLimiter redisRateLimiter(ReactiveStringRedisTemplate redisTemplate,
            @Qualifier(RedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> redisScript,
            ConfigurationService configurationService) {
        return new RedisRateLimiter(redisTemplate, redisScript, configurationService);
    }
}

RedisRateLimiter+request_rate_limiter.lua腳本

GatewayDiscoveryClientAutoConfiguration.class:服務發現

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true)
@AutoConfigureBefore(GatewayAutoConfiguration.class)
@AutoConfigureAfter(CompositeDiscoveryClientAutoConfiguration.class)
@ConditionalOnClass({ DispatcherHandler.class })
@EnableConfigurationProperties
public class GatewayDiscoveryClientAutoConfiguration {

    public static List<PredicateDefinition> initPredicates() {
        ArrayList<PredicateDefinition> definitions = new ArrayList<>();
        // TODO: add a predicate that matches the url at /serviceId?

        // add a predicate that matches the url at /serviceId/**
        PredicateDefinition predicate = new PredicateDefinition();
        predicate.setName(normalizeRoutePredicateName(PathRoutePredicateFactory.class));
        predicate.addArg(PATTERN_KEY, "'/'+serviceId+'/**'");
        definitions.add(predicate);
        return definitions;
    }

    public static List<FilterDefinition> initFilters() {
        ArrayList<FilterDefinition> definitions = new ArrayList<>();

        // add a filter that removes /serviceId by default
        FilterDefinition filter = new FilterDefinition();
        filter.setName(normalizeFilterFactoryName(RewritePathGatewayFilterFactory.class));
        String regex = "'/' + serviceId + '/(?<remaining>.*)'";
        String replacement = "'/${remaining}'";
        filter.addArg(REGEXP_KEY, regex);
        filter.addArg(REPLACEMENT_KEY, replacement);
        definitions.add(filter);

        return definitions;
    }

    @Bean
    public DiscoveryLocatorProperties discoveryLocatorProperties() {
        DiscoveryLocatorProperties properties = new DiscoveryLocatorProperties();
        properties.setPredicates(initPredicates());
        properties.setFilters(initFilters());
        return properties;
    }

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnProperty(value = "spring.cloud.discovery.reactive.enabled",
            matchIfMissing = true)
    public static class ReactiveDiscoveryClientRouteDefinitionLocatorConfiguration {

        @Bean
        @ConditionalOnProperty(name = "spring.cloud.gateway.discovery.locator.enabled")
        public DiscoveryClientRouteDefinitionLocator discoveryClientRouteDefinitionLocator(
                ReactiveDiscoveryClient discoveryClient,
                DiscoveryLocatorProperties properties) {
            return new DiscoveryClientRouteDefinitionLocator(discoveryClient, properties);
        }
    }

    // 本機反應式服務發現功能
    @Configuration(proxyBeanMethods = false)
    @Deprecated
    @ConditionalOnProperty(value = "spring.cloud.discovery.reactive.enabled",
            havingValue = "false")
    public static class BlockingDiscoveryClientRouteDefinitionLocatorConfiguration {

        @Bean
        @ConditionalOnProperty(name = "spring.cloud.gateway.discovery.locator.enabled")
        public DiscoveryClientRouteDefinitionLocator discoveryClientRouteDefinitionLocator(
                DiscoveryClient discoveryClient, DiscoveryLocatorProperties properties) {
            return new DiscoveryClientRouteDefinitionLocator(discoveryClient, properties);
        }
    }
}

SimpleUrlHandlerMappingGlobalCorsAutoConfiguration.class:跨域

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(SimpleUrlHandlerMapping.class)
@ConditionalOnProperty(
        name = "spring.cloud.gateway.globalcors.add-to-simple-url-handler-mapping",
        matchIfMissing = false)
public class SimpleUrlHandlerMappingGlobalCorsAutoConfiguration {

    @Autowired
    private GlobalCorsProperties globalCorsProperties;

    @Autowired
    private SimpleUrlHandlerMapping simpleUrlHandlerMapping;

    // 在實例化之后初始化之前執行,並且只會被服務器調用一次
 @PostConstruct void config() {
        simpleUrlHandlerMapping.setCorsConfigurations(globalCorsProperties.getCorsConfigurations());
    }
}

GatewayReactiveLoadBalancerClientAutoConfiguration.class:反應式負載均衡

與GatewayLoadBalancerClientAutoConfiguration 中的 LoadBalancerClientFilter 互斥;LoadBalancerClientFilter與ReactiveLoadBalancerClientFilter只會實例化一個

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ ReactiveLoadBalancer.class, LoadBalancerAutoConfiguration.class, DispatcherHandler.class })
@AutoConfigureBefore(GatewayLoadBalancerClientAutoConfiguration.class)
@AutoConfigureAfter(LoadBalancerAutoConfiguration.class)
@EnableConfigurationProperties(LoadBalancerProperties.class)
public class GatewayReactiveLoadBalancerClientAutoConfiguration {

    @Bean
    @ConditionalOnBean(LoadBalancerClientFactory.class)
    @ConditionalOnMissingBean(ReactiveLoadBalancerClientFilter.class)
    @Conditional(OnNoRibbonDefaultCondition.class)
    public ReactiveLoadBalancerClientFilter gatewayLoadBalancerClientFilter(LoadBalancerClientFactory clientFactory, LoadBalancerProperties properties) {
        return new ReactiveLoadBalancerClientFilter(clientFactory, properties);
    }

    private static final class OnNoRibbonDefaultCondition extends AnyNestedCondition {

        private OnNoRibbonDefaultCondition() {
            super(ConfigurationPhase.REGISTER_BEAN);
        }

        @ConditionalOnProperty(value = "spring.cloud.loadbalancer.ribbon.enabled", havingValue = "false")
        static class RibbonNotEnabled {}

        @ConditionalOnMissingClass("org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient")
        static class RibbonLoadBalancerNotPresent {}
    }
}

流程圖 

 

 

執行過程

DispatcherHandler.class

public class DispatcherHandler implements WebHandler, ApplicationContextAware {

    public DispatcherHandler(ApplicationContext applicationContext) {
        initStrategies(applicationContext);
    }

    protected void initStrategies(ApplicationContext context) {
        // 獲取所有HandlerMapping接口的實現類
        Map<String, HandlerMapping> mappingBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
                context, HandlerMapping.class, true, false);
        ArrayList<HandlerMapping> mappings = new ArrayList<>(mappingBeans.values());
        AnnotationAwareOrderComparator.sort(mappings);
        // 將轉換后的Mappings轉換為一個不能修改,只讀的List
        this.handlerMappings = Collections.unmodifiableList(mappings);

        // 獲取所有HandlerAdapter的實現類
        Map<String, HandlerAdapter> adapterBeans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
                context, HandlerAdapter.class, true, false);
        this.handlerAdapters = new ArrayList<>(adapterBeans.values());
        AnnotationAwareOrderComparator.sort(this.handlerAdapters);

        // 獲取所有HandlerResultHandler的實現類
        Map<String, HandlerResultHandler> beans = BeanFactoryUtils.beansOfTypeIncludingAncestors(
                context, HandlerResultHandler.class, true, false);
        this.resultHandlers = new ArrayList<>(beans.values());
        AnnotationAwareOrderComparator.sort(this.resultHandlers);
    }

    // 處理請求
    @Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        // 如果 handlerMappings 為空,創建異常返回
        if (this.handlerMappings == null) {
            return createNotFoundError();
        }
        // 不為空,就執行handlerMapping的getHandler方法
        return Flux.fromIterable(this.handlerMappings)
                .concatMap(mapping -> mapping.getHandler(exchange))
                .next()
                .switchIfEmpty(createNotFoundError())
                .flatMap(handler -> invokeHandler(exchange, handler))
                .flatMap(result -> handleResult(exchange, result));
    }
}

AbstractHandlerMapping.class

public abstract class AbstractHandlerMapping extends ApplicationObjectSupport
        implements HandlerMapping, Ordered, BeanNameAware {

    @Override
    public Mono<Object> getHandler(ServerWebExchange exchange) {
        // 獲取內部處理 handler
        return getHandlerInternal(exchange).map(handler -> {
            if (logger.isDebugEnabled()) {
                logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
            }
            ServerHttpRequest request = exchange.getRequest();
            if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
                CorsConfiguration config = (this.corsConfigurationSource != null ? this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
                CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
                config = (config != null ? config.combine(handlerConfig) : handlerConfig);
                if (config != null) {
                    config.validateAllowCredentials();
                }
                if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
                    return REQUEST_HANDLED_HANDLER;
                }
            }
            return handler;
        });
    }

    //抽象方法,由每個繼承類定義具體的實現
    protected abstract Mono<?> getHandlerInternal(ServerWebExchange var1);
}

RoutePredicateHandlerMapping.class

public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {
    @Override
    protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
        // 如果請求的端口是管理端口則不處理
        if (this.managementPortType == DIFFERENT && this.managementPort != null
                && exchange.getRequest().getURI().getPort() == this.managementPort) {
            return Mono.empty();
        }
        // 將 RoutePredicateHandlerMapping 緩存在 exchange 中
        exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
        // 查找路由
        return lookupRoute(exchange)
                // .log("route-predicate-handler-mapping", Level.FINER) //name this
                .flatMap((Function<Route, Mono<?>>) r -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
                    }
                    // 將 route 緩存 exchange 中
                    exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
                    // webHandler處理
                    return Mono.just(webHandler);
                }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
                    exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
                    }
                })));
    }
}

FilteringWebHandler.class

public class FilteringWebHandler implements WebHandler {

    // 處理所有全局過濾器
    public FilteringWebHandler(List<GlobalFilter> globalFilters) {
        this.globalFilters = loadFilters(globalFilters);
    }

    private static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {
        // 如果實現了Ordered接口,包裝成 OrderedGatewayFilter; 如果沒有實現則用 GatewayFilterAdapter 包裝
        return filters.stream().map(filter -> {
            GatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);
            if (filter instanceof Ordered) {
                int order = ((Ordered) filter).getOrder();
                return new OrderedGatewayFilter(gatewayFilter, order);
            }
            return gatewayFilter;
        }).collect(Collectors.toList());
    }

    // 過濾器鏈處理
    @Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        // 獲取 exchange 中的 route 路由信息
        Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
        // 獲取路由中的過濾器
        List<GatewayFilter> gatewayFilters = route.getFilters();
        // 全局過濾器 + 路由中的過濾器
        List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);
        combined.addAll(gatewayFilters);
        // 排序
        AnnotationAwareOrderComparator.sort(combined);

        if (logger.isDebugEnabled()) {
            logger.debug("Sorted gatewayFilterFactories: " + combined);
        }
        // 創建網關過濾器鏈,遞歸調用過濾器鏈進行過濾處理
        return new DefaultGatewayFilterChain(combined).filter(exchange);
    }

    // 默認網關過濾器鏈
    private static class DefaultGatewayFilterChain implements GatewayFilterChain {

        DefaultGatewayFilterChain(List<GatewayFilter> filters) {
            this.filters = filters;
            this.index = 0;
        }

        @Override
        public Mono<Void> filter(ServerWebExchange exchange) {
            // 遞歸調用每個過濾器filter方法
            return Mono.defer(() -> {
                if (this.index < filters.size()) {
                    GatewayFilter filter = filters.get(this.index);
                    DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this, this.index + 1);
                    return filter.filter(exchange, chain);
                }
                else {
                    return Mono.empty(); // complete
                }
            });
        }
    }
}

1.GatewayMetricsFilter:度量監控過濾器

該過濾器主要用於做網關度量監控的,要啟用,需添加spring-boot-starter-actuator依賴。

默認情況下,屬性spring.cloud.gateway.metrics.enabled未設置為false,GatewayMetricsFilter就會運行。

這些指標隨后可從/actuator/metrics/gateway.requests中進行抓取。

public class GatewayMetricsFilter implements GlobalFilter, Ordered {

    // 啟動計時器並報告度量標准事件
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        Sample sample = Timer.start(meterRegistry);

        return chain.filter(exchange).doOnSuccess(aVoid -> endTimerRespectingCommit(exchange, sample))
                .doOnError(throwable -> endTimerRespectingCommit(exchange, sample));
    }

    private void endTimerRespectingCommit(ServerWebExchange exchange, Sample sample) {

        ServerHttpResponse response = exchange.getResponse();
        if (response.isCommitted()) {
            endTimerInner(exchange, sample);
        }
        else {
            response.beforeCommit(() -> {
                endTimerInner(exchange, sample);
                return Mono.empty();
            });
        }
    }

    private void endTimerInner(ServerWebExchange exchange, Sample sample) {
        Tags tags = compositeTagsProvider.apply(exchange);

        if (log.isTraceEnabled()) {
            log.trace(metricsPrefix + ".requests tags: " + tags);
        }
        sample.stop(meterRegistry.timer(metricsPrefix + ".requests", tags));
    }

}

 

2.ReactiveLoadBalancerClientFilter:負載均衡過濾器

LoadBalancerClientFilter在ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR的exchange屬性中查找URI。

如果URL的方案為lb(例如lb:// myservice),將使用名稱(在本例中為myservice)解析為實際的主機和端口,並替換同一屬性中的URI。

過濾器會在exchange的屬性 GATEWAY_SCHEME_PREFIX_ATTR 中查找其是否等於lb。如果是,則適用相同的規則。

public class ReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
        // url 為空或者scheme不是 lb
        if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
            return chain.filter(exchange);
        }
        // 保留原始 url 路徑
        addOriginalRequestUrl(exchange, url);

        if (log.isTraceEnabled()) {
            log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
        }

        URI requestUri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
        String serviceId = requestUri.getHost();
        // 獲取支持的生命周期處理器
        Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
                .getSupportedLifecycleProcessors(clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
                        RequestDataContext.class, ResponseData.class, ServiceInstance.class);
        // 構建獲取實例請求 
        DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(new RequestDataContext(
                new RequestData(exchange.getRequest()), getHint(serviceId, loadBalancerProperties.getHint())));
        return choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext(response -> {
            // 返回結果中沒有實例則異常處理
            if (!response.hasServer()) {
                supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
                        .onComplete(new CompletionContext<>(CompletionContext.Status.DISCARD, lbRequest, response)));
                throw NotFoundException.create(properties.isUse404(), "Unable to find instance for " + url.getHost());
            }
            // 獲取實例
            ServiceInstance retrievedInstance = response.getServer();
            URI uri = exchange.getRequest().getURI();

            // 如果使用了 lb:<scheme>機制,則使用<scheme>作為默認值,
            // 如果loadbalancer沒有提供
            String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
            if (schemePrefix != null) {
                overrideScheme = url.getScheme();
            }

            // 代理實例
            DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme);
            // 重建url,並緩存url 與 結果
            URI requestUrl = reconstructURI(serviceInstance, uri);

            if (log.isTraceEnabled()) {
                log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
            }
            exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
            exchange.getAttributes().put(GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);
            supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, response));
        }).then(chain.filter(exchange))
                .doOnError(throwable -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
                        .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                                CompletionContext.Status.FAILED, throwable, lbRequest,
                                exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR)))))
                .doOnSuccess(aVoid -> supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
                        .onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                                CompletionContext.Status.SUCCESS, lbRequest,
                                exchange.getAttribute(GATEWAY_LOADBALANCER_RESPONSE_ATTR),
                                new ResponseData(exchange.getResponse(), new RequestData(exchange.getRequest()))))));
    }

    // 負載均衡獲取一個實例
    private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId,
            Set<LoadBalancerLifecycle> supportedLifecycleProcessors) {
        ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(serviceId,
                ReactorServiceInstanceLoadBalancer.class);
        if (loadBalancer == null) {
            throw new NotFoundException("No loadbalancer available for " + serviceId);
        }
        supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
        return loadBalancer.choose(lbRequest);
    }
}

 

3.NettyRoutingFilter:路由全局過濾器

如果 exchange 屬性中的URL帶有http或https,則將運行NettyRoutingFilter。它使用Netty HttpClient發出下游代理請求。

響應將放入 exchange 屬性中,供后面的過濾器使用。

public class NettyRoutingFilter implements GlobalFilter, Ordered {

    @Override
    @SuppressWarnings("Duplicates")
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        // 獲取請求url
        URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
        String scheme = requestUrl.getScheme();
        // 如果請求已路由或者不是http或https的schema,跳過當前過濾器繼續執行過濾器鏈
        if (isAlreadyRouted(exchange) || (!"http".equals(scheme) && !"https".equals(scheme))) {
            return chain.filter(exchange);
        }
        // 設置 GATEWAY_ALREADY_ROUTED_ATTR 標識已路由
        setAlreadyRouted(exchange);

        ServerHttpRequest request = exchange.getRequest();
        final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
        final String url = requestUrl.toASCIIString();
        HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);

        // 把請求頭設置到 new DefaultHttpHeaders
        final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
        filtered.forEach(httpHeaders::set);

        boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
        Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);

        // 構建帶超時的HttpClient,設置 headers
        Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> {
            headers.add(httpHeaders);
            // 先清掉所有host頭,再獲取第一個host頭,添加進去
            headers.remove(HttpHeaders.HOST);
            if (preserveHost) {
                String host = request.getHeaders().getFirst(HttpHeaders.HOST);
                headers.add(HttpHeaders.HOST, host);
            }
            // 請求
        }).request(method).uri(url).send((req, nettyOutbound) -> {
            if (log.isTraceEnabled()) {
                nettyOutbound.withConnection(connection -> log.trace("outbound route: "
                        + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix()));
            }
            return nettyOutbound.send(request.getBody().map(this::getByteBuf));
        }).responseConnection((res, connection) -> {

            // 推遲提交響應,直到所有路由篩選器都運行為止。
            // 將客戶端響應作為ServerWebExchange屬性,並在以后寫入響應NettyWriteResponseFilter
            exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
            exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

            ServerHttpResponse response = exchange.getResponse();
            // 設置標題和狀態,以便篩選器可以修改響應
            HttpHeaders headers = new HttpHeaders();
            res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));

            // 獲取到contentType的值,如果有就緩存在 exchange 中
            String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
            if (StringUtils.hasLength(contentTypeValue)) {
                exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR, contentTypeValue);
            }

            setResponseStatus(res, response);

            // 確保 HttpHeadersFilter 過濾器在設置狀態后運行,以便在響應時可用
            HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange,
                    Type.RESPONSE);

            if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)
                    && filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {
                // 同時具有傳輸編碼頭和內容長度頭是無效的。
                // 如果存在內容長度標頭,請在響應中刪除傳輸編碼標頭
                response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
            }
            // 緩存
            exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());
            response.getHeaders().putAll(filteredResponseHeaders);
            // 返回結果
            return Mono.just(res);
        });

        Duration responseTimeout = getResponseTimeout(route);
        if (responseTimeout != null) {
            responseFlux = responseFlux
                    .timeout(responseTimeout,
                            Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout)))
                    .onErrorMap(TimeoutException.class,
                            th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th));
        }
        // 繼續執行過濾器鏈
        return responseFlux.then(chain.filter(exchange));
    }
}

 注:在程序啟動加載 spring.factories 時,內置的全局過濾器(GlobalFilter)和過濾器工廠(GatewayFilter Factory),謂詞工廠(PredicateFactory)都會實例化。

  在 RoutePredicateHandlerMapping 的 getHandlerInternal 方法中會將 route 路由信息緩存起來,最后在 FilteringWebHandler 的 handle 方法中獲取 過濾器鏈,進行過濾處理。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM