分布式限流之一:redis+lua 實現分布式令牌桶,高並發限流


在《高可用服務設計之二:Rate limiting 限流與降級》的應用級限流中,介紹了多種方法例如:

1、使用guava提供工具庫里的RateLimiter類(內部采用令牌捅算法實現)進行限流 
2、使用Java自帶delayqueue的延遲隊列實現(編碼過程相對麻煩,此處省略代碼) 
3、使用Redis實現,存儲兩個key,一個用於計時,一個用於計數。請求每調用一次,計數器增加1,若在計時器時間內計數器未超過閾值,則可以處理任務 

可行性分析

最快捷且有效的方式是使用RateLimiter實現,但是這很容易踩到一個坑,單節點模式下,使用RateLimiter進行限流一點問題都沒有。但是…線上是分布式系統,布署了多個節點,而且多個節點最終調用的是同一個短信服務商接口。雖然我們對單個節點能做到將QPS限制在400/s,但是多節點條件下,如果每個節點均是400/s,那么到服務商那邊的總請求就是節點數x400/s,於是限流效果失效。使用該方案對單節點的閾值控制是難以適應分布式環境的,至少目前我還沒想到更為合適的方式。 對於第二種,使用delayqueue方式。其實主要存在兩個問題,

1:短信系統本身就用了一層消息隊列,有用kafka,或者rabitmq,如果再加一層延遲隊列,從設計上來說是不太合適的。

2:實現delayqueue的過程相對較麻煩,耗時可能比較長,而且達不到精准限流的效果

3:對於第三種,使用redis進行限流,其很好地解決了分布式環境下多實例所導致的並發問題。因為使用redis設置的計時器和計數器均是全局唯一的,不管多少個節點,它們使用的都是同樣的計時器和計數器,因此可以做到非常精准的流控。同時,這種方案編碼並不復雜,可能需要的代碼不超過10行。

 

回憶一下令牌桶算法:

令牌桶算法(Token Bucket)和 Leaky Bucket 效果一樣但方向相反的算法,更加容易理解.隨着時間流逝,系統會按恆定1/QPS時間間隔(如果QPS=100,則間隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有個水龍頭在不斷的加水),如果桶已經滿了就不再加了.新請求來臨時,會各自拿走一個Token,如果沒有Token可拿了就阻塞或者拒絕服務. 
令牌桶的另外一個好處是可以方便的改變速度. 一旦需要提高速率,則按需提高放入桶中的令牌的速率. 一般會定時(比如100毫秒)往桶中增加一定數量的令牌, 有些變種算法則實時的計算應該增加的令牌的數量. 

last_mill_second 最后時間毫秒 
curr_permits 當前可用的令牌 
max_burst 令牌桶最大值 
rate 每秒生成幾個令牌 
app 應用 
令牌桶內令牌生成借鑒Guava-RateLimiter類的設計 
每次getToken根據時間戳生成token,不超過最大值

復制代碼
local ratelimit_info=redis.pcall("HMGET",KEYS[1],"last_mill_second","curr_permits","max_burst","rate","app")
local last_mill_second=ratelimit_info[1]
local curr_permits=tonumber(ratelimit_info[2])
local max_burst=tonumber(ratelimit_info[3])
local rate=tonumber(ratelimit_info[4])
local app=tostring(ratelimit_info[5])
if app == nil then
    return 0
end

local local_curr_permits=max_burst;

if(type(last_mill_second) ~='boolean' and last_mill_second ~=nil) then
    local reverse_permits=math.floor((ARGV[2]-last_mill_second)/1000)*rate
    if(reverse_permits>0) then
        redis.pcall("HMSET",KEYS[1],"last_mill_second",ARGV[2])
    end

    local expect_curr_permits=reverse_permits+curr_permits
    local_curr_permits=math.min(expect_curr_permits,max_burst);

else
    redis.pcall("HMSET",KEYS[1],"last_mill_second",ARGV[2])
end

local result=-1
if(local_curr_permits-ARGV[1]>0) then
    result=1
    redis.pcall("HMSET",KEYS[1],"curr_permits",local_curr_permits-ARGV[1])
else
    redis.pcall("HMSET",KEYS[1],"curr_permits",local_curr_permits)
end

return result
復制代碼

Lua腳本在Redis中運行,保證了取令牌和生成令牌兩個操作的原子性。

復制代碼
# REDIS (RedisProperties)
# Redis數據庫索引(默認為0)
spring.redis.database=0
# Redis服務器地址
spring.redis.host=127.0.0.1
# Redis服務器連接端口
spring.redis.port=6379
# Redis服務器連接密碼(默認為空)
spring.redis.password=
# 連接池最大連接數(使用負值表示沒有限制)
spring.redis.jedis.pool.max-active=8
# 連接池最大阻塞等待時間(使用負值表示沒有限制)
spring.redis.jedis.pool.max-wait=-1
# 連接池中的最大空閑連接
spring.redis.jedis.pool.max-idle=8
# 連接池中的最小空閑連接
spring.redis.jedis.pool.min-idle=0
# 連接超時時間(毫秒)
spring.redis.timeout=2000
復制代碼
java代碼1:redis連接
復制代碼
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {

    @Override
    @Bean
    public KeyGenerator keyGenerator() {
        return new KeyGenerator() {
            @Override
            public Object generate(Object target, Method method, Object... params) {
                StringBuilder sb = new StringBuilder();
                sb.append(target.getClass().getName());
                sb.append(method.getName());
                for (Object obj : params) {
                    sb.append(obj.toString());
                }
                return sb.toString();
            }
        };
    }

    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate(factory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }

    @Bean("ratelimitLua")
    public DefaultRedisScript getRedisScript() {
        DefaultRedisScript redisScript = new DefaultRedisScript();
        redisScript.setLocation(new ClassPathResource("ratelimit.lua"));
        redisScript.setResultType(java.lang.Long.class);
        return redisScript;
    }
    @Bean("ratelimitInitLua")
    public DefaultRedisScript getInitRedisScript() {
        DefaultRedisScript redisScript = new DefaultRedisScript();
        redisScript.setLocation(new ClassPathResource("ratelimitInit.lua"));
        redisScript.setResultType(java.lang.Long.class);
        return redisScript;
    }


}


public class Constants {
    public static final String RATE_LIMIT_KEY = "ratelimit:";
}

public enum Token {
    SUCCESS,
    FAILED;
    public boolean isSuccess(){
        return this.equals(SUCCESS);
    }
    public boolean isFailed(){
        return this.equals(FAILED);
    }
}
復制代碼

 

java代碼2:如下是Java中判斷是否需要限流的代碼:
復制代碼
@Service
public class RateLimitClient {

    @Autowired
    StringRedisTemplate stringRedisTemplate;
    @Qualifier("getRedisScript")
    @Resource
    RedisScript<Long> ratelimitLua;
    @Qualifier("getInitRedisScript")
    @Resource
    RedisScript<Long> ratelimitInitLua;

    public Token initToken(String key){
        Token token = Token.SUCCESS;
        Long currMillSecond = stringRedisTemplate.execute(
                (RedisCallback<Long>) redisConnection -> redisConnection.time()
        );
        /**
         * redis.pcall("HMSET",KEYS[1],
         "last_mill_second",ARGV[1],
         "curr_permits",ARGV[2],
         "max_burst",ARGV[3],
         "rate",ARGV[4],
         "app",ARGV[5])
         */
        Long accquire = stringRedisTemplate.execute(ratelimitInitLua,
                Collections.singletonList(getKey(key)), currMillSecond.toString(), "1", "10", "10", "skynet");
        if (accquire == 1) {
            token = Token.SUCCESS;
        } else if (accquire == 0) {
            token = Token.SUCCESS;
        } else {
            token = Token.FAILED;
        }
        return token;
    }
    /**
     * 獲得key操作
     *
     * @param key
     * @return
     */
    public Token accquireToken(String key) {
        return accquireToken(key, 1);
    }

    public Token accquireToken(String key, Integer permits) {
        Token token = Token.SUCCESS;
        Long currMillSecond = stringRedisTemplate.execute(
                (RedisCallback<Long>) redisConnection -> redisConnection.time()
        );

        Long accquire = stringRedisTemplate.execute(ratelimitLua,
                Collections.singletonList(getKey(key)), permits.toString(), currMillSecond.toString());
        if (accquire == 1) {
            token = Token.SUCCESS;
        } else {
            token = Token.FAILED;
        }
        return token;
    }

    public String getKey(String key) {
        return Constants.RATE_LIMIT_KEY + key;
    }

}
復制代碼

 

lua:
復制代碼
local result=1
redis.pcall("HMSET",KEYS[1],
        "last_mill_second",ARGV[1],
        "curr_permits",ARGV[2],
        "max_burst",ARGV[3],
        "rate",ARGV[4],
        "app",ARGV[5])
return result
復制代碼

 

轉至:https://www.cnblogs.com/duanxz/p/3465559.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM