寫在前面
在互聯網應用中,高並發系統會面臨一個重大的挑戰,那就是大量流高並發訪問,比如:天貓的雙十一、京東618、秒殺、搶購促銷等,這些都是典型的大流量高並發場景。關於秒殺,小伙伴們可以參見我的另一篇文章《【高並發】高並發秒殺系統架構解密,不是所有的秒殺都是秒殺!》
關於【冰河技術】微信公眾號,解鎖更多【高並發】專題文章。
注意:由於原文篇幅比較長,所以被拆分為:理論、算法、實戰(HTTP接口實戰+分布式限流實戰)三大部分。
理論篇:《【高並發】如何實現億級流量下的分布式限流?這些理論你必須掌握!!》
算法篇:《【高並發】如何實現億級流量下的分布式限流?這些算法你必須掌握!!》
項目源碼已提交到github:https://github.com/sunshinelyz/mykit-ratelimiter
本文是在《【高並發】億級流量場景下如何為HTTP接口限流?看完我懂了!!》一文的基礎上進行實現,有關項目的搭建可參見《【高並發】億級流量場景下如何為HTTP接口限流?看完我懂了!!》一文的內容。小伙伴們可以關注【冰河技術】微信公眾號來閱讀上述文章。
前面介紹的限流方案有一個缺陷就是:它不是全局的,不是分布式的,無法很好的應對分布式場景下的大流量沖擊。那么,接下來,我們就介紹下如何實現億級流量下的分布式限流。
分布式限流的關鍵就是需要將限流服務做成全局的,統一的。可以采用Redis+Lua技術實現,通過這種技術可以實現高並發和高性能的限流。
Lua是一種輕量小巧的腳本編程語言,用標准的C語言編寫的開源腳本,其設計的目的是為了嵌入到應用程序中,為應用程序提供靈活的擴展和定制功能。
Redis+Lua腳本實現分布式限流思路
我們可以使用Redia+Lua腳本的方式來對我們的分布式系統進行統一的全局限流,Redis+Lua實現的Lua腳本:
local key = KEYS[1] --限流KEY(一秒一個)
local limit = tonumber(ARGV[1]) --限流大小
local current = tonumber(redis.call('get', key) or "0")
if current + 1 > limit then --如果超出限流大小
return 0
else --請求數+1,並設置2秒過期
redis.call("INCRBY", key, "1")
redis.call("expire", key "2")
return 1
end
我們可以按照如下的思路來理解上述Lua腳本代碼。
(1)在Lua腳本中,有兩個全局變量,用來接收Redis應用端傳遞的鍵和其他參數,分別為:KEYS、ARGV;
(2)在應用端傳遞KEYS時是一個數組列表,在Lua腳本中通過索引下標方式獲取數組內的值。
(3)在應用端傳遞ARGV時參數比較靈活,可以是一個或多個獨立的參數,但對應到Lua腳本中統一用ARGV這個數組接收,獲取方式也是通過數組下標獲取。
(4)以上操作是在一個Lua腳本中,又因為我當前使用的是Redis 5.0版本(Redis 6.0支持多線程),執行的請求是單線程的,因此,Redis+Lua的處理方式是線程安全的,並且具有原子性。
這里,需要注意一個知識點,那就是原子性操作:如果一個操作時不可分割的,是多線程安全的,我們就稱為原子性操作。
接下來,我們可以使用如下Java代碼來判斷是否需要限流。
//List設置Lua的KEYS[1]
String key = "ip:" + System.currentTimeMillis() / 1000;
List<String> keyList = Lists.newArrayList(key);
//List設置Lua的ARGV[1]
List<String> argvList = Lists.newArrayList(String.valueOf(value));
//調用Lua腳本並執行
List result = stringRedisTemplate.execute(redisScript, keyList, argvList)
至此,我們簡單的介紹了使用Redis+Lua腳本實現分布式限流的總體思路,並給出了Lua腳本的核心代碼和Java程序調用Lua腳本的核心代碼。接下來,我們就動手寫一個使用Redis+Lua腳本實現的分布式限流案例。
Redis+Lua腳本實現分布式限流案例
這里,我們和在《【高並發】億級流量場景下如何為HTTP接口限流?看完我懂了!!》一文中的實現方式類似,也是通過自定義注解的形式來實現分布式、大流量場景下的限流,只不過這里我們使用了Redis+Lua腳本的方式實現了全局統一的限流模式。接下來,我們就一起手動實現這個案例。
創建注解
首先,我們在項目中,定義個名稱為MyRedisLimiter的注解,具體代碼如下所示。
package io.mykit.limiter.annotation;
import org.springframework.core.annotation.AliasFor;
import java.lang.annotation.*;
/**
* @author binghe
* @version 1.0.0
* @description 自定義注解實現分布式限流
*/
@Target(value = ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MyRedisLimiter {
@AliasFor("limit")
double value() default Double.MAX_VALUE;
double limit() default Double.MAX_VALUE;
}
在MyRedisLimiter注解內部,我們為value屬性添加了別名limit,在我們真正使用@MyRedisLimiter注解時,即可以使用@MyRedisLimiter(10),也可以使用@MyRedisLimiter(value=10),還可以使用@MyRedisLimiter(limit=10)。
創建切面類
創建注解后,我們就來創建一個切面類MyRedisLimiterAspect,MyRedisLimiterAspect類的作用主要是解析@MyRedisLimiter注解,並且執行限流的規則。這樣,就不需要我們在每個需要限流的方法中執行具體的限流邏輯了,只需要我們在需要限流的方法上添加@MyRedisLimiter注解即可,具體代碼如下所示。
package io.mykit.limiter.aspect;
import com.google.common.collect.Lists;
import io.mykit.limiter.annotation.MyRedisLimiter;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletResponse;
import java.io.PrintWriter;
import java.util.List;
/**
* @author binghe
* @version 1.0.0
* @description MyRedisLimiter注解的切面類
*/
@Aspect
@Component
public class MyRedisLimiterAspect {
private final Logger logger = LoggerFactory.getLogger(MyRedisLimiter.class);
@Autowired
private HttpServletResponse response;
@Autowired
private StringRedisTemplate stringRedisTemplate;
private DefaultRedisScript<List> redisScript;
@PostConstruct
public void init(){
redisScript = new DefaultRedisScript<List>();
redisScript.setResultType(List.class);
redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(("limit.lua"))));
}
@Pointcut("execution(public * io.mykit.limiter.controller.*.*(..))")
public void pointcut(){
}
@Around("pointcut()")
public Object process(ProceedingJoinPoint proceedingJoinPoint) throws Throwable{
MethodSignature signature = (MethodSignature) proceedingJoinPoint.getSignature();
//使用反射獲取MyRedisLimiter注解
MyRedisLimiter myRedisLimiter = signature.getMethod().getDeclaredAnnotation(MyRedisLimiter.class);
if(myRedisLimiter == null){
//正常執行方法
return proceedingJoinPoint.proceed();
}
//獲取注解上的參數,獲取配置的速率
double value = myRedisLimiter.value();
//List設置Lua的KEYS[1]
String key = "ip:" + System.currentTimeMillis() / 1000;
List<String> keyList = Lists.newArrayList(key);
//List設置Lua的ARGV[1]
List<String> argvList = Lists.newArrayList(String.valueOf(value));
//調用Lua腳本並執行
List result = stringRedisTemplate.execute(redisScript, keyList, String.valueOf(value));
logger.info("Lua腳本的執行結果:" + result);
//Lua腳本返回0,表示超出流量大小,返回1表示沒有超出流量大小。
if("0".equals(result.get(0).toString())){
fullBack();
return null;
}
//獲取到令牌,繼續向下執行
return proceedingJoinPoint.proceed();
}
private void fullBack() {
response.setHeader("Content-Type" ,"text/html;charset=UTF8");
PrintWriter writer = null;
try{
writer = response.getWriter();
writer.println("回退失敗,請稍后閱讀。。。");
writer.flush();
}catch (Exception e){
e.printStackTrace();
}finally {
if(writer != null){
writer.close();
}
}
}
}
上述代碼會讀取項目classpath目錄下的limit.lua腳本文件來確定是否執行限流的操作,調用limit.lua文件執行的結果返回0則表示執行限流邏輯,否則不執行限流邏輯。既然,項目中需要使用Lua腳本,那么,接下來,我們就需要在項目中創建Lua腳本。
創建limit.lua腳本文件
在項目的classpath目錄下創建limit.lua腳本文件,文件的內容如下所示。
local key = KEYS[1] --限流KEY(一秒一個)
local limit = tonumber(ARGV[1]) --限流大小
local current = tonumber(redis.call('get', key) or "0")
if current + 1 > limit then --如果超出限流大小
return 0
else --請求數+1,並設置2秒過期
redis.call("INCRBY", key, "1")
redis.call("expire", key "2")
return 1
end
limit.lua腳本文件的內容比較簡單,這里就不再贅述了。
接口添加注解
注解類、解析注解的切面類、Lua腳本文件都已經准備好。那么,接下來,我們在PayController類中在sendMessage2()方法上添加@MyRedisLimiter注解,並且將limit屬性設置為10,如下所示。
@MyRedisLimiter(limit = 10)
@RequestMapping("/boot/send/message2")
public String sendMessage2(){
//記錄返回接口
String result = "";
boolean flag = messageService.sendMessage("恭喜您成長值+1");
if (flag){
result = "短信發送成功!";
return result;
}
result = "哎呀,服務器開小差了,請再試一下吧";
return result;
}
此處,我們限制了sendMessage2()方法,每秒鍾最多只能處理10個請求。那么。接下來,我們就使用JMeter對sendMessage2()進行測試。
測試分布式限流
此時,我們使用JMeter進行壓測,這里,我們配置的線程數為50,也就是說:會有50個線程同時訪問我們寫的接口。JMeter的配置如下所示。
保存並運行Jemeter,如下所示。
運行完成后,我們來查看下JMeter的測試結果,如下所示。
從測試結果可以看出,測試中途有部分接口的訪問返回了“哎呀,服務器開小差了,請再試一下吧”,說明接口被限流了。而再往后,又有部分接口成功返回了“短信發送成功!”的字樣。這是因為我們設置的是接口每秒最多接受10次請求,在第一秒內訪問接口時,前面的10次請求成功返回“短信發送成功!”的字樣,后面再訪問接口就會返回“哎呀,服務器開小差了,請再試一下吧”。而后面的請求又返回了“短信發送成功!”的字樣,說明后面的請求已經是在第二秒的時候調用的接口。
我們使用Redis+Lua腳本的方式實現的限流方式,可以將Java程序進行集群部署,這種方式實現的是全局的統一的限流,無論客戶端訪問的是集群中的哪個節點,都會對訪問進行計數並實現最終的限流效果。
這種思想就有點像分布式鎖了,小伙伴們可以關注【冰河技術】微信公眾號閱讀我寫的一篇《【高並發】高並發分布式鎖架構解密,不是所有的鎖都是分布式鎖!!》來深入理解如何實現真正線程安全的分布式鎖,此文章,以循序漸進的方式深入剖析了實現分布式鎖過程中的各種坑和解決方案,讓你真正理解什么才是分布式鎖。
Nginx+Lua實現分布式限流
Nginx+Lua實現分布式限流,通常會用在應用的入口處,也就是對系統的流量入口進行限流。這里,我們也以一個實際案例的形式來說明如何使用Nginx+Lua來實現分布式限流。
首先,我們需要創建一個Lua腳本,腳本文件的內容如下所示。
local locks = require "resty.lock"
local function acquire()
local lock =locks:new("locks")
local elapsed, err =lock:lock("limit_key") --互斥鎖
local limit_counter =ngx.shared.limit_counter --計數器
local key = "ip:" ..os.time()
local limit = 5 --限流大小
local current =limit_counter:get(key)
if current ~= nil and current + 1> limit then --如果超出限流大小
lock:unlock()
return 0
end
if current == nil then
limit_counter:set(key, 1, 1) --第一次需要設置過期時間,設置key的值為1,過期時間為1秒
else
limit_counter:incr(key, 1) --第二次開始加1即可
end
lock:unlock()
return 1
end
ngx.print(acquire())
實現中我們需要使用lua-resty-lock互斥鎖模塊來解決原子性問題(在實際工程中使用時請考慮獲取鎖的超時問題),並使用ngx.shared.DICT共享字典來實現計數器。如果需要限流則返回0,否則返回1。使用時需要先定義兩個共享字典(分別用來存放鎖和計數器數據)。
接下來,需要在Nginx的nginx.conf配置文件中定義數據字典,如下所示。
http {
……
lua_shared_dict locks 10m;
lua_shared_dict limit_counter 10m;
}
靈魂拷問
說到這里,相信有很多小伙伴可能會問:如果應用並發量非常大,那么,Redis或者Nginx能不能扛的住呢?
可以這么說:Redis和Nginx基本都是高性能的互聯網組件,對於一般互聯網公司的高並發流量是完全沒有問題的。為什么這么說呢?咱們繼續往下看。
如果你的應用流量真的非常大,可以通過一致性哈希將分布式限流進行分片,還可以將限流降級為應用級限流;解決方案也非常多,可以根據實際情況進行調整,使用Redis+Lua的方式進行限流,是可以穩定達到對上億級別的高並發流量進行限流的(筆者親身經歷)。
需要注意的是:面對高並發系統,尤其是這種流量上千萬、上億級別的高並發系統,我們不可能只用限流這一招,還要加上其他的一些措施,
對於分布式限流,目前遇到的場景是業務上的限流,而不是流量入口的限流。對於流量入口的限流,應該在接入層來完成。
對於秒殺場景來說,可以在流量入口處進行限流,小伙伴們可以關注【冰河技術】微信公眾號,來閱讀我寫的《【高並發】高並發秒殺系統架構解密,不是所有的秒殺都是秒殺!》一文,來深入理解如何架構一個高並發秒殺系統
重磅福利
關注「 冰河技術 」微信公眾號,后台回復 “設計模式” 關鍵字領取《深入淺出Java 23種設計模式》PDF文檔。回復“Java8”關鍵字領取《Java8新特性教程》PDF文檔。回復“限流”關鍵字獲取《億級流量下的分布式限流解決方案》PDF文檔,三本PDF均是由冰河原創並整理的超硬核教程,面試必備!!
好了,今天就聊到這兒吧!別忘了點個贊,給個在看和轉發,讓更多的人看到,一起學習,一起進步!!
寫在最后
如果你覺得冰河寫的還不錯,請微信搜索並關注「 冰河技術 」微信公眾號,跟冰河學習高並發、分布式、微服務、大數據、互聯網和雲原生技術,「 冰河技術 」微信公眾號更新了大量技術專題,每一篇技術文章干貨滿滿!不少讀者已經通過閱讀「 冰河技術 」微信公眾號文章,吊打面試官,成功跳槽到大廠;也有不少讀者實現了技術上的飛躍,成為公司的技術骨干!如果你也想像他們一樣提升自己的能力,實現技術能力的飛躍,進大廠,升職加薪,那就關注「 冰河技術 」微信公眾號吧,每天更新超硬核技術干貨,讓你對如何提升技術能力不再迷茫!