限流神器之-Guava RateLimiter 實戰


前段時間,項目中需要對某些訪問量較高的路徑進行訪問並發數控制,以及有些功能,比如Excel導出下載功能,數據量很大的情況下,用戶不斷的點擊下載按鈕,重復請求數據庫,導致線上數據庫掛掉。於是在這樣的情況下,這個限流組件應運而生,也許有人會提及SpringCloud zuul,其實它的現也是借助了RateLimiter。由於項目使用的是SpringBoot,也就沒往外思考。反正最后功能實現了就行,畢竟殊途同歸啊。本文只是用代碼來快速的幫你理清整個限流的流程,至於RateLimiter中具體限流算法以及Semaphore信號量的具體實現還是得自己去深挖了,這里就不再展開了。正片時間到:

0、由於需要對限流的路徑進行后台管理,那限流實體肯定是需要的

public class RateLimit {
    private String rateLimitId;
    /**
     * 限流路徑,支持通配符,示例 /user/**
     */
    private String limitPath;
    /**
     * 每秒限流頻率
     */
    private Integer permitsPerSecond;
    /**
     * 限流等待超時時間,單位s
     */
    private Integer permitsTimeOut;
    /**
     * 排序
     */private Integer orderNo;
    /**
     * 最大線程數
     */
    private Integer maxThread;
    /**
     * 創建時間
     */
    private Date gmtCreate;
   //get、set略
}

1、因為要借助RateLimiter類,所以再封裝一個限流信息類

/**
 * @描述: 限流信息
 */
public class RateLimitInfo {

    private RateLimiter rateLimiter;

    private RateLimitVo rateLimitVo;

    private long lastUpdateTime;

    public RateLimitInfo(RateLimiter rateLimiter, RateLimitVo rateLimitVo, long lastUpdateTime) {
        this.rateLimiter = rateLimiter;
        this.rateLimitVo = rateLimitVo;
        this.lastUpdateTime = lastUpdateTime;
    }
    //get、set略
}

2、定義限流策略RateLimitStrategist

/**
 * @描述: 限流策略
 */
public class RateLimitStrategist {

    private PathMatcher pathMatcher = new AntPathMatcher();

    private final Map<String, RateLimitInfo> limiterMap = new LinkedHashMap<>();

    private final Map<String, Semaphore> threadMap = new LinkedHashMap<>();

    /**
     * 更新頻率,意為后台配置路徑后5分鍾生效
     */
    private static final long UPDATE_RATE = 1000*60*5;

    private long lastUpdateTime = 0;

    @Autowired
    private RateLimitManager rateLimitManager;

    public void init() {
        limiterMap.clear();
        threadMap.clear();
        List<RateLimitVo> rateLimitVos = rateLimitManager.findListForPriority(); //查詢數據庫中配置的路徑信息,需要自己實現
        if(CollectionUtils.isNotEmpty(rateLimitVos)) {
            return;
        }
        for (RateLimitVo rateLimitVo : rateLimitVos) {
            RateLimiter rateLimiter = RateLimiter.create(rateLimitVo.getPermitsPerSecond());
            limiterMap.put(rateLimitVo.getLimitPath(), new RateLimitInfo(rateLimiter, rateLimitVo, System.currentTimeMillis()));
            threadMap.put(rateLimitVo.getLimitPath(), new Semaphore(rateLimitVo.getMaxThread(), true));
        }
        lastUpdateTime = System.currentTimeMillis();
    }

    public boolean tryAcquire(String requestUri) {
        //目前設置5分鍾更新一次
        if(System.currentTimeMillis() - lastUpdateTime > UPDATE_RATE) {
            synchronized (this) {
                if(System.currentTimeMillis() - lastUpdateTime > UPDATE_RATE) {
                    init();
                }
            }
        }

        for (Map.Entry<String, RateLimitInfo> entry : limiterMap.entrySet()) {
            if(!pathMatcher.match(entry.getKey(), requestUri)) {
                continue;
            }
            RateLimitInfo rateLimitInfo = entry.getValue();
            RateLimitVo rateLimitVo = rateLimitInfo.getRateLimitVo();
            RateLimiter rateLimiter = rateLimitInfo.getRateLimiter();
            boolean concurrentFlag = rateLimiter.tryAcquire(1, rateLimitVo.getPermitsTimeOut(), TimeUnit.SECONDS);
            if(!concurrentFlag) { //驗證失敗,直接返回
                return concurrentFlag;
            } else {
                if(threadMap.get(requestUri).availablePermits() != 0) { //當前路徑對應剩余可執行線程數不為0
                    try {
                        //申請可執行線程
                        threadMap.get(requestUri).acquire();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return true;
                } else {
                    return false;
                }
            }
        }
        return true;
    }

    public void setLastUpdateTime(long lastUpdateTime) {
        this.lastUpdateTime = lastUpdateTime;
    }

    /**
     * 釋放路徑對應的線程數
     * @param requestURI
     */
    public void releaseSemaphore(String requestURI) {
        if(null != threadMap.get(requestURI)) {
            threadMap.get(requestURI).release();
        }
    }
}

3、定義攔截器RateLimitFilter,在攔截器中調用限流策略

/**
 * @描述: 限流過濾器,配置后生效
 */
public class RateLimitFilter implements Filter {

    private RateLimitStrategist rateLimitStrategist;

    private static final Logger LOGGER = LoggerFactory.getLogger(RateLimitFilter.class);

    @Override
    public void init(FilterConfig filterConfig) throws ServletException {

    }

    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        if(rateLimitStrategist == null) {
            rateLimitStrategist = InstanceFactory.getInstance(RateLimitStrategist.class);
        }
        HttpServletRequest req = (HttpServletRequest) request;
        HttpServletResponse res = (HttpServletResponse) response;
        String requestURI = req.getRequestURI();
        String contextPath = req.getContextPath();
        if(StringUtils.isNotBlank(contextPath)) {
            requestURI = StringUtils.substring(requestURI, contextPath.length());
        }
        if(!rateLimitStrategist.tryAcquire(requestURI)) {
            res.setContentType("text/html;charset=UTF-8");
            res.setStatus(HttpStatus.UNAUTHORIZED.value());
            response.getWriter().write("當前服務器繁忙,請稍后再試!");
            LOGGER.info(requestURI + "路徑請求服務器繁忙,請稍后再試");
        } else {
            try {
                chain.doFilter(request, response);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                rateLimitStrategist.releaseSemaphore(requestURI);
            }
        }

    }

    @Override
    public void destroy() {

    }
}

4、需要的配置(采用注解也可以)

先在web.xml中引入過濾器(開始處)

<filter>
    <filter-name>rateLimiter</filter-name>
    <filter-class>com.limit.filter.RateLimitFilter</filter-class>
</filter>

<filter-mapping>
    <filter-name>rateLimiter</filter-name>
    <url-pattern>/*</url-pattern>  
</filter-mapping>

然后在context.xml中注入RateLimitStrategist

<bean id="rateLimitStrategist" class="com.limit.factory.RateLimitStrategist" />

5、代碼tag-0.0.1是項目單節點部署可用,版本tag-0.0.2為適應多節點部署改為redis來實現對處理線程數的控制

**需要源碼留郵箱**


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM