高並發之限流實現(三)


本次樣例從單機層面上,采用攔截器的方式對請求限流。

資源:https://github.com/xiaozhuanfeng/rateLimiterProj

 工程結構:

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- 熱啟動:項目修改,即時生效
          完整的打包環境下運行的時候會被禁用
          java -jar啟動應用或者用一個特定的classloader啟動 認為生產環境
          -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.jetbrains</groupId>
            <artifactId>annotations</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>

        <!-- 引入guava 包-->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>23.0</version>
        </dependency>

        <!-- 引入fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

application.properties

#端口號設置
server.port=8007
#日志的輸出路徑
logging.path=/logs/rateLimiterProj
logging.config=classpath:logback-spring.xml

#自定義限流方式,默認當請求超過指定QPS,放棄請求
rateLimit.qps = 1
rateLimit.type=acquire
rateLimit.tryAcquire.permits=1
rateLimit.tryAcquire.timeOut=100

1、新建抽象攔截器

package com.example.demo.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.util.IOUtils;
import com.example.demo.constant.ResponseEnum;
import com.example.demo.dto.ResponseDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.handler.HandlerInterceptorAdapter;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;

public abstract class AbstractInterceptor extends HandlerInterceptorAdapter {
    public final Logger log = LoggerFactory.getLogger(this.getClass());

    /**
     * 具體攔截方法
     *
     * @param req
     * @return
     */
    protected abstract ResponseEnum handleRequest(HttpServletRequest req) throws Exception;

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        ResponseEnum result = ResponseEnum.SERVER_ERROR;
        try {
            result = handleRequest(request);
        } catch (Exception e) {
            log.error("handleRequest catch an Exception>>>", e);
        }

        if (ResponseEnum.OK == result) {
            //成功
            return true;
        }

        handleFailResponse(response, result);
        return false;
    }

    public void handleFailResponse(HttpServletResponse resp, ResponseEnum result) {
        resp.setStatus(HttpServletResponse.SC_OK);
        resp.setContentType(MediaType.APPLICATION_JSON_UTF8_VALUE);

        ResponseDTO dto = new ResponseDTO();
        dto.setCode(result.getCode());
        dto.setMesg(result.getMesg());
        PrintWriter pw = null;
        try {
            pw = resp.getWriter();
            pw.write(JSON.toJSONString(dto));
        } catch (IOException e) {
            log.error("handleFailResponse catch an IOException>>>", e);
        } finally {
            IOUtils.close(pw);
        }
    }
}

2、新建RateLimiter Bean

package com.example.demo.configuration;

import com.google.common.util.concurrent.RateLimiter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RateLimitConfig {

    @Value("${rateLimit.qps}")
    private double qps;

    @Bean
    public RateLimiter getRateLimiter() {

        if(0 == this.qps){
            this.qps = 1.0D;
        }
        return RateLimiter.create(qps);
    }

}

3、新建返回枚舉類和響應實體類

package com.example.demo.constant;

/**
 * 自定義響應碼
 */
public enum ResponseEnum {
    OK(200, "成功"),
    RATE_LIMIT(403, "訪問次數受限"),
    AUTHENTICATION_FAIL(401, "未授權"),
    SERVER_ERROR(500, "服務器錯誤");

    private int code;

    private String mesg;

    ResponseEnum(int code, String mesg) {
        this.code = code;
        this.mesg = mesg;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMesg() {
        return mesg;
    }

    public void setMesg(String mesg) {
        this.mesg = mesg;
    }

    @Override
    public String toString() {
        return "ResponseEnum{" +
                "code=" + code +
                ", mesg='" + mesg + '\'' +
                '}';
    }
}
View Code
package com.example.demo.dto;

public class ResponseDTO {
    private int code;

    private String mesg;

    public ResponseDTO() {
    }

    public ResponseDTO(int code, String mesg) {
        this.code = code;
        this.mesg = mesg;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMesg() {
        return mesg;
    }

    public void setMesg(String mesg) {
        this.mesg = mesg;
    }

    @Override
    public String toString() {
        return "ResponseDTO{" +
                "code=" + code +
                ", mesg='" + mesg + '\'' +
                '}';
    }
}
View Code

4、創建請求攔截器

package com.example.demo.interceptor;

import com.example.demo.constant.ResponseEnum;
import com.google.common.util.concurrent.RateLimiter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.TimeUnit;

@Component("rateLimitInterceptor")
public class RateLimitInterceptor extends AbstractInterceptor{

    private static final String RATELIMIT_TYPE_ACQUIRE = "acquire";

    @Value("${rateLimit.type}")
    private String rateLimitType;

    @Value("${rateLimit.tryAcquire.permits}")
    private int permits;

    @Value("${rateLimit.tryAcquire.timeOut}")
    private long timeout;

    @Autowired
    private RateLimiter rateLimiter;

    @Override
    protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception {
        if(RATELIMIT_TYPE_ACQUIRE.equals(rateLimitType)){
            return passWaitReq(req);
        }else{
            return giveUpReqWhenExceedLimit(req);
        }
    }

    /**
     * 阻塞線程直到請求可以再授予許可
     * @return
     */
    private ResponseEnum passWaitReq(HttpServletRequest req){
        //permits,默認1,從RateLimiter獲取指定許可數,該方法會被阻塞直到獲取到請求數
        double waitTime = rateLimiter.acquire();
        log.info("請求成功......waitTime="+waitTime);
        return ResponseEnum.OK;
    }

    /**
     * 判斷是否可以立即獲取許可,否則放棄請求
     * @param req
     * @return
     */
    private ResponseEnum giveUpReqWhenExceedLimit(HttpServletRequest req){
        //timeout 時間內獲取令牌,默認timeout=0L,如果可以則掛起等待相應時間並返回true,否則立即返回false
        if(0 == permits || 0L == timeout){
            if(!rateLimiter.tryAcquire()){
                log.warn("限流中......");
                return ResponseEnum.RATE_LIMIT;
            }
        }else{
            if(!rateLimiter.tryAcquire(permits,timeout, TimeUnit.MICROSECONDS)){
                log.warn("permits="+permits+",timeout="+timeout+",限流中......");
                return ResponseEnum.RATE_LIMIT;
            }
        }

        log.info("請求成功......");
        return ResponseEnum.OK;
    }
}


5、創建Controller,測試用

package com.example.demo.controller;

import org.apache.juli.logging.LogFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    @RequestMapping(value="/rateLimit/getHello",method = RequestMethod.GET)
    public String getHello(){
        log.info("rateLimit Hello ........");

        try {
            //發呆1s
            Thread.sleep(100);
        } catch (InterruptedException e) {
            log.error("getHello catch an InterruptedException>>",e);
        }

        return "Get:Hello,World";
    }

    @RequestMapping(value="/calculate/getHello",method = RequestMethod.GET)
    public String postHello(){
        log.info("calculate Hello ........");

        try {
            //發呆1s
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log.error("getHello catch an InterruptedException>>",e);
        }

        return "Post:Hello,World";
    }
}
View Code

6、配置請求攔截器

package com.example.demo.configuration;

import com.example.demo.interceptor.CalculateReqInterceptor;
import com.example.demo.interceptor.RateLimitInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;

@Configuration
public class WebInterceptorConfig extends WebMvcConfigurationSupport {

    @Bean
    public RateLimitInterceptor getRateLimitInterceptor() {
        return new RateLimitInterceptor();
    }

    @Bean
    public CalculateReqInterceptor getCalculateReqInterceptor() {
        return new CalculateReqInterceptor();
    }

    @Override
    protected void addInterceptors(InterceptorRegistry registry) {
        InterceptorRegistration rateLimitIcpt = registry.addInterceptor(getRateLimitInterceptor());
        // 攔截配置
        rateLimitIcpt.addPathPatterns("/rateLimit/**");

        //需要計算請求時間的請求
        //InterceptorRegistration calculateIcpt = registry.addInterceptor(getCalculateReqInterceptor());
        //calculateIcpt.addPathPatterns("/calculate/**");
    }
}

測試:

  @Test
    public void mulitplyReq() throws Exception {
        String url = "/rateLimit/getHello";
        //String url = "/calculate/getHello";
        for(int i = 0;i < 10;i++){
            sendReq(i,url);
        }
    }
     private void sendReq(int batchNo,String url) throws Exception {
        /**
         * 1、mockMvc.perform執行一個請求。
         * 2、MockMvcRequestBuilders.get("XXX")構造一個請求。
         * 3、ResultActions.param添加請求傳值
         * 4、ResultActions.accept(MediaType.TEXT_HTML_VALUE))設置返回類型
         * 5、ResultActions.andExpect添加執行完成后的斷言。
         * 6、ResultActions.andDo添加一個結果處理器,表示要對結果做點什么事情
         *   比如此處使用MockMvcResultHandlers.print()輸出整個響應結果信息。
         * 5、ResultActions.andReturn表示執行完成后返回相應的結果。
         */

        MvcResult mvcResult = mockMvc.perform(MockMvcRequestBuilders.get(url)
                .accept(MediaType.TEXT_HTML_VALUE))
                // .andExpect(MockMvcResultMatchers.status().isOk())             //等同於Assert.assertEquals(200,status);
                // .andExpect(MockMvcResultMatchers.content().string("hello lvgang"))    //等同於 Assert.assertEquals
                // ("hello lvgang",content);
                //.andDo(MockMvcResultHandlers.print())//打印
                .andReturn();
        MockHttpServletResponse result = mvcResult.getResponse();
        System.out.println("Thread"+Thread.currentThread().getName()+">>>batchNo="+batchNo+":"+result.getContentAsString());
    }

結果:

15:31:43.428 [main] INFO c.e.d.i.RateLimitInterceptor - 請求成功......waitTime=0.0
15:31:43.435 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=0:Get:Hello,World
15:31:43.629 [main] INFO c.e.d.i.RateLimitInterceptor - 請求成功......waitTime=0.0
15:31:43.629 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=1:Get:Hello,World
15:31:44.545 [main] INFO c.e.d.i.RateLimitInterceptor - 請求成功......waitTime=0.812705
15:31:44.545 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........
Threadmain>>>batchNo=2:Get:Hello,World
15:31:45.544 [main] INFO c.e.d.i.RateLimitInterceptor - 請求成功......waitTime=0.89766
15:31:45.544 [main] INFO c.e.demo.controller.HelloController - rateLimit Hello ........

...

切換下rateLimit.type=tryAcquire,qps=10 ,qps一定要設置恰當,否則用戶體驗將不太每秒。運行:

@Test
    public void threadReq() throws Exception {
        String url = "/rateLimit/getHello";
        new Thread(()->{
            for(int i = 0;i < 500;i++){
                try {
                    sendReq(i,url);
                } catch (Exception e) {
                }
            }
        }).start();

        new Thread(()->{
            for(int i = 0;i < 500;i++){
                try {
                    sendReq(i,url);
                } catch (Exception e) {
                }
            }
        }).start();

        while(true){
            Thread.sleep(60 * 1000);
            break;
        }
    }


下面簡單介紹其他應用級限流:

(1)限流總並發/連接/請求數

對於一個應用系統來說一定會有極限並發/請求數,即總有一個TPS/QPS閥值,如果超了閥值則系統就會不響應用戶請求或響應的非常慢,因此我們最好進行過載保護,防止大量請求涌入擊垮系統。

如果你使用過Tomcat,其Connector 其中一種配置有如下幾個參數:

acceptCount:如果Tomcat的線程都忙於響應,新來的連接會進入隊列排隊,如果超出排隊大小,則拒絕連接;

maxConnections: 瞬時最大連接數,超出的會排隊等待;

maxThreads:Tomcat能啟動用來處理請求的最大線程數,如果請求處理量一直遠遠大於最大線程數則可能會僵死。

詳細的配置請參考官方文檔。另外如Mysql(如max_connections)、Redis(如tcp-backlog)都會有類似的限制連接數的配置。

Springboot內置tomcat屬性在org.springframework.boot.autoconfigure.web.ServerProperties,可在application.properties配置屬性。

Tomcat特有配置都以”server.tomcat”作為前綴

如上面的:

#端口號設置

server.port=8007

(2)限流總資源數

如果有的資源是稀缺資源(如數據庫連接、線程),而且可能有多個系統都會去使用它,那么需要限制應用;可以使用池化技術來限制總資源數:連接池、線程池。比如分配給每個應用的數據庫連接是100,那么本應用最多可以使用100個資源,超出了可以等待或者拋異常。

(3)限流某個接口的總並發/請求數

package com.example.demo.interceptor;

import com.example.demo.constant.ResponseEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.atomic.AtomicLong;

@Service("limitReqNumInterceptor")
public class LimitReqNumInterceptor extends AbstractInterceptor{

    @Autowired
    private AtomicLong atomic;

    private static final long LIMIT_NUM = 10L;

    @Override
    protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception {
        //AtomicLong 對請求數限流  注意:非線程安全(理論上超出限制是可以接受的),請求定制需要在攔截器配置,將會非常多,不好用
        if(atomic.incrementAndGet() > LIMIT_NUM) {
            //拒絕請求
            log.warn("超出請求數>>>,LIMIT_NUM="+LIMIT_NUM);
            return ResponseEnum.RATE_LIMIT;
        }
        log.warn("請求通過....");
        return ResponseEnum.OK;
    }

    //@Scheduled(cron="*/6 * * * * ?")
   /* private void process(){
        if(atomic.get() > LIMIT_NUM) {
            log.warn("請求達到上限,定時清理請求數>>>"+atomic.get());
            //啟動定時任務,定時清空請求數
            atomic.set(0L);
            log.warn("定時清理后>>>"+atomic.get());
        }
    }*/

}

限流某個接口的時間窗請求數

利用guava catche,緩存uri路徑的方式

/**
     * 過期時間
     */
    private static final long EXPIRE_SEC = 20L;

    /**
     * 定制  過期時間
     */
    private LoadingCache<String, AtomicLong> LoadingCache =
            CacheBuilder.newBuilder().expireAfterWrite(EXPIRE_SEC, TimeUnit.SECONDS).build(new CacheLoader<String,
                    AtomicLong>() {
                //本地緩存沒有命中時,調用load,並將結果緩存
                @Override
                public AtomicLong load(String aLong) throws Exception {
                    return new AtomicLong(0L);
                }
            });

    @Override
    protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception {
        String uri = req.getRequestURI();
        AtomicLong atomic = LoadingCache.get(uri);
        System.out.println("uri="+uri+",當前請求數,number="+atomic.get());

        if(atomic.incrementAndGet() > LIMIT_NUM) {
            //拒絕請求
            log.warn("uri="+uri+"超出請求數>>>,LIMIT_NUM="+LIMIT_NUM);
            return ResponseEnum.RATE_LIMIT;
        }
        log.warn("請求通過....");
        return ResponseEnum.OK;
    }

Controller,攔截器設置略

@RequestMapping(value="/numLimit/getHello",method = RequestMethod.GET)
    public String sayHello(){
        log.info("numLimit Hello ........");

        try {
            //發呆1s
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log.error("getHello catch an InterruptedException>>",e);
        }
        return "Say:Hello,World";
    }

    @RequestMapping(value="/numLimit/getHello2",method = RequestMethod.GET)
    public String sayHello2(){
        log.info("numLimit Hello2 ........");

        try {
            //發呆1s
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            log.error("getHello catch an InterruptedException>>",e);
        }
        return "Say:Hello,World";
    }
View Code

測試:

@Test
    public void mulitplyReq2() throws Exception {
        String url = "/numLimit/getHello";
        //String url = "/calculate/getHello";
        for (int i = 0; i < 100; i++) {
            sendReq(i, url);
        }

        String url2 = "/numLimit/getHello2";
        for (int i = 0; i < 15; i++) {
            sendReq(i, url2);
        }

        Thread.sleep(15 * 1000L);
        for (int i = 0; i < 15; i++) {
            sendReq(i, url);
        }
    }

分布是限流

1、lua+redis

package com.example.demo.interceptor;

import com.example.demo.constant.ResponseEnum;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.scripting.support.ResourceScriptSource;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

@Service("limitReqLuaInterceptor")
public class LimitReqLuaInterceptor extends AbstractInterceptor implements InitializingBean {

    private final Logger log = LoggerFactory.getLogger(this.getClass());

    private static final String LIMIT_NUM = "10";

    /**
     * 過期時間
     */
    private static final String EXPIRE_SEC = "20";

    DefaultRedisScript<Long> redisScript = new DefaultRedisScript<Long>();

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void afterPropertiesSet() throws Exception {
        redisScript.setResultType(Long.class);
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("lua/rateLimitReq.lua")));
    }

    @Override
    protected ResponseEnum handleRequest(HttpServletRequest req) throws Exception {
        String uri = req.getRequestURI();

        //KEY  這里可以控制細粒度,如:ip+uri+參數
        List<String> keys = Lists.newArrayList(uri);
        //ARGV
        String[] argvs = new String[]{LIMIT_NUM, EXPIRE_SEC};

        try {
            Long res = (Long) stringRedisTemplate.execute(redisScript, keys, argvs);

            if (0L == res) {
                //拒絕請求
                log.warn("uri=" + uri + "超出請求數>>>,LIMIT_NUM=" + LIMIT_NUM);
                return ResponseEnum.RATE_LIMIT;
            }
        } catch (Exception e) {
            log.error("執行lua異常,受限請求皆通過>>>>", e);
        }

        return ResponseEnum.OK;
    }

}
local key = KEYS[1]
--限流大小
local limit = tonumber(ARGV[1])
local timeOut = ARGV[2]
local current = tonumber(redis.call("get", key) or "0")

if current >= limit then --如果超出限流大小
    return 0
else --請求數+1,並設置2秒過期
    redis.call("INCRBY", key,"1")
    redis.call("expire", key,timeOut)
    return 1
end

本例是限制總的請求次數,也可以采用令牌桶方法實現等

2、nginx+lua

待學習。。

參考:

https://www.cnblogs.com/xuwc/p/9123078.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM