dubbo是如何控制並發數和限流的?


ExecuteLimitFilter

ExecuteLimitFilter ,在服務提供者,通過 <dubbo:service /> 的 "executes" 統一配置項開啟:
表示每服務的每方法最大可並行執行請求數。

ExecuteLimitFilter是通過信號量來實現的對服務端的並發數的控制。

ExecuteLimitFilter執行流程:

  1. 首先會去獲得服務提供者每服務每方法最大可並行執行請求數
  2. 如果每服務每方法最大可並行執行請求數大於零,那么就基於基於服務 URL + 方法維度獲取一個RpcStatus實例
  3. 通過RpcStatus實例獲取一個信號量,若果獲取的這個信號量調用tryAcquire返回false,則拋出異常
  4. 如果沒有拋異常,那么久調用RpcStatus靜態方法beginCount,給這個 URL + 方法維度開始計數
  5. 調用服務
  6. 調用結束后計數調用RpcStatus靜態方法endCount,計數結束
  7. 釋放信號量

ExecuteLimitFilter

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        Semaphore executesLimit = null;
        boolean acquireResult = false;
        int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
        if (max > 0) {
            RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());
//            if (count.getActive() >= max) {
            /**
             * http://manzhizhen.iteye.com/blog/2386408
             * use semaphore for concurrency control (to limit thread number)
             */
            executesLimit = count.getSemaphore(max);
            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) {
                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited.");
            }
        }
        long begin = System.currentTimeMillis();
        boolean isSuccess = true;
        RpcStatus.beginCount(url, methodName);
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (Throwable t) {
            isSuccess = false;
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        } finally {
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
            if(acquireResult) {
                executesLimit.release();
            }
        }
    }

我們接下來看看RpcStatus這個類

    private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
    
    public static RpcStatus getStatus(URL url, String methodName) {
        String uri = url.toIdentityString();
        ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
        if (map == null) {
            METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
            map = METHOD_STATISTICS.get(uri);
        }
        RpcStatus status = map.get(methodName);
        if (status == null) {
            map.putIfAbsent(methodName, new RpcStatus());
            status = map.get(methodName);
        }
        return status;
    }

這個方法很簡單,大概就是給RpcStatus這個類里面的靜態屬性METHOD_STATISTICS里面設值。外層的map是以url為key,里層的map是以方法名為key。

    private volatile int executesPermits;
    public Semaphore getSemaphore(int maxThreadNum) {
        if(maxThreadNum <= 0) {
            return null;
        }

        if (executesLimit == null || executesPermits != maxThreadNum) {
            synchronized (this) {
                if (executesLimit == null || executesPermits != maxThreadNum) {
                    executesLimit = new Semaphore(maxThreadNum);
                    executesPermits = maxThreadNum;
                }
            }
        }

        return executesLimit;
    }

這個方法是獲取信號量,如果這個實例里面的信號量是空的,那么就添加一個,如果不是空的就返回。

TPSLimiter

TpsLimitFilter 過濾器,用於服務提供者中,提供限流的功能。

配置方式:

  1. 通過 <dubbo:parameter key="tps" value="" /> 配置項,添加到 <dubbo:service /> 或 <dubbo:provider /> 或 <dubbo:protocol /> 中開啟,例如:
dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoServiceImpl" protocol="injvm" >
    <dubbo:parameter key="tps" value="100" />
</dubbo:service>
  1. 通過 <dubbo:parameter key="tps.interval" value="" /> 配置項,設置 TPS 周期。

源碼分析

TpsLimitFilter

    private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
    
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
            throw new RpcException(
                    "Failed to invoke service " +
                            invoker.getInterface().getName() +
                            "." +
                            invocation.getMethodName() +
                            " because exceed max service tps.");
        }

        return invoker.invoke(invocation);
    }

invoke方法調用了DefaultTPSLimiter的isAllowable,我們進入到isAllowable方法看一下

DefaultTPSLimiter

    private final ConcurrentMap<String, StatItem> stats
            = new ConcurrentHashMap<String, StatItem>();
    @Override
    public boolean isAllowable(URL url, Invocation invocation) {
        //獲取tps這個參數設置的大小
        int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
        //獲取tps.interval這個參數設置的大小,默認60秒
        long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
                Constants.DEFAULT_TPS_LIMIT_INTERVAL);
        String serviceKey = url.getServiceKey();
        if (rate > 0) {
            StatItem statItem = stats.get(serviceKey);
            if (statItem == null) {
                stats.putIfAbsent(serviceKey,
                        new StatItem(serviceKey, rate, interval));
                statItem = stats.get(serviceKey);
            }
            return statItem.isAllowable();
        } else {
            StatItem statItem = stats.get(serviceKey);
            if (statItem != null) {
                stats.remove(serviceKey);
            }
        }

        return true;
    }

若要限流,調用 StatItem#isAllowable(url, invocation) 方法,根據 TPS 限流規則判斷是否限制此次調用。

StatItem

    private long lastResetTime;

    private long interval;

    private AtomicInteger token;

    private int rate;

    public boolean isAllowable() {
        long now = System.currentTimeMillis();
         // 若到達下一個周期,恢復可用種子數,設置最后重置時間。
        if (now > lastResetTime + interval) {
            token.set(rate);// 回復可用種子數
            lastResetTime = now;// 最后重置時間
        }
        // CAS ,直到或得到一個種子,或者沒有足夠種子
        int value = token.get();
        boolean flag = false;
        while (value > 0 && !flag) {
            flag = token.compareAndSet(value, value - 1);
            value = token.get();
        }

        return flag;
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM