在《高可用服務設計之二:Rate limiting 限流與降級》的應用級限流中,介紹了多種方法例如:
1、使用guava提供工具庫里的RateLimiter類(內部采用令牌捅算法實現)進行限流
2、使用Java自帶delayqueue的延遲隊列實現(編碼過程相對麻煩,此處省略代碼)
3、使用Redis實現,存儲兩個key,一個用於計時,一個用於計數。請求每調用一次,計數器增加1,若在計時器時間內計數器未超過閾值,則可以處理任務
可行性分析
最快捷且有效的方式是使用RateLimiter實現,但是這很容易踩到一個坑,單節點模式下,使用RateLimiter進行限流一點問題都沒有。但是…線上是分布式系統,布署了多個節點,而且多個節點最終調用的是同一個短信服務商接口。雖然我們對單個節點能做到將QPS限制在400/s,但是多節點條件下,如果每個節點均是400/s,那么到服務商那邊的總請求就是節點數x400/s,於是限流效果失效。使用該方案對單節點的閾值控制是難以適應分布式環境的,至少目前我還沒想到更為合適的方式。 對於第二種,使用delayqueue方式。其實主要存在兩個問題,
1:短信系統本身就用了一層消息隊列,有用kafka,或者rabitmq,如果再加一層延遲隊列,從設計上來說是不太合適的。
2:實現delayqueue的過程相對較麻煩,耗時可能比較長,而且達不到精准限流的效果
3:對於第三種,使用redis進行限流,其很好地解決了分布式環境下多實例所導致的並發問題。因為使用redis設置的計時器和計數器均是全局唯一的,不管多少個節點,它們使用的都是同樣的計時器和計數器,因此可以做到非常精准的流控。同時,這種方案編碼並不復雜,可能需要的代碼不超過10行。
回憶一下令牌桶算法:

令牌桶算法(Token Bucket)和 Leaky Bucket 效果一樣但方向相反的算法,更加容易理解.隨着時間流逝,系統會按恆定1/QPS時間間隔(如果QPS=100,則間隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了就不再加了.新請求來臨時,會各自拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務.
令牌桶的另外一個好處是可以方便的改變速度. 一旦需要提高速率,則按需提高放入桶中的令牌的速率. 一般會定時(比如100毫秒)往桶中增加一定數量的令牌, 有些變種算法則實時的計算應該增加的令牌的數量. 
last_mill_second 最后時間毫秒
curr_permits 當前可用的令牌
max_burst 令牌桶最大值
rate 每秒生成幾個令牌
app 應用
令牌桶內令牌生成借鑒Guava-RateLimiter類的設計
每次getToken根據時間戳生成token,不超過最大值
local ratelimit_info=redis.pcall("HMGET",KEYS[1],"last_mill_second","curr_permits","max_burst","rate","app")
local last_mill_second=ratelimit_info[1]
local curr_permits=tonumber(ratelimit_info[2])
local max_burst=tonumber(ratelimit_info[3])
local rate=tonumber(ratelimit_info[4])
local app=tostring(ratelimit_info[5])
if app == nil then
return 0
end
local local_curr_permits=max_burst;
if(type(last_mill_second) ~='boolean' and last_mill_second ~=nil) then
local reverse_permits=math.floor((ARGV[2]-last_mill_second)/1000)*rate
if(reverse_permits>0) then
redis.pcall("HMSET",KEYS[1],"last_mill_second",ARGV[2])
end
local expect_curr_permits=reverse_permits+curr_permits
local_curr_permits=math.min(expect_curr_permits,max_burst);
else
redis.pcall("HMSET",KEYS[1],"last_mill_second",ARGV[2])
end
local result=-1
if(local_curr_permits-ARGV[1]>0) then
result=1
redis.pcall("HMSET",KEYS[1],"curr_permits",local_curr_permits-ARGV[1])
else
redis.pcall("HMSET",KEYS[1],"curr_permits",local_curr_permits)
end
return result
Lua腳本在Redis中運行,保證了取令牌和生成令牌兩個操作的原子性。
# REDIS (RedisProperties) # Redis數據庫索引(默認為0) spring.redis.database=0 # Redis服務器地址 spring.redis.host=127.0.0.1 # Redis服務器連接端口 spring.redis.port=6379 # Redis服務器連接密碼(默認為空) spring.redis.password= # 連接池最大連接數(使用負值表示沒有限制) spring.redis.jedis.pool.max-active=8 # 連接池最大阻塞等待時間(使用負值表示沒有限制) spring.redis.jedis.pool.max-wait=-1 # 連接池中的最大空閑連接 spring.redis.jedis.pool.max-idle=8 # 連接池中的最小空閑連接 spring.redis.jedis.pool.min-idle=0 # 連接超時時間(毫秒) spring.redis.timeout=2000
java代碼1:redis連接
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {
@Override
@Bean
public KeyGenerator keyGenerator() {
return new KeyGenerator() {
@Override
public Object generate(Object target, Method method, Object... params) {
StringBuilder sb = new StringBuilder();
sb.append(target.getClass().getName());
sb.append(method.getName());
for (Object obj : params) {
sb.append(obj.toString());
}
return sb.toString();
}
};
}
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
@Bean("ratelimitLua")
public DefaultRedisScript getRedisScript() {
DefaultRedisScript redisScript = new DefaultRedisScript();
redisScript.setLocation(new ClassPathResource("ratelimit.lua"));
redisScript.setResultType(java.lang.Long.class);
return redisScript;
}
@Bean("ratelimitInitLua")
public DefaultRedisScript getInitRedisScript() {
DefaultRedisScript redisScript = new DefaultRedisScript();
redisScript.setLocation(new ClassPathResource("ratelimitInit.lua"));
redisScript.setResultType(java.lang.Long.class);
return redisScript;
}
}
public class Constants {
public static final String RATE_LIMIT_KEY = "ratelimit:";
}
public enum Token {
SUCCESS,
FAILED;
public boolean isSuccess(){
return this.equals(SUCCESS);
}
public boolean isFailed(){
return this.equals(FAILED);
}
}
java代碼2:如下是Java中判斷是否需要限流的代碼:
@Service
public class RateLimitClient {
@Autowired
StringRedisTemplate stringRedisTemplate;
@Qualifier("getRedisScript")
@Resource
RedisScript<Long> ratelimitLua;
@Qualifier("getInitRedisScript")
@Resource
RedisScript<Long> ratelimitInitLua;
public Token initToken(String key){
Token token = Token.SUCCESS;
Long currMillSecond = stringRedisTemplate.execute(
(RedisCallback<Long>) redisConnection -> redisConnection.time()
);
/**
* redis.pcall("HMSET",KEYS[1],
"last_mill_second",ARGV[1],
"curr_permits",ARGV[2],
"max_burst",ARGV[3],
"rate",ARGV[4],
"app",ARGV[5])
*/
Long accquire = stringRedisTemplate.execute(ratelimitInitLua,
Collections.singletonList(getKey(key)), currMillSecond.toString(), "1", "10", "10", "skynet");
if (accquire == 1) {
token = Token.SUCCESS;
} else if (accquire == 0) {
token = Token.SUCCESS;
} else {
token = Token.FAILED;
}
return token;
}
/**
* 獲得key操作
*
* @param key
* @return
*/
public Token accquireToken(String key) {
return accquireToken(key, 1);
}
public Token accquireToken(String key, Integer permits) {
Token token = Token.SUCCESS;
Long currMillSecond = stringRedisTemplate.execute(
(RedisCallback<Long>) redisConnection -> redisConnection.time()
);
Long accquire = stringRedisTemplate.execute(ratelimitLua,
Collections.singletonList(getKey(key)), permits.toString(), currMillSecond.toString());
if (accquire == 1) {
token = Token.SUCCESS;
} else {
token = Token.FAILED;
}
return token;
}
public String getKey(String key) {
return Constants.RATE_LIMIT_KEY + key;
}
}
lua:
local result=1
redis.pcall("HMSET",KEYS[1],
"last_mill_second",ARGV[1],
"curr_permits",ARGV[2],
"max_burst",ARGV[3],
"rate",ARGV[4],
"app",ARGV[5])
return result

