也來說說redis+lua實現高並發限流---redis限流器


我們的靈活用工系統調用優付渠道API做用戶簽約或資金下發時,優付系統增加了API接口請求的限流策略。

針對每一個商戶的每種類型的API請求做限流。比如:同一商戶,每秒鍾只允許20次簽約請求。當每秒請求超過20次時,會提示“客戶請求簽約接口次數超限”。

那么,我們作為API調用方,就要對並發進行控制,以防出現無效請求。

 

最常用的並發限流方案是借助redis/jedis。為了保證原子性,這里,我使用Redis+LUA腳本的方式來控制。

那么,

對於服務提供方來說,當請求量超出設定的限流閾值,則直接返回錯誤碼/錯誤提示,並終止對請求的處理。

而對於調用方來說呢,我們要做的是:當並發請求超出了限定閾值時,要延遲請求,而不是直接丟棄。

 

話不多說,上代碼吧。 

redis限流器

如下RedisLimiter類定義了Redis限流器的api:服務提供方使用limit方法實現限流;服務調用方使用limitWait方法實現限流等待(如需)。

package jstudy.redislimit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * Redis+Lua實現高並發限流
 */
@Slf4j
@Component
public class RedisLimiter {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 達到限流時,則等待,直到新的間隔。
     *
     * @param key
     * @param limitCount
     * @param limitSecond
     */
    public void limitWait(String key, int limitCount, int limitSecond) {
        boolean ok;//放行標志
        do {
            ok = limit(key, limitCount, limitSecond);
            log.info("放行標志={}", ok);
            if (!ok) {
                Long ttl = redisTemplate.getExpire(key, TimeUnit.MILLISECONDS);
                if (null != ttl && ttl > 0) {
                    try {
                        Thread.sleep(ttl);
                        log.info("sleeped:{}", ttl);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } while (!ok);
    }

    /**
     * 限流方法    true-放行;false-限流
     *
     * @param key
     * @param limitCount
     * @param limitSecond
     * @return
     */
    public boolean limit(String key, int limitCount, int limitSecond) {
        List<String> keys = Collections.singletonList(key);
        String luaScript = buildLuaScript();
        RedisScript<Number> redisScript = new DefaultRedisScript<>(luaScript, Number.class);
        Number count = redisTemplate.execute(redisScript, keys, limitCount, limitSecond);
        log.info("Access try count is {} for key = {}", count, key);
        if (count != null && count.intValue() <= limitCount) {
            return true;//放行
        } else {
            return false;//限流
//            throw new RuntimeException("You have been dragged into the blacklist");
        }
    }

    /**
     * 編寫 redis Lua 限流腳本
     */
    public String buildLuaScript() {
        StringBuilder lua = new StringBuilder();
        lua.append("local c");
        lua.append("\nc = redis.call('get',KEYS[1])");
        // 實際調用次數超過閾值,則直接返回
        lua.append("\nif c and tonumber(c) > tonumber(ARGV[1]) then");
        lua.append("\nreturn c;");
        lua.append("\nend");
        // 執行計算器自加
        lua.append("\nc = redis.call('incr',KEYS[1])");
        lua.append("\nif tonumber(c) == 1 then");
        // 從第一次調用開始限流,設置對應鍵值的過期
        lua.append("\nredis.call('expire',KEYS[1],ARGV[2])");
        lua.append("\nend");
        lua.append("\nreturn c;");
        return lua.toString();
    }

}

 

 

SpringBoot自動注入的RedisTemplate是RedisTemplate<Object,Object>泛型, 上面class使用RedisTemplate<String, Object>,bean定義如下:

package jstudy.redislimit;

import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
@EnableCaching // 開啟緩存支持
public class RedisConfig extends CachingConfigurerSupport {

    /**
     * RedisTemplate配置
     *
     * @param lettuceConnectionFactory
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        // 設置序列化
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, Visibility.ANY);
        om.enableDefaultTyping(DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // 配置redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(lettuceConnectionFactory);
        RedisSerializer<?> stringSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringSerializer);// key序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化
        redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

}
View Code

 

 

並發測試通過,如下是testcase:

package jstudy.redislimit;

import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
public class RedisLimiterTest {
    @Autowired
    private RedisLimiter redisLimiter;

    @Test
    public void testLimitWait() throws InterruptedException {
        ExecutorService pool = Executors.newCachedThreadPool();
        log.info("--------{}", redisTemplate.opsForValue().get("abc"));
        for (int j = 1; j <= 5; j++) {
            int i=j;
            pool.execute(() -> {
                Thread.currentThread().setName( Thread.currentThread().getName().replace("-","_"));
                redisLimiter.limitWait("abc", 3, 1);
                log.info(i + ":" + true + " ttl:" + redisTemplate.getExpire("abc", TimeUnit.MILLISECONDS));
                    try {
                        // 線程等待,模擬執行業務邏輯
                        Thread.sleep(new Random().nextInt(100));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
            });
        }
        pool.shutdown();
        pool.awaitTermination(2,TimeUnit.SECONDS);
    }
}
View Code

 

jedis限流算法

如下jedis算法與上面lua腳本相比,實現算法殊途同歸。不過,因為不具備原子性,高並發可能會出現冒的情況。所以,要實現精確限流,還是借助上面的lua。

public class JedisLimiter {
    private static JedisPool jedisPool = SpringContextUtils.getBean(JedisPool.class);

    /**
     * 訪問頻率限制
     *
     * @param key
     * @param seconds
     * @param number
     * @return
     */
    public static boolean isOverLimit(String key, int seconds, int number) {
        Jedis jedis = null;
        try {
            jedis = getResource();
            String value = jedis.get(key);
            if (value == null) {
                jedis.set(key, "1");
                jedis.expire(key, seconds);
                return false;
            } else {
                Long ttl = jedis.ttl(key);
                if (ttl.longValue() > 0) {
                    int parseInt = Integer.parseInt(value);
                    if (parseInt > number) {
                        return true;
                    }
                    jedis.incr(key);
                }
            }
            return false;
        } catch (Exception e) {
            log.warn("jedis限流器異常", e);
        } finally {
            returnResource(jedis);
        }
        return false;
    }
}

 

Jedis限流算法改造,利用Lua腳本保證原子性

public class JedisLimiter {
    private static JedisPool jedisPool = SpringContextUtils.getBean(JedisPool.class);

    /**
     * 【redis限流器】請求頻次是否超出限制
     *
     * @param key       限流key
     * @param timeRange 時間范圍
     * @param limitNum  在限制時間內可以請求的次數閾值
     * @return 是否訪問超限
     */
    public static boolean isOverLimit(String key, String timeRange, String limitNum) {
        StringBuffer script = new StringBuffer();
//        LUA腳本--如果超限,返回1;否則返回0
//        set的返回值說明:"OK"-key不存在,設置成功;null-key已存在
        script.append("local ok = redis.call('set', KEYS[1], 1, 'NX', 'EX', tonumber(ARGV[1])) \n")
                .append("if (ok) then  \n")
                .append("return 0 \n")
                .append("end \n")
                .append("local reqNum = redis.call('incr',KEYS[1])  \n")
                .append("if reqNum> tonumber(ARGV[2]) then  \n")
                .append("return 1  \n")
                .append("end  \n")
                .append("return 0  \n");
        Jedis jedis = null;
        long result;
        try {
            jedis = getResource();
//          KEYS[1]:key ARGV[1]=timeRange ARGV[2]=limitNum
            result = (long) jedis.eval(script.toString(), 1, key, timeRange, limitNum);
//          執行lua腳本返回1,表示訪問超限
            if (result == 1) {
                logger.info("訪問超限,當前訪問次數={}", jedis.get(key));
                return true;
            }
        } catch (JedisException e) {
            logger.error("JedisException=", e);
            return true;
        } catch (Exception e) {
            logger.error("jedis限流器異常", e);
            return true;
        } finally {
            returnResource(jedis);
        }
        return false;
    }
}

 

 

關於TTL(Time to Live)

不管是redis還是jedis,其實都是利用了消息的ttl(Time to Live),即,當消息的ttl=0時,消息會自動過期。ttl還見諸於RabbitMQ的死信隊列,隊列里的消息會延遲消費,當等待ttl指定的時間后,才會自動轉移到實時隊列。

redis是使用RedisTemplate.expire來設置ttl;使用RedisTemplate.getExpire(key)或RedisTemplate.getExpire(key,TimeUnit)方法來獲取ttl。當然,對於並發限流,我們需要使用后者指定時間單位為TimeUnit.MILLISECONDS來得到精確的剩余毫秒數。

jedis是使用Jedis.expire來設置ttl;使用Jedis.ttl(key)方法來獲取ttl,返回的時間是毫秒。

getExpire/ttl返回值:

  • -2:key不存在
  • -1:未設置ttl
  • n:實際的剩余ttl

 

redis.incr指令說明

關於redis的increment :

  • 當key不存在時,創建key,默認值是delta值(不指定delta的話,則為1)。
  • 當key存在時,按delta來遞增。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM