Spring Cloud Gateway 限流
限流的目的是通過對並發訪問/請求進行限速或者對一個時間窗口內的請求進行限速來保護系統,一旦達到限制速率則可由拒絕服務,就是定向到錯誤頁或友好的展示頁,排隊或等待
Gateway內置過濾器工廠限流
Spring Cloud Gateway官方就提供了RequestRateLimiterGatewayFilterFactory這個類,適用Redis和lua腳本實現了令牌桶的方式。具體實現邏輯在RequestRateLimiterGatewayFilterFactory類中
pom文件中引入gateway的起步依賴和redis的reactive依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifatId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
配置
server:
port: 8081
spring:
cloud:
gateway:
routes:
- id: limit_route
uri: http://httpbin.org:80/get
predicates:
- After=2017-01-20T17:42:47.789-07:00[America/Denver]
filters:
- name: RequestRateLimiter
args:
key-resolver: '#{@hostAddrKeyResolver}'
redis-rate-limiter.replenishRate: 1
redis-rate-limiter.burstCapacity: 3
application:
name: gateway-limiter
redis:
host: localhost
port: 6379
database: 0
配置文件,指定程序的端口為8081,配置了 redis的信息,並配置了RequestRateLimiter的限流過濾器,該過濾器需要配置三個參數:
- burstCapacity,令牌桶總容量。
- replenishRate,令牌桶每秒填充平均速率。
- key-resolver,用於限流的鍵的解析器的 Bean 對象的名字。它使用 SpEL 表達式根據#{@beanName}從 Spring 容器中獲取 Bean 對象。
KeyResolver需要實現resolve方法,比如根據Hostname進行限流,則需要用hostAddress去判斷。實現完KeyResolver之后,需要將這個類的Bean注冊到Ioc容器中
public class HostAddrKeyResolver implements KeyResolver {
@Override
public Mono<String> resolve(ServerWebExchange exchange) {
return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
}
}
@Bean
public HostAddrKeyResolver hostAddrKeyResolver() {
return new HostAddrKeyResolver();
}
根據uri去限流,這時KeyResolver代碼如下:
public class UriKeyResolver implements KeyResolver {
@Override
public Mono<String> resolve(ServerWebExchange exchange) {
return Mono.just(exchange.getRequest().getURI().getPath());
}
}
@Bean
public UriKeyResolver uriKeyResolver() {
return new UriKeyResolver();
}
以用戶的維度去限流
@Bean
KeyResolver userKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user"));
}
自定義限流
Spring Cloud Gateway實現自定義限流,需要編寫一個過濾器。Guava中的RateLimiter,Bucket4j,RateLimitJ限流都是基於令牌桶實現的。
下面使用Bucket4j實現限流。
pom
<!-- Spring Cloud Gateway的依賴-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<!-- Bucket4j限流依賴-->
<dependency>
<groupId>com.github.vladimir-bukhtoyarov</groupId>
<artifactId>bucket4j-core</artifactId>
<version>4.0.0</version>
</dependency>
自定義過濾器需要實現GatewayFilter,Ordered接口,實現對ip的限流
import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.Bucket4j;
import io.github.bucket4j.Refill;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.core.Ordered;
import org.springframework.http.HttpStatus;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 自定義過濾器進行ip限流
*/
public class GatewayRateLimitFilterByIp implements GatewayFilter, Ordered {
private final Logger log = LoggerFactory.getLogger(GatewayRateLimitFilterByIp.class);
/**
* 單機網關限流用一個ConcurrentHashMap來存儲 bucket,
* 如果是分布式集群限流的話,可以采用 Redis等分布式解決方案
*/
private static final Map<String, Bucket> LOCAL_CACHE = new ConcurrentHashMap<>();
/**
* 桶的最大容量,即能裝載 Token 的最大數量
*/
int capacity;
/**
* 每次 Token 補充量
*/
int refillTokens;
/**
*補充 Token 的時間間隔
*/
Duration refillDuration;
public GatewayRateLimitFilterByIp() {
}
public GatewayRateLimitFilterByIp(int capacity, int refillTokens, Duration refillDuration) {
this.capacity = capacity;
this.refillTokens = refillTokens;
this.refillDuration = refillDuration;
}
private Bucket createNewBucket() {
Refill refill = Refill.of(refillTokens, refillDuration);
Bandwidth limit = Bandwidth.classic(capacity, refill);
return Bucket4j.builder().addLimit(limit).build();
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String ip = exchange.getRequest().getRemoteAddress().getAddress().getHostAddress();
Bucket bucket = LOCAL_CACHE.computeIfAbsent(ip, k -> createNewBucket());
log.debug("IP:{} ,令牌通可用的Token數量:{} " ,ip,bucket.getAvailableTokens());
if (bucket.tryConsume(1)) {
return chain.filter(exchange);
} else {
//當可用的令牌書為0是,進行限流返回429狀態碼
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
return exchange.getResponse().setComplete();
}
}
@Override
public int getOrder() {
return -1000;
}
public static Map<String, Bucket> getLocalCache() {
return LOCAL_CACHE;
}
public int getCapacity() {
return capacity;
}
public void setCapacity(int capacity) {
this.capacity = capacity;
}
public int getRefillTokens() {
return refillTokens;
}
public void setRefillTokens(int refillTokens) {
this.refillTokens = refillTokens;
}
public Duration getRefillDuration() {
return refillDuration;
}
public void setRefillDuration(Duration refillDuration) {
this.refillDuration = refillDuration;
}
}
代碼配置
@Bean
public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {
return builder.routes()
.route(r -> r.path("/test/rateLimit")
.filters(f -> f.filter(new GatewayRateLimitFilterByIp(10,1,Duration.ofSeconds(1))))
.uri("http://localhost:8000/hello/rateLimit")
.id("rateLimit_route")
).build();
}
基於CPU使用率進行限流
通過Spring Boot Actuator 提供的Metrics獲取當前CPU的使用情況,進行限流
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.metrics.MetricsEndpoint;
import org.springframework.cloud.gateway.filter.GatewayFilter;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.core.Ordered;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.util.Objects;
/**
* 根據CPU的使用情況限流
**/
@Component
public class GatewayRateLimitFilterByCpu implements GatewayFilter, Ordered {
private final Logger log = LoggerFactory.getLogger(GatewayRateLimitFilterByCpu.class);
@Autowired
private MetricsEndpoint metricsEndpoint;
private static final String METRIC_NAME = "system.cpu.usage";
private static final double MAX_USAGE = 0.50D;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//獲取網關所在機器的CPU使用情況
Double systemCpuUsage = metricsEndpoint.metric(METRIC_NAME, null)
.getMeasurements()
.stream()
.filter(Objects::nonNull)
.findFirst()
.map(MetricsEndpoint.Sample::getValue)
.filter(Double::isFinite)
.orElse(0.0D);
boolean isOpenRateLimit = systemCpuUsage >MAX_USAGE;
log.debug("system.cpu.usage: {}, isOpenRateLimit:{} ",systemCpuUsage , isOpenRateLimit);
if (isOpenRateLimit) {
//當CPU的使用超過設置的最大閥值開啟限流
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
return exchange.getResponse().setComplete();
} else {
return chain.filter(exchange);
}
}
@Override
public int getOrder() {
return 0;
}
}
代碼配置
@Autowired
private GatewayRateLimitFilterByCpu gatewayRateLimitFilterByCpu;
@Bean
public RouteLocator customerRouteLocator(RouteLocatorBuilder builder) {
return builder.routes()
.route(r -> r.path("/test/rateLimit")
.filters(f -> f.filter(gatewayRateLimitFilterByCpu))
.uri("http://localhost:8000/hello/rateLimit")
.id("rateLimit_route")
).build();
}