Dubbo之Filter 原理


 
一、使用示例
(1)創建一個XxxFilter,並實現com.alibaba.dubbo.rpc.Filter 這個類
public class MyDubboFilter implements Filter {
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        // before filter ...
        Result result = invoker.invoke(invocation);
        // after filter ...
        return result;
    }
}

 

(2)添加META-INF/dubbo/com.alibaba.dubbo.rpc.Filter 文件,並添加如下內容
myFilter=com.test.MyDubboFilter

 

(3)配置xml
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" filter="myFilter">
    <dubbo:method name="sayHello" retries="0"></dubbo:method>
</dubbo:reference>

 

配置方式
xml配置方式
<!-- 消費方調用過程攔截 -->
<dubbo:reference filter="xxx" />
<!-- 消費方調用過程缺省攔截器,將攔截所有reference -->
<dubbo:consumer filter="xxx"/>
<!-- 提供方調用過程攔截 -->
<dubbo:service filter="xxx" />
<!-- 提供方調用過程缺省攔截器,將攔截所有service -->
<dubbo:provider filter="xxx"/>

 

注解配置方式
@Activate(group = Constants.CONSUMER, order = -10000)
public class MyDubboFilter implements Filter {
}

 

 
 
二、源碼分析
通過調試我們來看一下它的實現機制
 
(1)Filter的加載順序問題
通過@Activate注解中的order屬性,Activate表示激活
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {   
}

@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {
}

@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})    // 指定消費者和生產者
public class MonitorFilter implements Filter {
}

將MyDubboFilter改成 -1000000,發現執行順序在ConsumerContextFilter之前了,如下圖所示

 
(2)原生的Filter
Dubbo原生的Filter很多,RpcContext,accesslog等功能都可以通過Dubbo來實現,下面我們來介紹一下Consumer端用於上下文傳遞的 ConsumerContextFilter
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        try {
            return invoker.invoke(invocation);
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }

}

此Filter記錄了調用過程中的狀態信息,並且通過invocation對象將客戶端設置的attachments參數傳遞到服務端。並且在調用完成后清除這些參數,這就是為什么請求狀態信息可以按次記錄並且進行傳遞。

 
Dubbo中已經實現的Filter大概有二十幾個,它們的入口都是ProtocolFilterWrapper,ProtocolFilterWrapper對Protocol做了Wrapper,會在加載擴展的時候被加載進來,下面我們來看下這個Filter鏈是如何構造的。
 
加載機制
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    // 獲取已經激活的Filter(調用鏈,這里的調用鏈是已經排好序的)
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (filters.size() > 0) {
        // 遍歷
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            // 典型的裝飾器模式,將invoker用filter逐層進行包裝
            last = new Invoker<T>() {

                public Class<T> getInterface() {
                    return invoker.getInterface();
                }

                public URL getUrl() {
                    return invoker.getUrl();
                }

                public boolean isAvailable() {
                    return invoker.isAvailable();
                }
                // 重點,每個filter在執行invoke方法時,會觸發其下級節點的invoke方法,最后一級節點即為最原始的服務
                public Result invoke(Invocation invocation) throws RpcException {
                    return filter.invoke(next, invocation);
                }

                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    return last;
}
// 服務端暴露服務
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}

// 客戶端引用服務
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}

通過上述代碼我們可以看到,在buildInvokerChain中,先獲取所有已經激活的調用鏈,這里的調用鏈是已經排好序的。再通過Invoker來構造出一個Filter的調用鏈,最后構建出的調用鏈大致可以表示為:Filter1->Filter2->Filter3->......->Invoker,下面我們來看一下,第一步中獲取已經激活的調用鏈的詳細流程

 
調用鏈流程
public List<T> getActivateExtension(URL url, String key, String group) {
    String value = url.getParameter(key);
    return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
}

public List<T> getActivateExtension(URL url, String[] values, String group) {
    List<T> exts = new ArrayList<T>();

    List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
    // 如果用戶配置的filter列表名稱中不包含-default,則加載標注了Activate注解的filter列表
    if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
        // 加載配置文件,獲取所有標注有Activate注解的類,存入cachedActivates中
        getExtensionClasses();
        for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
            String name = entry.getKey();
            Activate activate = entry.getValue();
            // Activate注解可以指定group,這里是看注解指定的group與我們要求的group是否匹配
            if (isMatchGroup(group, activate.group())) {
                T ext = getExtension(name);
                // 對於每一個dubbo中原生的filter,需要滿足以下3個條件才會被加載:
                // 1.用戶配置的filter列表中不包含該名稱的filter
                // 2.用戶配置的filter列表中不包含該名稱前加了"-"的filter
                // 3.該Activate注解被激活,具體激活條件隨后詳解                
                if (! names.contains(name) && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
                    && isActive(activate, url)) {
                    exts.add(ext);
                }
            }
        }
        // 排序
        Collections.sort(exts, ActivateComparator.COMPARATOR);
    }
    // 加載用戶在spring配置文件中配置的filter列表
    List<T> usrs = new ArrayList<T>();
    for (int i = 0; i < names.size(); i ++) {
        String name = names.get(i);
        if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
            && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
            if (Constants.DEFAULT_KEY.equals(name)) {
                if (usrs.size() > 0) {
                    exts.addAll(0, usrs);
                    usrs.clear();
                }
            } else {
                T ext = getExtension(name);
                usrs.add(ext);
            }
        }
    }
    if (usrs.size() > 0) {
        exts.addAll(usrs);
    }
    return exts;
}

通過以上代碼可以看到,用戶自己配置的Filter中,有些是默認激活,有些是需要通過配置文件來激活。而所有Filter的加載順序,也是先處理Dubbo的默認Filter,再來處理用戶自己定義並且配置的Filter。通過"-"配置,可以替換掉Dubbo的原生Filter,通過這樣的設計,可以靈活地替換或者修改Filter的加載順序。

 
總結:
filter被分為兩類,一類是標注了Activate注解的filter,包括dubbo原生的和用戶自定義的;一類是用戶在spring配置文件中手動注入的filter
對標注了Activate注解的filter,可以通過before、after和order屬性來控制它們之間的相對順序,還可以通過group來區分服務端和消費端
手動注入filter時,可以用default來代表所有標注了Activate注解的filter,以此來控制兩類filter之間的順序
手動注入filter時,可以在filter名稱前加一個"-"表示排除某一個filter,比如說如果配置了一個-default的filter,將不再加載所有標注了Activate注解的filter
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM