JAVA WEB項目中開啟流量控制Filter


Flow Control:控流的概念
  • 主要是用來限定server所能承載的最大(高並發)流量峰值,以免在峰值是Server過載而宕機,對於WEB系統而言
  • 通常是分布式部署,如果請求並發量很大,會導致整個集群崩潰,也就是通常所說的“雪崩效應”。
  • 所以,我們不僅在網絡代理層面(比如nginx)設置流量控制以抵抗、拒止溢出流量,
  • 我們還應該在application server層面有一定的自我保護策略,確保當前JVM的負載應該在可控范圍之內,對於JVM承載能力之外的請求,應該被合理管理。

本文開發了一個分布式流量控制Filter,來限定application的並發量:

 
         

    1)對於過量的請求,首先將請求buffer在隊列中。

 
         

    2)當buffer隊列滿時,多余的請求將會被直接拒絕。(過載請求量)

 
         

    3)那些buffer中被阻塞的請求,等待一定時間后任然無法被執行,則直接返回錯誤URL。(溢出請求量)

 
         

    4)我們設定一個允許的並發量,通過java中Semaphore控制。只有獲取“鎖”的請求,才能繼續執行。




web.xml配置

<
filter> <filter-name>flowControlFilter</filter-name> <filter-class>com.demo.security.FlowControlFilter</filter-class> <init-param> <param-name>permits</param-name> <param-value>128</param-value> </init-param> <init-param> <param-name>timeout</param-name> <param-value>15000</param-value> </init-param> <init-param> <param-name>bufferSize</param-name> <param-value>500</param-value> </init-param> <init-param> <param-name>errorUrl</param-name> <param-value>/error.html</param-value> </init-param> </filter>

<filter-mapping>  
    <filter-name>flowControlFilter</filter-name>  
    <url-pattern>/*</url-pattern>  
</filter-mapping>  

 

Java代碼:

package com.src.java.filter;

import java.io.IOException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;

/**
 * 
 * @ClassName: FlowControlFilter
 * @Description: 分布式系統流量控制
 * @author chinasoft_liuhanlin
 * @date 2017年6月1日 下午3:57:08
 */
public class FlowControlFilter implements Filter {

    /**
     * 最大並發量 默認為500
     */
    private int permits = Runtime.getRuntime().availableProcessors() + 1;

    /**
     * 當並發量達到permits后,新的請求將會被buffer,buffer最大尺寸 如果buffer已滿,則直接拒絕
     */
    private int bufferSize = 500;
    /**
     * buffer中的請求被阻塞,此值用於控制最大阻塞時間 默認阻塞時間
     */
    private long timeout = 30000;
    /**
     * 跳轉的錯誤頁面
     */
    private String errorUrl;

    private BlockingQueue<Node> waitingQueue;

    private Thread selectorThread;
    private Semaphore semaphore;

    private Object lock = new Object();

    @Override
    public void destroy() {

    }

    /**
     * <p>
     * Title: doFilter
     * </p>
     * <p>
     * Description:
     * </p>
     * 
     * @param request
     * @param response
     * @param chain
     * @throws IOException
     * @throws ServletException
     * @see javax.servlet.Filter#doFilter(javax.servlet.ServletRequest,
     *      javax.servlet.ServletResponse, javax.servlet.FilterChain)
     */
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        checkSelector();
        Thread t = Thread.currentThread();
        HttpServletResponse httpServletResponse = (HttpServletResponse) response;

        Node node = new Node(t, false);
        boolean buffered = waitingQueue.offer(node);
        // 如果buffer已滿
        if (!buffered) {
            if (errorUrl != null) {
                httpServletResponse.sendRedirect(errorUrl);
            }
            return;
        }
        long deadline = System.currentTimeMillis() + timeout;
        // 進入等待隊列后,當前線程阻塞
        LockSupport.parkNanos(this, TimeUnit.MICROSECONDS.toNanos(timeout));
        if (t.isInterrupted()) {
            // 如果線程是中斷返回
            t.interrupted();// clear status

        }
        // 如果等待過期,則直接返回
        if (deadline >= System.currentTimeMillis()) {
            if (errorUrl != null) {
                httpServletResponse.sendRedirect(errorUrl);
            }
            // 對信號量進行補充
            synchronized (lock) {
                if (node.dequeued) {
                    semaphore.release();
                } else {
                    node.dequeued = true;
                }
            }
            return;
        }
        // 繼續執行
        try {
            chain.doFilter(request, response);
        } finally {
            semaphore.release();
            checkSelector();
        }
    }

    /**
     * <p>
     * Title: init
     * </p>
     * <p>
     * Description:
     * </p>
     * 
     * @param filterConfig
     * @throws ServletException
     * @see javax.servlet.Filter#init(javax.servlet.FilterConfig)
     */
    @Override
    public void init(FilterConfig filterConfig) throws ServletException {
        String p = filterConfig.getInitParameter("permits");
        if (p != null) {
            permits = Integer.parseInt(p);
            if (permits < 0) {
                throw new IllegalArgumentException("FlowControlFilter,permits parameter should be greater than 0 !");
            }
        }

        String t = filterConfig.getInitParameter("timeout");
        if (t != null) {
            timeout = Long.parseLong(t);
            if (timeout < 1) {
                throw new IllegalArgumentException("FlowControlFilter,timeout parameter should be greater than 0 !");
            }
        }

        String b = filterConfig.getInitParameter("bufferSize");
        if (b != null) {
            bufferSize = Integer.parseInt(b);
            if (bufferSize < 0) {
                throw new IllegalArgumentException("FlowControlFilter,bufferSize parameter should be greater than 0 !");
            }
        }

        errorUrl = filterConfig.getInitParameter("errorUrl");

        waitingQueue = new LinkedBlockingQueue<>(bufferSize);
        semaphore = new Semaphore(permits);

        selectorThread = new Thread(new SelectorRunner());
        selectorThread.setDaemon(true);
        selectorThread.start();

    }

    /**
     * @Title: checkSelector
     * @Description: TODO
     * @param:
     * @return: void
     * @throws
     */
    private void checkSelector() {
        if (selectorThread != null && selectorThread.isAlive()) {
            return;
        }
        synchronized (lock) {
            if (selectorThread != null && selectorThread.isAlive()) {
                return;
            }
            selectorThread = new Thread(new SelectorRunner());
            selectorThread.setDaemon(true);
            selectorThread.start();
        }
    }

    /**
     * 
     * @ClassName: SelectorRunner
     * @Description: TODO
     * @author chinasoft_liuhanlin
     * @date 2017年6月1日 下午3:59:11
     */
    private class SelectorRunner implements Runnable {

        @Override
        public void run() {
            try {
                while (true) {
                    Node node = waitingQueue.take();
                    // 如果t,阻塞逃逸,只能在pack超時后退出
                    synchronized (lock) {
                        if (node.dequeued) {
                            // 如果此線程已經park過期而退出了,則直接忽略
                            continue;
                        } else {
                            node.dequeued = true;
                        }

                    }
                    semaphore.acquire();
                    LockSupport.unpark(node.currentThread);
                }
            } catch (Exception e) {
                //
            } finally {
                // 全部釋放阻塞
                Queue<Node> queue = new LinkedList<>();
                waitingQueue.drainTo(queue);
                for (Node n : queue) {
                    if (!n.dequeued) {
                        LockSupport.unpark(n.currentThread);
                    }
                }
            }
        }
    }

    private class Node {
        Thread currentThread;
        boolean dequeued;// 是否已經出隊

        public Node(Thread t, boolean dequeued) {
            this.currentThread = t;
            this.dequeued = dequeued;
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM