dubbo過濾器
1.前言
dubbo filter的作用和web filter的作用是一樣的,在真正調用前做一些公共的處理。這也就是在重要的過程上設置攔截接口,提供擴展供業務實現。
dubbo過濾器是整個dubbo框架中非常重要的組成部分,dubbo中許多重要功能都是基於過濾器擴展而來。過濾器提供了provider和consumer調用過程的攔截,
即每次RPC調用的時候,對應的過濾器都會生效。雖然過濾器功能強大,但由於每次調用都會執行,因此在使用的時候需要注意它對性能的影響。
dubbo filter分為comsumer和provider端兩種,我們開發中對dubbo經常擴展的也是filter,因此系統的記錄下。
dubbo filter的接口是com.alibaba.dubbo.rpc.Filter
,是個SPI(擴展點),僅提供了一個invoker方法,用於rpc調用前后的功能增強。其中dubbo提供的內置filter實現如下:
provider端:AccessLogFilter、ClassloaderFilter、ContextFilter、ExecuteLimitFilter、ExceptionFilter、EchoFilter、TimeoutFilter、GenericFilter、TokenFilter、TpsFilter、TraceFilter、MonitorFilter
consumer端:ConsumerContextFilter、GenericImplFilter、ActiveLimitFilter、DeprecatedFilter、FutureFilter、MonitorFilter
以上內置filter都使用了@Activate注解,默認被激活。provider端的filter只在provider端服務啟動時加入到filter chain;consumer端的filter只在consumer端服務啟動時加入到filter chain;其中MonitorFilter特殊,它屬於consumer和provider端,那么同時會在服務暴露和引用時候被加入到filter chain。
對dubbo filter有了整體了解,下面問題來了,filter chain是如何生成的?
2.dubbo filter chain如何生成
2.1.provider端filter chain生成
provider服務在暴露時候,在com.alibaba.dubbo.registry.integration.RegistryProtocol.export(Invoker<T>) -> com.alibaba.dubbo.registry.integration.RegistryProtocol.doLocalExport(Invoker<T>)
內進行服務暴露,關鍵是
自適應對象Protocol$Adaptive根據URL上的協議dubbo獲取對應的DubboProtocol,但是由於Protocol有三個Wrapper實現,具體是ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper,那么在自適應對象獲取com.alibaba.dubbo.rpc.Protocol extension =(com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); //extName是dubbo
,實際上獲取的是Wrapper類(dubbo SPI的依賴注入功能,即類似AOP),Wrapper類封裝了最終的DubboProtocol。即最終這個Protocol對象的結構是QosProtocolWrapper->ProtocolListenerWrapper->ProtocolFilterWrapper->DubboProtocol
。wrapper類的作用就是增強,這個是裝飾模式。在這里就是使用ProtocolFilterWrapper對DubboProtocol進行了增強,在DubboProtocl生成Invoker后,對Invoker功能增強生成filter chain。代碼如下
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
//invoker參數即服務端使用javaassit生成的動態代理對象
//buildInvokerChain生成filter chain,對服務進行功能增強
provider端生成的默認filter chain [EchoFilter->ClassLoaderFilter->GenericFilter->ContextFilter->TraceFilter->TimeoutFilter->MonitorFilter->ExceptionFilter]
2.2.consumer端filter chain生成
具體是在
com.alibaba.dubbo.registry.integration.RegistryProtocol.refer(Class<T>, URL)
com.alibaba.dubbo.registry.integration.RegistryProtocol.doRefer(Cluster, Registry, Class<T>, URL)
com.alibaba.dubbo.registry.integration.RegistryDirectory.subscribe(URL)
com.alibaba.dubbo.registry.integration.RegistryDirectory.notify(List<URL>)
com.alibaba.dubbo.registry.integration.RegistryDirectory.refreshInvoker(List<URL>)
com.alibaba.dubbo.registry.integration.RegistryDirectory.toInvokers(List<URL>)
自適應對象Protocol$Adaptive引用,具體如下圖
和provider端一樣,獲取的Protoco對象結構是QosProtocolWrapper->ProtocolListenerWrapper->ProtocolFilterWrapper->DubboProtocol
,在DubboProtocl生成Invoker后,ProtocolFilterWrapper對Invoker功能增強生成filter chain。代碼如下
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
//protocol.refer(type, url)即DubboProtocol.refer生成Invoker引用,即DubboInvoker
//buildInvokerChain對DubboInvoker進行功能增強,增加filter chain
對於conusmer端,默認生成的filter chain [ConsumerContextFilter->FutureFilter->MonitorFilter]
如果consumer端是泛化,默認生成的filter chain [ConsumerContextFilter->FutureFilter->MonitorFilter->GenericImplFilter]
2.3.buildInvokerChain
provider和consumer端生成filter chain都是使用的buildInvokerChain,因此分析下
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
//其它忽略
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
};
}
}
return last;
}
代碼核心在於List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
,dubbo SPI機制獲取filter的實現,並且激活對應的provider或consumer端的filter集合(並且根據Filter上的order進行了排序),然后組成filter chain。這個明白了SPI機制和自動激活就很簡單。
3.provider端filter記錄
3.1.AccessLogFilter
AccessLogFilter:默認不啟用,只有配置了dubbo.provider.accesslog="true/default/custom-access.log"才生效。具體作用是把dubbo日志輸出到應用本身的日志組件(比如logback)或者指定的文件。實現原理:如果是輸出到應用本身的日志組件,則直接info打印;如果是輸出到指定目錄文件,則把要寫入到日志內容暫存到map,然后使用schedule線程池從map獲取從而寫入文件。
3.2.ExecuteLimitFilter
ExecuteLimitFilter:默認不啟用,只有在配置了@Service(executes=10)或者@Method(executes=10)時候才生效,分表是針對接口級別和方法級別。具體作用是限制每個方法的並發執行數(即占用線程池線程數)不能超過10。在不設置或者設置了小於等於0的數值,是不會進行限制。實現原理是使用ConcurrentHashMap為每個url緩存了RpcStatus,RpcStatus又封裝了信號量Semaphore,具體就是請求先獲取信號量,信號量消耗完,拒絕執行。具體代碼中,個人認為RpcStatus.beginCount(url, methodName); 和 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
這兩行代碼應該是無用的(可能是歷史遺留,因為歷史代碼使用原子性進行控制並發數),可以直接廢棄掉。因為通過url作為key,method作為分組獲取RpcStatus,也就是獲取了信號量,沒必要再使用原子了。
3.3.ClassLoaderFilter
ClassLoaderFilter:默認啟用,此filter的作用就是切換當前線程的類加載器為Invoker的類加載器,用完還原回去。為什么這么做呢?java的類加載器和雙親委派模型我們通常都知道,類的正常加載也都是按照雙親委派模型進行加載的。但是有些時候需要破壞雙親委派模型,比如jdk的spi機制、當前線程的類加載器,就可以破壞雙親委派模型。那么為了避免當前框架的類加載器可能和Invoker的類加載器不是同一個(用戶可能對框架進行了擴展,比如自定義了一個filter,在當前線程使用了自定義的類加載器),而當前框架線程中又需要獲取Invoker的類加載器中的一些class,為了避免(class存在但實際)出現ClassNotFoundException,此時需要設置Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader()),即設置當前線程的類加載器為Invoker的類加載器。本質是為了避免當前線程和Invoker的類加載(可能)不同而做的兼容。
常見的使用例子是DubboProtocol#optimizeSerialization方法,會根據Invoker中配置的optimizer參數獲取擴展的自定義序列化處理類,這些外部引入的序列化類在框架的類加載器中肯定沒有,因此需要使用Invoker的類加載器獲取對應的類。
3.4.ContextFilter
ContextFilter:默認啟用,統一在過濾器中處理請求的上下文信息RpcContext,比如設置當前請求的上下文如Invoker信息、地址信息、端口信息等,清除異步屬性,防止傳到過濾器鏈的下一個環節等。
3.5.ExceptionFilter
ExceptionFilter:默認啟用,dubbo框架的全局異常處理器,有個缺點,我們項目中很多情況會拋出一些自定義異常來替代返回錯誤碼,但是ExceptionFilter會將我們的異常信息進行包裝,因此通常情況下我們也會重寫ExceptionFilter,從而禁用原生ExceptionFilter。當然也可以把項目組的異常定義為extends RpcException,這樣項目異常也會直接拋出(這樣也不規范,自己項目的異常怎么能是RpcContext)。
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
try {
Result result = invoker.invoke(invocation);
if (result.hasException() && GenericService.class != invoker.getInterface()) {//代碼@1
try {
Throwable exception = result.getException();
// directly throw if it's checked exception
if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {//代碼@2
return result;
}
// directly throw if the exception appears in the signature
try {//代碼@3-start
Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
Class<?>[] exceptionClassses = method.getExceptionTypes();
for (Class<?> exceptionClass : exceptionClassses) {
if (exception.getClass().equals(exceptionClass)) {
return result;
}
}
} catch (NoSuchMethodException e) {
return result;
}//代碼@3-end
// for the exception not found in method's signature, print ERROR message in server's log.
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);
// directly throw if exception class and interface class are in the same jar file.
String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());//代碼@4-start
String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
return result;
}//代碼@4-end
// directly throw if it's JDK exception
String className = exception.getClass().getName();
if (className.startsWith("java.") || className.startsWith("javax.")) {//代碼@5
return result;
}
// directly throw if it's dubbo exception
if (exception instanceof RpcException) {//代碼@6
return result;
}
// otherwise, wrap with RuntimeException and throw back to the client
return new RpcResult(new RuntimeException(StringUtils.toString(exception)));//代碼@7
} catch (Throwable e) {
logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
return result;//處理invoker異常又出現異常,直接返回
}
}
return result;
} catch (RuntimeException e) {//invoke調用異常,直接拋出異常
logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
+ ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
+ ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
throw e;
}
}
代碼@1:如果有異常並且未實現GenericService接口,進入后續判斷邏輯,否則直接返回結果。
代碼@2:不是RuntimeException類型的異常,並且是受檢異常(繼承Exception),直接返回結果。
代碼@3: 在方法簽名上有聲明這個異常,直接返回結果。就是接口方法上直接throws 異常,且調用返回的異常也是聲明的異常,直接返回。
代碼@4:如果異常類和接口類在同一個jar包中,直接返回結果。
代碼@5:以java.或javax.開頭的異常直接返回結果。
代碼@6:dubbo的自身異常RpcException,直接返回結果。
代碼@7:不滿足上述條件,會做toString處理並被封裝成RuntimeException拋出。這樣就吞了業務定義的異常
如何解決
解決方法就針對上面的幾個條件進行,有幾種方案,我們一一看一下:
1、將該異常的包名以java.或者javax.開頭
這個方案不現實,也不符合規范,所以不采用
2、業務異常繼承Exception,變為checked異常
自定義的業務異常本身屬於RuntimeException,所以也不采用
3、異常類和接口類在同一jar包里
較大的項目一般都會有一些common包,定義好異常類型,使用二方包的方式引用,所以也不適用
4、provider的api明確寫明throws XxxException
作為生產服務端,不應顯式拋出異常給客戶的進行處理,所以也不適用
最終方案
重寫ExceptionFilter,並且禁用原生的ExceptionFilter。
比如在自定義的DubboExceptionFilter內加上className.startsWith("xxx.xxx"),表明是我們項目的異常,直接return result。其它內容和ExceptionFilter保持相同。同時我們應該在加上配置 全局禁用dubbo ExceptionFilter:dubbo.provider.filter=-exception
,自定義的DubboExceptionFilter如下
@Activate(group = Constants.PROVIDER)
public class DubboExceptionFilter implements Filter {
//省略
}
備注:禁用dubbo所有的默認filter,配置如下 dubbo.provider.filter=-default
3.6.TimeoutFilter
TimeoutFilter:默認啟用,記錄每個Invoker的調用耗時,如果超過了設置的timeout時間,則會打印一條warn日志,此外別無其他功能。監控可以收集這個日志用於警告。
3.7.TokenFilter
TokenFilter:默認不啟用,如果provider不想讓consumer繞過注冊中心直連自己,則可以使用令牌驗證。原理是provider服務生成token並暴露到zk,消費者必須通過注冊中心才能獲取有令牌provider的url。TokenFilter就是provider端用於過濾consumer的調用,禁用無token的consumer調用。
配置token如下
全局設置 dubbo.provider.token=true
隨機token,uuid生成。dubbo.provider.token=123456
固定token
服務級別設置 dubbo.service.token=true
隨機token,uuid生成。dubbo.service.token=123456
固定token
協議級別設置 dubbo.protocol.token=true
隨機token,uuid生成。dubbo.protocol.token=123456
固定token
通常進行認證,級別都是全局設置。
3.8.TpsLimitFilter
TpsLimitFilter:默認不啟用,用於provider端限流。在dubbo的SPI com.alibaba.dubbo.rpc.Filter文件內沒有這個配置,如果需要使用,需要如下配置:
在resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter
內增加tpsLimiter=com.alibaba.dubbo.rpc.filter.TpsLimitFilter
,接着在屬性文件內配置
dubbo.provider.parameters.tps=1000
每次發放1000個令牌
dubbo.provider.parameters.tps.interval=2000
令牌刷新間隔時間是2s,默認是60s
這樣實際就生效。
主要邏輯在com.alibaba.dubbo.rpc.filter.tps.StatItem.isAllowable()
內,設計的簡單又好用,工作中可以直接參考使用。代碼如下
//com.alibaba.dubbo.rpc.filter.tps.StatItem.isAllowable()
public boolean isAllowable() {
long now = System.currentTimeMillis();
if (now > lastResetTime + interval) {//當前時間戳大於上次重置時間戳+間隔時間,說明到了下一周期,又可以獲取1000個令牌
token.set(rate);//因此重置令牌數。因為令牌數隨着使用遞減為0
lastResetTime = now;
}
int value = token.get();//獲取令牌值
boolean flag = false;
while (value > 0 && !flag) {//使用當前令牌值和flag控制循環進入
flag = token.compareAndSet(value, value - 1);//令牌減一
value = token.get();
}
return flag;//返回獲取令牌的結果。如果在2s內已經獲取了1000個令牌,則結果false,不允許請求
}
3.9.EchoFilter
EchoFilter:默認啟用,用於回聲測試。所謂回聲就是返回請求的數據。代碼具體很簡單,判斷請求方法是$echo且只有一個參數,則把請求的數據返回。具體對應consumer端生成的Invoker進行了動態代理增強,增加了EchoSerivce接口。具體是consumer端在com.alibaba.dubbo.config.ReferenceConfig.createProxy(Map<String, String>)
生成了Invoker對象,然后接着return (T) proxyFactory.getProxy(invoker);
,對生成的Invoker對象進行動態代理給Invoker增加EchoSerivce接口。具體代碼在
這樣就可以在consumer端引用的服務,強制轉換為EchoService類型了。具體作用是如果需要探測 Dubbo 服務能否通,但又不想用啟動時檢查的方式,那么調用方只需要把其中的一個服務引用強轉為 EchoService 然后$echo調用即可。
3.10.GenericFilter
GenericFilter:默認啟用,用於處理泛型調用,后續泛型時候寫。
3.11.TraceFilter
TraceFilter:默認啟用,真實情況是,只有在通過運維命令 trace $服務名
后才會真正觸發這個filter的執行。
比如我們執行telnet 192.168.1.155 20880 連接上dubbo服務,執行trace ProductService
,這樣ProductService服務的調用就會被加入TraceFilter的統計(TraceFilter#addTracer操作),dubbo官方對trace的介紹:
trace 大體的實現流程,telnet 命令會被dubbo handler 單讀處理,然后將trace命令解析出來,交給TraceTelnetHandler ,然后它經過一頓操作,找到接口暴露的invoker,然后再向TraceFilter中添加這個tracer,其實就是將要監聽接口名,方法名,監聽次數,還有channel 給TraceFilter,接下來就是調用的時候收集 執行時間,將執行時間 找到對應的channel 然后發送給telnet客戶端。
3.12.MonitorFilter
MonitorFilter:默認啟用,用於監控收集,dubbo默認的Monitor太簡陋,工作中需要進行統一收集監控信息展示,因此可以自定義Monitor,使用dubbo spi機制擴展即可。
4.consumer端filter記錄
4.1.ConsumerContextFilter
默認啟用。ConsumerContextFilter會和ContextFilter配合使用,在微服務環境中,有許多鏈式調用,如A->B->C->D,收到請求時,當前節點被看作一個provider,由ContextFilter社長上下文,當發起請求到下一個服務時候,當前服務變為一個consumer,由ConsumerContextFilter設置上下文。具體邏輯:invoker調用前,設置當前請求上下文,進行invoker調用,invoker調用完畢后,清除當前線程的上下文信息,防止再次調用被誤用。
之前遇到一個問題:A應用是web服務(dubbo consumer),B應用是dubbo服務(dubbo provider),A調用B會把用戶信息放入隱式參數,這樣B應用接收到就可以正常處理。為了方便,用戶的信息由A應用的web filter統一設置,用着沒有問題。但是,在A應用一次請求內連續多次訪問B應用時候,發現第二次請求B應用,就提示用戶信息不存在,原因就是ConsumerContextFilter在第一次后清除了當前上下文的用戶信息。因此解決辦法是定義個dubbo consumer filter,每次dubbo請求就會保存用戶信息到當前dubbo隱式參數,如此解決。
4.2.FutureFilter
這個是個鈎子,用於用戶擴展,在invoker調用前后執行consumer端定義的邏輯。具體使用方法如下:
定義個用戶bean,如下
@Component(value="mynotify")
public class Notify {
public void oninvoke(String msg){
System.out.println("oninvoke:" + msg);
}
public void onreturn(String msg) {
System.out.println("onreturn:" + msg);
}
public void onthrow(Throwable e) {
System.out.println("onthrow:" + e);
}
}
在引用的dubbo服務上加@Method注解
@Reference( methods= {@Method(name="findProduct",onreturn="mynotify.onreturn")})
private ProductService productService;
含義是在請求調用執行后,執行bean mynotify的onreturn方法。不過我在測試時候,發現dubbo2.6.8 FutureFilter對於執行這樣的鈎子有bug,具體bug是在MethodConfig的構造器內,沒有對method進行按.進行截取賦值,報錯異常如下
java.lang.IllegalStateException: service:org.pangu.api.ProductService has a onreturn callback config , but no such method found. url:dubbo://192.168.5.1:20880/org.pangu.api.ProductService?anyhost=true&application=pangu-client-consumer&bean.name=ServiceBean:org.pangu.api.ProductService&check=false&default.blueGreenTag=green&default.tps=1000&default.tps.interval=2000&dubbo=2.0.2&findProduct.return=true&generic=false&interface=org.pangu.api.ProductService&methods=findProduct,selectProduct&pid=17972&qos.enable=true®ister.ip=192.168.5.1&remote.timestamp=1627181280321&retries=0&revision=1.0-SNAPSHOT&side=consumer&timeout=900000000×tamp=1627211036576&weight=100
at com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.fireReturnCallback(FutureFilter.java:137) ~[dubbo-2.6.8.jar:2.6.8]
at com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.syncCallback(FutureFilter.java:67) ~[dubbo-2.6.8.jar:2.6.8]
at com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:58) ~[dubbo-2.6.8.jar:2.6.8]
錯誤原因:
對MethodConfig改了后(重寫同名的MethodConfig,設置method),又發現啟動報錯,這功能個bug有點多,就不再改了。
Caused by: java.lang.IllegalStateException: java.lang.NoSuchMethodException: No such method onreturn in class class java.lang.String
at com.alibaba.dubbo.config.ReferenceConfig.getMethodByName(ReferenceConfig.java:147) ~[dubbo-2.6.8.jar:2.6.8]
at com.alibaba.dubbo.config.ReferenceConfig.checkAndConvertImplicitConfig(ReferenceConfig.java:127) ~[dubbo-2.6.8.jar:2.6.8]
at com.alibaba.dubbo.config.ReferenceConfig.init(ReferenceConfig.java:321) ~[dubbo-2.6.8.jar:2.6.8]
通過GitHub查看dubbo2.7代碼,發現此bug已修復。
4.3.GenericImplFilter
consumer端是泛化調用才啟用。和provider端的GenericFilter配合使用。后續泛型時候再寫。
4.4.ActiveLimitFilter
默認不啟用。和provider端的ExecuteLimitFilter功能類似,ActiveLimitFilter是消費端的filter,限制客戶端的並發數,即限制方法在每個客戶端的並發執行數(或占用連接的請求數)不能超過配置的actives。
具體邏輯:如果達到限流閾值,和provider端的ExecuteLimitFilter不同,並不是直接拋出異常,而是先等待直到超時,因為請求是有timeout屬性的。當並發數達到閾值時,會先加鎖搶占當前接口方法的RpcStatus對象,然后通過wait方法進行等待。此時有兩種結果:第一種是某個Invoker在調用結束后,並把計數器原子-1並觸發此RpcStatus對象的notify,會有一個在wait狀態的線程被喚醒並繼續執行invoker調用;第二種是wait等待超時都未被喚醒,此時直接拋出異常。具體代碼如下
//com.alibaba.dubbo.rpc.filter.ActiveLimitFilter.invoke(Invoker<?>, Invocation)
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (max > 0) {
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
int active = count.getActive();
if (active >= max) {//當前活躍數大於actives
synchronized (count) {//鎖的顆粒小,只加在此方法對應的RpcStatus上
while ((active = count.getActive()) >= max) {//這里又獲取一次是因為可能count.getActive()值已經小於max,因為在此過程可能有釋放。使用while是因為被喚醒
try {
count.wait(remain);//在當前方法對應的RpcStatus上等待,直至喚醒或者超時
} catch (InterruptedException e) {
}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {//在timeout時間內獲取不到,拋出異常
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + active
+ ". max concurrent invoke limit: " + max);
}
}
}
}
}
try {
long begin = System.currentTimeMillis();
RpcStatus.beginCount(url, methodName);//active +1
try {
Result result = invoker.invoke(invocation);
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);//active -1
return result;
} catch (RuntimeException t) {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
throw t;
}
} finally {
if (max > 0) {
synchronized (count) {
count.notify();//喚醒等待在此RpcStatus對象上的線程
}
}
}
}
5.擴展dubbo filter
dubbo filter可能是我們使用dubbo過程中最經常需要定義的,通常做一些公共的處理,自定義dubbo filter步驟如下:
1.實現filter接口,加上自動激活注解@Activate和group、order
2.在META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter內增加xxx=自定義filter
擴展也是使用dubbo提供的SPI機制,擴展很簡單。