一、Dubbo服務注冊過程
先從下面這個demo開始:
@EnableDubbo(scanBasePackages = "com.alibaba.dubbo.demo.service") @PropertySource(value = "classpath:/provider-config.properties") public class DemoServiceProviderBootstrap { public static void main(String[] args) throws IOException { // AnnotationConfigApplicationContext 該類可以實現基於Java的配置類加載自定義在Spring的應用上下文的bean。 AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); // 通過使用register方法具體到注冊哪個配置類,最后刷新。 context.register(DemoServiceProviderBootstrap.class); context.refresh(); System.out.println("DemoService provider is starting..."); System.in.read(); } }
從context.refresh中,
通過event事件的方式,最后來到dubbo源碼定義的:com.alibaba.dubbo.config.spring.ServiceBean#onApplicationEvent
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.config.spring; import com.alibaba.dubbo.config.ApplicationConfig; import com.alibaba.dubbo.config.ModuleConfig; import com.alibaba.dubbo.config.MonitorConfig; import com.alibaba.dubbo.config.ProtocolConfig; import com.alibaba.dubbo.config.ProviderConfig; import com.alibaba.dubbo.config.RegistryConfig; import com.alibaba.dubbo.config.ServiceConfig; import com.alibaba.dubbo.config.annotation.Service; import com.alibaba.dubbo.config.spring.context.event.ServiceBeanExportedEvent; import com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory; import org.springframework.aop.support.AopUtils; import org.springframework.beans.factory.BeanFactoryUtils; import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import java.util.ArrayList; import java.util.List; import java.util.Map; import static com.alibaba.dubbo.config.spring.util.BeanFactoryUtils.addApplicationListener; /** * ServiceFactoryBean * * @export */ public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware, ApplicationEventPublisherAware { private static final long serialVersionUID = 213195494150089726L; private final transient Service service; private transient ApplicationContext applicationContext; private transient String beanName; private transient boolean supportedApplicationListener; private ApplicationEventPublisher applicationEventPublisher; public ServiceBean() { super(); this.service = null; } public ServiceBean(Service service) { super(service); this.service = service; } @Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; SpringExtensionFactory.addApplicationContext(applicationContext); supportedApplicationListener = addApplicationListener(applicationContext, this); } @Override public void setBeanName(String name) { this.beanName = name; } /** * Gets associated {@link Service} * * @return associated {@link Service} */ public Service getService() { return service; } @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (isDelay() && !isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } export(); } } private boolean isDelay() { Integer delay = getDelay(); ProviderConfig provider = getProvider(); if (delay == null && provider != null) { delay = provider.getDelay(); } return supportedApplicationListener && (delay == null || delay == -1); } @Override @SuppressWarnings({"unchecked", "deprecation"}) public void afterPropertiesSet() throws Exception { if (getProvider() == null) { Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProviderConfig.class, false, false); if (providerConfigMap != null && providerConfigMap.size() > 0) { Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false); if ((protocolConfigMap == null || protocolConfigMap.size() == 0) && providerConfigMap.size() > 1) { // backward compatibility List<ProviderConfig> providerConfigs = new ArrayList<ProviderConfig>(); for (ProviderConfig config : providerConfigMap.values()) { if (config.isDefault() != null && config.isDefault().booleanValue()) { providerConfigs.add(config); } } if (!providerConfigs.isEmpty()) { setProviders(providerConfigs); } } else { ProviderConfig providerConfig = null; for (ProviderConfig config : providerConfigMap.values()) { if (config.isDefault() == null || config.isDefault().booleanValue()) { if (providerConfig != null) { throw new IllegalStateException("Duplicate provider configs: " + providerConfig + " and " + config); } providerConfig = config; } } if (providerConfig != null) { setProvider(providerConfig); } } } } if (getApplication() == null && (getProvider() == null || getProvider().getApplication() == null)) { Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ApplicationConfig.class, false, false); if (applicationConfigMap != null && applicationConfigMap.size() > 0) { ApplicationConfig applicationConfig = null; for (ApplicationConfig config : applicationConfigMap.values()) { if (config.isDefault() == null || config.isDefault().booleanValue()) { if (applicationConfig != null) { throw new IllegalStateException("Duplicate application configs: " + applicationConfig + " and " + config); } applicationConfig = config; } } if (applicationConfig != null) { setApplication(applicationConfig); } } } if (getModule() == null && (getProvider() == null || getProvider().getModule() == null)) { Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ModuleConfig.class, false, false); if (moduleConfigMap != null && moduleConfigMap.size() > 0) { ModuleConfig moduleConfig = null; for (ModuleConfig config : moduleConfigMap.values()) { if (config.isDefault() == null || config.isDefault().booleanValue()) { if (moduleConfig != null) { throw new IllegalStateException("Duplicate module configs: " + moduleConfig + " and " + config); } moduleConfig = config; } } if (moduleConfig != null) { setModule(moduleConfig); } } } if ((getRegistries() == null || getRegistries().isEmpty()) && (getProvider() == null || getProvider().getRegistries() == null || getProvider().getRegistries().isEmpty()) && (getApplication() == null || getApplication().getRegistries() == null || getApplication().getRegistries().isEmpty())) { Map<String, RegistryConfig> registryConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, RegistryConfig.class, false, false); if (registryConfigMap != null && registryConfigMap.size() > 0) { List<RegistryConfig> registryConfigs = new ArrayList<RegistryConfig>(); for (RegistryConfig config : registryConfigMap.values()) { if (config.isDefault() == null || config.isDefault().booleanValue()) { registryConfigs.add(config); } } if (registryConfigs != null && !registryConfigs.isEmpty()) { super.setRegistries(registryConfigs); } } } if (getMonitor() == null && (getProvider() == null || getProvider().getMonitor() == null) && (getApplication() == null || getApplication().getMonitor() == null)) { Map<String, MonitorConfig> monitorConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, MonitorConfig.class, false, false); if (monitorConfigMap != null && monitorConfigMap.size() > 0) { MonitorConfig monitorConfig = null; for (MonitorConfig config : monitorConfigMap.values()) { if (config.isDefault() == null || config.isDefault().booleanValue()) { if (monitorConfig != null) { throw new IllegalStateException("Duplicate monitor configs: " + monitorConfig + " and " + config); } monitorConfig = config; } } if (monitorConfig != null) { setMonitor(monitorConfig); } } } if ((getProtocols() == null || getProtocols().isEmpty()) && (getProvider() == null || getProvider().getProtocols() == null || getProvider().getProtocols().isEmpty())) { Map<String, ProtocolConfig> protocolConfigMap = applicationContext == null ? null : BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, ProtocolConfig.class, false, false); if (protocolConfigMap != null && protocolConfigMap.size() > 0) { List<ProtocolConfig> protocolConfigs = new ArrayList<ProtocolConfig>(); for (ProtocolConfig config : protocolConfigMap.values()) { if (config.isDefault() == null || config.isDefault().booleanValue()) { protocolConfigs.add(config); } } if (protocolConfigs != null && !protocolConfigs.isEmpty()) { super.setProtocols(protocolConfigs); } } } if (getPath() == null || getPath().length() == 0) { if (beanName != null && beanName.length() > 0 && getInterface() != null && getInterface().length() > 0 && beanName.startsWith(getInterface())) { setPath(beanName); } } if (!isDelay()) { export(); } } /** * Get the name of {@link ServiceBean} * * @return {@link ServiceBean}'s name * @since 2.6.5 */ public String getBeanName() { return this.beanName; } /** * @since 2.6.5 */ @Override public void export() { super.export(); // Publish ServiceBeanExportedEvent publishExportEvent(); } /** * @since 2.6.5 */ private void publishExportEvent() { ServiceBeanExportedEvent exportEvent = new ServiceBeanExportedEvent(this); applicationEventPublisher.publishEvent(exportEvent); } @Override public void destroy() throws Exception { // no need to call unexport() here, see // org.apache.dubbo.config.spring.extension.SpringExtensionFactory.ShutdownHookListener } // merged from dubbox @Override protected Class getServiceClass(T ref) { if (AopUtils.isAopProxy(ref)) { return AopUtils.getTargetClass(ref); } return super.getServiceClass(ref); } /** * @param applicationEventPublisher * @since 2.6.5 */ @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; } }
關注這里重寫的onApplicationEvent方法,進一步跟蹤:export():
先來看方法super.export() --> com/alibaba/dubbo/config/ServiceConfig.java
dubbo-registry/dubbo-registry-api/src/main/java/com/alibaba/dubbo/registry/integration/RegistryProtocol.java
Nacos的NameSpace,命名空間,用於進行租戶粒度的配置隔離。不同的命名空間下,可以相同的Group和DataId。NameSpace的常用場景之一是不同環境的配置的區分隔離。例如開發測試和生產環境的資源的配置或者服務的隔離。Dubbo源碼中通過調用NamingService發送HTTP請求,把服務注冊到注冊中心。
這個過程中有一個重要的類:com.alibaba.dubbo.registry.integration.RegistryProtocol。來看着這個方法:
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); URL registryUrl = getRegistryUrl(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); //to judge to delay publish whether or not boolean register = registeredProviderUrl.getParameter("register", true); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); if (register) { register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); }
這里面有一個重要的方法:
private Registry getRegistry(final Invoker<?> originInvoker) { URL registryUrl = getRegistryUrl(originInvoker); return registryFactory.getRegistry(registryUrl); }
這個地方很重要:使用NamingFactory 這個工廠創建出來一個NamingService(NacosNamingService)
public static NamingService createNamingService(Properties properties) throws NacosException {
try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService"); Constructor constructor = driverImplClass.getConstructor(Properties.class); NamingService vendorImpl = (NamingService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } }
接下來,我們就需要關注:NamingService 創建的時候,里面都初始化了哪些組件?
public NacosNamingService(Properties properties) throws NacosException {
init(properties);
}
private void init(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); // 獲取NameSpace this.namespace = InitUtils.initNamespaceForNaming(properties); InitUtils.initSerialization(); // 加載ServerList initServerAddr(properties); InitUtils.initWebRootContext(properties); // 初始化緩存地址 initCacheDir(); initLogName(properties); // serverProxy this.serverProxy = new NamingProxy(this.namespace, this.endpoint, this.serverList, properties); // 心跳:用於發送心跳的 this.beatReactor = new BeatReactor(this.serverProxy, initClientBeatThreadCount(properties)); // host refator:維護本地訂閱的服務注冊表信息 this.hostReactor = new HostReactor(this.serverProxy, beatReactor, this.cacheDir, isLoadCacheAtStart(properties), isPushEmptyProtect(properties), initPollingThreadCount(properties)); }
其次,關注注冊這個方法:
public void register(URL registryUrl, URL registedProviderUrl) { Registry registry = registryFactory.getRegistry(registryUrl); registry.register(registedProviderUrl); }
--> dubbo-registry/dubbo-registry-nacos/src/main/java/com/alibaba/dubbo/registry/nacos/NacosRegistry.java
dubbo中,所有的服務都被封裝成了URL,對應nacos中的服務實例Instance,
所以服務注冊時,只需要簡單的將URL轉換成Instance就可以注冊到nacos中。
protected void doRegister(URL url) { final String serviceName = getServiceName(url); final Instance instance = createInstance(url); execute(new NamingServiceCallback() { public void callback(NamingService namingService) throws NacosException { namingService.registerInstance(serviceName, instance); } }); }
二、Nacos
client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java
@Override public void registerInstance(String serviceName, String ip, int port) throws NacosException { registerInstance(serviceName, ip, port, Constants.DEFAULT_CLUSTER_NAME); }
注冊實例的時候,會判斷是否臨時實例,臨時實例加入beatReactor心跳列表nacos把服務分為兩類,臨時和永久性服務。(永久性如數據庫、緩存服務,客戶端不會也沒法發送心跳,通過TCP端口檢測來反向探活)。
groupedServiceName其實就是groupName@@serviceName
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance); beatReactor.addBeatInfo(groupedServiceName, beatInfo); } serverProxy.registerService(groupedServiceName, groupName, instance); }
從addBeatInfo方法來看:Nacos通過任務的方式來保持心跳。而dom2Beat是定義的一個ConcurrentHashMap, 是一個存放需要心跳上報的臨時實例的map容器。
public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); // 通過任務來保持心跳 executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); }
緊接着,來看serverProxy.registerService,這里通過HTTP POST請求的方式,調用請求路徑:/v1/ns/instance
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); final Map<String, String> params = new HashMap<String, String>(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); }
如果是多台NacosServer怎么選擇呢?可以看到,是隨機選擇的,如果請求異常了,就輪着下一個server,你有多少server 就重試多少次。
也就是nacos的InstanceController#register
@CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); final Instance instance = parseInstance(request); serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; }
以上過程簡單繪制如下圖:
本篇先到這里,下一篇繼續探討這個注冊的過程。