Dubbo學習筆記11:使用Dubbo中需要注意的一些事情


指定方法異步調用

前面我們講解了通過設置ReferenceConfig的setAsync()方法來讓整個接口里的所有方法變為異步調用,那么如何指定某些方法為異步調用呢?下面講解下如何正確地設置默寫方法為異步調用。

假如你只需要設置接口里的方法sayHello為異步調用,那么可以使用下面方式:

final List<MethodConfig> asyncMethodList = new ArrayList<MethodConfig>();
MethodConfig methodConfig = new MethodConfig();
methodConfig.setAsync(true);
methodConfig.setName("sayHello");
asyncMethodList.add(methodConfig);

然后調用ReferenceConfig的setMethods(asyncMethodList)即可。另外如果異步調用的方法沒有返回值,則可以再調用methodConfig.setReturn(false); ,以便減少Dubbo內部Future對象的創建和管理成本。

關閉啟動檢查

正常情況下,在服務消費端啟動時會檢查依賴的服務是否存在,如果不存在則會拋出 throw new IllegalStateException("Failed to check the status of the service" + interfaceName + ".No provider available for the service ")異常阻止Spring初始化完成,以便上線前能及早發現問題。

可以通過調用ReferenceConfig.setCheck(false)關閉檢查,設置check為true有助於及時發現問題,那么什么時候需要設置false呢?

比如測試時,有些無關的服務啟動不了,導致整個應用都啟動不了,這時候你可以把那些無關服務的check設置為false。再比如出現了循環依賴,必須有一方先啟動,比如你給服務使用方提供了一個SDK,SDK里面使用Dubbo API遠程消費服務器提供方的服務,如果你在服務提供方的服務器里面也引入這個SDK,在服務提供方啟動時候就會拋出 No Provider available for the service異常,具體原因是服務提供方啟動時候會初始化SDK,而SDK里面初始化時候需要檢查服務提供方是否存在,而服務提供方的服務還沒提供出來。

另外需要注意的是check設置為false,總是會返回調用,當服務提供方恢復服務時,能自動連上。

如何設置均衡策略

由於Dubbo提供的一致性Hash負載均衡策略,可以允許你自定義虛擬節點個數和指定某些方法需要使用一致性Hash策略,下面具體講下如何設置:

// 虛擬節點設置為512
Map<String,String> parameters = new HashMap<String,String>();
parameters.put("hash.nodes","512");
ReferenceConfig<T> reference = new ReferenceConfig<T>();
// 設置負載均衡為一致性Hash
reference.setLoadbalance(consistenthash);
// 設置參數
reference.setParameters(parameters);

如下代碼設置接口的sayHello方法為一致性Hash負載均衡策略,設置saySomething方法為隨機負載均衡策略:

ReferenceConfig reference = new ReferenceConfig();
final List<MethodConfig> methodList = new ArrayList<MethodConfig>();
// 設置sayHello方法為一致性Hash負載均衡策略
MethodConfig methodConfig = new MethodConfig();
methodConfig.setName("sayHello");
methodConfig.setLoadbalance("consistenthash");
// 虛擬節點設置為512
Map<String,String> parameters = new HashMap<String,String>();
parameters.put("hash.nodes","512");
methodConfig.setParameters(parameters);
methodList.add(methodConfig);
// 設置saySomething方法為隨機負載均衡策略
methodConfig = new MethodConfig();
methodConfig.setName("saySomething");
methodConfig.setLoadbalance("random");
methodList.add(methodConfig);
reference.setMethods(methodList);

另外,默認情況下一致性hash使用第一個參數值計算hash值,如果你需要自定義可以通過以下設置:

Map<String,String> parameters = new HashMap<String,String>();
parameters.put("hash.nodes","512");
parameters.put("hash.arguments","0,1");    // 使用第一個和第二個參數值計算hash值
methodConfig.setParameters(parameters);
    

注意"0,1"是一個字符串,里面使用英文","分隔。

服務消費端ReferenceConfig需要自行緩存

ReferenceConfig實例是個很重的實例,每個ReferenceConfig實例里都維護了與服務注冊中心的一個長鏈,並且維護了與所有服務提供者的長鏈。假設有一個服務注冊中心和N個服務提供者,那么每個ReferenceConfig實例里面維護了N+1個長鏈,如果頻繁地生成ReferenceConfig實例,可能會造成性能問題,甚至產生內存或者連接泄露的風險。特別是使用Dubbo API編程時候容易忽略這個問題。

為了解決這個問題,之前都是自行緩存,但自從發布Dubbo 2.4.0版本后,Dubbo提供了簡單的工具類ReferenceConfigCache用於緩存ReferenceConfig實例。使用如下:

// 創建服務消費實例
ReferenceConfig<XxxService> reference = new ReferenceConfig<XxxService>();
reference.setInterface(XxxService.class);
reference.setVersion("1.0.0");
......
// 獲取Dubbo提供的緩存
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
// cache.get方法中會緩存reference對象,並且調用reference.get方法啟動ReferenceConfig,並返回經過代理后的服務接口的對象
XxxService xxxService = cache.get(reference);
// 使用xxxService對象
xxxService.sayHello();     

需要注意的是Cache內持有ReferenceConfig對象的引用,不要在外部再調用ReferenceConfig的destroy方法了,這會導致Cache內的ReferenceConfig失效!

如果要銷毀Cache中的ReferenceConfig,將銷毀ReferenceConfig並釋放對應的資源,具體使用下面方法來銷毀:

ReferenceConfigCache cache = ReferenceConfigCache.getCache();
cache.destroy(reference);

另外在Dubbo中唯一確定一個服務是通過 接口+分組+版本,所以默認情況下cache內是通過服務Group/接口/版本三個屬性來標示一個ReferenceConfig實例的。即以服務Group/接口/版本為緩存的key,ReferenceConfig實例為對應的value。如果你需要使用自定義的key,可以在創建cache時候調用 ReferenceConfigCache cache = ReferenceConfigCache.getCache(keyGenerator); 方法傳遞自定義的keyGenerator。

並發控制

服務消費方並發控制

在服務消費方進行並發控制,需要設置actives參數,如下:

<dubbo:reference id="userService" interface="com.test.UserServiceBo" group="dubbo" version="1.0.0" timeout="3000" actives="10" />

設置com.test.UserServiceBo接口中的所有方法,每個方法最多同時並發請求10個請求。

也可以使用下面方法設置接口中單個方法的並發請求個數,如下:

<dubbo:reference id="userService" interface="com.test.UserServiceBo" group="dubbo" version="1.0.0" timeout="3000">
    <dubbo:method name="sayHello" actives="10" />
</dubbo:reference>

如上設置sayHello方法的並發請求數量最大為10,如果客戶端請求該方法並發超過了10則客戶端會被阻塞,等客戶端並發請求數量少於10的時候,該請求才會被發送到服務提供方服務器。在Dubbo中客戶端並發控制使用ActiveLimitFilter過濾器來控制,代碼如下:

public class ActiveLimitFilter implements Filter{
    public Result invoke(Invoker<?> invoker , Invocation invocation) throws RpcException{
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        // 獲取設置的active值,默認為0
        int max = invoker.getUrl().getMethodParameter(methodName , Constants.ACTIVES_KEY , 0);
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl() , invocation.getMethodName());
        if(max > 0){
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName() , Constants.TIMEOUT_KEY , 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            int active = count.getActive();
            // 如果該方法並發請求數量大於設置值,則掛起當前線程
            if(active >= max){
                sychronized(count){
                    while((active = count.getActive()) >= max){
                        try{
                            count.wait(remain);
                        }catch(InterruptedException e){
                            
                        }
                        // 如果等待時間超時,則拋出異常
                        long elapsed = System.currentTimeMillis() - start;
                        remain = timeout - elapsed;
                        if(remain <= 0){
                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + " , method: " + invocation.getMethodName() + ",elapsed: " + elapsed + ",timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit:" + max);
                        }
                    }
                }
            }

        }
        // 沒有限流的時候,正常調用
        try{
            long begin = System.currentTimeMillis();
            RpcStatus.beginCount(url , methodName);
            try{
                Result result = invoker.invoke(invocation);
                RpcStatus.endCount(url , methodName , System.currentTimeMillis() - begin , true);
                return result;
            }catch(RuntimeException t){
                RpcStatus.endCount(url , methodName , System.currentTimeMillis() - begin , false);
                throw t;
            }
        }finally{
            if(max > 0){
                synchronized(count){
                    count.notify();
                }
            }
        }
    }
}

由上可知,在客戶端並發控制中,如果當並發量達到指定值后,當前客戶端請求線程會被掛起,如果在等待超時期間並發請求量少了,那么阻塞的線程會被激活,然后發送請求到服務提供方,如果等待超時了,則直接拋出異常,這時服務根本都沒有發送到服務提供方服務器。

服務提供方並發控制

在服務提供方進行並發控制需要設置executes參數,如下:

<dubbo:service interface="com.test.UserServiceBo" ref="userService" group="dubbo" version="1.0.0" timeout="3000" executes="10" />

設置com.test.UserServiceBo 接口中所有方法,每個方法最多同時並發處理10個請求,這里並發是指同時在處理10個請求。

也可以使用下面方法設置接口中單個方法的並發處理個數,如下:

<dubbo:service interface="com.test.UserServiceBo" ref="userService" group="dubbo" version="1.0.0" timeout="3000">
    <dubbo:method name="sayHello" executes="10" />
</dubbo:service>

如上設置sayHello方法的並發處理數量為10 。

需要注意的是,服務提供方設置並發數量后,如果同時請求數量大於了設置的executes的值,則會拋出異常,而不是像服務端設置actives時候,會等待。服務提供方並發控制是使用ExecuteLimitFilter過濾器實現的,ExecuteLimitFilter代碼如下:

public class ExecuteLimitFilter implements Filter{
    public Result invoke(Invoker<?> invoker , Invocation invocation) throws RpcException{
        URL url = invoker.getUrl();
        // 默認不設置executes時候,其值為0
        int max = url.getMethodParameter(methodName , Constants.EXECUTES_KEY , 0);
        if(max > 0){    // max>0說明設置了executes值
            RpcStatus count = RpcStatus.getStatus(url , invocation.getMethodName());
            // 可知如果並發處理數量大於設置的值,會拋出異常
            executesLimit = count.getSemaphore(max);
            if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())){
                throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ",cause: The service using threads greater than <dubbo:service execute=\"" + max + "\"> limited.");
            }
        }
        ...
        try{    // 沒有限流的時候,激活filter鏈
            Result result = invoker.invoke(invocation);
            return result;
        }catch(){
            ...
        }finally{
            ...
        }

    }
}

所以當使用executes參數時候要注意,當並發量過大時侯,多余的請求會失敗。

改進的廣播策略

前面我們講解集群容錯時談到廣播策略,該策略主要用於對所有服務提供者廣播消息,那么有個問題需要思考,廣播是說你在客戶端調用接口一次,內部就是輪詢調用所有服務提供者的機器的服務,那么你調用一次該接口,返回值是什么呢?比如內部輪詢了10台機器,每個機器應該都有一個返回值,那么你調用的這一次返回值是10個返回值的組成?其實不是,返回的輪詢調用的最后一個機器結果,我們可以看下BroadcastClusterInvoker的主干代碼:

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T>{
    private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);

    public BroadcastClusterInvoker(Directory<T> directory){
        super(directory);
    }

    @SuppressWarnings({"unchecked","rawtypes"})
    public Result doInvoke(final Invocation invocation , List<Invoker<T>> invokers , LoadBalance loadbalance){
        ...
        // 使用循環,輪詢每個機器進行調用,其中result為最后一個機器的結果
        for(Invoker<T> invoker : invokers){
            try{
                result = invoker.invoke(invocation);
            }catch(RpcException e){
                exception = e;
                logger.warn(e.getMessage() , e);
            }catch(Throwable e){
                exception = new RpcException(e.getMessage(),e);
                logger.warn(e.getMessage() , e);
            }
        }
        if(exception != null){
            throw exception;
        }
        return result;
    }
}        

如上代碼,可知使用循環輪詢調用每個機器,其中result為調用最后一個機器的結果。

如果我想獲取所有服務提供者的結果,該怎么辦呢?其實我們可以自定義一個SPI擴展,並且規定我們的服務接口的返回結果為一個map,代碼如下:

public Result doInvokePara(final Invocation invocation , List<Invoker<T>> invokers , LoadBalance loadbalance) throws RpcException{
    // 用來保存所有服務提供者返回的結果
    Map allResult = new ConcurrentHashMap<String , Result>();
    // 保存異步調用返回的Future對象
    List<Future<Result>> futureList = new ArrayList<Future<Result>>();
    // 所有服務提供者的機器個數
    int machineNum = invokers.size();
    for(Invoker<T> invoker : invokers){
        try{
            // 異步調用服務提供者
            Future<Result> future = paramCallPool.submit(new Callable<Result>(){
                @Override
                public Result call() throws Exception{
                    try{
                        // 具體調用服務提供者
                        Result result = invoker.invoke(invocation);
                        // 服務提供者 ip:port
                        String url = invoker.getUrl().getAddress();
                        // 保存結果到map,key為服務提供者的地址,value為返回結果
                        allResult.put(url , result.getResult());
                        return result;
                    }catch(RpcException e){
                        logger.warn(e.getMessage(),e);
                    }catch(Throwable e){
                        logger.warn(e.getMessage(),e);
                    }
                    return null;
                }
            });
            futureList.add(future);
        }catch(Exception e){
            logger.warn(e.getMessage() , e);
        }
    }
    // 等所有調用完成
    for(Future<Result> future : futureList){
        try{
            future.get();
        }catch(InterruptedException | ExecutionException e){
            e. printStackTrace();   
        }
    }

    // 假設服務接口返回中類型為這個
    ACCSResult<Map> resultTemp = new ActionResult<Map>(true,null,null,null);
    // 自定義返回結果
    Map finalResult = new HashMap<String,Result>();
    finalResult.put("machineNum",machineNum);
    finalResult.put("result",result);
    resultTemp.setData(finalResult);
    // 重新設置返回值
    Result result = new RpcResult(resultTemp);
    return result;    
}

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM