我們的靈活用工系統調用優付渠道API做用戶簽約或資金下發時,優付系統增加了API接口請求的限流策略。
針對每一個商戶的每種類型的API請求做限流。比如:同一商戶,每秒鍾只允許20次簽約請求。當每秒請求超過20次時,會提示“客戶請求簽約接口次數超限”。
那么,我們作為API調用方,就要對並發進行控制,以防出現無效請求。
最常用的並發限流方案是借助redis/jedis。為了保證原子性,這里,我使用Redis+LUA腳本的方式來控制。
那么,
對於服務提供方來說,當請求量超出設定的限流閾值,則直接返回錯誤碼/錯誤提示,並終止對請求的處理。
而對於調用方來說呢,我們要做的是:當並發請求超出了限定閾值時,要延遲請求,而不是直接丟棄。
話不多說,上代碼吧。
redis限流器
如下RedisLimiter類定義了Redis限流器的api:服務提供方使用limit方法實現限流;服務調用方使用limitWait方法實現限流等待(如需)。
package jstudy.redislimit; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Component; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; /** * Redis+Lua實現高並發限流 */ @Slf4j @Component public class RedisLimiter { @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 達到限流時,則等待,直到新的間隔。 * * @param key * @param limitCount * @param limitSecond */ public void limitWait(String key, int limitCount, int limitSecond) { boolean ok;//放行標志 do { ok = limit(key, limitCount, limitSecond); log.info("放行標志={}", ok); if (!ok) { Long ttl = redisTemplate.getExpire(key, TimeUnit.MILLISECONDS); if (null != ttl && ttl > 0) { try { Thread.sleep(ttl); log.info("sleeped:{}", ttl); } catch (InterruptedException e) { e.printStackTrace(); } } } } while (!ok); } /** * 限流方法 true-放行;false-限流 * * @param key * @param limitCount * @param limitSecond * @return */ public boolean limit(String key, int limitCount, int limitSecond) { List<String> keys = Collections.singletonList(key); String luaScript = buildLuaScript(); RedisScript<Number> redisScript = new DefaultRedisScript<>(luaScript, Number.class); Number count = redisTemplate.execute(redisScript, keys, limitCount, limitSecond); log.info("Access try count is {} for key = {}", count, key); if (count != null && count.intValue() <= limitCount) { return true;//放行 } else { return false;//限流 // throw new RuntimeException("You have been dragged into the blacklist"); } } /** * 編寫 redis Lua 限流腳本 */ public String buildLuaScript() { StringBuilder lua = new StringBuilder(); lua.append("local c"); lua.append("\nc = redis.call('get',KEYS[1])"); // 實際調用次數超過閾值,則直接返回 lua.append("\nif c and tonumber(c) > tonumber(ARGV[1]) then"); lua.append("\nreturn c;"); lua.append("\nend"); // 執行計算器自加 lua.append("\nc = redis.call('incr',KEYS[1])"); lua.append("\nif tonumber(c) == 1 then"); // 從第一次調用開始限流,設置對應鍵值的過期 lua.append("\nredis.call('expire',KEYS[1],ARGV[2])"); lua.append("\nend"); lua.append("\nreturn c;"); return lua.toString(); } }
SpringBoot自動注入的RedisTemplate是RedisTemplate<Object,Object>泛型, 上面class使用RedisTemplate<String, Object>,bean定義如下:
package jstudy.redislimit; import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration @EnableCaching // 開啟緩存支持 public class RedisConfig extends CachingConfigurerSupport { /** * RedisTemplate配置 * * @param lettuceConnectionFactory * @return */ @Bean public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) { // 設置序列化 Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, Visibility.ANY); om.enableDefaultTyping(DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); // 配置redisTemplate RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>(); redisTemplate.setConnectionFactory(lettuceConnectionFactory); RedisSerializer<?> stringSerializer = new StringRedisSerializer(); redisTemplate.setKeySerializer(stringSerializer);// key序列化 redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);// value序列化 redisTemplate.setHashKeySerializer(stringSerializer);// Hash key序列化 redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);// Hash value序列化 redisTemplate.afterPropertiesSet(); return redisTemplate; } }
並發測試通過,如下是testcase:
package jstudy.redislimit; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @Slf4j @SpringBootTest @RunWith(SpringRunner.class) public class RedisLimiterTest { @Autowired private RedisLimiter redisLimiter; @Test public void testLimitWait() throws InterruptedException { ExecutorService pool = Executors.newCachedThreadPool(); log.info("--------{}", redisTemplate.opsForValue().get("abc")); for (int j = 1; j <= 5; j++) { int i=j; pool.execute(() -> { Thread.currentThread().setName( Thread.currentThread().getName().replace("-","_")); redisLimiter.limitWait("abc", 3, 1); log.info(i + ":" + true + " ttl:" + redisTemplate.getExpire("abc", TimeUnit.MILLISECONDS)); try { // 線程等待,模擬執行業務邏輯 Thread.sleep(new Random().nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } }); } pool.shutdown(); pool.awaitTermination(2,TimeUnit.SECONDS); } }
jedis限流算法
如下jedis算法與上面lua腳本相比,實現算法殊途同歸。不過,因為不具備原子性,高並發可能會出現冒的情況。所以,要實現精確限流,還是借助上面的lua。

public class JedisLimiter { private static JedisPool jedisPool = SpringContextUtils.getBean(JedisPool.class); /** * 訪問頻率限制 * * @param key * @param seconds * @param number * @return */ public static boolean isOverLimit(String key, int seconds, int number) { Jedis jedis = null; try { jedis = getResource(); String value = jedis.get(key); if (value == null) { jedis.set(key, "1"); jedis.expire(key, seconds); return false; } else { Long ttl = jedis.ttl(key); if (ttl.longValue() > 0) { int parseInt = Integer.parseInt(value); if (parseInt > number) { return true; } jedis.incr(key); } } return false; } catch (Exception e) { log.warn("jedis限流器異常", e); } finally { returnResource(jedis); } return false; } }
Jedis限流算法改造,利用Lua腳本保證原子性
public class JedisLimiter { private static JedisPool jedisPool = SpringContextUtils.getBean(JedisPool.class); /** * 【redis限流器】請求頻次是否超出限制 * * @param key 限流key * @param timeRange 時間范圍 * @param limitNum 在限制時間內可以請求的次數閾值 * @return 是否訪問超限 */ public static boolean isOverLimit(String key, String timeRange, String limitNum) { StringBuffer script = new StringBuffer(); // LUA腳本--如果超限,返回1;否則返回0 // set的返回值說明:"OK"-key不存在,設置成功;null-key已存在 script.append("local ok = redis.call('set', KEYS[1], 1, 'NX', 'EX', tonumber(ARGV[1])) \n") .append("if (ok) then \n") .append("return 0 \n") .append("end \n") .append("local reqNum = redis.call('incr',KEYS[1]) \n") .append("if reqNum> tonumber(ARGV[2]) then \n") .append("return 1 \n") .append("end \n") .append("return 0 \n"); Jedis jedis = null; long result; try { jedis = getResource(); // KEYS[1]:key ARGV[1]=timeRange ARGV[2]=limitNum result = (long) jedis.eval(script.toString(), 1, key, timeRange, limitNum); // 執行lua腳本返回1,表示訪問超限 if (result == 1) { logger.info("訪問超限,當前訪問次數={}", jedis.get(key)); return true; } } catch (JedisException e) { logger.error("JedisException=", e); return true; } catch (Exception e) { logger.error("jedis限流器異常", e); return true; } finally { returnResource(jedis); } return false; } }
關於TTL(Time to Live)
不管是redis還是jedis,其實都是利用了消息的ttl(Time to Live),即,當消息的ttl=0時,消息會自動過期。ttl還見諸於RabbitMQ的死信隊列,隊列里的消息會延遲消費,當等待ttl指定的時間后,才會自動轉移到實時隊列。
redis是使用RedisTemplate.expire來設置ttl;使用RedisTemplate.getExpire(key)或RedisTemplate.getExpire(key,TimeUnit)方法來獲取ttl。當然,對於並發限流,我們需要使用后者指定時間單位為TimeUnit.MILLISECONDS來得到精確的剩余毫秒數。
jedis是使用Jedis.expire來設置ttl;使用Jedis.ttl(key)方法來獲取ttl,返回的時間是毫秒。
getExpire/ttl返回值:
- -2:key不存在
- -1:未設置ttl
- n:實際的剩余ttl
redis.incr指令說明
關於redis的increment :
- 當key不存在時,創建key,默認值是delta值(不指定delta的話,則為1)。
- 當key存在時,按delta來遞增。
