目標
- 溫故 Java8 常見函數式用法
- 再過一下 lookupRoute 方法
- 過一遍 9 默認全局 Filter
Java 8 常見函數式用法
Consumer 表達式
一個消費型的接口,通過傳入參數,然后輸出值,無返回值。接連兩個consumer
有相同的入參可以使用addThen
將兩個方法鏈接起來,然后一起accept
方法接受入參。
Consumer<Integer> add2consumer = x -> {
int a = x + 2;
System.out.println(a);
};
Consumer<Integer> add4consumer = x -> {
int a = x + 4;
System.out.println(a);
};
add2consumer.andThen(add4consumer).accept(10);
//① 使用consumer接口實現方法
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
};
Stream<String> stream = Stream.of("aaa", "bbb", "ddd", "ccc", "fff");
stream.forEach(consumer);
System.out.println("********************");
//② 使用lambda表達式,forEach方法需要的就是一個Consumer接口
stream = Stream.of("aaa", "bbb", "ddd", "ccc", "fff");
Consumer<String> consumer1 = (s) -> System.out.println(s);//lambda表達式返回的就是一個Consumer接口
stream.forEach(consumer1);
//更直接的方式
//stream.forEach((s) -> System.out.println(s));
System.out.println("********************");
//③ 使用方法引用,方法引用也是一個consumer
stream = Stream.of("aaa", "bbb", "ddd", "ccc", "fff");
Consumer consumer2 = System.out::println;
stream.forEach(consumer2);
Supplier 表達式
一個供給型的接口,可以用來存儲數據,然后可以供其他方法使用的這么一個接口。
//① 使用Supplier接口實現方法,只有一個get方法,無參數,返回一個值
Supplier<Integer> supplier = new Supplier<Integer>() {
@Override
public Integer get() {
//返回一個隨機值
return new Random().nextInt();
}
};
System.out.println(supplier.get());
System.out.println("********************");
//② 使用lambda表達式,
supplier = () -> new Random().nextInt();
System.out.println(supplier.get());
System.out.println("********************");
//③ 使用方法引用
Supplier<Double> supplier2 = Math::random;
System.out.println(supplier2.get());
// 高階用法
Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5);
//返回一個optional對象
Optional<Integer> first = stream.filter(i -> i > 4)
.findFirst();
//optional對象有需要Supplier接口的方法
//orElse,如果first中存在數,就返回這個數,如果不存在,就放回傳入的數
System.out.println(first.orElse(1));
System.out.println(first.orElse(7));
System.out.println("********************");
Supplier<Integer> integerSupplier = new Supplier<Integer>() {
@Override
public Integer get() {
//返回一個隨機值
return new Random().nextInt();
}
};
//orElseGet,如果first中存在數,就返回這個數,如果不存在,就返回supplier返回的值
System.out.println(first.orElseGet(integerSupplier));
Function 表達式
一個功能型接口,它的一個作用就是轉換作用,將輸入數據轉換成另一種形式的輸出數據。
//① 使用map方法,泛型的第一個參數是轉換前的類型,第二個是轉化后的類型
Function<String, Integer> function = new Function<String, Integer>() {
@Override
public Integer apply(String s) {
return s.length();//獲取每個字符串的長度,並且返回
}
};
Stream<String> stream = Stream.of("aaa", "bbbbb", "ccccccv");
Stream<Integer> stream1 = stream.map(function);
stream1.forEach(System.out::println);
System.out.println("********************");
Predicate 表達式
謂詞型接口,其實,這個就是一個類似於 bool 類型的判斷的接口。
//① 使用Predicate接口實現方法,只有一個test方法,傳入一個參數,返回一個bool值
Predicate<Integer> predicate = new Predicate<Integer>() {
@Override
public boolean test(Integer integer) {
if (integer > 5) {
return true;
}
return false;
}
};
System.out.println(predicate.test(6));
System.out.println("********************");
//② 使用lambda表達式,
predicate = (t) -> t > 5;
System.out.println(predicate.test(1));
System.out.println("********************");
需要時查看
列舉下java8中 java.util.function包下,內置所有的接口簡介和表達的意思
1 BiConsumer
2 BiFunction
3 BinaryOperator
4 BiPredicate
5 BooleanSupplier:代表了boolean值結果的提供方
6 Consumer
7 DoubleBinaryOperator:代表了作用於兩個double值操作符的操作,並且返回了一個double值的結果。
8 DoubleConsumer:代表一個接受double值參數的操作,並且不返回結果。
9 DoubleFunction
10 DoublePredicate:代表一個擁有double值參數的boolean值方法
11 DoubleSupplier:代表一個double值結構的提供方
12 DoubleToIntFunction:接受一個double類型輸入,返回一個int類型結果。
13 DoubleToLongFunction:接受一個double類型輸入,返回一個long類型結果
14 DoubleUnaryOperator:接受一個參數同為類型double,返回值類型也為double 。
15 Function
16 IntBinaryOperator:接受兩個參數同為類型int,返回值類型也為int 。
17 IntConsumer:接受一個int類型的輸入參數,無返回值 。
18 IntFunction
19 IntPredicate:接受一個int輸入參數,返回一個布爾值的結果。
20 IntSupplier:無參數,返回一個int類型結果。
21 IntToDoubleFunction:接受一個int類型輸入,返回一個double類型結果 。
22 IntToLongFunction:接受一個int類型輸入,返回一個long類型結果。
23 IntUnaryOperator:接受一個參數同為類型int,返回值類型也為int 。
24 LongBinaryOperator:接受兩個參數同為類型long,返回值類型也為long。
25 LongConsumer:接受一個long類型的輸入參數,無返回值。
26 LongFunction
27 LongPredicate:R接受一個long輸入參數,返回一個布爾值類型結果。
28 LongSupplier:無參數,返回一個結果long類型的值。
29 LongToDoubleFunction:接受一個long類型輸入,返回一個double類型結果。
30 LongToIntFunction:接受一個long類型輸入,返回一個int類型結果。
31 LongUnaryOperator:接受一個參數同為類型long,返回值類型也為long。
32 ObjDoubleConsumer
33 ObjIntConsumer
34 ObjLongConsumer
35 Predicate
36 Supplier
37 ToDoubleBiFunction
38 ToDoubleFunction
39 ToIntBiFunction
40 ToIntFunction
41 ToLongBiFunction
42 ToLongFunction
43 UnaryOperator
lookupRoute 方法如何調用?
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
logger.info("lookupRoute 開始匹配 Route 賊關鍵 -> RoutePredicateHandlerMapping#lookupRoute");
return this.routeLocator.getRoutes()
// 單獨處理,以便在出現錯誤的時候,返回空
.concatMap(route ->
Mono.just(route)
.filterWhen(r -> {
logger.info("設置當前執行的 routeId GATEWAY_PREDICATE_ROUTE_ATTR: " +
r.getId() + " -> RoutePredicateHandlerMapping#lookupRoute");
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
.doOnError(e -> logger.error(
"Error applying predicate for route: " + route.getId(),
e))
.onErrorResume(e -> Mono.empty()))
.next()
.map(route -> {
if (logger.isDebugEnabled()) {
logger.debug("Route matched: " + route.getId());
}
logger.info("校驗 Route 的有效性 -> RoutePredicateHandlerMapping#lookupRoute");
validateRoute(route, exchange);
return route;
});
}
從方法名我們可以得知,這個方法就是要找出符合規則的 Route。在實際的調用過程中,方法是先返回一個 Mono 空對象,之后自己執行r.getPredicate().apply(exchange)
使用每個Route
的謂詞規則過濾,返回匹配 Route。
public class Route implements Ordered {
...
// route 的 getPredicate 方法返回 AsyncPredicate 類型
public AsyncPredicate<ServerWebExchange> getPredicate() {
return this.predicate;
}
}
route 的 getPredicate 方法返回 AsyncPredicate 類型
class DefaultAsyncPredicate<T> implements AsyncPredicate<T> {
private final Predicate<T> delegate;
public DefaultAsyncPredicate(Predicate<T> delegate) {
this.delegate = delegate;
}
@Override
public Publisher<Boolean> apply(T t) {
return Mono.just(delegate.test(t));
}
...
}
然后執行Predicate
的test
方法返回一個boolean
值,來滿足filterWhen
方法,返回對應 Route。
9 個默認全局 Filter
訪問對應http://localhost:8080/actuator/gateway/globalfilters
地址,我們可以看到所有 GlobalFilter 以及對應的Order
順序。
{
"org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter@ffaaaf0": -2147483648,
"org.springframework.cloud.gateway.filter.AdaptCachedBodyGlobalFilter@537c8c7e": -2147482648,
"org.springframework.cloud.gateway.filter.NettyWriteResponseFilter@2459319c": -1,
"org.springframework.cloud.gateway.filter.ForwardPathFilter@33d53216": 0,
"org.springframework.cloud.gateway.filter.GatewayMetricsFilter@4f3e7344": 0,
"org.springframework.cloud.gateway.filter.RouteToRequestUrlFilter@1dc76fa1": 10000,
"org.springframework.cloud.gateway.filter.WebsocketRoutingFilter@69a2b3b6": 2147483646,
"org.springframework.cloud.gateway.filter.NettyRoutingFilter@3681037": 2147483647,
"org.springframework.cloud.gateway.filter.ForwardRoutingFilter@5eed2d86": 2147483647,
}
RemoveCachedBodyFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).doFinally(s -> {
log.info("執行到 RemoveCachedBodyFilter#filter ");
// 移除上下文中 cachedRequestBody 屬性
Object attribute = exchange.getAttributes().remove(CACHED_REQUEST_BODY_ATTR);
// PooledDataBuffer 是 DataBuffer 的擴展,允許共享一塊內存共享池
if (attribute != null && attribute instanceof PooledDataBuffer) {
PooledDataBuffer dataBuffer = (PooledDataBuffer) attribute;
// 如果還有占用,則釋放
if (dataBuffer.isAllocated()) {
if (log.isTraceEnabled()) {
log.trace("releasing cached body in exchange attribute");
}
dataBuffer.release();
}
}
});
}
AdaptCachedBodyGlobalFilter
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
logger.info("執行到 AdaptCachedBodyGlobalFilter#filter");
ServerHttpRequest cachedRequest = exchange
.getAttributeOrDefault(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, null);
if (cachedRequest != null) {
exchange.getAttributes().remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);
// 如果不為空,直接使用 DefaultServerWebExchangeBuilder 構建一個,並將 cachedRequest 加入到其中
return chain.filter(exchange.mutate().request(cachedRequest).build());
}
DataBuffer body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null);
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
// 如果上下文中,沒有 cachedRequestBody 緩存,並且本地方法中沒有緩存過改 Route 則繼續
if (body != null || !this.routesToCache.containsKey(route.getId())) {
return chain.filter(exchange);
}
return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest) -> {
// 如果是相同的則不 build 了
if (serverHttpRequest == exchange.getRequest()) {
return chain.filter(exchange);
}
return chain.filter(exchange.mutate().request(serverHttpRequest).build());
});
}
NettyWriteResponseFilter
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("執行到 NettyWriteResponseFilter#filter");
// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added
// until the NettyRoutingFilter is run
// @formatter:off
return chain.filter(exchange)
.doOnError(throwable -> cleanup(exchange))
.then(Mono.defer(() -> {
// 獲取 gatewayClientResponseConnection 連接
Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
if (connection == null) {
return Mono.empty();
}
if (log.isTraceEnabled()) {
log.trace("NettyWriteResponseFilter start inbound: "
+ connection.channel().id().asShortText() + ", outbound: "
+ exchange.getLogPrefix());
}
ServerHttpResponse response = exchange.getResponse();
final Flux<DataBuffer> body = connection
.inbound()
.receive()
.retain()
// byteBuf -> DataBuffer, netty 數據結構到 spring 數據結構
.map(byteBuf -> wrap(byteBuf, response));
MediaType contentType = null;
try {
contentType = response.getHeaders().getContentType();
}
catch (Exception e) {
if (log.isTraceEnabled()) {
log.trace("invalid media type", e);
}
}
// 根據不同類型,是否直接刷盤發送
return (isStreamingMediaType(contentType)
? response.writeAndFlushWith(body.map(Flux::just))
: response.writeWith(body));
})).doOnCancel(() -> cleanup(exchange));
// @formatter:on
}
ForwardPathFilter
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
logger.info("執行到 ForwardPathFilter#filter");
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
URI routeUri = route.getUri();
String scheme = routeUri.getScheme();
// 是否已經轉發過(gatewayAlreadyRouted) 或 沒有包含 forward 關鍵字
if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
return chain.filter(exchange);
}
exchange = exchange.mutate()
.request(exchange.getRequest().mutate().path(routeUri.getPath()).build())
.build();
return chain.filter(exchange);
}
GatewayMetricsFilter
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("執行到 GatewayMetricsFilter#filter");
Sample sample = Timer.start(meterRegistry);
// 添加對於成功或異常調用的方法
return chain.filter(exchange)
.doOnSuccess(aVoid -> endTimerRespectingCommit(exchange, sample))
.doOnError(throwable -> endTimerRespectingCommit(exchange, sample));
}
private void endTimerRespectingCommit(ServerWebExchange exchange, Sample sample) {
ServerHttpResponse response = exchange.getResponse();
if (response.isCommitted()) {
endTimerInner(exchange, sample);
}
else {
response.beforeCommit(() -> {
endTimerInner(exchange, sample);
return Mono.empty();
});
}
}
RouteToRequestUrlFilter
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("執行到 RouteToRequestUrlFilter#filter");
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
if (route == null) {
return chain.filter(exchange);
}
log.trace("RouteToRequestUrlFilter start");
URI uri = exchange.getRequest().getURI();
boolean encoded = containsEncodedParts(uri);
URI routeUri = route.getUri();
if (hasAnotherScheme(routeUri)) {
// 設置相應的 schema
exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR, routeUri.getScheme());
routeUri = URI.create(routeUri.getSchemeSpecificPart());
}
// 對於 loadbalance 的判斷
if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
throw new IllegalStateException("Invalid host: " + routeUri.toString());
}
URI mergedUrl = UriComponentsBuilder.fromUri(uri)
// .uri(routeUri)
.scheme(routeUri.getScheme()).host(routeUri.getHost())
.port(routeUri.getPort()).build(encoded).toUri();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
return chain.filter(exchange);
}
WebsocketRoutingFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("執行到 -> WebsocketRoutingFilter#filter");
// 如果是 ws 則修改 gatewayRequestUrl, 這個參數在 RouteToRequestUrlFilter 中設置
changeSchemeIfIsWebSocketUpgrade(exchange);
// 然后獲取上一步設置好的 gatewayRequestUrl 值
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
// 如果已經 gatewayAlreadyRouted 已經為 true 或者 scheme 不是 ws 或 wss 則略過
if (isAlreadyRouted(exchange)
|| (!"ws".equals(scheme) && !"wss".equals(scheme))) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);
HttpHeaders headers = exchange.getRequest().getHeaders();
HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
if (protocols != null) {
protocols = headers.get(SEC_WEBSOCKET_PROTOCOL).stream().flatMap(
header -> Arrays.stream(commaDelimitedListToStringArray(header)))
.map(String::trim).collect(Collectors.toList());
}
return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(
requestUrl, this.webSocketClient, filtered, protocols));
}
NettyRoutingFilter
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
if (isAlreadyRouted(exchange)
|| (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
setAlreadyRouted(exchange);
ServerHttpRequest request = exchange.getRequest();
HttpMethod method = request.getMethod();
HttpHeaders filteredHeaders = filterRequest(getHeadersFilters(), exchange);
// 判斷是否 保留主機標頭屬性名稱
boolean preserveHost = exchange
.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
/*
* 在 GatewayAutoConfiguration 中有如下一塊注釋,說我們可以自定義 WebClient 來覆蓋 netty
* @Bean //TODO: default over netty? configurable public WebClientHttpRoutingFilter
* webClientHttpRoutingFilter() { //TODO: WebClient bean return new
* WebClientHttpRoutingFilter(WebClient.routes().build()); }
*
* @Bean public WebClientWriteResponseFilter webClientWriteResponseFilter() { return
* new WebClientWriteResponseFilter(); }
*/
// 使用 webClient
RequestBodySpec bodySpec = this.webClient.method(method).uri(requestUrl)
.headers(httpHeaders -> {
httpHeaders.addAll(filteredHeaders);
// TODO: can this support preserviceHostHeader?
if (!preserveHost) {
httpHeaders.remove(HttpHeaders.HOST);
}
});
RequestHeadersSpec<?> headersSpec;
if (requiresBody(method)) {
headersSpec = bodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
}
else {
headersSpec = bodySpec;
}
return headersSpec.exchange()
.log("webClient route")
.flatMap(res -> {
// 將返回的結構再次封裝到 exchange 中,同時設置 gatewayClientResponse
ServerHttpResponse response = exchange.getResponse();
response.getHeaders().putAll(res.headers().asHttpHeaders());
response.setStatusCode(res.statusCode());
// Defer committing the response until all route filters have run
// Put client response as ServerWebExchange attribute and write
// response later NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
return chain.filter(exchange);
});
}
ForwardRoutingFilter
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("執行到 -> ForwardRoutingFilter#filter");
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
String scheme = requestUrl.getScheme();
if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {
return chain.filter(exchange);
}
if (log.isTraceEnabled()) {
log.trace("Forwarding to URI: " + requestUrl);
}
return this.getDispatcherHandler().handle(exchange);
}