ServiceBean與ReferenceBean是dubbo與spring整合后的調用的核心類。
提供者在Spring中以ServiceBean的形式存在,消費者需要引用的服務由ReferenceBean構建,兩種Bean中記錄了dubbo調用的信息,在底層調用時
@EnableDubbo
SpringBoot項目如果需要開啟dubbo則必須配置該注解,該注解繼承了@DubboComponentScan,該注解會引入DubboComponentScanRegistrar,該Registrar會注冊ServiceAnnotationBeanPostProcessor以及ReferenceAnnotationBeanPostProcessor兩個后置處理器就是完成ServiceBean以及ReferenceBean創建的入口
ReferenceBean
第一步:獲取類中被@Reference注解修飾的方法或者屬性
ReferenceAnnotationBeanPostProcessor中有如下方法,其代碼相對簡單這里不展開敘述。
1 @Override 2 3 public void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName) { 4 5 if (beanType != null) { 6 7 // 獲取類中被@Reference注解引用的方法或者屬性並包裝成metadata 8 9 // 將metadata放入緩存 10 11 InjectionMetadata metadata = findReferenceMetadata(beanName, beanType, null); 12 13 metadata.checkConfigMembers(beanDefinition); 14 15 } 16 17 }
第二步:獲取將引用實例化並且注入到對應的屬性或方法中
ReferenceAnnotationBeanPostProcessor中有如下方法。
1 @Override 2 3 public PropertyValues postProcessPropertyValues( 4 5 PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException { 6 7 // 獲取類中被@Reference注解引用的方法或者屬性並包裝成metadata 8 9 // 由於在第一步中已經獲取完畢並且放入緩存,所以這里直接從緩存中就可以取到 10 11 InjectionMetadata metadata = findReferenceMetadata(beanName, bean.getClass(), pvs); 12 13 try { 14 15 // 向bean中注入引用實例 16 17 metadata.inject(bean, beanName, pvs); 18 19 } catch (BeanCreationException ex) { 20 21 throw ex; 22 23 } catch (Throwable ex) { 24 25 throw new BeanCreationException(beanName, "Injection of @Reference dependencies failed", ex); 26 27 } 28 29 return pvs; 30 31 }
我們順着metadata#inject方法調用鏈向下,最后會進入到ReferenceBean創建的流程。
第三步:創建ReferenceBean
ReferenceAnnotationBeanPostProcessor$ReferenceFieldElement中有如下方法。
1 @Override 2 3 protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable { 4 5 6 7 Class<?> referenceClass = field.getType(); 8 9 // 創建ReferenceBean 10 11 referenceBean = buildReferenceBean(reference, referenceClass); 12 13 14 15 ReflectionUtils.makeAccessible(field); 16 17 // 創建服務引用實例,並將實例注入到屬性中 18 19 field.set(bean, referenceBean.getObject()); 20 21 22 23 }
創建ReferenceBean中會將各種信息注冊到ReferenceBean中,這里不做介紹,主要是最后一行,ReferenceBean#getObject方法,該方法是真正實例化服務引用的方法。我們可以看到ReferenceBean並沒有作為Spring的bean存在,其只是用於實例化服務引用的工具類而已。
ps:dubbo2.7.0之前實例化引用不會是在最后一行這里,因為ReferenceBean繼承自了來自AbstractConfig的toString方法,該方法是利用反射原理,將所有方法名以get開頭的方法調用一遍,這里就會調用到getObject方法。而在buildReferenceBean方法中會日志會打印出構建好的ReferenceBean,導致提前實例化了服務對象。不過不影響結果,只是在debug時會造成疑惑。
第四步:實例化服務引用
ReferenceBean#getObject最終會調到ReferenceConfig#init方法,該方法中包含了完整的實例化流程。
1 private void init() { 2 3 // 省略其他,前面的所有代碼都是在初始化、構建屬性 4 5 // 實例化服務引用 6 7 // 看方法名就知道,實際的服務引用是一個動態生成的代理類 8 9 ref = createProxy(map); 10 11 } 12 13 14 15 // 協議,SPI,動態獲取自適應類 16 17 private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); 18 19 // 集群,SPI,動態獲取自適應類 20 21 private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension(); 22 23 // 代理生成工廠,SPI,動態獲取自適應類 24 25 private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); 26 27 28 29 private T createProxy(Map<String, String> map) { 30 31 // 省略前面的代碼 32 33 34 35 // 4.1 獲取invoker,invoker就是實際遠程調用工具 36 37 if (urls.size() == 1) { 38 39 invoker = refprotocol.refer(interfaceClass, urls.get(0)); 40 41 } else { 42 43 List<Invoker<?>> invokers = new ArrayList<Invoker<?>>(); 44 45 URL registryURL = null; 46 47 for (URL url : urls) { 48 49 invokers.add(refprotocol.refer(interfaceClass, url)); 50 51 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { 52 53 registryURL = url; // use last registry url 54 55 } 56 57 } 58 59 if (registryURL != null) { // registry url is available 60 61 // use AvailableCluster only when register's cluster is available 62 63 URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); 64 65 invoker = cluster.join(new StaticDirectory(u, invokers)); 66 67 } else { // not a registry url 68 69 invoker = cluster.join(new StaticDirectory(invokers)); 70 71 } 72 73 } 74 75 } 76 77 78 79 // 省略部分 80 81 82 83 // 4.2 創建代理類 84 85 return (T) proxyFactory.getProxy(invoker); 86 87 }
4.1:獲取invoker
refprotocol是一個SPI,urls中存的是注冊中心的url,其protocol是registry,所以這里會調用到RegistryProtocol。
1 @Override 2 3 @SuppressWarnings("unchecked") 4 5 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { 6 7 // 將真實的protocol設置到protocol屬性中 8 9 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); 10 11 // 4.1.1 獲取registry 12 13 Registry registry = registryFactory.getRegistry(url); 14 15 if (RegistryService.class.equals(type)) { 16 17 return proxyFactory.getInvoker((T) registry, type, url); 18 19 } 20 21 22 23 // group="a,b" or group="*" 24 25 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); 26 27 String group = qs.get(Constants.GROUP_KEY); 28 29 if (group != null && group.length() > 0) { 30 31 if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 32 33 || "*".equals(group)) { 34 35 return doRefer(getMergeableCluster(), registry, type, url); 36 37 } 38 39 } 40 41 return doRefer(cluster, registry, type, url); 42 43 } 44 45 46 47 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { 48 49 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); 50 51 directory.setRegistry(registry); 52 53 directory.setProtocol(protocol); 54 55 // all attributes of REFER_KEY 56 57 Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); 58 59 // 4.1.2 構建訂閱url 60 61 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); 62 63 if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) 64 65 && url.getParameter(Constants.REGISTER_KEY, true)) { 66 67 // 4.1.3 注冊 68 69 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, 70 71 Constants.CHECK_KEY, String.valueOf(false))); 72 73 } 74 75 // 4.1.4 訂閱 76 77 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 78 79 Constants.PROVIDERS_CATEGORY 80 81 + "," + Constants.CONFIGURATORS_CATEGORY 82 83 + "," + Constants.ROUTERS_CATEGORY)); 84 85 86 87 Invoker invoker = cluster.join(directory); 88 89 ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); 90 91 return invoker; 92 93 }
首先獲取注冊中心,這里使用的是zookeeper,所以這里獲取的ZooKeeperRegistry。
然后是構建訂閱的url並注冊到zookeeper,即向zookeeper寫入數據。
最后是訂閱服務提供者的目錄。然后會將RegistryDirectory作為listener包裝后注冊到zookeeper中。在訂閱結束之后也會顯式回調listener的notify方法。在該方法中會創建服務的invoker,其流程比較簡單這里就不展開敘述。
我們使用的是dubbo協議,所以最終在創建invoker時會使用dubboProtocol,所以在dubboProtocol的refer方法打個斷點,你就能看到整個訂閱流程,這是一個相當長的調用鏈。詳細介紹會在《服務注冊與調用流程》中介紹。
4.2:創建服務引用代理類
1 proxyFactory是一個SPI,默認使用JavassistProxyFactory,所以這里就以JavassistProxyFactory為例。getProxy方法最終會調用到Proxy的getProxy方法。 2 3 public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { 4 5 return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); 6 7 } 8 9 10 11 public static Proxy getProxy(ClassLoader cl, Class<?>... ics) { 12 13 14 15 ClassGenerator ccp = null, ccm = null; 16 17 try { 18 19 // 省略前面的代碼,前面的代碼是構建代理類的各種屬性 20 21 22 23 // 服務引用實例的代理類,它實現了我們自己的接口 24 25 String pcn = pkg + ".proxy" + id; 26 27 ccp.setClassName(pcn); 28 29 ccp.addField("public static java.lang.reflect.Method[] methods;"); 30 31 ccp.addField("private " + InvocationHandler.class.getName() + " handler;"); 32 33 ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;"); 34 35 ccp.addDefaultConstructor(); 36 37 Class<?> clazz = ccp.toClass(); 38 39 clazz.getField("methods").set(null, methods.toArray(new Method[0])); 40 41 42 43 // 創建代理類,該代理類用於創建服務引用實例,即創建上面的類 44 45 String fcn = Proxy.class.getName() + id; 46 47 ccm = ClassGenerator.newInstance(cl); 48 49 ccm.setClassName(fcn); 50 51 ccm.addDefaultConstructor(); 52 53 ccm.setSuperClass(Proxy.class); 54 55 // pcn就是上面類的名字 56 57 ccm.addMethod("public Object newInstance(" 58 59 + InvocationHandler.class.getName() + 60 61 " h){ return new " + pcn + "($1); }"); 62 63 Class<?> pc = ccm.toClass(); 64 65 proxy = (Proxy) pc.newInstance(); 66 67 } catch (RuntimeException e) { 68 69 throw e; 70 71 } catch (Exception e) { 72 73 throw new RuntimeException(e.getMessage(), e); 74 75 } finally { 76 77 // release ClassGenerator 78 79 if (ccp != null) 80 81 ccp.release(); 82 83 if (ccm != null) 84 85 ccm.release(); 86 87 synchronized (cache) { 88 89 if (proxy == null) 90 91 cache.remove(key); 92 93 else 94 95 cache.put(key, new WeakReference<Proxy>(proxy)); 96 97 cache.notifyAll(); 98 99 } 100 101 } 102 103 return proxy; 104 105 }
其中ccp就是實際服務引用實例的代理類,我們程序中使用的對象就是該類。
mInterfaces中包含了我們自己的接口service.Service。
mFields中就包含了一個handler對象,該handler持有了之前創建的invoker對象。
mMethods中就包含了我們自己接口中的方法sayHello,該方法的實現就是直接使用handler進行遠程調用。
ccm是用來生成ccp的代理類,調用ccp的newInstances方法會生成ccp的對象。
類中只包含了一個newInstance方法,用於創建ccp實例。
以上便完成了引用代理對象的創建與注入。
ServiceBean
第一步:獲取類中被@Service注解修飾的對象
ServiceAnnotationBeanProcessor實現了BeanDefinitionRegistryPostProcessor,所以在bean注冊完之后會調用該類的postProcessBeanDefinitionRegistry方法。
整個ServiceBean的注冊流程非常簡單,無非是掃描包,找到@Service注解修飾的對象,然后構建一個ServiceBean對象注入到Spring容器中。這一流程就不再過多敘述。
第二步:加載配置
第一步只完成了ServiceBean的注冊,而核心創建流程就在ServiceBean中。
ServiceBean實現了ApplicationListener接口,Spring容器加載完之后會通知ApplicationListener的實現類,即調用如下方法:
1 @Override 2 3 public void onApplicationEvent(ContextRefreshedEvent event) { 4 5 if (isDelay() && !isExported() && !isUnexported()) { 6 7 if (logger.isInfoEnabled()) { 8 9 logger.info("The service ready on spring started. service: " + getInterface()); 10 11 } 12 13 // 發布流程 14 15 export(); 16 17 } 18 19 }
由於這一步比較簡單,讀者可以自行閱讀export()方法的源碼。
第三步:發布
跟着export()方法流程走,會進入到ServiceConfig#doExportUrls方法中,其部分代碼如下:
1 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { 2 3 4 5 // export to local if the config is not remote (export to remote only when config is remote) 6 7 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { 8 9 exportLocal(url); 10 11 } 12 13 // export to remote if the config is not local (export to local only when config is local) 14 15 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { 16 17 18 19 if (registryURLs != null && !registryURLs.isEmpty()) { 20 21 for (URL registryURL : registryURLs) { 22 23 url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); 24 25 URL monitorUrl = loadMonitor(registryURL); 26 27 if (monitorUrl != null) { 28 29 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); 30 31 } 32 33 if (logger.isInfoEnabled()) { 34 35 logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); 36 37 } 38 39 // 3.1 獲取invoker 40 41 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); 42 43 // 3.2 將invoker與當前ServiceBean綁定 44 45 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 46 47 // 3.3 發布 48 49 Exporter<?> exporter = protocol.export(wrapperInvoker); 50 51 exporters.add(exporter); 52 53 } 54 55 } else { 56 57 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); 58 59 DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); 60 61 62 63 Exporter<?> exporter = protocol.export(wrapperInvoker); 64 65 exporters.add(exporter); 66 67 } 68 69 } 70 71 }
首先是獲取invoker,該invoker包含了注冊中心的url,然后將invoker將當前ServiceBean綁定,這樣就能夠通過該invoker獲取服務、注冊中心、配置等必須的數據,然后發布。
注冊中心url的protocol是registry,所以這里發布調用的是RegistryProtocol。
1 @Override 2 public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { 3 // 3.4 發布到本地 即綁定本地端口 4 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); 5 6 URL registryUrl = getRegistryUrl(originInvoker); 7 8 // 3.5 獲取注冊中心 9 final Registry registry = getRegistry(originInvoker); 10 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); 11 12 13 boolean register = registedProviderUrl.getParameter("register", true); 14 15 ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); 16 17 // 3.6 注冊 18 if (register) { 19 register(registryUrl, registedProviderUrl); 20 ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); 21 } 22 23 24 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); 25 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); 26 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 27 // 3.7 訂閱 28 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); 29 return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); 30 }
到這里,整個流程已經相當清晰了。其中的細節比較簡單,都是一些zookeeper操作,所以這里不仔細展開了。
首先將當前服務綁定到本地端口用於監聽遠程調用。
其次獲取注冊中心,我們使用zookeeper,所以這里獲取的是ZooKeeperRegistry。
然后想注冊中心中注冊我們的服務,即向zookeeper寫數據。
最后訂閱路徑,當有新的服務加入時會覆蓋zk上的數據,這里就會收到回調。
以上便完成了ServiceBean的創建與注冊。