dubbo系列五、dubbo過濾器


dubbo過濾器

1.前言

dubbo filter的作用和web filter的作用是一樣的,在真正調用前做一些公共的處理。這也就是在重要的過程上設置攔截接口,提供擴展供業務實現。

dubbo過濾器是整個dubbo框架中非常重要的組成部分,dubbo中許多重要功能都是基於過濾器擴展而來。過濾器提供了provider和consumer調用過程的攔截,

即每次RPC調用的時候,對應的過濾器都會生效。雖然過濾器功能強大,但由於每次調用都會執行,因此在使用的時候需要注意它對性能的影響。

dubbo filter分為comsumer和provider端兩種,我們開發中對dubbo經常擴展的也是filter,因此系統的記錄下。

dubbo filter的接口是com.alibaba.dubbo.rpc.Filter,是個SPI(擴展點),僅提供了一個invoker方法,用於rpc調用前后的功能增強。其中dubbo提供的內置filter實現如下:

provider端:AccessLogFilter、ClassloaderFilter、ContextFilter、ExecuteLimitFilter、ExceptionFilter、EchoFilter、TimeoutFilter、GenericFilter、TokenFilter、TpsFilter、TraceFilter、MonitorFilter

consumer端:ConsumerContextFilter、GenericImplFilter、ActiveLimitFilter、DeprecatedFilter、FutureFilter、MonitorFilter

以上內置filter都使用了@Activate注解,默認被激活。provider端的filter只在provider端服務啟動時加入到filter chain;consumer端的filter只在consumer端服務啟動時加入到filter chain;其中MonitorFilter特殊,它屬於consumer和provider端,那么同時會在服務暴露和引用時候被加入到filter chain。

對dubbo filter有了整體了解,下面問題來了,filter chain是如何生成的?

2.dubbo filter chain如何生成

2.1.provider端filter chain生成

provider服務在暴露時候,在com.alibaba.dubbo.registry.integration.RegistryProtocol.export(Invoker<T>) -> com.alibaba.dubbo.registry.integration.RegistryProtocol.doLocalExport(Invoker<T>) 內進行服務暴露,關鍵是

image-20210718171601843

自適應對象Protocol$Adaptive根據URL上的協議dubbo獲取對應的DubboProtocol,但是由於Protocol有三個Wrapper實現,具體是ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper,那么在自適應對象獲取com.alibaba.dubbo.rpc.Protocol extension =(com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); //extName是dubbo,實際上獲取的是Wrapper類(dubbo SPI的依賴注入功能,即類似AOP),Wrapper類封裝了最終的DubboProtocol。即最終這個Protocol對象的結構是QosProtocolWrapper->ProtocolListenerWrapper->ProtocolFilterWrapper->DubboProtocol。wrapper類的作用就是增強,這個是裝飾模式。在這里就是使用ProtocolFilterWrapper對DubboProtocol進行了增強,在DubboProtocl生成Invoker后,對Invoker功能增強生成filter chain。代碼如下

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
//invoker參數即服務端使用javaassit生成的動態代理對象
//buildInvokerChain生成filter chain,對服務進行功能增強

provider端生成的默認filter chain [EchoFilter->ClassLoaderFilter->GenericFilter->ContextFilter->TraceFilter->TimeoutFilter->MonitorFilter->ExceptionFilter]

2.2.consumer端filter chain生成

具體是在

com.alibaba.dubbo.registry.integration.RegistryProtocol.refer(Class<T>, URL)
com.alibaba.dubbo.registry.integration.RegistryProtocol.doRefer(Cluster, Registry, Class<T>, URL)
com.alibaba.dubbo.registry.integration.RegistryDirectory.subscribe(URL)
com.alibaba.dubbo.registry.integration.RegistryDirectory.notify(List<URL>)
com.alibaba.dubbo.registry.integration.RegistryDirectory.refreshInvoker(List<URL>)
com.alibaba.dubbo.registry.integration.RegistryDirectory.toInvokers(List<URL>)

自適應對象Protocol$Adaptive引用,具體如下圖

image-20210718173816425

和provider端一樣,獲取的Protoco對象結構是QosProtocolWrapper->ProtocolListenerWrapper->ProtocolFilterWrapper->DubboProtocol,在DubboProtocl生成Invoker后,ProtocolFilterWrapper對Invoker功能增強生成filter chain。代碼如下

@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
//protocol.refer(type, url)即DubboProtocol.refer生成Invoker引用,即DubboInvoker
//buildInvokerChain對DubboInvoker進行功能增強,增加filter chain

對於conusmer端,默認生成的filter chain [ConsumerContextFilter->FutureFilter->MonitorFilter]

如果consumer端是泛化,默認生成的filter chain [ConsumerContextFilter->FutureFilter->MonitorFilter->GenericImplFilter]

2.3.buildInvokerChain

provider和consumer端生成filter chain都是使用的buildInvokerChain,因此分析下

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {
				//其它忽略
                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }
            };
        }
    }
    return last;
}

代碼核心在於List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);,dubbo SPI機制獲取filter的實現,並且激活對應的provider或consumer端的filter集合(並且根據Filter上的order進行了排序),然后組成filter chain。這個明白了SPI機制和自動激活就很簡單。

3.provider端filter記錄

3.1.AccessLogFilter

AccessLogFilter:默認不啟用,只有配置了dubbo.provider.accesslog="true/default/custom-access.log"才生效。具體作用是把dubbo日志輸出到應用本身的日志組件(比如logback)或者指定的文件。實現原理:如果是輸出到應用本身的日志組件,則直接info打印;如果是輸出到指定目錄文件,則把要寫入到日志內容暫存到map,然后使用schedule線程池從map獲取從而寫入文件。

3.2.ExecuteLimitFilter

ExecuteLimitFilter:默認不啟用,只有在配置了@Service(executes=10)或者@Method(executes=10)時候才生效,分表是針對接口級別和方法級別。具體作用是限制每個方法的並發執行數(即占用線程池線程數)不能超過10。在不設置或者設置了小於等於0的數值,是不會進行限制。實現原理是使用ConcurrentHashMap為每個url緩存了RpcStatus,RpcStatus又封裝了信號量Semaphore,具體就是請求先獲取信號量,信號量消耗完,拒絕執行。具體代碼中,個人認為RpcStatus.beginCount(url, methodName); 和 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);這兩行代碼應該是無用的(可能是歷史遺留,因為歷史代碼使用原子性進行控制並發數),可以直接廢棄掉。因為通過url作為key,method作為分組獲取RpcStatus,也就是獲取了信號量,沒必要再使用原子了。

3.3.ClassLoaderFilter

ClassLoaderFilter:默認啟用,此filter的作用就是切換當前線程的類加載器為Invoker的類加載器,用完還原回去。為什么這么做呢?java的類加載器和雙親委派模型我們通常都知道,類的正常加載也都是按照雙親委派模型進行加載的。但是有些時候需要破壞雙親委派模型,比如jdk的spi機制、當前線程的類加載器,就可以破壞雙親委派模型。那么為了避免當前框架的類加載器可能和Invoker的類加載器不是同一個(用戶可能對框架進行了擴展,比如自定義了一個filter,在當前線程使用了自定義的類加載器),而當前框架線程中又需要獲取Invoker的類加載器中的一些class,為了避免(class存在但實際)出現ClassNotFoundException,此時需要設置Thread.currentThread().setContextClassLoader(invoker.getInterface().getClassLoader()),即設置當前線程的類加載器為Invoker的類加載器。本質是為了避免當前線程和Invoker的類加載(可能)不同而做的兼容。

常見的使用例子是DubboProtocol#optimizeSerialization方法,會根據Invoker中配置的optimizer參數獲取擴展的自定義序列化處理類,這些外部引入的序列化類在框架的類加載器中肯定沒有,因此需要使用Invoker的類加載器獲取對應的類。

3.4.ContextFilter

ContextFilter:默認啟用,統一在過濾器中處理請求的上下文信息RpcContext,比如設置當前請求的上下文如Invoker信息、地址信息、端口信息等,清除異步屬性,防止傳到過濾器鏈的下一個環節等。

3.5.ExceptionFilter

ExceptionFilter:默認啟用,dubbo框架的全局異常處理器,有個缺點,我們項目中很多情況會拋出一些自定義異常來替代返回錯誤碼,但是ExceptionFilter會將我們的異常信息進行包裝,因此通常情況下我們也會重寫ExceptionFilter,從而禁用原生ExceptionFilter。當然也可以把項目組的異常定義為extends RpcException,這樣項目異常也會直接拋出(這樣也不規范,自己項目的異常怎么能是RpcContext)。

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    try {
        Result result = invoker.invoke(invocation);
        if (result.hasException() && GenericService.class != invoker.getInterface()) {//代碼@1
            try {
                Throwable exception = result.getException();

                // directly throw if it's checked exception
                if (!(exception instanceof RuntimeException) && (exception instanceof Exception)) {//代碼@2
                    return result;
                }
                // directly throw if the exception appears in the signature
                try {//代碼@3-start
                    Method method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
                    Class<?>[] exceptionClassses = method.getExceptionTypes();
                    for (Class<?> exceptionClass : exceptionClassses) {
                        if (exception.getClass().equals(exceptionClass)) {
                            return result;
                        }
                    }
                } catch (NoSuchMethodException e) {
                    return result;
                }//代碼@3-end

                // for the exception not found in method's signature, print ERROR message in server's log.
                logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                             + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                             + ", exception: " + exception.getClass().getName() + ": " + exception.getMessage(), exception);

                // directly throw if exception class and interface class are in the same jar file.
                String serviceFile = ReflectUtils.getCodeBase(invoker.getInterface());//代碼@4-start
                String exceptionFile = ReflectUtils.getCodeBase(exception.getClass());
                if (serviceFile == null || exceptionFile == null || serviceFile.equals(exceptionFile)) {
                    return result;
                }//代碼@4-end
                // directly throw if it's JDK exception
                String className = exception.getClass().getName();
                if (className.startsWith("java.") || className.startsWith("javax.")) {//代碼@5
                    return result;
                }
                // directly throw if it's dubbo exception
                if (exception instanceof RpcException) {//代碼@6
                    return result;
                }

                // otherwise, wrap with RuntimeException and throw back to the client
                return new RpcResult(new RuntimeException(StringUtils.toString(exception)));//代碼@7
            } catch (Throwable e) {
                logger.warn("Fail to ExceptionFilter when called by " + RpcContext.getContext().getRemoteHost()
                            + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                            + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
                return result;//處理invoker異常又出現異常,直接返回
            }
        }
        return result;
    } catch (RuntimeException e) {//invoke調用異常,直接拋出異常
        logger.error("Got unchecked and undeclared exception which called by " + RpcContext.getContext().getRemoteHost()
                     + ". service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName()
                     + ", exception: " + e.getClass().getName() + ": " + e.getMessage(), e);
        throw e;
    }
}

代碼@1:如果有異常並且未實現GenericService接口,進入后續判斷邏輯,否則直接返回結果。

代碼@2:不是RuntimeException類型的異常,並且是受檢異常(繼承Exception),直接返回結果。

代碼@3: 在方法簽名上有聲明這個異常,直接返回結果。就是接口方法上直接throws 異常,且調用返回的異常也是聲明的異常,直接返回。

代碼@4:如果異常類和接口類在同一個jar包中,直接返回結果。

代碼@5:以java.或javax.開頭的異常直接返回結果。

代碼@6:dubbo的自身異常RpcException,直接返回結果。

代碼@7:不滿足上述條件,會做toString處理並被封裝成RuntimeException拋出。這樣就吞了業務定義的異常

如何解決

解決方法就針對上面的幾個條件進行,有幾種方案,我們一一看一下:

1、將該異常的包名以java.或者javax.開頭

​ 這個方案不現實,也不符合規范,所以不采用

2、業務異常繼承Exception,變為checked異常

​ 自定義的業務異常本身屬於RuntimeException,所以也不采用

3、異常類和接口類在同一jar包里

​ 較大的項目一般都會有一些common包,定義好異常類型,使用二方包的方式引用,所以也不適用

4、provider的api明確寫明throws XxxException

​ 作為生產服務端,不應顯式拋出異常給客戶的進行處理,所以也不適用

最終方案

​ 重寫ExceptionFilter,並且禁用原生的ExceptionFilter。

比如在自定義的DubboExceptionFilter內加上className.startsWith("xxx.xxx"),表明是我們項目的異常,直接return result。其它內容和ExceptionFilter保持相同。同時我們應該在加上配置 全局禁用dubbo ExceptionFilter:dubbo.provider.filter=-exception,自定義的DubboExceptionFilter如下

@Activate(group = Constants.PROVIDER)
public class DubboExceptionFilter implements Filter {
	//省略
}

備注:禁用dubbo所有的默認filter,配置如下 dubbo.provider.filter=-default

3.6.TimeoutFilter

TimeoutFilter:默認啟用,記錄每個Invoker的調用耗時,如果超過了設置的timeout時間,則會打印一條warn日志,此外別無其他功能。監控可以收集這個日志用於警告。

3.7.TokenFilter

TokenFilter:默認不啟用,如果provider不想讓consumer繞過注冊中心直連自己,則可以使用令牌驗證。原理是provider服務生成token並暴露到zk,消費者必須通過注冊中心才能獲取有令牌provider的url。TokenFilter就是provider端用於過濾consumer的調用,禁用無token的consumer調用。

配置token如下

全局設置 dubbo.provider.token=true 隨機token,uuid生成。dubbo.provider.token=123456 固定token

服務級別設置 dubbo.service.token=true 隨機token,uuid生成。dubbo.service.token=123456 固定token

協議級別設置 dubbo.protocol.token=true 隨機token,uuid生成。dubbo.protocol.token=123456 固定token

通常進行認證,級別都是全局設置。

3.8.TpsLimitFilter

TpsLimitFilter:默認不啟用,用於provider端限流。在dubbo的SPI com.alibaba.dubbo.rpc.Filter文件內沒有這個配置,如果需要使用,需要如下配置:

resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter內增加tpsLimiter=com.alibaba.dubbo.rpc.filter.TpsLimitFilter,接着在屬性文件內配置

dubbo.provider.parameters.tps=1000 每次發放1000個令牌

dubbo.provider.parameters.tps.interval=2000 令牌刷新間隔時間是2s,默認是60s

這樣實際就生效。

主要邏輯在com.alibaba.dubbo.rpc.filter.tps.StatItem.isAllowable()內,設計的簡單又好用,工作中可以直接參考使用。代碼如下

//com.alibaba.dubbo.rpc.filter.tps.StatItem.isAllowable()
public boolean isAllowable() {
    long now = System.currentTimeMillis();
    if (now > lastResetTime + interval) {//當前時間戳大於上次重置時間戳+間隔時間,說明到了下一周期,又可以獲取1000個令牌
        token.set(rate);//因此重置令牌數。因為令牌數隨着使用遞減為0
        lastResetTime = now;
    }

    int value = token.get();//獲取令牌值
    boolean flag = false;
    while (value > 0 && !flag) {//使用當前令牌值和flag控制循環進入
        flag = token.compareAndSet(value, value - 1);//令牌減一
        value = token.get();
    }

    return flag;//返回獲取令牌的結果。如果在2s內已經獲取了1000個令牌,則結果false,不允許請求
}

3.9.EchoFilter

EchoFilter:默認啟用,用於回聲測試。所謂回聲就是返回請求的數據。代碼具體很簡單,判斷請求方法是$echo且只有一個參數,則把請求的數據返回。具體對應consumer端生成的Invoker進行了動態代理增強,增加了EchoSerivce接口。具體是consumer端在com.alibaba.dubbo.config.ReferenceConfig.createProxy(Map<String, String>)生成了Invoker對象,然后接着return (T) proxyFactory.getProxy(invoker);,對生成的Invoker對象進行動態代理給Invoker增加EchoSerivce接口。具體代碼在

image-20210720232357036

這樣就可以在consumer端引用的服務,強制轉換為EchoService類型了。具體作用是如果需要探測 Dubbo 服務能否通,但又不想用啟動時檢查的方式,那么調用方只需要把其中的一個服務引用強轉為 EchoService 然后$echo調用即可。

3.10.GenericFilter

GenericFilter:默認啟用,用於處理泛型調用,后續泛型時候寫。

3.11.TraceFilter

TraceFilter:默認啟用,真實情況是,只有在通過運維命令 trace $服務名 后才會真正觸發這個filter的執行。

比如我們執行telnet 192.168.1.155 20880 連接上dubbo服務,執行trace ProductService,這樣ProductService服務的調用就會被加入TraceFilter的統計(TraceFilter#addTracer操作),dubbo官方對trace的介紹:

image-20210720235808855

trace 大體的實現流程,telnet 命令會被dubbo handler 單讀處理,然后將trace命令解析出來,交給TraceTelnetHandler ,然后它經過一頓操作,找到接口暴露的invoker,然后再向TraceFilter中添加這個tracer,其實就是將要監聽接口名,方法名,監聽次數,還有channel 給TraceFilter,接下來就是調用的時候收集 執行時間,將執行時間 找到對應的channel 然后發送給telnet客戶端。

3.12.MonitorFilter

MonitorFilter:默認啟用,用於監控收集,dubbo默認的Monitor太簡陋,工作中需要進行統一收集監控信息展示,因此可以自定義Monitor,使用dubbo spi機制擴展即可。

4.consumer端filter記錄

4.1.ConsumerContextFilter

默認啟用。ConsumerContextFilter會和ContextFilter配合使用,在微服務環境中,有許多鏈式調用,如A->B->C->D,收到請求時,當前節點被看作一個provider,由ContextFilter社長上下文,當發起請求到下一個服務時候,當前服務變為一個consumer,由ConsumerContextFilter設置上下文。具體邏輯:invoker調用前,設置當前請求上下文,進行invoker調用,invoker調用完畢后,清除當前線程的上下文信息,防止再次調用被誤用。

之前遇到一個問題:A應用是web服務(dubbo consumer),B應用是dubbo服務(dubbo provider),A調用B會把用戶信息放入隱式參數,這樣B應用接收到就可以正常處理。為了方便,用戶的信息由A應用的web filter統一設置,用着沒有問題。但是,在A應用一次請求內連續多次訪問B應用時候,發現第二次請求B應用,就提示用戶信息不存在,原因就是ConsumerContextFilter在第一次后清除了當前上下文的用戶信息。因此解決辦法是定義個dubbo consumer filter,每次dubbo請求就會保存用戶信息到當前dubbo隱式參數,如此解決。

4.2.FutureFilter

這個是個鈎子,用於用戶擴展,在invoker調用前后執行consumer端定義的邏輯。具體使用方法如下:

定義個用戶bean,如下

@Component(value="mynotify")
public class Notify {
    public void oninvoke(String msg){
        System.out.println("oninvoke:" + msg);
    }
    public void onreturn(String msg) {
        System.out.println("onreturn:" + msg);
    }
    public void onthrow(Throwable e) {
        System.out.println("onthrow:" + e);
    }
}

在引用的dubbo服務上加@Method注解

@Reference( methods= {@Method(name="findProduct",onreturn="mynotify.onreturn")})
private ProductService productService;

含義是在請求調用執行后,執行bean mynotify的onreturn方法。不過我在測試時候,發現dubbo2.6.8 FutureFilter對於執行這樣的鈎子有bug,具體bug是在MethodConfig的構造器內,沒有對method進行按.進行截取賦值,報錯異常如下

java.lang.IllegalStateException: service:org.pangu.api.ProductService has a onreturn callback config , but no such method found. url:dubbo://192.168.5.1:20880/org.pangu.api.ProductService?anyhost=true&application=pangu-client-consumer&bean.name=ServiceBean:org.pangu.api.ProductService&check=false&default.blueGreenTag=green&default.tps=1000&default.tps.interval=2000&dubbo=2.0.2&findProduct.return=true&generic=false&interface=org.pangu.api.ProductService&methods=findProduct,selectProduct&pid=17972&qos.enable=true&register.ip=192.168.5.1&remote.timestamp=1627181280321&retries=0&revision=1.0-SNAPSHOT&side=consumer&timeout=900000000&timestamp=1627211036576&weight=100
	at com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.fireReturnCallback(FutureFilter.java:137) ~[dubbo-2.6.8.jar:2.6.8]
	at com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.syncCallback(FutureFilter.java:67) ~[dubbo-2.6.8.jar:2.6.8]
	at com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter.invoke(FutureFilter.java:58) ~[dubbo-2.6.8.jar:2.6.8]

錯誤原因:

image-20210725191138113

對MethodConfig改了后(重寫同名的MethodConfig,設置method),又發現啟動報錯,這功能個bug有點多,就不再改了。

Caused by: java.lang.IllegalStateException: java.lang.NoSuchMethodException: No such method onreturn in class class java.lang.String
	at com.alibaba.dubbo.config.ReferenceConfig.getMethodByName(ReferenceConfig.java:147) ~[dubbo-2.6.8.jar:2.6.8]
	at com.alibaba.dubbo.config.ReferenceConfig.checkAndConvertImplicitConfig(ReferenceConfig.java:127) ~[dubbo-2.6.8.jar:2.6.8]
	at com.alibaba.dubbo.config.ReferenceConfig.init(ReferenceConfig.java:321) ~[dubbo-2.6.8.jar:2.6.8]

通過GitHub查看dubbo2.7代碼,發現此bug已修復。

4.3.GenericImplFilter

consumer端是泛化調用才啟用。和provider端的GenericFilter配合使用。后續泛型時候再寫。

4.4.ActiveLimitFilter

默認不啟用。和provider端的ExecuteLimitFilter功能類似,ActiveLimitFilter是消費端的filter,限制客戶端的並發數,即限制方法在每個客戶端的並發執行數(或占用連接的請求數)不能超過配置的actives。

具體邏輯:如果達到限流閾值,和provider端的ExecuteLimitFilter不同,並不是直接拋出異常,而是先等待直到超時,因為請求是有timeout屬性的。當並發數達到閾值時,會先加鎖搶占當前接口方法的RpcStatus對象,然后通過wait方法進行等待。此時有兩種結果:第一種是某個Invoker在調用結束后,並把計數器原子-1並觸發此RpcStatus對象的notify,會有一個在wait狀態的線程被喚醒並繼續執行invoker調用;第二種是wait等待超時都未被喚醒,此時直接拋出異常。具體代碼如下

//com.alibaba.dubbo.rpc.filter.ActiveLimitFilter.invoke(Invoker<?>, Invocation)
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    URL url = invoker.getUrl();
    String methodName = invocation.getMethodName();
    int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
    RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
    if (max > 0) {
        long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
        long start = System.currentTimeMillis();
        long remain = timeout;
        int active = count.getActive();
        if (active >= max) {//當前活躍數大於actives
            synchronized (count) {//鎖的顆粒小,只加在此方法對應的RpcStatus上
                while ((active = count.getActive()) >= max) {//這里又獲取一次是因為可能count.getActive()值已經小於max,因為在此過程可能有釋放。使用while是因為被喚醒
                    try {
                        count.wait(remain);//在當前方法對應的RpcStatus上等待,直至喚醒或者超時
                    } catch (InterruptedException e) {
                    }
                    long elapsed = System.currentTimeMillis() - start;
                    remain = timeout - elapsed;
                    if (remain <= 0) {//在timeout時間內獲取不到,拋出異常
                        throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                               + invoker.getInterface().getName() + ", method: "
                                               + invocation.getMethodName() + ", elapsed: " + elapsed
                                               + ", timeout: " + timeout + ". concurrent invokes: " + active
                                               + ". max concurrent invoke limit: " + max);
                    }
                }
            }
        }
    }
    try {
        long begin = System.currentTimeMillis();
        RpcStatus.beginCount(url, methodName);//active +1
        try {
            Result result = invoker.invoke(invocation);
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);//active -1
            return result;
        } catch (RuntimeException t) {
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
            throw t;
        }
    } finally {
        if (max > 0) {
            synchronized (count) {
                count.notify();//喚醒等待在此RpcStatus對象上的線程
            }
        }
    }
}

5.擴展dubbo filter

dubbo filter可能是我們使用dubbo過程中最經常需要定義的,通常做一些公共的處理,自定義dubbo filter步驟如下:

1.實現filter接口,加上自動激活注解@Activate和group、order

2.在META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter內增加xxx=自定義filter

擴展也是使用dubbo提供的SPI機制,擴展很簡單。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM