基于SpringCloud Gateway自定义限流


目前,市面上越来越多的公司选择使用RESTful接口作为数据交换接口,通常通过网关对外暴露这些RESTful接口,为了避免有意或者无意的高并发访问,拖垮这些RESTful接口所在的业
务系统,需要对这些接口进行限流,熔断,降级之类的。这里我们重点讲解限流。

1,常见的限流算法

1.1,计数器算法

计数器是比较简单的一种算法,是一种基于时间计数的算法,假设有一个接口/user/hello接口,规定这个接口的访问量为100次/秒,每次访问该接口时都使计数器Counter递增1,当达到100次的时候,都去判断第100次和第1次的时间只差是否大于1秒,如果大于一秒,就表示这一百次访问的并发量小于100次/秒,允许这次请求通过;反之,如果时间之差小于1秒,就表示超过了100次/秒的并发访问,需要限制请求到到业务系统。

但是这种基于时间周期的计数算法有一个很大的弊端,它只能限制周期内的并发访问数。加入有100个访问是在00:00.500-00:01.000这段时间内访问的,同时又有100个访问是在00:01.000-01:00.500这段时间内访问的,那么00:00.500-01:00.500这一秒内就有200个访问,已经超过了需要限制的并发量。同JVM的引用计数算法一样,该计数算法也存在弊端。

1.2,漏桶算法

漏桶算法其实很简单,可以粗略的认为就是注水漏水过程,注水过程对应访问过程,往桶中以一定速率流出水,以任意速率流入水,当水超过桶流量则丢弃,因为桶容量是不变的,保证了整体的速率。

1.3,令牌桶算法

令牌桶算法和漏桶算法类似,不过是将漏桶算法反转的一种算法。可以认为是从水箱取水的过程。

2,使用SpringCloud Gateway实现自定义多维度限流

2.1 SpringCloud Gateway自带限流对象

SpringCloud Gateway是SpringCloud的产物,是Spring官方用来替换Zuul的一个网关,由Spring开发。

使用SpringCloud Gateway需要引入依赖

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

SpringCloud Gateway有一个自带的限流Limiter。org.springframework.cloud.gateway.filter.ratelimit.RedisRateLimiter。该对象的isAllowed方法是关键,让我们来看看这个方法的源码

	public Mono<Response> isAllowed(String routeId, String id) {
		if (!this.initialized.get()) {
			throw new IllegalStateException("RedisRateLimiter is not initialized");
		}

		Config routeConfig = loadConfiguration(routeId);

		// How many requests per second do you want a user to be allowed to do?
		int replenishRate = routeConfig.getReplenishRate();

		// How much bursting do you want to allow?
		int burstCapacity = routeConfig.getBurstCapacity();

		// How many tokens are requested per request?
		int requestedTokens = routeConfig.getRequestedTokens();

		try {
			List<String> keys = getKeys(id);

			// The arguments to the LUA script. time() returns unixtime in seconds.
			List<String> scriptArgs = Arrays.asList(replenishRate + "",
					burstCapacity + "", Instant.now().getEpochSecond() + "",
					requestedTokens + "");
			// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
			Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys,
					scriptArgs);
			// .log("redisratelimiter", Level.FINER);
			return flux.onErrorResume(throwable -> {
				if (log.isDebugEnabled()) {
					log.debug("Error calling rate limiter lua", throwable);
				}
				return Flux.just(Arrays.asList(1L, -1L));
			}).reduce(new ArrayList<Long>(), (longs, l) -> {
				longs.addAll(l);
				return longs;
			}).map(results -> {
				boolean allowed = results.get(0) == 1L;
				Long tokensLeft = results.get(1);

				Response response = new Response(allowed,
						getHeaders(routeConfig, tokensLeft));

				if (log.isDebugEnabled()) {
					log.debug("response: " + response);
				}
				return response;
			});
		}
		catch (Exception e) {
			/*
			 * We don't want a hard dependency on Redis to allow traffic. Make sure to set
			 * an alert so you know if this is happening too much. Stripe's observed
			 * failure rate is 0.01%.
			 */
			log.error("Error determining if user allowed from redis", e);
		}
		return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
	}

由源码可知,该方法的内部使用过一个Redis的Lua脚本来控制限流的。SpringCloud Gateway使用的是令牌桶限流算法。我们来看看这个限流算法的Lua脚本,Lua脚本在spring-cloud-gateway-core.jar包的META-INF/scripts/request-rate-limiter.lua路径。源码如下

--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
--该参数是令牌桶填充速率
local rate = tonumber(ARGV[1])
--该参数是令牌桶容量 local capacity = tonumber(ARGV[2])
--该参数是访问的时间戳 local now = tonumber(ARGV[3])
--该参数每次取出的令牌数量 local requested = tonumber(ARGV[4]) local fill_time = capacity/rate local ttl = math.floor(fill_time*2) --redis.log(redis.LOG_WARNING, "rate " .. ARGV[1]) --redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2]) --redis.log(redis.LOG_WARNING, "now " .. ARGV[3]) --redis.log(redis.LOG_WARNING, "requested " .. ARGV[4]) --redis.log(redis.LOG_WARNING, "filltime " .. fill_time) --redis.log(redis.LOG_WARNING, "ttl " .. ttl) local last_tokens = tonumber(redis.call("get", tokens_key)) if last_tokens == nil then last_tokens = capacity end --redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens) local last_refreshed = tonumber(redis.call("get", timestamp_key)) if last_refreshed == nil then last_refreshed = 0 end --redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed) local delta = math.max(0, now-last_refreshed) local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) local allowed = filled_tokens >= requested local new_tokens = filled_tokens local allowed_num = 0 if allowed then new_tokens = filled_tokens - requested allowed_num = 1 end --redis.log(redis.LOG_WARNING, "delta " .. delta) --redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens) --redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num) --redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens) if ttl > 0 then redis.call("setex", tokens_key, ttl, new_tokens) redis.call("setex", timestamp_key, ttl, now) end -- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
--返回的有两个参数,allowed_num表示该算法算出来允许访问的次数,如果该值大于0,就表示可以允许这次访问通过
-- return { allowed_num, new_tokens }

该Lua脚本的输入参数有四个,分别是令牌桶填充速率,令牌桶容量,访问时间戳,取出令牌的数量;返回参数有两个,分别是允许访问的次数,新的令牌。只要允许访问的次数大于0,就表示此次访问允许通过。

2.2 自定义限流,IP限流,接口限流。

源码地址:https://github.com/chengwenqin/gateway-demo

此次实现两个较为常见的业务限流规则,我们将不同IP和接口路径的限流参数(令牌桶填充速率和令牌桶容量)配置在数据库中,并且改变参数无需重启即可生效。

1)我们需要重新写个Limiter,SpringCloud Gateway自带的Limiter是读取配置文件的限流参数的

实现如下

package net.sunmonkey.gateway.limiter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import java.time.Instant;
import java.util.Arrays;
import java.util.List;

@Component
public class MyRedisRateLimiter {

    private static final Logger logger = LoggerFactory.getLogger(MyRedisRateLimiter.class);

    private final String keyNamespace="gateway:demo:limit:";

    private final RedisTemplate<String, Long> redisTemplate;

    private final RedisScript<List<Long>> redisScript;

/**
* @param redisScript 该RedisScript对象将会自动注入进来,该对象使用的正是上面介绍的request-rate-limiter.lua脚本
*/
public MyRedisRateLimiter(RedisTemplate redisTemplate, RedisScript<List<Long>> redisScript){ this.redisTemplate = redisTemplate; this.redisScript = redisScript; } public boolean isAllowed(String key, int replenishRate, int burstCapacity){ List<String> keys = Arrays.asList(keyNamespace+key+"tokens", keyNamespace+key+"timestamp"); try { List<Long> response = this.redisTemplate.execute(this.redisScript, keys, replenishRate+"",
burstCapacity+"",
Instant.now().getEpochSecond()+"",
1+"");
if(response.get(0) ==0){ return false; }else{ return true; } }catch (Exception e){ logger.error(e.getMessage(), e); return true; } } }

2)加入过滤器,我们来自定义实现一个全局的过滤器吧

IP限流过滤器如下

package net.sunmonkey.gateway.filter;

import net.sunmonkey.db.model.IpRate;
import net.sunmonkey.gateway.limiter.MyRedisRateLimiter;
import net.sunmonkey.gateway.service.IpRateService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

/**
 * IP限流过滤器
 */
@Component
public class IpRateGlobalFilter implements Ordered, GlobalFilter {

    @Autowired
    private MyRedisRateLimiter myRedisRateLimiter;

    @Autowired
    private IpRateService ipRateService;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        //获取到调用客户端的IP地址
String ip
= exchange.getRequest().getRemoteAddress().getHostName(); //从数据库中获取到该IP地址对应的限流参数 IpRate ipRate = ipRateService.get(ip); //如果允许同行,没有超过该ip的流量限制 if(myRedisRateLimiter.isAllowed("ip:"+ip+":", ipRate.getReplenishRate(), ipRate.getBurstCapacity())){ return chain.filter(exchange); }else{ exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); return exchange.getResponse().setComplete(); } } @Override public int getOrder() { return 1; } }

接口限流的过滤器如下

package net.sunmonkey.gateway.filter;

import net.sunmonkey.db.model.PathRate;
import net.sunmonkey.gateway.limiter.MyRedisRateLimiter;
import net.sunmonkey.gateway.service.PathRateService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

/**
 * 接口限流过滤器
 */
@Component
public class PathRateGlobalFilter implements Ordered, GlobalFilter {
    @Autowired
    private MyRedisRateLimiter myRedisRateLimiter;

    @Autowired
    private PathRateService pathRateService;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        //获取到该接口路径
String path
= exchange.getRequest().getPath().value(); //从数据库中获取到该接口对于的限流参数 PathRate pathRate = pathRateService.get(path); //如果允许同行,没有超过该接口的流量限制 if(myRedisRateLimiter.isAllowed("path:"+path+":", pathRate.getReplenishRate(), pathRate.getBurstCapacity())){ return chain.filter(exchange); }
//如果不允许此次请求通过,就返回429,请求太频繁的错误
else{ exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS); return exchange.getResponse().setComplete(); } } @Override public int getOrder() { return 1; } }

至此,基于客户端IP的限流和接口的限流实现完毕。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM