概述
這里是 SpringCloud Gateway
實踐的第一篇,主要講過濾器的相關實現。Spring-Cloud-Gateway 是以 WebFlux
為基礎的響應式架構設計, 是異步非阻塞式的,它能夠充分利用多核 CPU 的硬件資源去處理大量的並發請求。
本篇將基於 spring-cloud-gateway 簡介 基礎環境進行改造。
工作原理
Spring-Cloud-Gateway 基於過濾器實現,同 zuul 類似,有pre和post兩種方式的 filter,分別處理前置邏輯和后置邏輯。客戶端的請求先經過pre類型的 filter,然后將請求轉發到具體的業務服務,收到業務服務的響應之后,再經過post類型的 filter 處理,最后返回響應到客戶端。
過濾器執行流程如下,order 越大,優先級越低
接下來我們來驗證下 filter
執行順序。
這里創建 3 個過濾器,分別配置不同的優先級
@Slf4j
public class AFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("AFilter前置邏輯");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
log.info("AFilter后置邏輯");
}));
}
}
@Slf4j
public class BFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("BFilter前置邏輯");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
log.info("BFilter后置邏輯");
}));
}
}
@Slf4j
public class CFilter implements GlobalFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
log.info("CFilter前置邏輯");
return chain.filter(exchange).then(Mono.fromRunnable(() -> {
log.info("CFilter后置邏輯");
}));
}
}
@Configuration
public class FilterConfig {
@Bean
@Order(-1)
public GlobalFilter a() {
return new AFilter();
}
@Bean
@Order(0)
public GlobalFilter b() {
return new BFilter();
}
@Bean
@Order(1)
public GlobalFilter c() {
return new CFilter();
}
}
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1
curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1
查看網關輸出日志
2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter : AFilter前置邏輯
2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter : BFilter前置邏輯
2020-03-29 16:23:22.832 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter : CFilter前置邏輯
2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.CFilter : CFilter后置邏輯
2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.BFilter : BFilter后置邏輯
2020-03-29 16:23:22.836 INFO 59326 --- [ctor-http-nio-6] cn.idea360.gateway.filter1.AFilter : AFilter后置邏輯
自定義過濾器
現在假設我們要統計某個服務的響應時間,我們可以在代碼中
long beginTime = System.currentTimeMillis();
// do something...
long elapsed = System.currentTimeMillis() - beginTime;
log.info("elapsed: {}ms", elapsed);
每次都要這么寫是不是很煩?Spring 告訴我們有個東西叫 AOP。但是我們是微服務啊,在每個服務里都寫也很煩。這時候就該網關的過濾器登台表演了。
自定義過濾器需要實現 GatewayFilter
和 Ordered
。其中 GatewayFilter
中的這個方法就是用來實現你的自定義的邏輯的
Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
而 Ordered
中的 int getOrder()
方法是來給過濾器設定優先級別的,值越大則優先級越低。
好了,讓我們來擼代碼吧.
/**
* 此過濾器功能為計算請求完成時間
*/
public class ElapsedFilter implements GatewayFilter, Ordered {
private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis());
return chain.filter(exchange).then(
Mono.fromRunnable(() -> {
Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN);
if (startTime != null) {
System.out.println(exchange.getRequest().getURI().getRawPath() + ": " + (System.currentTimeMillis() - startTime) + "ms");
}
})
);
}
/*
*過濾器存在優先級,order越大,優先級越低
*/
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
}
我們在請求剛剛到達時,往 ServerWebExchange
中放入了一個屬性 elapsedTimeBegin
,屬性值為當時的毫秒級時間戳。然后在請求執行結束后,又從中取出我們之前放進去的那個時間戳,與當前時間的差值即為該請求的耗時。因為這是與業務無關的日志所以將 Ordered
設為 Integer.MAX_VALUE
以降低優先級。
現在再來看我們之前的問題:怎么來區分是 “pre” 還是 “post” 呢?其實就是 chain.filter(exchange)
之前的就是 “pre” 部分,之后的也就是 then
里邊的是 “post” 部分。
創建好 Filter 之后我們將它添加到我們的 Filter Chain 里邊
@Configuration
public class FilterConfig {
/**
* http://localhost:8100/filter/provider
* @param builder
* @return
*/
@Bean
public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {
// @formatter:off
// 可以對比application.yml中關於路由轉發的配置
return builder.routes()
.route(r -> r.path("/filter/**")
.filters(f -> f.stripPrefix(1)
.filter(new ElapsedFilter()))
.uri("lb://idc-cloud-provider")
.order(0)
.id("filter")
)
.build();
// @formatter:on
}
}
基於全局過濾器實現審計功能
// AdaptCachedBodyGlobalFilter
@Component
public class LogFilter implements GlobalFilter, Ordered {
private Logger log = LoggerFactory.getLogger(LogFilter.class);
private final ObjectMapper objectMapper = new ObjectMapper();
private static final String START_TIME = "startTime";
private static final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 請求路徑
String path = request.getPath().pathWithinApplication().value();
// 請求schema: http/https
String scheme = request.getURI().getScheme();
// 請求方法
HttpMethod method = request.getMethod();
// 路由服務地址
URI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
// 請求頭
HttpHeaders headers = request.getHeaders();
// 設置startTime
exchange.getAttributes().put(START_TIME, System.currentTimeMillis());
// 獲取請求地址
InetSocketAddress remoteAddress = request.getRemoteAddress();
MultiValueMap<String, String> formData = null;
AccessRecord accessRecord = new AccessRecord();
accessRecord.setPath(path);
accessRecord.setSchema(scheme);
accessRecord.setMethod(method.name());
accessRecord.setTargetUri(targetUri.toString());
accessRecord.setRemoteAddress(remoteAddress.toString());
accessRecord.setHeaders(headers);
if (method == HttpMethod.GET) {
formData = request.getQueryParams();
accessRecord.setFormData(formData);
writeAccessRecord(accessRecord);
}
if (method == HttpMethod.POST) {
Mono<Void> voidMono = null;
if (headers.getContentType().equals(MediaType.APPLICATION_JSON)) {
// JSON
voidMono = readBody(exchange, chain, accessRecord);
}
if (headers.getContentType().equals(MediaType.APPLICATION_FORM_URLENCODED)) {
// x-www-form-urlencoded
voidMono = readFormData(exchange, chain, accessRecord);
}
if (voidMono != null) {
return voidMono;
}
}
return chain.filter(exchange);
}
private Mono<Void> readFormData(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {
return null;
}
private Mono<Void> readBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessRecord accessRecord) {
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux.defer(() -> {
DataBuffer buffer = exchange.getResponse().bufferFactory().wrap(bytes);
DataBufferUtils.retain(buffer);
return Mono.just(buffer);
});
// 重寫請求體,因為請求體數據只能被消費一次
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
ServerWebExchange mutatedExchange = exchange.mutate().request(mutatedRequest).build();
return ServerRequest.create(mutatedExchange, messageReaders)
.bodyToMono(String.class)
.doOnNext(objectValue -> {
accessRecord.setBody(objectValue);
writeAccessRecord(accessRecord);
}).then(chain.filter(mutatedExchange));
});
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
/**
* TODO 異步日志
* @param accessRecord
*/
private void writeAccessRecord(AccessRecord accessRecord) {
log.info("\n\n start------------------------------------------------- \n " +
"請求路徑:{}\n " +
"scheme:{}\n " +
"請求方法:{}\n " +
"目標服務:{}\n " +
"請求頭:{}\n " +
"遠程IP地址:{}\n " +
"表單參數:{}\n " +
"請求體:{}\n " +
"end------------------------------------------------- \n ",
accessRecord.getPath(), accessRecord.getSchema(), accessRecord.getMethod(), accessRecord.getTargetUri(), accessRecord.getHeaders(), accessRecord.getRemoteAddress(), accessRecord.getFormData(), accessRecord.getBody());
}
}
curl -X POST -H "Content-Type:application/json" -d '{"name": "admin"}' http://192.168.124.5:2000/p/provider1
curl -X GET -G -d "username=admin" http://192.168.124.5:2000/p/provider1/1
輸出結果
start-------------------------------------------------
請求路徑:/provider1
scheme:http
請求方法:POST
目標服務:http://192.168.124.5:2001/provider1
請求頭:[Content-Type:"application/json", User-Agent:"PostmanRuntime/7.22.0", Accept:"*/*", Cache-Control:"no-cache", Postman-Token:"2a4ce04d-8449-411d-abd8-247d20421dc2", Host:"192.168.124.5:2000", Accept-Encoding:"gzip, deflate, br", Content-Length:"16", Connection:"keep-alive"]
遠程IP地址:/192.168.124.5:49969
表單參數:null
請求體:{"name":"admin"}
end-------------------------------------------------
接下來,我們來配置日志,方便日志系統提取日志。SpringBoot 默認的日志為 logback。
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="LOGS" value="/Users/cuishiying/Documents/spring-cloud-learning/logs" />
<appender name="Console" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%black(%d{ISO8601}) %highlight(%-5level) [%blue(%t)] %yellow(%C{1.}): %msg%n%throwable
</Pattern>
</layout>
</appender>
<appender name="RollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOGS}/spring-boot-logger.log</file>
<encoder
class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<Pattern>%d %p %C{1.} [%t] %m%n</Pattern>
</encoder>
<rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- rollover daily and when the file reaches 10 MegaBytes -->
<fileNamePattern>${LOGS}/archived/spring-boot-logger-%d{yyyy-MM-dd}.%i.log
</fileNamePattern>
<timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
<maxFileSize>10MB</maxFileSize>
</timeBasedFileNamingAndTriggeringPolicy>
</rollingPolicy>
</appender>
<!-- LOG everything at INFO level -->
<root level="info">
<!--<appender-ref ref="RollingFile" />-->
<appender-ref ref="Console" />
</root>
<!-- LOG "cn.idea360*" at TRACE level additivity:是否向上級loger傳遞打印信息。默認是true-->
<logger name="cn.idea360.gateway" level="info" additivity="false">
<appender-ref ref="RollingFile" />
<appender-ref ref="Console" />
</logger>
</configuration>
這樣 console 和日志目錄下就都有日志了。
自定義過濾器工廠
如果你看過靜態路由的配置,你應該對如下配置有印象。
filters:
- StripPrefix=1
- AddResponseHeader=X-Response-Default-Foo, Default-Bar
StripPrefix
、AddResponseHeader
這兩個實際上是兩個過濾器工廠(GatewayFilterFactory),用這種配置的方式更靈活方便。
我們就將之前的那個 ElapsedFilter
改造一下,讓它能接收一個 boolean
類型的參數,來決定是否將請求參數也打印出來。
public class ElapsedGatewayFilterFactory extends AbstractGatewayFilterFactory<ElapsedGatewayFilterFactory.Config> {
private static final Log log = LogFactory.getLog(GatewayFilter.class);
private static final String ELAPSED_TIME_BEGIN = "elapsedTimeBegin";
private static final String KEY = "withParams";
public List<String> shortcutFieldOrder() {
return Arrays.asList(KEY);
}
public ElapsedGatewayFilterFactory() {
super(Config.class);
}
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
exchange.getAttributes().put(ELAPSED_TIME_BEGIN, System.currentTimeMillis());
return chain.filter(exchange).then(
Mono.fromRunnable(() -> {
Long startTime = exchange.getAttribute(ELAPSED_TIME_BEGIN);
if (startTime != null) {
StringBuilder sb = new StringBuilder(exchange.getRequest().getURI().getRawPath())
.append(": ")
.append(System.currentTimeMillis() - startTime)
.append("ms");
if (config.isWithParams()) {
sb.append(" params:").append(exchange.getRequest().getQueryParams());
}
log.info(sb.toString());
}
})
);
};
}
public static class Config {
private boolean withParams;
public boolean isWithParams() {
return withParams;
}
public void setWithParams(boolean withParams) {
this.withParams = withParams;
}
}
}
過濾器工廠的頂級接口是 GatewayFilterFactory
,我們可以直接繼承它的兩個抽象類來簡化開發 AbstractGatewayFilterFactory
和 AbstractNameValueGatewayFilterFactory
,這兩個抽象類的區別就是前者接收一個參數(像 StripPrefix
和我們創建的這種),后者接收兩個參數(像 AddResponseHeader
)。
GatewayFilter apply(Config config)
方法內部實際上是創建了一個 GatewayFilter
的匿名類,具體實現和之前的幾乎一樣,就不解釋了。
靜態內部類 Config
就是為了接收那個 boolean
類型的參數服務的,里邊的變量名可以隨意寫,但是要重寫 List shortcutFieldOrder()
這個方法。
這里注意一下,一定要調用一下父類的構造器把 Config
類型傳過去,否則會報 ClassCastException
public ElapsedGatewayFilterFactory() {
super(Config.class);
}
工廠類我們有了,再把它注冊到 Spring 當中
@Bean
public ElapsedGatewayFilterFactory elapsedGatewayFilterFactory() {
return new ElapsedGatewayFilterFactory();
}
然后添加配置(主要改動在 default-filters
配置)
server:
port: 2000
spring:
application:
name: idc-gateway
redis:
host: localhost
port: 6379
timeout: 6000ms # 連接超時時長(毫秒)
jedis:
pool:
max-active: 1000 # 連接池最大連接數(使用負值表示沒有限制)
max-wait: -1ms # 連接池最大阻塞等待時間(使用負值表示沒有限制)
max-idle: 10 # 連接池中的最大空閑連接
min-idle: 5 # 連接池中的最小空閑連接
cloud:
consul:
host: localhost
port: 8500
gateway:
discovery:
locator:
enabled: true
# 修改在這里。gateway可以通過開啟以下配置來打開根據服務的serviceId來匹配路由,默認是大寫
default-filters:
- Elapsed=true
routes:
- id: provider # 路由 ID,保持唯一
uri: lb://idc-provider1 # uri指目標服務地址,lb代表從注冊中心獲取服務
predicates: # 路由條件。Predicate 接受一個輸入參數,返回一個布爾值結果。該接口包含多種默認方法來將 Predicate 組合成其他復雜的邏輯(比如:與,或,非)
- Path=/p/**
filters:
- StripPrefix=1 # 過濾器StripPrefix,作用是去掉請求路徑的最前面n個部分截取掉。StripPrefix=1就代表截取路徑的個數為1,比如前端過來請求/test/good/1/view,匹配成功后,路由到后端的請求路徑就會變成http://localhost:8888/good/1/view
結語
本文到此結束。關於 Webflux
的學習剛入門,覺得可以像 Rxjava
那樣在 onNext
中拿到異步數據,然而在 post
獲取 body 中沒生效。經測試可知 getBody
獲得的數據輸出為 null,而自己通過 Flux.create
創建的數據可以在訂閱者中獲取到。此處還有待研究,希望拋磚引玉,大家有研究出來的不吝賜教。同時,希望大家關注公眾號【當我遇上你】。