本次樣例從單機層面上,采用攔截器的方式對請求限流。
資源:https://github.com/xiaozhuanfeng/rateLimiterProj
工程結構:
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.3.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- 熱啟動:項目修改,即時生效 完整的打包環境下運行的時候會被禁用 java -jar啟動應用或者用一個特定的classloader啟動 認為生產環境 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.jetbrains</groupId> <artifactId>annotations</artifactId> <version>RELEASE</version> <scope>compile</scope> </dependency> <!-- 引入guava 包--> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>23.0</version> </dependency> <!-- 引入fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
application.properties
#端口號設置
server.port=8007
#日志的輸出路徑
logging.path=/logs/rateLimiterProj
logging.config=classpath:logback-spring.xml
#自定義限流方式,默認當請求超過指定QPS,放棄請求
rateLimit.qps = 1
rateLimit.type=acquire
rateLimit.tryAcquire.permits=1
rateLimit.tryAcquire.timeOut=100
1、新建抽象攔截器
package com.example.demo.interceptor; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.util.IOUtils; import com.example.demo.constant.ResponseEnum; import com.example.demo.dto.ResponseDTO; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; import org.springframework.web.servlet.handler.HandlerInterceptorAdapter; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.PrintWriter; public abstract class AbstractInterceptor extends HandlerInterceptorAdapter { public final Logger log = LoggerFactory.getLogger(this.getClass()); /** * 具體攔截方法 * * @param req * @return */ protected abstract ResponseEnum handleRequest(HttpServletRequest req) throws Exception; @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { ResponseEnum result = ResponseEnum.SERVER_ERROR; try { result = handleRequest(request); } catch (Exception e) { log.error("handleRequest catch an Exception>>>", e); } if (ResponseEnum.OK == result) { //成功 return true; } handleFailResponse(response, result); return false; } public void handleFailResponse(HttpServletResponse resp, ResponseEnum result) { resp.setStatus(HttpServletResponse.SC_OK); resp.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE); ResponseDTO dto = new ResponseDTO(); dto.setCode(result.getCode()); dto.setMesg(result.getMesg()); PrintWriter pw = null; try { pw = resp.getWriter(); pw.write(JSON.toJSONString(dto)); } catch (IOException e) { log.error("handleFailResponse catch an IOException>>>", e); } finally { IOUtils.close(pw); } } }
2、新建RateLimiter Bean
package com.example.demo.configuration; import com.google.common.util.concurrent.RateLimiter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RateLimitConfig { @Value("${rateLimit.qps}") private double qps; @Bean public RateLimiter getRateLimiter() { if(0 == this.qps){ this.qps = 1.0D; } return RateLimiter.create(qps); } }
3、新建返回枚舉類和響應實體類

package com.example.demo.constant; /** * 自定義響應碼 */ public enum ResponseEnum { OK(200, "成功"), RATE_LIMIT(403, "訪問次數受限"), AUTHENTICATION_FAIL(401, "未授權"), SERVER_ERROR(500, "服務器錯誤"); private int code; private String mesg; ResponseEnum(int code, String mesg) { this.code = code; this.mesg = mesg; } public int getCode() { return code; } public void setCode(int code) { this.code = code; } public String getMesg() { return mesg; } public void setMesg(String mesg) { this.mesg = mesg; } @Override public String toString() { return "ResponseEnum{" + "code=" + code + ", mesg='" + mesg + '\'' + '}'; } }

package com.example.demo.dto; public class ResponseDTO { private int code; private String mesg; public ResponseDTO() { } public ResponseDTO(int code, String mesg) { this.code = code; this.mesg = mesg; } public int getCode() { return code; } public void setCode(int code) { this.code = code; } public String getMesg() { return mesg; } public void setMesg(String mesg) { this.mesg = mesg; } @Override public String toString() { return "ResponseDTO{" + "code=" + code + ", mesg='" + mesg + '\'' + '}'; } }
4、創建請求攔截器
package com.example.demo.interceptor; import com.example.demo.constant.ResponseEnum; import com.google.common.util.concurrent.RateLimiter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import javax.servlet.http.HttpServletRequest; import java.util.concurrent.TimeUnit; @Component("rateLimitInterceptor") public class RateLimitInterceptor extends AbstractInterceptor{ private static final String RATELIMIT_TYPE_ACQUIRE = "acquire"; @Value("${rateLimit.type}") private String rateLimitType; @Value("${rateLimit.tryAcquire.permits}") private int permits; @Value("${rateLimit.tryAcquire.timeOut}") private long timeout; @Autowired private RateLimiter rateLimiter; @Override protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception { if(RATELIMIT_TYPE_ACQUIRE.equals(rateLimitType)){ return passWaitReq(req); }else{ return giveUpReqWhenExceedLimit(req); } } /** * 阻塞線程直到請求可以再授予許可 * @return */ private ResponseEnum passWaitReq(HttpServletRequest req){ //permits,默認1,從RateLimiter獲取指定許可數,該方法會被阻塞直到獲取到請求數 double waitTime = rateLimiter.acquire(); log.info("請求成功......waitTime="+waitTime); return ResponseEnum.OK; } /** * 判斷是否可以立即獲取許可,否則放棄請求 * @param req * @return */ private ResponseEnum giveUpReqWhenExceedLimit(HttpServletRequest req){ //timeout 時間內獲取令牌,默認timeout=0L,如果可以則掛起等待相應時間並返回true,否則立即返回false if(0 == permits || 0L == timeout){ if(!rateLimiter.tryAcquire()){ log.warn("限流中......"); return ResponseEnum.RATE_LIMIT; } }else{ if(!rateLimiter.tryAcquire(permits,timeout, TimeUnit.MICROSECONDS)){ log.warn("permits="+permits+",timeout="+timeout+",限流中......"); return ResponseEnum.RATE_LIMIT; } } log.info("請求成功......"); return ResponseEnum.OK; } }
5、創建Controller,測試用

package com.example.demo.controller; import org.apache.juli.logging.LogFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController public class HelloController { private final Logger log = LoggerFactory.getLogger(this.getClass()); @RequestMapping(value="/rateLimit/getHello",method = RequestMethod.GET) public String getHello(){ log.info("rateLimit Hello ........"); try { //發呆1s Thread.sleep(100); } catch (InterruptedException e) { log.error("getHello catch an InterruptedException>>",e); } return "Get:Hello,World"; } @RequestMapping(value="/calculate/getHello",method = RequestMethod.GET) public String postHello(){ log.info("calculate Hello ........"); try { //發呆1s Thread.sleep(1000); } catch (InterruptedException e) { log.error("getHello catch an InterruptedException>>",e); } return "Post:Hello,World"; } }
6、配置請求攔截器
package com.example.demo.configuration; import com.example.demo.interceptor.CalculateReqInterceptor; import com.example.demo.interceptor.RateLimitInterceptor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.InterceptorRegistration; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; @Configuration public class WebInterceptorConfig extends WebMvcConfigurationSupport { @Bean public RateLimitInterceptor getRateLimitInterceptor() { return new RateLimitInterceptor(); } @Bean public CalculateReqInterceptor getCalculateReqInterceptor() { return new CalculateReqInterceptor(); } @Override protected void addInterceptors(InterceptorRegistry registry) { InterceptorRegistration rateLimitIcpt = registry.addInterceptor(getRateLimitInterceptor()); // 攔截配置 rateLimitIcpt.addPathPatterns("/rateLimit/**"); //需要計算請求時間的請求 //InterceptorRegistration calculateIcpt = registry.addInterceptor(getCalculateReqInterceptor()); //calculateIcpt.addPathPatterns("/calculate/**"); } }
測試:
@Test public void mulitplyReq() throws Exception { String url = "/rateLimit/getHello"; //String url = "/calculate/getHello"; for(int i = 0;i < 10;i++){ sendReq(i,url); } } private void sendReq(int batchNo,String url) throws Exception { /** * 1、mockMvc.perform執行一個請求。 * 2、MockMvcRequestBuilders.get("XXX")構造一個請求。 * 3、ResultActions.param添加請求傳值 * 4、ResultActions.accept(MediaType.TEXT_HTML_VALUE))設置返回類型 * 5、ResultActions.andExpect添加執行完成后的斷言。 * 6、ResultActions.andDo添加一個結果處理器,表示要對結果做點什么事情 * 比如此處使用MockMvcResultHandlers.print()輸出整個響應結果信息。 * 5、ResultActions.andReturn表示執行完成后返回相應的結果。 */ MvcResult mvcResult = mockMvc.perform(MockMvcRequestBuilders.get(url) .accept(MediaType.TEXT_HTML_VALUE)) // .andExpect(MockMvcResultMatchers.status().isOk()) //等同於Assert.assertEquals(200,status); // .andExpect(MockMvcResultMatchers.content().string("hello lvgang")) //等同於 Assert.assertEquals // ("hello lvgang",content); //.andDo(MockMvcResultHandlers.print())//打印 .andReturn(); MockHttpServletResponse result = mvcResult.getResponse(); System.out.println("Thread"+Thread.currentThread().getName()+">>>batchNo="+batchNo+":"+result.getContentAsString()); }
結果:
15:31:43.428 [main] INFO c.e.d.i.RateLimitInterceptor - 請求成功......waitTime=0.0
15:31:43.435 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=0:Get:Hello,World
15:31:43.629 [main] INFO c.e.d.i.RateLimitInterceptor - 請求成功......waitTime=0.0
15:31:43.629 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=1:Get:Hello,World
15:31:44.545 [main] INFO c.e.d.i.RateLimitInterceptor - 請求成功......waitTime=0.812705
15:31:44.545 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=2:Get:Hello,World
15:31:45.544 [main] INFO c.e.d.i.RateLimitInterceptor - 請求成功......waitTime=0.89766
15:31:45.544 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
...
切換下rateLimit.type=tryAcquire,qps=10 ,qps一定要設置恰當,否則用戶體驗將不太每秒。運行:
@Test public void threadReq() throws Exception { String url = "/rateLimit/getHello"; new Thread(()->{ for(int i = 0;i < 500;i++){ try { sendReq(i,url); } catch (Exception e) { } } }).start(); new Thread(()->{ for(int i = 0;i < 500;i++){ try { sendReq(i,url); } catch (Exception e) { } } }).start(); while(true){ Thread.sleep(60 * 1000); break; } }
下面簡單介紹其他應用級限流:
(1)限流總並發/連接/請求數
對於一個應用系統來說一定會有極限並發/請求數,即總有一個TPS/QPS閥值,如果超了閥值則系統就會不響應用戶請求或響應的非常慢,因此我們最好進行過載保護,防止大量請求涌入擊垮系統。
如果你使用過Tomcat,其Connector 其中一種配置有如下幾個參數:
acceptCount:如果Tomcat的線程都忙於響應,新來的連接會進入隊列排隊,如果超出排隊大小,則拒絕連接;
maxConnections: 瞬時最大連接數,超出的會排隊等待;
maxThreads:Tomcat能啟動用來處理請求的最大線程數,如果請求處理量一直遠遠大於最大線程數則可能會僵死。
詳細的配置請參考官方文檔。另外如Mysql(如max_connections)、Redis(如tcp-backlog)都會有類似的限制連接數的配置。
Springboot內置tomcat屬性在org.springframework.boot.autoconfigure.web.ServerProperties,可在application.properties配置屬性。
Tomcat特有配置都以”server.tomcat”作為前綴
如上面的:
#端口號設置
server.port=8007
(2)限流總資源數
如果有的資源是稀缺資源(如數據庫連接、線程),而且可能有多個系統都會去使用它,那么需要限制應用;可以使用池化技術來限制總資源數:連接池、線程池。比如分配給每個應用的數據庫連接是100,那么本應用最多可以使用100個資源,超出了可以等待或者拋異常。
(3)限流某個接口的總並發/請求數
package com.example.demo.interceptor; import com.example.demo.constant.ResponseEnum; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import javax.servlet.http.HttpServletRequest; import java.util.concurrent.atomic.AtomicLong; @Service("limitReqNumInterceptor") public class LimitReqNumInterceptor extends AbstractInterceptor{ @Autowired private AtomicLong atomic; private static final long LIMIT_NUM = 10L; @Override protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception { //AtomicLong 對請求數限流 注意:非線程安全(理論上超出限制是可以接受的),請求定制需要在攔截器配置,將會非常多,不好用 if(atomic.incrementAndGet() > LIMIT_NUM) { //拒絕請求 log.warn("超出請求數>>>,LIMIT_NUM="+LIMIT_NUM); return ResponseEnum.RATE_LIMIT; } log.warn("請求通過...."); return ResponseEnum.OK; } //@Scheduled(cron="*/6 * * * * ?") /* private void process(){ if(atomic.get() > LIMIT_NUM) { log.warn("請求達到上限,定時清理請求數>>>"+atomic.get()); //啟動定時任務,定時清空請求數 atomic.set(0L); log.warn("定時清理后>>>"+atomic.get()); } }*/ }
限流某個接口的時間窗請求數
利用guava catche,緩存uri路徑的方式
/** * 過期時間 */ private static final long EXPIRE_SEC = 20L; /** * 定制 過期時間 */ private LoadingCache<String, AtomicLong> LoadingCache = CacheBuilder.newBuilder().expireAfterWrite(EXPIRE_SEC, TimeUnit.SECONDS).build(new CacheLoader<String, AtomicLong>() { //本地緩存沒有命中時,調用load,並將結果緩存 @Override public AtomicLong load(String aLong) throws Exception { return new AtomicLong(0L); } }); @Override protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception { String uri = req.getRequestURI(); AtomicLong atomic = LoadingCache.get(uri); System.out.println("uri="+uri+",當前請求數,number="+atomic.get()); if(atomic.incrementAndGet() > LIMIT_NUM) { //拒絕請求 log.warn("uri="+uri+"超出請求數>>>,LIMIT_NUM="+LIMIT_NUM); return ResponseEnum.RATE_LIMIT; } log.warn("請求通過...."); return ResponseEnum.OK; }
Controller,攔截器設置略

@RequestMapping(value="/numLimit/getHello",method = RequestMethod.GET) public String sayHello(){ log.info("numLimit Hello ........"); try { //發呆1s Thread.sleep(1000); } catch (InterruptedException e) { log.error("getHello catch an InterruptedException>>",e); } return "Say:Hello,World"; } @RequestMapping(value="/numLimit/getHello2",method = RequestMethod.GET) public String sayHello2(){ log.info("numLimit Hello2 ........"); try { //發呆1s Thread.sleep(1000); } catch (InterruptedException e) { log.error("getHello catch an InterruptedException>>",e); } return "Say:Hello,World"; }
測試:
@Test public void mulitplyReq2() throws Exception { String url = "/numLimit/getHello"; //String url = "/calculate/getHello"; for (int i = 0; i < 100; i++) { sendReq(i, url); } String url2 = "/numLimit/getHello2"; for (int i = 0; i < 15; i++) { sendReq(i, url2); } Thread.sleep(15 * 1000L); for (int i = 0; i < 15; i++) { sendReq(i, url); } }
分布是限流
1、lua+redis
package com.example.demo.interceptor; import com.example.demo.constant.ResponseEnum; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.scripting.support.ResourceScriptSource; import org.springframework.stereotype.Service; import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @Service("limitReqLuaInterceptor") public class LimitReqLuaInterceptor extends AbstractInterceptor implements InitializingBean { private final Logger log = LoggerFactory.getLogger(this.getClass()); private static final String LIMIT_NUM = "10"; /** * 過期時間 */ private static final String EXPIRE_SEC = "20"; DefaultRedisScript<Long> redisScript = new DefaultRedisScript<Long>(); @Resource private StringRedisTemplate stringRedisTemplate; @Override public void afterPropertiesSet() throws Exception { redisScript.setResultType(Long.class); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/rateLimitReq.lua"))); } @Override protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception { String uri = req.getRequestURI(); //KEY 這里可以控制細粒度,如:ip+uri+參數 List<String> keys = Lists.newArrayList(uri); //ARGV String[] argvs = new String[]{LIMIT_NUM, EXPIRE_SEC}; try { Long res = (Long) stringRedisTemplate.execute(redisScript, keys, argvs); if (0L == res) { //拒絕請求 log.warn("uri=" + uri + "超出請求數>>>,LIMIT_NUM=" + LIMIT_NUM); return ResponseEnum.RATE_LIMIT; } } catch (Exception e) { log.error("執行lua異常,受限請求皆通過>>>>", e); } return ResponseEnum.OK; } }
local key = KEYS[1] --限流大小 local limit = tonumber(ARGV[1]) local timeOut = ARGV[2] local current = tonumber(redis.call("get", key) or "0") if current >= limit then --如果超出限流大小 return 0 else --請求數+1,並設置2秒過期 redis.call("INCRBY", key,"1") redis.call("expire", key,timeOut) return 1 end
本例是限制總的請求次數,也可以采用令牌桶方法實現等
2、nginx+lua
待學習。。
參考:
https://www.cnblogs.com/xuwc/p/9123078.html