想聊一聊流量控制,談談的重要性,解決了哪些業務問題,那我們問題來進入正題。
1、WEB容器如何流量控制?
一個Tomcat的容器,這個容器呢,部署在一台服務器上面,同時這台服務器的資源非常非常有限,這台服務器只能同時讓500個請求訪問,若是多余500個的話,這樣服務器的資源就會打滿,那么我們肯定需要想辦法這些問題的。Tomcat本身就有這樣的機制,因為每一個請求過來后,tomcat會為這個請求分配一個處理線程,所以tomcat就是來控制處理線程的數量。
server.xml
<Connector executor="tomcatThreadPool" port="8080" protocol="HTTP/1.1" connectionTimeout="8000" enableLookups="false" acceptorThreadCount="1" URIEncoding="utf-8" redirectPort="443" compression="on" compressionMinSize="1024" compressableMimeType="text/html,text/xml,text/javascript,text/css,text/plain,application/json,application/xml" /> <Executor className="StandardThreadExecutor" name="tomcatThreadPool" namePrefix="catalina-exec-" maxThreads="500" minSpareThreads="100"/>
maxThreads="500" 表示最多能同時並存500個處理線程。
acceptCount="500" 表示在500個處理線程在占用的情況中,還允許500個請求的排隊。
這兩個參數基本就是Tomcat在線程保護當中的策略。
2、一個WEB容器里面如何進行具體的業務模塊的線程保護呢?
一個業務系統部署在一個Tomcat中,例如這個業務系統有兩個重要模塊(A和B模塊),這個兩個模塊的請求都需要有資源處理,而不是那一個模塊把系統的資源都占用去,例如:A模塊限制最多300請求,B模塊最多300個請求。這樣場景的出現時,我們就需要考慮說A模塊最多只能有300個處理線程,B也是這樣,那么Tomcat是可以保證資源層面的,A+B共有500個,而無法確保A/B各300個,所以有如下想法:
1、每個請求進來確定是屬於A還是屬於B。
2、當前正在運行的A/B模塊的數量。
基於上面想法的具體實現:
流量控制的業務實現(TrafficControl.java):
/** * 簡單的實現基於URL的流控 */ public class TrafficControl { //一個url請求的最大訪問數量為300 private final static int ONE_URI_MAX_CONCURRENT = 300; //所有url請求的最大訪問數量為500 private final static int ALL_URI_MAX_CONCURRENT = 500; private final static AtomicInteger all_url_concurrent = new AtomicInteger(0); private final static ConcurrentMap<String, AtomicInteger> url_concurrent_map = new ConcurrentHashMap<String, AtomicInteger>(); private final static SwitcherManager switcherManager = SwitcherManagerFactoryLoader.getSwitcherManagerFactory().getSwitcherManager(); private final static Switcher tcEnabled = switcherManager.registerSwitcher("feature.trackurl.traffic_control.enable", true); public static boolean isDisabled() { return tcEnabled.isClose(); } public static boolean isOverflow(String uri) { if (all_url_concurrent.get() > ALL_URI_MAX_CONCURRENT) { return true; } AtomicInteger one_url_concurrent = url_concurrent_map.get(uri); if (one_url_concurrent != null && one_url_concurrent.get() > ONE_URI_MAX_CONCURRENT) { return true; } return false; } public static void startAccess(String uri) { all_url_concurrent.incrementAndGet(); AtomicInteger one_url_concurrent = url_concurrent_map.get(uri); if (one_url_concurrent != null) { one_url_concurrent.incrementAndGet(); } else { url_concurrent_map.putIfAbsent(uri, new AtomicInteger(1)); } } public static void endAccess(String uri) { all_url_concurrent.decrementAndGet(); AtomicInteger one_url_concurrent = url_concurrent_map.get(uri); if (one_url_concurrent != null) { one_url_concurrent.decrementAndGet(); } } public static void dumpWarnLog() { String lineSeparator = System.getProperty("line.separator"); // 估算每一個URL和其計數占用32個字符 StringBuilder sb = new StringBuilder((1 + url_concurrent_map.size()) * 32); sb.append("all_url_concurrent : ").append(all_url_concurrent); for (Map.Entry<String, AtomicInteger> entry : url_concurrent_map.entrySet()) { sb.append(lineSeparator).append('\t').append(entry.getKey()).append(" : ").append(entry.getValue()); } ApiLogger.warn(sb); } }
Servlet的實現(TrafficControlServlet.java):
/** * 帶有流量控制的Servlet */ public class TrafficControlServlet extends HttpServlet { @Override protected final void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { if (TrafficControl.isDisabled()) { super.service(req, resp); return; } String uri = req.getRequestURI(); if (TrafficControl.isOverflow(uri)) { TrafficControl.dumpWarnLog(); resp.setStatus(HttpServletResponse.SC_SERVICE_UNAVAILABLE); return; } try { TrafficControl.startAccess(uri); super.service(req, resp); } finally { TrafficControl.endAccess(uri); } } }
最后執行Servlet如下,繼承於
TrafficControlServlet。
public class TestServlet extends TrafficControlServlet { private static final long serialVersionUID = 2895590140869067830L; @Override protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException { //................. } @Override protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws IOException { doGet(request, response); }
