基於SpringCloud Gateway自定義限流


目前,市面上越來越多的公司選擇使用RESTful接口作為數據交換接口,通常通過網關對外暴露這些RESTful接口,為了避免有意或者無意的高並發訪問,拖垮這些RESTful接口所在的業
務系統,需要對這些接口進行限流,熔斷,降級之類的。這里我們重點講解限流。

1,常見的限流算法

1.1,計數器算法

計數器是比較簡單的一種算法,是一種基於時間計數的算法,假設有一個接口/user/hello接口,規定這個接口的訪問量為100次/秒,每次訪問該接口時都使計數器Counter遞增1,當達到100次的時候,都去判斷第100次和第1次的時間只差是否大於1秒,如果大於一秒,就表示這一百次訪問的並發量小於100次/秒,允許這次請求通過;反之,如果時間之差小於1秒,就表示超過了100次/秒的並發訪問,需要限制請求到到業務系統。

但是這種基於時間周期的計數算法有一個很大的弊端,它只能限制周期內的並發訪問數。加入有100個訪問是在00:00.500-00:01.000這段時間內訪問的,同時又有100個訪問是在00:01.000-01:00.500這段時間內訪問的,那么00:00.500-01:00.500這一秒內就有200個訪問,已經超過了需要限制的並發量。同JVM的引用計數算法一樣,該計數算法也存在弊端。

1.2,漏桶算法

漏桶算法其實很簡單,可以粗略的認為就是注水漏水過程,注水過程對應訪問過程,往桶中以一定速率流出水,以任意速率流入水,當水超過桶流量則丟棄,因為桶容量是不變的,保證了整體的速率。

1.3,令牌桶算法

令牌桶算法和漏桶算法類似,不過是將漏桶算法反轉的一種算法。可以認為是從水箱取水的過程。

2,使用SpringCloud Gateway實現自定義多維度限流

2.1 SpringCloud Gateway自帶限流對象

SpringCloud Gateway是SpringCloud的產物,是Spring官方用來替換Zuul的一個網關,由Spring開發。

使用SpringCloud Gateway需要引入依賴

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

SpringCloud Gateway有一個自帶的限流Limiter。org.springframework.cloud.gateway.filter.ratelimit.RedisRateLimiter。該對象的isAllowed方法是關鍵,讓我們來看看這個方法的源碼

	public Mono<Response> isAllowed(String routeId, String id) {
		if (!this.initialized.get()) {
			throw new IllegalStateException("RedisRateLimiter is not initialized");
		}

		Config routeConfig = loadConfiguration(routeId);

		// How many requests per second do you want a user to be allowed to do?
		int replenishRate = routeConfig.getReplenishRate();

		// How much bursting do you want to allow?
		int burstCapacity = routeConfig.getBurstCapacity();

		// How many tokens are requested per request?
		int requestedTokens = routeConfig.getRequestedTokens();

		try {
			List<String> keys = getKeys(id);

			// The arguments to the LUA script. time() returns unixtime in seconds.
			List<String> scriptArgs = Arrays.asList(replenishRate + "",
					burstCapacity + "", Instant.now().getEpochSecond() + "",
					requestedTokens + "");
			// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
			Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys,
					scriptArgs);
			// .log("redisratelimiter", Level.FINER);
			return flux.onErrorResume(throwable -> {
				if (log.isDebugEnabled()) {
					log.debug("Error calling rate limiter lua", throwable);
				}
				return Flux.just(Arrays.asList(1L, -1L));
			}).reduce(new ArrayList<Long>(), (longs, l) -> {
				longs.addAll(l);
				return longs;
			}).map(results -> {
				boolean allowed = results.get(0) == 1L;
				Long tokensLeft = results.get(1);

				Response response = new Response(allowed,
						getHeaders(routeConfig, tokensLeft));

				if (log.isDebugEnabled()) {
					log.debug("response: " + response);
				}
				return response;
			});
		}
		catch (Exception e) {
			/*
			 * We don't want a hard dependency on Redis to allow traffic. Make sure to set
			 * an alert so you know if this is happening too much. Stripe's observed
			 * failure rate is 0.01%.
			 */
			log.error("Error determining if user allowed from redis", e);
		}
		return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
	}

由源碼可知,該方法的內部使用過一個Redis的Lua腳本來控制限流的。SpringCloud Gateway使用的是令牌桶限流算法。我們來看看這個限流算法的Lua腳本,Lua腳本在spring-cloud-gateway-core.jar包的META-INF/scripts/request-rate-limiter.lua路徑。源碼如下

--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
--該參數是令牌桶填充速率
local rate = tonumber(ARGV[1])
--該參數是令牌桶容量 local capacity = tonumber(ARGV[2])
--該參數是訪問的時間戳 local now = tonumber(ARGV[3])
--該參數每次取出的令牌數量 local requested = tonumber(ARGV[4]) local fill_time = capacity/rate local ttl = math.floor(fill_time*2) --redis.log(redis.LOG_WARNING, "rate " .. ARGV[1]) --redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2]) --redis.log(redis.LOG_WARNING, "now " .. ARGV[3]) --redis.log(redis.LOG_WARNING, "requested " .. ARGV[4]) --redis.log(redis.LOG_WARNING, "filltime " .. fill_time) --redis.log(redis.LOG_WARNING, "ttl " .. ttl) local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil then last_tokens = capacity end --redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens) local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end --redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed) local delta = math.max(0, now-last_refreshed) local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then new_tokens = filled_tokens - requested allowed_num = 1 end --redis.log(redis.LOG_WARNING, "delta " .. delta) --redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens) --redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num) --redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens) if ttl > 0 then redis.call("setex", tokens_key, ttl, new_tokens) redis.call("setex", timestamp_key, ttl, now) end -- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
--返回的有兩個參數,allowed_num表示該算法算出來允許訪問的次數,如果該值大於0,就表示可以允許這次訪問通過
-- return { allowed_num, new_tokens }

該Lua腳本的輸入參數有四個,分別是令牌桶填充速率,令牌桶容量,訪問時間戳,取出令牌的數量;返回參數有兩個,分別是允許訪問的次數,新的令牌。只要允許訪問的次數大於0,就表示此次訪問允許通過。

2.2 自定義限流,IP限流,接口限流。

源碼地址:https://github.com/chengwenqin/gateway-demo

此次實現兩個較為常見的業務限流規則,我們將不同IP和接口路徑的限流參數(令牌桶填充速率和令牌桶容量)配置在數據庫中,並且改變參數無需重啟即可生效。

1)我們需要重新寫個Limiter,SpringCloud Gateway自帶的Limiter是讀取配置文件的限流參數的

實現如下

package net.sunmonkey.gateway.limiter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import java.time.Instant;
import java.util.Arrays;
import java.util.List;

@Component
public class MyRedisRateLimiter {

    private static final Logger logger = LoggerFactory.getLogger(MyRedisRateLimiter.class);

    private final String keyNamespace="gateway:demo:limit:";

    private final RedisTemplate<String, Long> redisTemplate;

    private final RedisScript<List<Long>> redisScript;

/**
* @param redisScript 該RedisScript對象將會自動注入進來,該對象使用的正是上面介紹的request-rate-limiter.lua腳本
*/
public MyRedisRateLimiter(RedisTemplate redisTemplate, RedisScript<List<Long>> redisScript){ this.redisTemplate = redisTemplate; this.redisScript = redisScript; } public boolean isAllowed(String key, int replenishRate, int burstCapacity){ List<String> keys = Arrays.asList(keyNamespace+key+"tokens", keyNamespace+key+"timestamp"); try { List<Long> response = this.redisTemplate.execute(this.redisScript, keys, replenishRate+"",
burstCapacity+"",
Instant.now().getEpochSecond()+"",
1+"");
if(response.get(0) ==0){ return false; }else{ return true; } }catch (Exception e){ logger.error(e.getMessage(), e); return true; } } }

2)加入過濾器,我們來自定義實現一個全局的過濾器吧

IP限流過濾器如下

package net.sunmonkey.gateway.filter;

import net.sunmonkey.db.model.IpRate;
import net.sunmonkey.gateway.limiter.MyRedisRateLimiter;
import net.sunmonkey.gateway.service.IpRateService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

/**
 * IP限流過濾器
 */
@Component
public class IpRateGlobalFilter implements Ordered, GlobalFilter {

    @Autowired
    private MyRedisRateLimiter myRedisRateLimiter;

    @Autowired
    private IpRateService ipRateService;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        //獲取到調用客戶端的IP地址
String ip
= exchange.getRequest().getRemoteAddress().getHostName(); //從數據庫中獲取到該IP地址對應的限流參數 IpRate ipRate = ipRateService.get(ip); //如果允許同行,沒有超過該ip的流量限制 if(myRedisRateLimiter.isAllowed("ip:"+ip+":", ipRate.getReplenishRate(), ipRate.getBurstCapacity())){ return chain.filter(exchange); }else{ exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); return exchange.getResponse().setComplete(); } } @Override public int getOrder() { return 1; } }

接口限流的過濾器如下

package net.sunmonkey.gateway.filter;

import net.sunmonkey.db.model.PathRate;
import net.sunmonkey.gateway.limiter.MyRedisRateLimiter;
import net.sunmonkey.gateway.service.PathRateService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

/**
 * 接口限流過濾器
 */
@Component
public class PathRateGlobalFilter implements Ordered, GlobalFilter {
    @Autowired
    private MyRedisRateLimiter myRedisRateLimiter;

    @Autowired
    private PathRateService pathRateService;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        //獲取到該接口路徑
String path
= exchange.getRequest().getPath().value(); //從數據庫中獲取到該接口對於的限流參數 PathRate pathRate = pathRateService.get(path); //如果允許同行,沒有超過該接口的流量限制 if(myRedisRateLimiter.isAllowed("path:"+path+":", pathRate.getReplenishRate(), pathRate.getBurstCapacity())){ return chain.filter(exchange); }
//如果不允許此次請求通過,就返回429,請求太頻繁的錯誤
else{ exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); return exchange.getResponse().setComplete(); } } @Override public int getOrder() { return 1; } }

至此,基於客戶端IP的限流和接口的限流實現完畢。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM