dubbo中的Filter鏈原理及應用


轉載:https://www.jianshu.com/p/f390bb88574d

filter在dubbo中的應用非常廣泛,它可以對服務端、消費端的調用過程進行攔截,從而對dubbo進行功能上的擴展,我們所熟知的RpcContext就用到了filter。本文主要嘗試從以下3個方面來簡單介紹一下dubbo中的filter:
1.filter鏈原理
2.自定義filter
3.使用filter透傳traceId

1.filter鏈原理

dubbo中filter鏈的入口在ProtocolFilterWrapper中,這是Protocol的一個包裝類,在其服務暴露和服務引用時都進行了構建filter鏈的工作。

// 構建filter鏈 private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; // 獲取可用的filter列表 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; // 典型的裝飾器模式,將invoker用filter逐層進行包裝 last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } // 重點,每個filter在執行invoke方法時,會觸發其下級節點的invoke方法,最后一級節點即為最原始的服務 public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } // 服務端暴露服務 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } // 客戶端引用服務 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); } 

可以看到,每一個filter節點都為原始的invoker服務增加了功能,是典型的裝飾器模式。構建filter鏈的核心在於filter列表的獲取,也就是這一行代碼:

List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); 

通過Filter的ExtendLoader實例獲取其激活的filter列表,getActivateExtension邏輯分為兩部分:
1.加載標注了Activate注解的filter列表
2.加載用戶在spring配置文件中手動注入的filter列表

public List<T> getActivateExtension(URL url, String key, String group) { // 根據key來獲取服務方/消費方自定義的filter列表 String value = url.getParameter(key); return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group); } public List<T> getActivateExtension(URL url, String[] values, String group) { List<T> exts = new ArrayList<T>(); List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values); // 如果用戶配置的filter列表名稱中不包含-default,則加載標注了Activate注解的filter列表 if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) { // 加載配置文件,獲取所有標注有Activate注解的類,存入cachedActivates中 getExtensionClasses(); for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) { String name = entry.getKey(); Activate activate = entry.getValue(); // Activate注解可以指定group,這里是看注解指定的group與我們要求的group是否匹配 if (isMatchGroup(group, activate.group())) { T ext = getExtension(name); // 對於每一個dubbo中原生的filter,需要滿足以下3個條件才會被加載: // 1.用戶配置的filter列表中不包含該名稱的filter // 2.用戶配置的filter列表中不包含該名稱前加了"-"的filter // 3.該Activate注解被激活,具體激活條件隨后詳解 if (!names.contains(name) && !names.contains(Constants.REMOVE_VALUE_PREFIX + name) && isActive(activate, url)) { exts.add(ext); } } } // 對加載的dubbo原生的filter列表進行排序,ActivateComparator排序器會根據Activate注解的before、after、order屬性對filter列表排序 Collections.sort(exts, ActivateComparator.COMPARATOR); } // 加載用戶在spring配置文件中配置的filter列表 List<T> usrs = new ArrayList<T>(); for (int i = 0; i < names.size(); i++) { String name = names.get(i); // 針對用戶配置的每一個filter,需要滿足以下兩個條件才會被加載: // 1.名稱不是以"-"開頭 // 2.用戶配置的所有filter列表中不包含-name的filter if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX) && !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) { // 用戶自己配置filter列表時,可以使用default的key來代表dubbo原生的filter列表,這樣一來就可以控制dubbo原生filter列表和用戶自定義filter列表之間的相對順序 if (Constants.DEFAULT_KEY.equals(name)) { if (!usrs.isEmpty()) { exts.addAll(0, usrs); usrs.clear(); } } else { T ext = getExtension(name); usrs.add(ext); } } } if (!usrs.isEmpty()) { exts.addAll(usrs); } return exts; } 

判斷Activate注解是否被激活的邏輯是這樣的:

private boolean isActive(Activate activate, URL url) { // 如果注解沒有配置value屬性,則一定是激活的 String[] keys = activate.value(); if (keys == null || keys.length == 0) { return true; } // 對配置了value屬性的注解,如果服務的url屬性中存在與value屬性值相匹配的屬性且改屬性值不為空,則該注解也是激活的 for (String key : keys) { for (Map.Entry<String, String> entry : url.getParameters().entrySet()) { String k = entry.getKey(); String v = entry.getValue(); if ((k.equals(key) || k.endsWith("." + key)) && ConfigUtils.isNotEmpty(v)) { return true; } } } return false; } 

ActivateComparator比較器的規則如下,總結起來有這么幾條規則:
1.before指定的過濾器,該過濾器將在這些指定的過濾器之前執行
2.after指定的過濾器,該過濾器將在這些指定的過濾器之后執行
3.order數值越小,越先執行
4.order數值相等的條件下,順序將依賴於兩個filter的加載順序
5.before/after的優先級高於order

public class ActivateComparator implements Comparator<Object> { public static final Comparator<Object> COMPARATOR = new ActivateComparator(); public int compare(Object o1, Object o2) { if (o1 == null && o2 == null) { return 0; } if (o1 == null) { return -1; } if (o2 == null) { return 1; } if (o1.equals(o2)) { return 0; } // 配置了before/after屬性時,按照規則1、2進行排序,比較完直接返回,此時指定的order值將被忽略 Activate a1 = o1.getClass().getAnnotation(Activate.class); Activate a2 = o2.getClass().getAnnotation(Activate.class); if ((a1.before().length > 0 || a1.after().length > 0 || a2.before().length > 0 || a2.after().length > 0) && o1.getClass().getInterfaces().length > 0 && o1.getClass().getInterfaces()[0].isAnnotationPresent(SPI.class)) { ExtensionLoader<?> extensionLoader = ExtensionLoader.getExtensionLoader(o1.getClass().getInterfaces()[0]); if (a1.before().length > 0 || a1.after().length > 0) { String n2 = extensionLoader.getExtensionName(o2.getClass()); for (String before : a1.before()) { if (before.equals(n2)) { return -1; } } for (String after : a1.after()) { if (after.equals(n2)) { return 1; } } } if (a2.before().length > 0 || a2.after().length > 0) { String n1 = extensionLoader.getExtensionName(o1.getClass()); for (String before : a2.before()) { if (before.equals(n1)) { return 1; } } for (String after : a2.after()) { if (after.equals(n1)) { return -1; } } } } // 沒有配置before/after的條件下,按照規則3、4進行排序 int n1 = a1 == null ? 0 : a1.order(); int n2 = a2 == null ? 0 : a2.order(); // never return 0 even if n1 equals n2, otherwise, o1 and o2 will override each other in collection like HashSet return n1 > n2 ? 1 : -1; } } 

分析上面的分析,可以發現dubbo在構建filter鏈時非常靈活,有幾個關鍵點在這里做一下總結:

  • filter被分為兩類,一類是標注了Activate注解的filter,包括dubbo原生的和用戶自定義的;一類是用戶在spring配置文件中手動注入的filter
  • 對標注了Activate注解的filter,可以通過before、after和order屬性來控制它們之間的相對順序,還可以通過group來區分服務端和消費端
  • 手動注入filter時,可以用default來代表所有標注了Activate注解的filter,以此來控制兩類filter之間的順序
  • 手動注入filter時,可以在filter名稱前加一個"-"表示排除某一個filter,比如說如果配置了一個-default的filter,將不再加載所有標注了Activate注解的filter
2.自定義filter

自定義filter非常簡單,只需要實現Filter接口即可,對於Filter的某一個具體實現,有兩種方式可以在構建filter鏈時將其包含進去,但無論哪種方式,都需要在Filter對應的SPI文件中進行相應的配置
1.通過標注Activate注解來實現

@Activate(group = Constants.PROVIDER) public class ProviderFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { System.out.println("=== provider ==="); return invoker.invoke(invocation); } } 

2.在spring配置文件中通過配置filter屬性來實現

<dubbo:service interface="com.alibaba.dubbo.demo.TraceIdService" ref="traceIdService" filter="providerFilter"/> 

這兩種方式除了該filter在filter鏈中的順序不同外,其它地方都是等價的。當然,按照上面的分析,順序也是可以按照我們的要求來靈活控制的。

3.利用filter實現traceId透傳

在微服務場景下,一次調用過程常常會涉及多個應用,在定位問題時,往往需要在多個應用中查看某一次調用鏈路上的日志,為了達到這個目的,一種常見的做法是在調用入口處生成一個traceId,並基於RpcContext來實現traceId的透傳。

在開始進一步的嘗試之前,我們不妨先來看看兩個filter,大致了解下RpcContext是怎么實現traceId透傳的。
客戶端的ConsumerContextFilter:

@Activate(group = Constants.CONSUMER, order = -10000) public class ConsumerContextFilter implements Filter { public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { RpcContext.getContext() .setInvoker(invoker) .setInvocation(invocation) .setLocalAddress(NetUtils.getLocalHost(), 0) .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()); if (invocation instanceof RpcInvocation) { ((RpcInvocation) invocation).setInvoker(invoker); } try { return invoker.invoke(invocation); } finally { RpcContext.getContext().clearAttachments(); } } } 

服務端的ContextFilter:

@Activate(group = Constants.PROVIDER, order = -10000) public class ContextFilter implements Filter { public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { Map<String, String> attachments = invocation.getAttachments(); if (attachments != null) { attachments = new HashMap<String, String>(attachments); attachments.remove(Constants.PATH_KEY); attachments.remove(Constants.GROUP_KEY); attachments.remove(Constants.VERSION_KEY); attachments.remove(Constants.DUBBO_VERSION_KEY); attachments.remove(Constants.TOKEN_KEY); attachments.remove(Constants.TIMEOUT_KEY); attachments.remove(Constants.ASYNC_KEY);// Remove async property to avoid being passed to the following invoke chain. } RpcContext.getContext() .setInvoker(invoker) .setInvocation(invocation) // .setAttachments(attachments) // merged from dubbox .setLocalAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()); // mreged from dubbox // we may already added some attachments into RpcContext before this filter (e.g. in rest protocol) if (attachments != null) { if (RpcContext.getContext().getAttachments() != null) { RpcContext.getContext().getAttachments().putAll(attachments); } else { RpcContext.getContext().setAttachments(attachments); } } if (invocation instanceof RpcInvocation) { ((RpcInvocation) invocation).setInvoker(invoker); } try { return invoker.invoke(invocation); } finally { RpcContext.removeContext(); } } } 

通過這兩個filter不難發現,之所以利用RpcContext可以實現traceId的透傳,是因為invocation的存在,客戶端在調用invoke方法的時候,會將當前調用的參數載體invocation透傳給服務端,而服務端會從其中取出attachments屬性進行相關處理后在重新設置到invocation中向后傳遞,因此只需要在客戶端將traceId設置到attachments中即可。

於是我們開始以下嘗試:
服務端接口:

public interface TraceIdService { void test(String key); } 

服務端實現:

public class TraceIdServiceImpl implements TraceIdService { @Override public void test(String key) { String traceId = RpcContext.getContext().getAttachment("traceId"); System.out.println("key = " + key + ", traceId = " + traceId); } } 

客戶端代碼:

public class TraceIdConsumer { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"}); context.start(); TraceIdService traceIdService = (TraceIdService) context.getBean("traceIdService"); // get remote service proxy RpcContext.getContext().setAttachment("traceId", String.valueOf(System.currentTimeMillis())); System.out.println(RpcContext.getContext().getAttachments()); traceIdService.test("1"); System.out.println(RpcContext.getContext().getAttachments()); traceIdService.test("2"); } } 

以上代碼的執行結果如下:

客戶端輸出:
{traceId=1538746615202} {} 服務端輸出: key = 1, traceId = 1538746615202 key = 2, traceId = null 

我們發現,在第一次調用中,traceId確實從客戶端透傳到了服務端,但是在第二次調用時神奇的消失了!而這正是filter搗的鬼。在ConsumerContextFilter的finally子句中,我們發現attachments對象被清空了,而在服務端ContextFilter中,整個context對象都被清空了!!!

為了解決這個問題,我們需要在每次調用前都重新設置下attachments對象,也就是在客戶端給調用鏈新增一個設置attachments對象的功能。前面我們說過,dubbo中每一個filter節點都為原始的invoker服務增加了功能,是典型的裝飾器模式。看到這里你想到了什么?是的,沒錯。我們可以新增一個filter來完成這一功能。

@Activate(group = Constants.CONSUMER) public class TraceIdFilter implements Filter { @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { String traceId = String.valueOf(System.currentTimeMillis()); RpcContext.getContext().setAttachment("traceId", traceId); System.out.println("traceId = " + traceId); return invoker.invoke(invocation); } } 

此時在客戶端中注釋小設置attachments的代碼,再次執行代碼的輸出如下,此時兩次調用,traceId都可以正確地從客戶端傳遞到服務端,完美؏؏☝ᖗ乛◡乛ᖘ☝؏؏

客戶端輸出:
traceId = 1538749616953
traceId = 1538749617199

服務端輸出:
key = 1, traceId = 1538749616953
key = 2, traceId = 1538749617199



作者:shysheng
鏈接:https://www.jianshu.com/p/f390bb88574d
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM