指定方法異步調用
前面我們講解了通過設置ReferenceConfig的setAsync()方法來讓整個接口里的所有方法變為異步調用,那么如何指定某些方法為異步調用呢?下面講解下如何正確地設置默寫方法為異步調用。
假如你只需要設置接口里的方法sayHello為異步調用,那么可以使用下面方式:
final List<MethodConfig> asyncMethodList = new ArrayList<MethodConfig>(); MethodConfig methodConfig = new MethodConfig(); methodConfig.setAsync(true); methodConfig.setName("sayHello"); asyncMethodList.add(methodConfig);
然后調用ReferenceConfig的setMethods(asyncMethodList)即可。另外如果異步調用的方法沒有返回值,則可以再調用methodConfig.setReturn(false); ,以便減少Dubbo內部Future對象的創建和管理成本。
關閉啟動檢查
正常情況下,在服務消費端啟動時會檢查依賴的服務是否存在,如果不存在則會拋出 throw new IllegalStateException("Failed to check the status of the service" + interfaceName + ".No provider available for the service ")異常阻止Spring初始化完成,以便上線前能及早發現問題。
可以通過調用ReferenceConfig.setCheck(false)關閉檢查,設置check為true有助於及時發現問題,那么什么時候需要設置false呢?
比如測試時,有些無關的服務啟動不了,導致整個應用都啟動不了,這時候你可以把那些無關服務的check設置為false。再比如出現了循環依賴,必須有一方先啟動,比如你給服務使用方提供了一個SDK,SDK里面使用Dubbo API遠程消費服務器提供方的服務,如果你在服務提供方的服務器里面也引入這個SDK,在服務提供方啟動時候就會拋出 No Provider available for the service異常,具體原因是服務提供方啟動時候會初始化SDK,而SDK里面初始化時候需要檢查服務提供方是否存在,而服務提供方的服務還沒提供出來。
另外需要注意的是check設置為false,總是會返回調用,當服務提供方恢復服務時,能自動連上。
如何設置均衡策略
由於Dubbo提供的一致性Hash負載均衡策略,可以允許你自定義虛擬節點個數和指定某些方法需要使用一致性Hash策略,下面具體講下如何設置:
// 虛擬節點設置為512 Map<String,String> parameters = new HashMap<String,String>(); parameters.put("hash.nodes","512"); ReferenceConfig<T> reference = new ReferenceConfig<T>(); // 設置負載均衡為一致性Hash reference.setLoadbalance(consistenthash); // 設置參數 reference.setParameters(parameters);
如下代碼設置接口的sayHello方法為一致性Hash負載均衡策略,設置saySomething方法為隨機負載均衡策略:
ReferenceConfig reference = new ReferenceConfig(); final List<MethodConfig> methodList = new ArrayList<MethodConfig>(); // 設置sayHello方法為一致性Hash負載均衡策略 MethodConfig methodConfig = new MethodConfig(); methodConfig.setName("sayHello"); methodConfig.setLoadbalance("consistenthash"); // 虛擬節點設置為512 Map<String,String> parameters = new HashMap<String,String>(); parameters.put("hash.nodes","512"); methodConfig.setParameters(parameters); methodList.add(methodConfig); // 設置saySomething方法為隨機負載均衡策略 methodConfig = new MethodConfig(); methodConfig.setName("saySomething"); methodConfig.setLoadbalance("random"); methodList.add(methodConfig); reference.setMethods(methodList);
另外,默認情況下一致性hash使用第一個參數值計算hash值,如果你需要自定義可以通過以下設置:
Map<String,String> parameters = new HashMap<String,String>(); parameters.put("hash.nodes","512"); parameters.put("hash.arguments","0,1"); // 使用第一個和第二個參數值計算hash值 methodConfig.setParameters(parameters);
注意"0,1"是一個字符串,里面使用英文","分隔。
服務消費端ReferenceConfig需要自行緩存
ReferenceConfig實例是個很重的實例,每個ReferenceConfig實例里都維護了與服務注冊中心的一個長鏈,並且維護了與所有服務提供者的長鏈。假設有一個服務注冊中心和N個服務提供者,那么每個ReferenceConfig實例里面維護了N+1個長鏈,如果頻繁地生成ReferenceConfig實例,可能會造成性能問題,甚至產生內存或者連接泄露的風險。特別是使用Dubbo API編程時候容易忽略這個問題。
為了解決這個問題,之前都是自行緩存,但自從發布Dubbo 2.4.0版本后,Dubbo提供了簡單的工具類ReferenceConfigCache用於緩存ReferenceConfig實例。使用如下:
// 創建服務消費實例 ReferenceConfig<XxxService> reference = new ReferenceConfig<XxxService>(); reference.setInterface(XxxService.class); reference.setVersion("1.0.0"); ...... // 獲取Dubbo提供的緩存 ReferenceConfigCache cache = ReferenceConfigCache.getCache(); // cache.get方法中會緩存reference對象,並且調用reference.get方法啟動ReferenceConfig,並返回經過代理后的服務接口的對象 XxxService xxxService = cache.get(reference); // 使用xxxService對象 xxxService.sayHello();
需要注意的是Cache內持有ReferenceConfig對象的引用,不要在外部再調用ReferenceConfig的destroy方法了,這會導致Cache內的ReferenceConfig失效!
如果要銷毀Cache中的ReferenceConfig,將銷毀ReferenceConfig並釋放對應的資源,具體使用下面方法來銷毀:
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
cache.destroy(reference);
另外在Dubbo中唯一確定一個服務是通過 接口+分組+版本,所以默認情況下cache內是通過服務Group/接口/版本三個屬性來標示一個ReferenceConfig實例的。即以服務Group/接口/版本為緩存的key,ReferenceConfig實例為對應的value。如果你需要使用自定義的key,可以在創建cache時候調用 ReferenceConfigCache cache = ReferenceConfigCache.getCache(keyGenerator); 方法傳遞自定義的keyGenerator。
並發控制
服務消費方並發控制
在服務消費方進行並發控制,需要設置actives參數,如下:
<dubbo:reference id="userService" interface="com.test.UserServiceBo" group="dubbo" version="1.0.0" timeout="3000" actives="10" />
設置com.test.UserServiceBo接口中的所有方法,每個方法最多同時並發請求10個請求。
也可以使用下面方法設置接口中單個方法的並發請求個數,如下:
<dubbo:reference id="userService" interface="com.test.UserServiceBo" group="dubbo" version="1.0.0" timeout="3000"> <dubbo:method name="sayHello" actives="10" /> </dubbo:reference>
如上設置sayHello方法的並發請求數量最大為10,如果客戶端請求該方法並發超過了10則客戶端會被阻塞,等客戶端並發請求數量少於10的時候,該請求才會被發送到服務提供方服務器。在Dubbo中客戶端並發控制使用ActiveLimitFilter過濾器來控制,代碼如下:
public class ActiveLimitFilter implements Filter{ public Result invoke(Invoker<?> invoker , Invocation invocation) throws RpcException{ URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); // 獲取設置的active值,默認為0 int max = invoker.getUrl().getMethodParameter(methodName , Constants.ACTIVES_KEY , 0); RpcStatus count = RpcStatus.getStatus(invoker.getUrl() , invocation.getMethodName()); if(max > 0){ long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName() , Constants.TIMEOUT_KEY , 0); long start = System.currentTimeMillis(); long remain = timeout; int active = count.getActive(); // 如果該方法並發請求數量大於設置值,則掛起當前線程 if(active >= max){ sychronized(count){ while((active = count.getActive()) >= max){ try{ count.wait(remain); }catch(InterruptedException e){ } // 如果等待時間超時,則拋出異常 long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; if(remain <= 0){ throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + " , method: " + invocation.getMethodName() + ",elapsed: " + elapsed + ",timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit:" + max); } } } } } // 沒有限流的時候,正常調用 try{ long begin = System.currentTimeMillis(); RpcStatus.beginCount(url , methodName); try{ Result result = invoker.invoke(invocation); RpcStatus.endCount(url , methodName , System.currentTimeMillis() - begin , true); return result; }catch(RuntimeException t){ RpcStatus.endCount(url , methodName , System.currentTimeMillis() - begin , false); throw t; } }finally{ if(max > 0){ synchronized(count){ count.notify(); } } } } }
由上可知,在客戶端並發控制中,如果當並發量達到指定值后,當前客戶端請求線程會被掛起,如果在等待超時期間並發請求量少了,那么阻塞的線程會被激活,然后發送請求到服務提供方,如果等待超時了,則直接拋出異常,這時服務根本都沒有發送到服務提供方服務器。
服務提供方並發控制
在服務提供方進行並發控制需要設置executes參數,如下:
<dubbo:service interface="com.test.UserServiceBo" ref="userService" group="dubbo" version="1.0.0" timeout="3000" executes="10" />
設置com.test.UserServiceBo 接口中所有方法,每個方法最多同時並發處理10個請求,這里並發是指同時在處理10個請求。
也可以使用下面方法設置接口中單個方法的並發處理個數,如下:
<dubbo:service interface="com.test.UserServiceBo" ref="userService" group="dubbo" version="1.0.0" timeout="3000"> <dubbo:method name="sayHello" executes="10" /> </dubbo:service>
如上設置sayHello方法的並發處理數量為10 。
需要注意的是,服務提供方設置並發數量后,如果同時請求數量大於了設置的executes的值,則會拋出異常,而不是像服務端設置actives時候,會等待。服務提供方並發控制是使用ExecuteLimitFilter過濾器實現的,ExecuteLimitFilter代碼如下:
public class ExecuteLimitFilter implements Filter{ public Result invoke(Invoker<?> invoker , Invocation invocation) throws RpcException{ URL url = invoker.getUrl(); // 默認不設置executes時候,其值為0 int max = url.getMethodParameter(methodName , Constants.EXECUTES_KEY , 0); if(max > 0){ // max>0說明設置了executes值 RpcStatus count = RpcStatus.getStatus(url , invocation.getMethodName()); // 可知如果並發處理數量大於設置的值,會拋出異常 executesLimit = count.getSemaphore(max); if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())){ throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ",cause: The service using threads greater than <dubbo:service execute=\"" + max + "\"> limited."); } } ... try{ // 沒有限流的時候,激活filter鏈 Result result = invoker.invoke(invocation); return result; }catch(){ ... }finally{ ... } } }
所以當使用executes參數時候要注意,當並發量過大時侯,多余的請求會失敗。
改進的廣播策略
前面我們講解集群容錯時談到廣播策略,該策略主要用於對所有服務提供者廣播消息,那么有個問題需要思考,廣播是說你在客戶端調用接口一次,內部就是輪詢調用所有服務提供者的機器的服務,那么你調用一次該接口,返回值是什么呢?比如內部輪詢了10台機器,每個機器應該都有一個返回值,那么你調用的這一次返回值是10個返回值的組成?其實不是,返回的輪詢調用的最后一個機器結果,我們可以看下BroadcastClusterInvoker的主干代碼:
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T>{ private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class); public BroadcastClusterInvoker(Directory<T> directory){ super(directory); } @SuppressWarnings({"unchecked","rawtypes"}) public Result doInvoke(final Invocation invocation , List<Invoker<T>> invokers , LoadBalance loadbalance){ ... // 使用循環,輪詢每個機器進行調用,其中result為最后一個機器的結果 for(Invoker<T> invoker : invokers){ try{ result = invoker.invoke(invocation); }catch(RpcException e){ exception = e; logger.warn(e.getMessage() , e); }catch(Throwable e){ exception = new RpcException(e.getMessage(),e); logger.warn(e.getMessage() , e); } } if(exception != null){ throw exception; } return result; } }
如上代碼,可知使用循環輪詢調用每個機器,其中result為調用最后一個機器的結果。
如果我想獲取所有服務提供者的結果,該怎么辦呢?其實我們可以自定義一個SPI擴展,並且規定我們的服務接口的返回結果為一個map,代碼如下:
public Result doInvokePara(final Invocation invocation , List<Invoker<T>> invokers , LoadBalance loadbalance) throws RpcException{ // 用來保存所有服務提供者返回的結果 Map allResult = new ConcurrentHashMap<String , Result>(); // 保存異步調用返回的Future對象 List<Future<Result>> futureList = new ArrayList<Future<Result>>(); // 所有服務提供者的機器個數 int machineNum = invokers.size(); for(Invoker<T> invoker : invokers){ try{ // 異步調用服務提供者 Future<Result> future = paramCallPool.submit(new Callable<Result>(){ @Override public Result call() throws Exception{ try{ // 具體調用服務提供者 Result result = invoker.invoke(invocation); // 服務提供者 ip:port String url = invoker.getUrl().getAddress(); // 保存結果到map,key為服務提供者的地址,value為返回結果 allResult.put(url , result.getResult()); return result; }catch(RpcException e){ logger.warn(e.getMessage(),e); }catch(Throwable e){ logger.warn(e.getMessage(),e); } return null; } }); futureList.add(future); }catch(Exception e){ logger.warn(e.getMessage() , e); } } // 等所有調用完成 for(Future<Result> future : futureList){ try{ future.get(); }catch(InterruptedException | ExecutionException e){ e. printStackTrace(); } } // 假設服務接口返回中類型為這個 ACCSResult<Map> resultTemp = new ActionResult<Map>(true,null,null,null); // 自定義返回結果 Map finalResult = new HashMap<String,Result>(); finalResult.put("machineNum",machineNum); finalResult.put("result",result); resultTemp.setData(finalResult); // 重新設置返回值 Result result = new RpcResult(resultTemp); return result; }