前段時間,項目中需要對某些訪問量較高的路徑進行訪問並發數控制,以及有些功能,比如Excel導出下載功能,數據量很大的情況下,用戶不斷的點擊下載按鈕,重復請求數據庫,導致線上數據庫掛掉。於是在這樣的情況下,這個限流組件應運而生,也許有人會提及SpringCloud zuul,其實它的現也是借助了RateLimiter。由於項目使用的是SpringBoot,也就沒往外思考。反正最后功能實現了就行,畢竟殊途同歸啊。本文只是用代碼來快速的幫你理清整個限流的流程,至於RateLimiter中具體限流算法以及Semaphore信號量的具體實現還是得自己去深挖了,這里就不再展開了。正片時間到:
0、由於需要對限流的路徑進行后台管理,那限流實體肯定是需要的
public class RateLimit { private String rateLimitId; /** * 限流路徑,支持通配符,示例 /user/** */ private String limitPath; /** * 每秒限流頻率 */ private Integer permitsPerSecond; /** * 限流等待超時時間,單位s */ private Integer permitsTimeOut; /** * 排序 */private Integer orderNo; /** * 最大線程數 */ private Integer maxThread; /** * 創建時間 */ private Date gmtCreate; //get、set略 }
1、因為要借助RateLimiter類,所以再封裝一個限流信息類
/** * @描述: 限流信息 */ public class RateLimitInfo { private RateLimiter rateLimiter; private RateLimitVo rateLimitVo; private long lastUpdateTime; public RateLimitInfo(RateLimiter rateLimiter, RateLimitVo rateLimitVo, long lastUpdateTime) { this.rateLimiter = rateLimiter; this.rateLimitVo = rateLimitVo; this.lastUpdateTime = lastUpdateTime; } //get、set略 }
2、定義限流策略RateLimitStrategist
/** * @描述: 限流策略 */ public class RateLimitStrategist { private PathMatcher pathMatcher = new AntPathMatcher(); private final Map<String, RateLimitInfo> limiterMap = new LinkedHashMap<>(); private final Map<String, Semaphore> threadMap = new LinkedHashMap<>(); /** * 更新頻率,意為后台配置路徑后5分鍾生效 */ private static final long UPDATE_RATE = 1000*60*5; private long lastUpdateTime = 0; @Autowired private RateLimitManager rateLimitManager; public void init() { limiterMap.clear(); threadMap.clear(); List<RateLimitVo> rateLimitVos = rateLimitManager.findListForPriority(); //查詢數據庫中配置的路徑信息,需要自己實現 if(CollectionUtils.isNotEmpty(rateLimitVos)) { return; } for (RateLimitVo rateLimitVo : rateLimitVos) { RateLimiter rateLimiter = RateLimiter.create(rateLimitVo.getPermitsPerSecond()); limiterMap.put(rateLimitVo.getLimitPath(), new RateLimitInfo(rateLimiter, rateLimitVo, System.currentTimeMillis())); threadMap.put(rateLimitVo.getLimitPath(), new Semaphore(rateLimitVo.getMaxThread(), true)); } lastUpdateTime = System.currentTimeMillis(); } public boolean tryAcquire(String requestUri) { //目前設置5分鍾更新一次 if(System.currentTimeMillis() - lastUpdateTime > UPDATE_RATE) { synchronized (this) { if(System.currentTimeMillis() - lastUpdateTime > UPDATE_RATE) { init(); } } } for (Map.Entry<String, RateLimitInfo> entry : limiterMap.entrySet()) { if(!pathMatcher.match(entry.getKey(), requestUri)) { continue; } RateLimitInfo rateLimitInfo = entry.getValue(); RateLimitVo rateLimitVo = rateLimitInfo.getRateLimitVo(); RateLimiter rateLimiter = rateLimitInfo.getRateLimiter(); boolean concurrentFlag = rateLimiter.tryAcquire(1, rateLimitVo.getPermitsTimeOut(), TimeUnit.SECONDS); if(!concurrentFlag) { //驗證失敗,直接返回 return concurrentFlag; } else { if(threadMap.get(requestUri).availablePermits() != 0) { //當前路徑對應剩余可執行線程數不為0 try { //申請可執行線程 threadMap.get(requestUri).acquire(); } catch (InterruptedException e) { e.printStackTrace(); } return true; } else { return false; } } } return true; } public void setLastUpdateTime(long lastUpdateTime) { this.lastUpdateTime = lastUpdateTime; } /** * 釋放路徑對應的線程數 * @param requestURI */ public void releaseSemaphore(String requestURI) { if(null != threadMap.get(requestURI)) { threadMap.get(requestURI).release(); } } }
3、定義攔截器RateLimitFilter,在攔截器中調用限流策略
/** * @描述: 限流過濾器,配置后生效 */ public class RateLimitFilter implements Filter { private RateLimitStrategist rateLimitStrategist; private static final Logger LOGGER = LoggerFactory.getLogger(RateLimitFilter.class); @Override public void init(FilterConfig filterConfig) throws ServletException { } @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { if(rateLimitStrategist == null) { rateLimitStrategist = InstanceFactory.getInstance(RateLimitStrategist.class); } HttpServletRequest req = (HttpServletRequest) request; HttpServletResponse res = (HttpServletResponse) response; String requestURI = req.getRequestURI(); String contextPath = req.getContextPath(); if(StringUtils.isNotBlank(contextPath)) { requestURI = StringUtils.substring(requestURI, contextPath.length()); } if(!rateLimitStrategist.tryAcquire(requestURI)) { res.setContentType("text/html;charset=UTF-8"); res.setStatus(HttpStatus.UNAUTHORIZED.value()); response.getWriter().write("當前服務器繁忙,請稍后再試!"); LOGGER.info(requestURI + "路徑請求服務器繁忙,請稍后再試"); } else { try { chain.doFilter(request, response); } catch (Exception e) { e.printStackTrace(); } finally { rateLimitStrategist.releaseSemaphore(requestURI); } } } @Override public void destroy() { } }
4、需要的配置(采用注解也可以)
先在web.xml中引入過濾器(開始處)
<filter> <filter-name>rateLimiter</filter-name> <filter-class>com.limit.filter.RateLimitFilter</filter-class> </filter> <filter-mapping> <filter-name>rateLimiter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping>
然后在context.xml中注入RateLimitStrategist
<bean id="rateLimitStrategist" class="com.limit.factory.RateLimitStrategist" />
5、代碼tag-0.0.1是項目單節點部署可用,版本tag-0.0.2為適應多節點部署改為redis來實現對處理線程數的控制
**需要源碼留郵箱**