目前,市面上越来越多的公司选择使用RESTful接口作为数据交换接口,通常通过网关对外暴露这些RESTful接口,为了避免有意或者无意的高并发访问,拖垮这些RESTful接口所在的业
务系统,需要对这些接口进行限流,熔断,降级之类的。这里我们重点讲解限流。
1,常见的限流算法
1.1,计数器算法
计数器是比较简单的一种算法,是一种基于时间计数的算法,假设有一个接口/user/hello接口,规定这个接口的访问量为100次/秒,每次访问该接口时都使计数器Counter递增1,当达到100次的时候,都去判断第100次和第1次的时间只差是否大于1秒,如果大于一秒,就表示这一百次访问的并发量小于100次/秒,允许这次请求通过;反之,如果时间之差小于1秒,就表示超过了100次/秒的并发访问,需要限制请求到到业务系统。
但是这种基于时间周期的计数算法有一个很大的弊端,它只能限制周期内的并发访问数。加入有100个访问是在00:00.500-00:01.000这段时间内访问的,同时又有100个访问是在00:01.000-01:00.500这段时间内访问的,那么00:00.500-01:00.500这一秒内就有200个访问,已经超过了需要限制的并发量。同JVM的引用计数算法一样,该计数算法也存在弊端。
1.2,漏桶算法
漏桶算法其实很简单,可以粗略的认为就是注水漏水过程,注水过程对应访问过程,往桶中以一定速率流出水,以任意速率流入水,当水超过桶流量则丢弃,因为桶容量是不变的,保证了整体的速率。
1.3,令牌桶算法
令牌桶算法和漏桶算法类似,不过是将漏桶算法反转的一种算法。可以认为是从水箱取水的过程。
2,使用SpringCloud Gateway实现自定义多维度限流
2.1 SpringCloud Gateway自带限流对象
SpringCloud Gateway是SpringCloud的产物,是Spring官方用来替换Zuul的一个网关,由Spring开发。
使用SpringCloud Gateway需要引入依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-gateway</artifactId> </dependency>
SpringCloud Gateway有一个自带的限流Limiter。org.springframework.cloud.gateway.filter.ratelimit.RedisRateLimiter。该对象的isAllowed方法是关键,让我们来看看这个方法的源码
public Mono<Response> isAllowed(String routeId, String id) { if (!this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } Config routeConfig = loadConfiguration(routeId); // How many requests per second do you want a user to be allowed to do? int replenishRate = routeConfig.getReplenishRate(); // How much bursting do you want to allow? int burstCapacity = routeConfig.getBurstCapacity(); // How many tokens are requested per request? int requestedTokens = routeConfig.getRequestedTokens(); try { List<String> keys = getKeys(id); // The arguments to the LUA script. time() returns unixtime in seconds. List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", requestedTokens + ""); // allowed, tokens_left = redis.eval(SCRIPT, keys, args) Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs); // .log("redisratelimiter", Level.FINER); return flux.onErrorResume(throwable -> { if (log.isDebugEnabled()) { log.debug("Error calling rate limiter lua", throwable); } return Flux.just(Arrays.asList(1L, -1L)); }).reduce(new ArrayList<Long>(), (longs, l) -> { longs.addAll(l); return longs; }).map(results -> { boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1); Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft)); if (log.isDebugEnabled()) { log.debug("response: " + response); } return response; }); } catch (Exception e) { /* * We don't want a hard dependency on Redis to allow traffic. Make sure to set * an alert so you know if this is happening too much. Stripe's observed * failure rate is 0.01%. */ log.error("Error determining if user allowed from redis", e); } return Mono.just(new Response(true, getHeaders(routeConfig, -1L))); }
由源码可知,该方法的内部使用过一个Redis的Lua脚本来控制限流的。SpringCloud Gateway使用的是令牌桶限流算法。我们来看看这个限流算法的Lua脚本,Lua脚本在spring-cloud-gateway-core.jar包的META-INF/scripts/request-rate-limiter.lua路径。源码如下
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key) --该参数是令牌桶填充速率 local rate = tonumber(ARGV[1])
--该参数是令牌桶容量 local capacity = tonumber(ARGV[2])
--该参数是访问的时间戳 local now = tonumber(ARGV[3])
--该参数每次取出的令牌数量 local requested = tonumber(ARGV[4]) local fill_time = capacity/rate local ttl = math.floor(fill_time*2) --redis.log(redis.LOG_WARNING, "rate " .. ARGV[1]) --redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2]) --redis.log(redis.LOG_WARNING, "now " .. ARGV[3]) --redis.log(redis.LOG_WARNING, "requested " .. ARGV[4]) --redis.log(redis.LOG_WARNING, "filltime " .. fill_time) --redis.log(redis.LOG_WARNING, "ttl " .. ttl) local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil then last_tokens = capacity end --redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens) local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end --redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed) local delta = math.max(0, now-last_refreshed) local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then new_tokens = filled_tokens - requested allowed_num = 1 end --redis.log(redis.LOG_WARNING, "delta " .. delta) --redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens) --redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num) --redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens) if ttl > 0 then redis.call("setex", tokens_key, ttl, new_tokens) redis.call("setex", timestamp_key, ttl, now) end -- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
--返回的有两个参数,allowed_num表示该算法算出来允许访问的次数,如果该值大于0,就表示可以允许这次访问通过
-- return { allowed_num, new_tokens }
该Lua脚本的输入参数有四个,分别是令牌桶填充速率,令牌桶容量,访问时间戳,取出令牌的数量;返回参数有两个,分别是允许访问的次数,新的令牌。只要允许访问的次数大于0,就表示此次访问允许通过。
2.2 自定义限流,IP限流,接口限流。
源码地址:https://github.com/chengwenqin/gateway-demo
此次实现两个较为常见的业务限流规则,我们将不同IP和接口路径的限流参数(令牌桶填充速率和令牌桶容量)配置在数据库中,并且改变参数无需重启即可生效。
1)我们需要重新写个Limiter,SpringCloud Gateway自带的Limiter是读取配置文件的限流参数的
实现如下
package net.sunmonkey.gateway.limiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Component; import java.time.Instant; import java.util.Arrays; import java.util.List; @Component public class MyRedisRateLimiter { private static final Logger logger = LoggerFactory.getLogger(MyRedisRateLimiter.class); private final String keyNamespace="gateway:demo:limit:"; private final RedisTemplate<String, Long> redisTemplate; private final RedisScript<List<Long>> redisScript;
/**
* @param redisScript 该RedisScript对象将会自动注入进来,该对象使用的正是上面介绍的request-rate-limiter.lua脚本
*/ public MyRedisRateLimiter(RedisTemplate redisTemplate, RedisScript<List<Long>> redisScript){ this.redisTemplate = redisTemplate; this.redisScript = redisScript; } public boolean isAllowed(String key, int replenishRate, int burstCapacity){ List<String> keys = Arrays.asList(keyNamespace+key+"tokens", keyNamespace+key+"timestamp"); try { List<Long> response = this.redisTemplate.execute(this.redisScript, keys, replenishRate+"",
burstCapacity+"",
Instant.now().getEpochSecond()+"",
1+""); if(response.get(0) ==0){ return false; }else{ return true; } }catch (Exception e){ logger.error(e.getMessage(), e); return true; } } }
2)加入过滤器,我们来自定义实现一个全局的过滤器吧
IP限流过滤器如下
package net.sunmonkey.gateway.filter; import net.sunmonkey.db.model.IpRate; import net.sunmonkey.gateway.limiter.MyRedisRateLimiter; import net.sunmonkey.gateway.service.IpRateService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.core.Ordered; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; /** * IP限流过滤器 */ @Component public class IpRateGlobalFilter implements Ordered, GlobalFilter { @Autowired private MyRedisRateLimiter myRedisRateLimiter; @Autowired private IpRateService ipRateService; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { //获取到调用客户端的IP地址
String ip = exchange.getRequest().getRemoteAddress().getHostName(); //从数据库中获取到该IP地址对应的限流参数 IpRate ipRate = ipRateService.get(ip); //如果允许同行,没有超过该ip的流量限制 if(myRedisRateLimiter.isAllowed("ip:"+ip+":", ipRate.getReplenishRate(), ipRate.getBurstCapacity())){ return chain.filter(exchange); }else{ exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); return exchange.getResponse().setComplete(); } } @Override public int getOrder() { return 1; } }
接口限流的过滤器如下
package net.sunmonkey.gateway.filter; import net.sunmonkey.db.model.PathRate; import net.sunmonkey.gateway.limiter.MyRedisRateLimiter; import net.sunmonkey.gateway.service.PathRateService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.core.Ordered; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; /** * 接口限流过滤器 */ @Component public class PathRateGlobalFilter implements Ordered, GlobalFilter { @Autowired private MyRedisRateLimiter myRedisRateLimiter; @Autowired private PathRateService pathRateService; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { //获取到该接口路径
String path = exchange.getRequest().getPath().value(); //从数据库中获取到该接口对于的限流参数 PathRate pathRate = pathRateService.get(path); //如果允许同行,没有超过该接口的流量限制 if(myRedisRateLimiter.isAllowed("path:"+path+":", pathRate.getReplenishRate(), pathRate.getBurstCapacity())){ return chain.filter(exchange); }
//如果不允许此次请求通过,就返回429,请求太频繁的错误
else{ exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); return exchange.getResponse().setComplete(); } } @Override public int getOrder() { return 1; } }
至此,基于客户端IP的限流和接口的限流实现完毕。