參考:https://blog.csdn.net/ErickPang/article/details/84680132
采用自帶默認網關請參照微服務架構spring cloud - gateway網關限流,參數與其唯一的區別是header中多了參數userLevel,值為A或者B
此處實現按傳入參數取到不同配置
userLvl.A.replenishRate: 10
userLvl.A.burstCapacity: 100
userLvl.B.replenishRate: 20
userLvl.B.burstCapacity: 1000
自定義限流器
package com.gatewayaop.filter; import com.iot.crm.gatewayaop.common.config.UserLevelRateLimiterConf; import org.springframework.beans.BeansException; import org.springframework.cloud.gateway.filter.ratelimit.AbstractRateLimiter; import org.springframework.cloud.gateway.filter.ratelimit.RateLimiter; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.util.ObjectUtils; import org.springframework.validation.Validator; import org.springframework.validation.annotation.Validated; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import javax.validation.constraints.Min; import java.time.Instant; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; public class UserLevelRedisRateLimiter extends AbstractRateLimiter<UserLevelRedisRateLimiter.Config> implements ApplicationContextAware { //這些變量全部從RedisRateLimiter復制的,都會用到。 public static final String REPLENISH_RATE_KEY = "replenishRate"; public static final String BURST_CAPACITY_KEY = "burstCapacity"; public static final String CONFIGURATION_PROPERTY_NAME = "sys-redis-rate-limiter"; public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript"; public static final String REMAINING_HEADER = "X-RateLimit-Remaining"; public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate"; public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity"; //處理速度 private static final String DEFAULT_REPLENISHRATE="default.replenishRate"; //容量 private static final String DEFAULT_BURSTCAPACITY="default.burstCapacity"; private ReactiveRedisTemplate<String, String> redisTemplate; private RedisScript<List<Long>> script; private AtomicBoolean initialized = new AtomicBoolean(false); private String remainingHeader = REMAINING_HEADER; /** The name of the header that returns the replenish rate configuration. */ private String replenishRateHeader = REPLENISH_RATE_HEADER; /** The name of the header that returns the burst capacity configuration. */ private String burstCapacityHeader = BURST_CAPACITY_HEADER; private Config defaultConfig; public UserLevelRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, RedisScript<List<Long>> script, Validator validator) { super(Config.class , CONFIGURATION_PROPERTY_NAME , validator); this.redisTemplate = redisTemplate; this.script = script; initialized.compareAndSet(false,true); } public UserLevelRedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity){ super(Config.class , CONFIGURATION_PROPERTY_NAME , null); defaultConfig = new Config() .setReplenishRate(defaultReplenishRate) .setBurstCapacity(defaultBurstCapacity); } //具體限流實現,此處調用的是lua腳本 @Override public Mono<Response> isAllowed(String routeId, String id) { if (!this.initialized.get()) { throw new IllegalStateException("RedisRateLimiter is not initialized"); } if (ObjectUtils.isEmpty(rateLimiterConf) ){ throw new IllegalArgumentException("No Configuration found for route " + routeId); } //獲取的是自定義的map Map<String , Integer> rateLimitMap = rateLimiterConf.getRateLimitMap(); //緩存的key,此處routeId為userSev,Id為header參數userLevel的值(A或者B) String replenishRateKey = routeId + "." + id + "." + REPLENISH_RATE_KEY; //若map中不存在則采用默認值,存在則取值。 int replenishRate = ObjectUtils.isEmpty(rateLimitMap.get(replenishRateKey)) ? rateLimitMap.get(DEFAULT_REPLENISHRATE) : rateLimitMap.get(replenishRateKey); //容量key String burstCapacityKey = routeId + "." + id + "." + BURST_CAPACITY_KEY; //若map中不存在則采用默認值,存在則取值。 int burstCapacity = ObjectUtils.isEmpty(rateLimitMap.get(burstCapacityKey)) ? rateLimitMap.get(DEFAULT_BURSTCAPACITY) : rateLimitMap.get(burstCapacityKey); try { List<String> keys = getKeys(id); List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "", Instant.now().getEpochSecond() + "", "1"); Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs); return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))) .reduce(new ArrayList<Long>(), (longs, l) -> { longs.addAll(l); return longs; }) .map(results -> { boolean allowed = results.get(0) == 1L; Long tokensLeft = results.get(1); RateLimiter.Response response = new RateLimiter.Response(allowed, getHeaders(replenishRate , burstCapacity , tokensLeft)); return response; }); } catch (Exception e) { e.printStackTrace(); } return Mono.just(new RateLimiter.Response(true, getHeaders(replenishRate , burstCapacity , -1L))); } private UserLevelRateLimiterConf rateLimiterConf; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.rateLimiterConf = applicationContext.getBean(UserLevelRateLimiterConf.class); } public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity , Long tokensLeft) { HashMap<String, String> headers = new HashMap<>(); headers.put(this.remainingHeader, tokensLeft.toString()); headers.put(this.replenishRateHeader, String.valueOf(replenishRate)); headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity)); return headers; } static List<String> getKeys(String id) { // use `{}` around keys to use Redis Key hash tags // this allows for using redis cluster // Make a unique key per user. //此處可以自定義redis前綴信息 String prefix = "request_sys_rate_limiter.{" + id; // You need two Redis keys for Token Bucket. String tokenKey = prefix + "}.tokens"; String timestampKey = prefix + "}.timestamp"; return Arrays.asList(tokenKey, timestampKey); } @Validated public static class Config{ @Min(1) private int replenishRate; @Min(1) private int burstCapacity = 1; public int getReplenishRate() { return replenishRate; } public Config setReplenishRate(int replenishRate) { this.replenishRate = replenishRate; return this; } public int getBurstCapacity() { return burstCapacity; } public Config setBurstCapacity(int burstCapacity) { this.burstCapacity = burstCapacity; return this; } @Override public String toString() { return "Config{" + "replenishRate=" + replenishRate + ", burstCapacity=" + burstCapacity + '}'; } } }
讀取自定義配置類
package com.gatewayaop.common.config; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; //使用配置文件的方式進行初始化 @Component @ConfigurationProperties(prefix = "comsumer.ratelimiter-conf") //@EnableConfigurationProperties(UserLevelRateLimiterConf.class) public class UserLevelRateLimiterConf { //處理速度 private static final String DEFAULT_REPLENISHRATE="default.replenishRate"; //容量 private static final String DEFAULT_BURSTCAPACITY="default.burstCapacity"; //默認配置 private Map<String , Integer> rateLimitMap = new ConcurrentHashMap<String , Integer>(){ { put(DEFAULT_REPLENISHRATE , 10); put(DEFAULT_BURSTCAPACITY , 100); } }; public Map<String, Integer> getRateLimitMap() { return rateLimitMap; } public void setRateLimitMap(Map<String, Integer> rateLimitMap) { this.rateLimitMap = rateLimitMap; } }
定義限流器種類
package com.gatewayaop.common.config; import com.iot.crm.gatewayaop.filter.UserLevelRedisRateLimiter; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cloud.gateway.filter.ratelimit.KeyResolver; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.redis.core.ReactiveRedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.validation.Validator; import reactor.core.publisher.Mono; import java.util.List; @Configuration public class RequestRateLimiterConfig { @Bean @Primary KeyResolver apiKeyResolver() { //按URL限流 return exchange -> Mono.just(exchange.getRequest().getPath().toString()); } @Bean KeyResolver userKeyResolver() { //按用戶限流 return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user")); } @Bean KeyResolver ipKeyResolver() { //按IP來限流 return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName()); } @Bean KeyResolver userLevelKeyResolver() { //按IP來限流 return exchange -> Mono.just(exchange.getRequest().getHeaders().getFirst("userLevel")); } @Bean @Primary //使用自己定義的限流類 UserLevelRedisRateLimiter userLevelRedisRateLimiter( ReactiveRedisTemplate<String, String> redisTemplate, @Qualifier(UserLevelRedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> script, @Qualifier("defaultValidator") Validator validator){ return new UserLevelRedisRateLimiter(redisTemplate , script , validator); } }
yml配置
server: port: 9701 spring: application: name: gateway-aop-dev profiles: active: dev index: 62 cloud: gateway: discovery: locator: enabled: true # 服務名小寫 lower-case-service-id: true routes: #與customer.中key相同即是java代碼中的routeID - id: userSev # lb代表從注冊中心獲取服務,且已負載均衡方式轉發 uri: lb://hello-dev predicates: - Path=/hello-dev/** # 加上StripPrefix=1,否則轉發到后端服務時會帶上consumer前綴 filters: - StripPrefix=1 # 限流過濾器,使用gateway內置令牌算法 - name: RequestRateLimiter args: # # 令牌桶每秒填充平均速率,即行等價於允許用戶每秒處理多少個請求平均數 # redis-rate-limiter.replenishRate: 10 # # 令牌桶的容量,允許在一秒鍾內完成的最大請求數 # redis-rate-limiter.burstCapacity: 20 # 用於限流的鍵的解析器的 Bean 對象的名字。它使用 SpEL 表達式根據#{@beanName}從 Spring 容器中獲取 Bean 對象。 key-resolver: "#{@userLevelKeyResolver}" rate-limiter: "#{@userLevelRedisRateLimiter}" comsumer: ratelimiter-conf: #配置限流參數與RateLimiterConf類映射 rateLimitMap: #格式為:routeid(gateway配置routes時指定的).系統名稱.replenishRate(流速)/burstCapacity令牌桶大小 userSev.A.replenishRate: 10 userSev.A.burstCapacity: 100 userSev.B.replenishRate: 20 userSev.B.burstCapacity: 1000