Spring-Cloud-Gateway-全局filter細品(五)


目標

  • 溫故 Java8 常見函數式用法
  • 再過一下 lookupRoute 方法
  • 過一遍 9 默認全局 Filter

Java 8 常見函數式用法

Consumer 表達式

一個消費型的接口,通過傳入參數,然后輸出值,無返回值。接連兩個consumer有相同的入參可以使用addThen將兩個方法鏈接起來,然后一起accept方法接受入參。

Consumer<Integer> add2consumer = x -> {
  int a = x + 2;
  System.out.println(a);
};
Consumer<Integer> add4consumer = x -> {
  int a = x + 4;
  System.out.println(a);
};
add2consumer.andThen(add4consumer).accept(10);
​
//① 使用consumer接口實現方法
Consumer<String> consumer = new Consumer<String>() {
  @Override
  public void accept(String s) {
    System.out.println(s);
  }
};
Stream<String> stream = Stream.of("aaa", "bbb", "ddd", "ccc", "fff");
stream.forEach(consumer);
System.out.println("********************");
​
//② 使用lambda表達式,forEach方法需要的就是一個Consumer接口
stream = Stream.of("aaa", "bbb", "ddd", "ccc", "fff");
Consumer<String> consumer1 = (s) -> System.out.println(s);//lambda表達式返回的就是一個Consumer接口
stream.forEach(consumer1);
//更直接的方式
//stream.forEach((s) -> System.out.println(s));
System.out.println("********************");
​
//③ 使用方法引用,方法引用也是一個consumer
stream = Stream.of("aaa", "bbb", "ddd", "ccc", "fff");
Consumer consumer2 = System.out::println;
stream.forEach(consumer2);

Supplier 表達式

一個供給型的接口,可以用來存儲數據,然后可以供其他方法使用的這么一個接口。

//① 使用Supplier接口實現方法,只有一個get方法,無參數,返回一個值
Supplier<Integer> supplier = new Supplier<Integer>() {
  @Override
  public Integer get() {
    //返回一個隨機值
    return new Random().nextInt();
  }
};
​
System.out.println(supplier.get());
System.out.println("********************");
​
//② 使用lambda表達式,
supplier = () -> new Random().nextInt();
System.out.println(supplier.get());
System.out.println("********************");
​
//③ 使用方法引用
Supplier<Double> supplier2 = Math::random;
System.out.println(supplier2.get());
​
// 高階用法
Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5);
//返回一個optional對象
Optional<Integer> first = stream.filter(i -> i > 4)
  .findFirst();
​
//optional對象有需要Supplier接口的方法
//orElse,如果first中存在數,就返回這個數,如果不存在,就放回傳入的數
System.out.println(first.orElse(1));
System.out.println(first.orElse(7));
​
System.out.println("********************");
​
Supplier<Integer> integerSupplier = new Supplier<Integer>() {
  @Override
  public Integer get() {
    //返回一個隨機值
    return new Random().nextInt();
  }
};
​
//orElseGet,如果first中存在數,就返回這個數,如果不存在,就返回supplier返回的值
System.out.println(first.orElseGet(integerSupplier));

Function 表達式

一個功能型接口,它的一個作用就是轉換作用,將輸入數據轉換成另一種形式的輸出數據。

//① 使用map方法,泛型的第一個參數是轉換前的類型,第二個是轉化后的類型
Function<String, Integer> function = new Function<String, Integer>() {
  @Override
  public Integer apply(String s) {
    return s.length();//獲取每個字符串的長度,並且返回
  }
};
​
Stream<String> stream = Stream.of("aaa", "bbbbb", "ccccccv");
Stream<Integer> stream1 = stream.map(function);
stream1.forEach(System.out::println);
System.out.println("********************");

Predicate 表達式

謂詞型接口,其實,這個就是一個類似於 bool 類型的判斷的接口。

//① 使用Predicate接口實現方法,只有一個test方法,傳入一個參數,返回一個bool值
Predicate<Integer> predicate = new Predicate<Integer>() {
  @Override
  public boolean test(Integer integer) {
    if (integer > 5) {
      return true;
    }
    return false;
  }
};
System.out.println(predicate.test(6));
System.out.println("********************");
​
//② 使用lambda表達式,
predicate = (t) -> t > 5;
System.out.println(predicate.test(1));
System.out.println("********************");

需要時查看

列舉下java8中 java.util.function包下,內置所有的接口簡介和表達的意思

1 BiConsumer :代表了一個接受兩個輸入參數的操作,並且不返回任何結果

2 BiFunction :代表了一個接受兩個輸入參數的方法,並且返回一個結果

3 BinaryOperator :代表了一個作用於於兩個同類型操作符的操作,並且返回了操作符同類型的結果

4 BiPredicate :代表了一個兩個參數的boolean值方法

5 BooleanSupplier:代表了boolean值結果的提供方

6 Consumer :代表了接受一個輸入參數並且無返回的操作

7 DoubleBinaryOperator:代表了作用於兩個double值操作符的操作,並且返回了一個double值的結果。

8 DoubleConsumer:代表一個接受double值參數的操作,並且不返回結果。

9 DoubleFunction :代表接受一個double值參數的方法,並且返回結果

10 DoublePredicate:代表一個擁有double值參數的boolean值方法

11 DoubleSupplier:代表一個double值結構的提供方

12 DoubleToIntFunction:接受一個double類型輸入,返回一個int類型結果。

13 DoubleToLongFunction:接受一個double類型輸入,返回一個long類型結果

14 DoubleUnaryOperator:接受一個參數同為類型double,返回值類型也為double 。

15 Function :接受一個輸入參數,返回一個結果。

16 IntBinaryOperator:接受兩個參數同為類型int,返回值類型也為int 。

17 IntConsumer:接受一個int類型的輸入參數,無返回值 。

18 IntFunction :接受一個int類型輸入參數,返回一個結果 。

19 IntPredicate:接受一個int輸入參數,返回一個布爾值的結果。

20 IntSupplier:無參數,返回一個int類型結果。

21 IntToDoubleFunction:接受一個int類型輸入,返回一個double類型結果 。

22 IntToLongFunction:接受一個int類型輸入,返回一個long類型結果。

23 IntUnaryOperator:接受一個參數同為類型int,返回值類型也為int 。

24 LongBinaryOperator:接受兩個參數同為類型long,返回值類型也為long。

25 LongConsumer:接受一個long類型的輸入參數,無返回值。

26 LongFunction :接受一個long類型輸入參數,返回一個結果。

27 LongPredicate:R接受一個long輸入參數,返回一個布爾值類型結果。

28 LongSupplier:無參數,返回一個結果long類型的值。

29 LongToDoubleFunction:接受一個long類型輸入,返回一個double類型結果。

30 LongToIntFunction:接受一個long類型輸入,返回一個int類型結果。

31 LongUnaryOperator:接受一個參數同為類型long,返回值類型也為long。

32 ObjDoubleConsumer :接受一個object類型和一個double類型的輸入參數,無返回值。

33 ObjIntConsumer :接受一個object類型和一個int類型的輸入參數,無返回值。

34 ObjLongConsumer :接受一個object類型和一個long類型的輸入參數,無返回值。

35 Predicate :接受一個輸入參數,返回一個布爾值結果。

36 Supplier :無參數,返回一個結果。

37 ToDoubleBiFunction :接受兩個輸入參數,返回一個double類型結果

38 ToDoubleFunction :接受一個輸入參數,返回一個double類型結果

39 ToIntBiFunction :接受兩個輸入參數,返回一個int類型結果。

40 ToIntFunction :接受一個輸入參數,返回一個int類型結果。

41 ToLongBiFunction :接受兩個輸入參數,返回一個long類型結果。

42 ToLongFunction :接受一個輸入參數,返回一個long類型結果。

43 UnaryOperator :接受一個參數為類型T,返回值類型也為T。

lookupRoute 方法如何調用?

protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
  logger.info("lookupRoute 開始匹配 Route 賊關鍵 -> RoutePredicateHandlerMapping#lookupRoute");
  return this.routeLocator.getRoutes()
    // 單獨處理,以便在出現錯誤的時候,返回空
    .concatMap(route ->
               Mono.just(route)
               .filterWhen(r -> {
                 logger.info("設置當前執行的 routeId GATEWAY_PREDICATE_ROUTE_ATTR: " +
                             r.getId() + " -> RoutePredicateHandlerMapping#lookupRoute");
                 exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
                 return r.getPredicate().apply(exchange);
               })
               .doOnError(e -> logger.error(
                 "Error applying predicate for route: " + route.getId(),
                 e))
               .onErrorResume(e -> Mono.empty()))
    .next()
    .map(route -> {
      if (logger.isDebugEnabled()) {
        logger.debug("Route matched: " + route.getId());
      }
      logger.info("校驗 Route 的有效性 -> RoutePredicateHandlerMapping#lookupRoute");
      validateRoute(route, exchange);
      return route;
    });
}

從方法名我們可以得知,這個方法就是要找出符合規則的 Route。在實際的調用過程中,方法是先返回一個 Mono 空對象,之后自己執行r.getPredicate().apply(exchange)使用每個Route的謂詞規則過濾,返回匹配 Route。

public class Route implements Ordered {
  ...
  // route 的 getPredicate 方法返回 AsyncPredicate 類型
  public AsyncPredicate<ServerWebExchange> getPredicate() {
    return this.predicate;
  }
}

route 的 getPredicate 方法返回 AsyncPredicate 類型

class DefaultAsyncPredicate<T> implements AsyncPredicate<T> {
​
  private final Predicate<T> delegate;
​
  public DefaultAsyncPredicate(Predicate<T> delegate) {
    this.delegate = delegate;
  }
​
  @Override
  public Publisher<Boolean> apply(T t) {
    return Mono.just(delegate.test(t));
  }
  ...
}

然后執行Predicatetest方法返回一個boolean值,來滿足filterWhen方法,返回對應 Route。

9 個默認全局 Filter

訪問對應http://localhost:8080/actuator/gateway/globalfilters地址,我們可以看到所有 GlobalFilter 以及對應的Order順序。

{
  "org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter@ffaaaf0": -2147483648,
  "org.springframework.cloud.gateway.filter.AdaptCachedBodyGlobalFilter@537c8c7e": -2147482648,
  "org.springframework.cloud.gateway.filter.NettyWriteResponseFilter@2459319c": -1,
  "org.springframework.cloud.gateway.filter.ForwardPathFilter@33d53216": 0,
  "org.springframework.cloud.gateway.filter.GatewayMetricsFilter@4f3e7344": 0,
  "org.springframework.cloud.gateway.filter.RouteToRequestUrlFilter@1dc76fa1": 10000,
  "org.springframework.cloud.gateway.filter.WebsocketRoutingFilter@69a2b3b6": 2147483646,
  "org.springframework.cloud.gateway.filter.NettyRoutingFilter@3681037": 2147483647,
  "org.springframework.cloud.gateway.filter.ForwardRoutingFilter@5eed2d86": 2147483647,
}

RemoveCachedBodyFilter

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  return chain.filter(exchange).doFinally(s -> {
    log.info("執行到 RemoveCachedBodyFilter#filter ");
    // 移除上下文中 cachedRequestBody 屬性
    Object attribute = exchange.getAttributes().remove(CACHED_REQUEST_BODY_ATTR);
    // PooledDataBuffer 是 DataBuffer 的擴展,允許共享一塊內存共享池 
    if (attribute != null && attribute instanceof PooledDataBuffer) {
      PooledDataBuffer dataBuffer = (PooledDataBuffer) attribute;
      // 如果還有占用,則釋放
      if (dataBuffer.isAllocated()) {
        if (log.isTraceEnabled()) {
          log.trace("releasing cached body in exchange attribute");
        }
        dataBuffer.release();
      }
    }
  });
}

AdaptCachedBodyGlobalFilter

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  logger.info("執行到 AdaptCachedBodyGlobalFilter#filter");
  ServerHttpRequest cachedRequest = exchange
    .getAttributeOrDefault(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, null);
  if (cachedRequest != null) {
    exchange.getAttributes().remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
    // 如果不為空,直接使用 DefaultServerWebExchangeBuilder 構建一個,並將 cachedRequest 加入到其中
    return chain.filter(exchange.mutate().request(cachedRequest).build());
  }
​
  DataBuffer body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null);
  Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
​
  // 如果上下文中,沒有 cachedRequestBody 緩存,並且本地方法中沒有緩存過改 Route 則繼續
  if (body != null || !this.routesToCache.containsKey(route.getId())) {
    return chain.filter(exchange);
  }
​
  return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest) -> {
    // 如果是相同的則不 build 了
    if (serverHttpRequest == exchange.getRequest()) {
      return chain.filter(exchange);
    }
    return chain.filter(exchange.mutate().request(serverHttpRequest).build());
  });
}

NettyWriteResponseFilter

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  log.info("執行到 NettyWriteResponseFilter#filter");
  // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added
  // until the NettyRoutingFilter is run
  // @formatter:off
  return chain.filter(exchange)
    .doOnError(throwable -> cleanup(exchange))
    .then(Mono.defer(() -> {
      // 獲取 gatewayClientResponseConnection 連接
      Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
​
      if (connection == null) {
        return Mono.empty();
      }
      if (log.isTraceEnabled()) {
        log.trace("NettyWriteResponseFilter start inbound: "
                  + connection.channel().id().asShortText() + ", outbound: "
                  + exchange.getLogPrefix());
      }
      ServerHttpResponse response = exchange.getResponse();
      
      final Flux<DataBuffer> body = connection
        .inbound()
        .receive()
        .retain()
        // byteBuf -> DataBuffer, netty 數據結構到 spring 數據結構 
        .map(byteBuf -> wrap(byteBuf, response));
​
      MediaType contentType = null;
      try {
        contentType = response.getHeaders().getContentType();
      }
      catch (Exception e) {
        if (log.isTraceEnabled()) {
          log.trace("invalid media type", e);
        }
      }
      // 根據不同類型,是否直接刷盤發送
      return (isStreamingMediaType(contentType)
              ? response.writeAndFlushWith(body.map(Flux::just))
              : response.writeWith(body));
    })).doOnCancel(() -> cleanup(exchange));
  // @formatter:on
}

ForwardPathFilter

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  logger.info("執行到 ForwardPathFilter#filter");
  Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
  URI routeUri = route.getUri();
  String scheme = routeUri.getScheme();
  // 是否已經轉發過(gatewayAlreadyRouted) 或 沒有包含 forward 關鍵字
  if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
    return chain.filter(exchange);
  }
  exchange = exchange.mutate()
    .request(exchange.getRequest().mutate().path(routeUri.getPath()).build())
    .build();
  return chain.filter(exchange);
}

GatewayMetricsFilter

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  log.info("執行到 GatewayMetricsFilter#filter");
  Sample sample = Timer.start(meterRegistry);
  // 添加對於成功或異常調用的方法
  return chain.filter(exchange)
    .doOnSuccess(aVoid -> endTimerRespectingCommit(exchange, sample))
    .doOnError(throwable -> endTimerRespectingCommit(exchange, sample));
}
​
private void endTimerRespectingCommit(ServerWebExchange exchange, Sample sample) {
  ServerHttpResponse response = exchange.getResponse();
  if (response.isCommitted()) {
    endTimerInner(exchange, sample);
  }
  else {
    response.beforeCommit(() -> {
      endTimerInner(exchange, sample);
      return Mono.empty();
    });
  }
}

RouteToRequestUrlFilter

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  log.info("執行到 RouteToRequestUrlFilter#filter");
  Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
  if (route == null) {
    return chain.filter(exchange);
  }
  log.trace("RouteToRequestUrlFilter start");
  URI uri = exchange.getRequest().getURI();
  boolean encoded = containsEncodedParts(uri);
  URI routeUri = route.getUri();
​
  if (hasAnotherScheme(routeUri)) {
    // 設置相應的 schema
    exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
    routeUri = URI.create(routeUri.getSchemeSpecificPart());
  }
​
  // 對於 loadbalance 的判斷
  if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
    throw new IllegalStateException("Invalid host: " + routeUri.toString());
  }
​
  URI mergedUrl = UriComponentsBuilder.fromUri(uri)
    // .uri(routeUri)
    .scheme(routeUri.getScheme()).host(routeUri.getHost())
    .port(routeUri.getPort()).build(encoded).toUri();
  exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
  return chain.filter(exchange);
}

WebsocketRoutingFilter

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  log.info("執行到 -> WebsocketRoutingFilter#filter");
  // 如果是 ws 則修改 gatewayRequestUrl, 這個參數在 RouteToRequestUrlFilter 中設置
  changeSchemeIfIsWebSocketUpgrade(exchange);
  // 然后獲取上一步設置好的 gatewayRequestUrl 值
  URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
  String scheme = requestUrl.getScheme();
​
  // 如果已經 gatewayAlreadyRouted 已經為 true 或者 scheme 不是 ws 或 wss 則略過
  if (isAlreadyRouted(exchange)
      || (!"ws".equals(scheme) && !"wss".equals(scheme))) {
    return chain.filter(exchange);
  }
  setAlreadyRouted(exchange);
​
  HttpHeaders headers = exchange.getRequest().getHeaders();
  HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
​
  List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
  if (protocols != null) {
    protocols = headers.get(SEC_WEBSOCKET_PROTOCOL).stream().flatMap(
      header -> Arrays.stream(commaDelimitedListToStringArray(header)))
      .map(String::trim).collect(Collectors.toList());
  }
​
  return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(
    requestUrl, this.webSocketClient, filtered, protocols));
}

NettyRoutingFilter

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
​
  String scheme = requestUrl.getScheme();
  if (isAlreadyRouted(exchange)
      || (!"http".equals(scheme) && !"https".equals(scheme))) {
    return chain.filter(exchange);
  }
  setAlreadyRouted(exchange);
​
  ServerHttpRequest request = exchange.getRequest();
​
  HttpMethod method = request.getMethod();
​
  HttpHeaders filteredHeaders = filterRequest(getHeadersFilters(), exchange);
​
  // 判斷是否 保留主機標頭屬性名稱
  boolean preserveHost = exchange
    .getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
​
​
  /*
    * 在 GatewayAutoConfiguration 中有如下一塊注釋,說我們可以自定義 WebClient 來覆蓋 netty
    * @Bean //TODO: default over netty? configurable public WebClientHttpRoutingFilter
    * webClientHttpRoutingFilter() { //TODO: WebClient bean return new
    * WebClientHttpRoutingFilter(WebClient.routes().build()); }
    *
    * @Bean public WebClientWriteResponseFilter webClientWriteResponseFilter() { return
    * new WebClientWriteResponseFilter(); }
    */
  // 使用 webClient 
  RequestBodySpec bodySpec = this.webClient.method(method).uri(requestUrl)
    .headers(httpHeaders -> {
      httpHeaders.addAll(filteredHeaders);
      // TODO: can this support preserviceHostHeader?
      if (!preserveHost) {
        httpHeaders.remove(HttpHeaders.HOST);
      }
    });
​
  RequestHeadersSpec<?> headersSpec;
  if (requiresBody(method)) {
    headersSpec = bodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
  }
  else {
    headersSpec = bodySpec;
  }
​
  return headersSpec.exchange()
    .log("webClient route")
    .flatMap(res -> {
      // 將返回的結構再次封裝到 exchange 中,同時設置 gatewayClientResponse
      ServerHttpResponse response = exchange.getResponse();
      response.getHeaders().putAll(res.headers().asHttpHeaders());
      response.setStatusCode(res.statusCode());
      // Defer committing the response until all route filters have run
      // Put client response as ServerWebExchange attribute and write
      // response later NettyWriteResponseFilter
      exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
      return chain.filter(exchange);
    });
}

ForwardRoutingFilter

@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
   log.info("執行到 -> ForwardRoutingFilter#filter");
   URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
   String scheme = requestUrl.getScheme();
   if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
      return chain.filter(exchange);
   }
   if (log.isTraceEnabled()) {
      log.trace("Forwarding to URI: " + requestUrl);
   }
   return this.getDispatcherHandler().handle(exchange);
}

參考鏈接


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM