motan是新浪微博開源的服務治理框架,具體介紹請看:http://tech.sina.com.cn/i/2016-05-10/doc-ifxryhhh1869879.shtml.
本系列的文章將分析它的底層源碼,分析的源碼版本為:0.1.2。第一篇文章將以服務的發布和注冊開始,注冊服務使用zookeeper來分析。源碼地址:https://github.com/weibocom/motan
本文涉及到的主要類和接口:MotanApiExportDemo、MotanDemoService、MotanDemoServiceImpl、ServiceConfig、RegistryConfig、ProtocolConfig、DefaultProvider、ZookeeperRegistryFactory、ZookeeperRegistry、SimpleConfigHandler、ProtocolFilterDecorator等。
1.首先來看demo源碼:MotanApiExportDemo
demo中先后創建了ServiceConfig、RegistryConfig和ProtocolConfig相關的對象,其中ServiceConfig是我們提供服務的相關配置(每個服務一個配置,例如一個服務接口一個配置,本文中的具體服務是:MotanDemoServiceImpl)、RegistryConfig是注冊中心相關的配置信息、ProtocolConfig是應用協議相關的配置(在客戶端還負責集群相關的配置)。
ServiceConfig<MotanDemoService> motanDemoService = new ServiceConfig<MotanDemoService>(); // 設置接口及實現類 motanDemoService.setInterface(MotanDemoService.class);//設置服務接口,客戶端在rpc調用時,會在協議中傳遞接口名稱,從而實現與具體實現類一一對應 motanDemoService.setRef(new MotanDemoServiceImpl());//設置接口實現類,實際的業務代碼 // 配置服務的group以及版本號 motanDemoService.setGroup("motan-demo-rpc");//服務所屬的組 motanDemoService.setVersion("1.0"); // 配置ZooKeeper注冊中心 RegistryConfig zookeeperRegistry = new RegistryConfig(); zookeeperRegistry.setRegProtocol("zookeeper");//使用zookeeper作為注冊中心 zookeeperRegistry.setAddress("127.0.0.1:2181");//zookeeper的連接地址 motanDemoService.setRegistry(zookeeperRegistry); // 配置RPC協議 ProtocolConfig protocol = new ProtocolConfig(); protocol.setId("motan");//使用motan應用協議 protocol.setName("motan"); motanDemoService.setProtocol(protocol); motanDemoService.setExport("motan:8010");//本服務的監控端口號是8010 motanDemoService.export();//發布及在zookeeper上注冊此服務
2.從上面的代碼可知ServiceConfig類是服務的發布及注冊的核心是motanDemoService.export()方法,我們來看一下此方法的實現細節:
public synchronized void export() { if(exported.get()) { LoggerUtil.warn(String.format("%s has already been expoted, so ignore the export request!", new Object[] { interfaceClass.getName() })); return; } checkInterfaceAndMethods(interfaceClass, methods); List registryUrls = loadRegistryUrls();//加載注冊中心的url,支持多個注冊中心 if(registryUrls == null || registryUrls.size() == 0) throw new IllegalStateException((new StringBuilder("Should set registry config for service:")).append(interfaceClass.getName()).toString()); Map protocolPorts = getProtocolAndPort(); ProtocolConfig protocolConfig; Integer port; for(Iterator iterator = protocols.iterator(); iterator.hasNext(); doExport(protocolConfig, port.intValue(), registryUrls))//發布服務 { protocolConfig = (ProtocolConfig)iterator.next(); port = (Integer)protocolPorts.get(protocolConfig.getId()); if(port == null) throw new MotanServiceException(String.format("Unknow port in service:%s, protocol:%s", new Object[] { interfaceClass.getName(), protocolConfig.getId() })); } afterExport(); }
方法中調用了doexport和afterExport方法:
private void doExport(ProtocolConfig protocolConfig, int port, List registryURLs) { String protocolName = protocolConfig.getName();//獲取協議名稱,此處為motan if(protocolName == null || protocolName.length() == 0) protocolName = URLParamType.protocol.getValue(); String hostAddress = host;//本機地址 if(StringUtils.isBlank(hostAddress) && basicServiceConfig != null) hostAddress = basicServiceConfig.getHost(); if(NetUtils.isInvalidLocalHost(hostAddress)) hostAddress = getLocalHostAddress(registryURLs); Map map = new HashMap(); map.put(URLParamType.nodeType.getName(), "service"); map.put(URLParamType.refreshTimestamp.getName(), String.valueOf(System.currentTimeMillis())); collectConfigParams(map, new AbstractConfig[] { protocolConfig, basicServiceConfig, extConfig, this }); collectMethodConfigParams(map, getMethods()); URL serviceUrl = new URL(protocolName, hostAddress, port, interfaceClass.getName(), map);//組裝serviceUrl信息 if(serviceExists(serviceUrl))//判斷服務之前是否已經加載過 { LoggerUtil.warn(String.format("%s configService is malformed, for same service (%s) already exists ", new Object[] { interfaceClass.getName(), serviceUrl.getIdentity() })); throw new MotanFrameworkException(String.format("%s configService is malformed, for same service (%s) already exists ", new Object[] { interfaceClass.getName(), serviceUrl.getIdentity() }), MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);//拋出同名服務異常 } List urls = new ArrayList(); if("injvm".equals(protocolConfig.getId())) { URL localRegistryUrl = null; for(Iterator iterator2 = registryURLs.iterator(); iterator2.hasNext();) { URL ru = (URL)iterator2.next(); if("local".equals(ru.getProtocol())) { localRegistryUrl = ru.createCopy(); break; } } if(localRegistryUrl == null) localRegistryUrl = new URL("local", hostAddress, 0, com/weibo/api/motan/registry/RegistryService.getName()); urls.add(localRegistryUrl); } else { URL ru; for(Iterator iterator = registryURLs.iterator(); iterator.hasNext(); urls.add(ru.createCopy())) ru = (URL)iterator.next(); } URL u; for(Iterator iterator1 = urls.iterator(); iterator1.hasNext(); registereUrls.add(u.createCopy())) { u = (URL)iterator1.next(); u.addParameter(URLParamType.embed.getName(), StringTools.urlEncode(serviceUrl.toFullStr())); } ConfigHandler configHandler = (ConfigHandler)ExtensionLoader.getExtensionLoader(com/weibo/api/motan/config/handler/ConfigHandler).getExtension("default");//使用spi機制加載SimpleConfigHandler exporters.add(configHandler.export(interfaceClass, ref, urls));//調用SimpleConfigHandler的export方法 initLocalAppInfo(serviceUrl); } private void afterExport() { exported.set(true); Exporter ep; for(Iterator iterator = exporters.iterator(); iterator.hasNext(); existingServices.add(ep.getProvider().getUrl().getIdentity())) ep = (Exporter)iterator.next(); }
再來看一下SimpleConfigHandler的export方法
public <T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registryUrls) { String serviceStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName())); URL serviceUrl = URL.valueOf(serviceStr); // export service // 利用protocol decorator來增加filter特性 String protocolName = serviceUrl.getParameter(URLParamType.protocol.getName(), URLParamType.protocol.getValue()); Protocol protocol = new ProtocolFilterDecorator(ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(protocolName));//對於Protoclo對象增強filter Provider<T> provider = new DefaultProvider<T>(ref, serviceUrl, interfaceClass);服務的代理提供者,包裝ref的服務 Exporter<T> exporter = protocol.export(provider, serviceUrl);//發布服務,將代理對象provider與具體的serviceUrl關聯 // register service register(registryUrls, serviceUrl); return exporter; }
3.下面我們來看一下,motan如何對filter進行相應的增強處理
public class ProtocolFilterDecorator implements Protocol { //實現Protocol的接口,聯系到上文中使用此類對實際的Protocol進行包裝 private Protocol protocol; public ProtocolFilterDecorator(Protocol protocol) { if (protocol == null) { throw new MotanFrameworkException("Protocol is null when construct ProtocolFilterDecorator", MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR); } this.protocol = protocol;//給實際的Protocol進行賦值 } @Override public <T> Exporter<T> export(Provider<T> provider, URL url) { return protocol.export(decorateWithFilter(provider, url), url);發布服務時,調用filter增強處理方法 } private <T> Provider<T> decorateWithFilter(Provider<T> provider, URL url) { List<Filter> filters = getFilters(url, MotanConstants.NODE_TYPE_SERVICE);//獲取實際需要增強的filter if (filters == null || filters.size() == 0) { return provider; } Provider<T> lastProvider = provider; for (Filter filter : filters) {//對於代理對象provider進行包裝,包裝成一個provider鏈,返回最后一個provider final Filter f = filter; final Provider<T> lp = lastProvider; lastProvider = new Provider<T>() { @Override public Response call(Request request) { return f.filter(lp, request);//對於后面調用的call方法時,首先調用最外層的filter,最后再調用實際的provider的call方法 } @Override public String desc() { return lp.desc(); } @Override public void destroy() { lp.destroy(); } @Override public Class<T> getInterface() { return lp.getInterface(); } @Override public URL getUrl() { return lp.getUrl(); } @Override public void init() { lp.init(); } @Override public boolean isAvailable() { return lp.isAvailable(); } }; } return lastProvider; } private List<Filter> getFilters(URL url, String key) { // load default filters List<Filter> filters = new ArrayList<Filter>(); List<Filter> defaultFilters = ExtensionLoader.getExtensionLoader(Filter.class).getExtensions(key);//使用spi機制初始化filer對象 if (defaultFilters != null && defaultFilters.size() > 0) { filters.addAll(defaultFilters); } // add filters via "filter" config String filterStr = url.getParameter(URLParamType.filter.getName()); if (StringUtils.isNotBlank(filterStr)) { String[] filterNames = MotanConstants.COMMA_SPLIT_PATTERN.split(filterStr); for (String fn : filterNames) { addIfAbsent(filters, fn); } } // add filter via other configs, like accessLog and so on boolean accessLog = url.getBooleanParameter(URLParamType.accessLog.getName(), URLParamType.accessLog.getBooleanValue()); if (accessLog) { addIfAbsent(filters, AccessLogFilter.class.getAnnotation(SpiMeta.class).name()); } // sort the filters Collections.sort(filters, new ActivationComparator<Filter>()); Collections.reverse(filters); return filters; } }
4.服務發布完成后,需要像注冊中心注冊此服務
private void register(List<URL> registryUrls, URL serviceUrl) { for (URL url : registryUrls) {//循環便利多個注冊中心的信息 // 根據check參數的設置,register失敗可能會拋異常,上層應該知曉 RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension(url.getProtocol());//文中使用的是zookeeper if (registryFactory == null) { throw new MotanFrameworkException(new MotanErrorMsg(500, MotanErrorMsgConstant.FRAMEWORK_REGISTER_ERROR_CODE, "register error! Could not find extension for registry protocol:" + url.getProtocol() + ", make sure registry module for " + url.getProtocol() + " is in classpath!")); } Registry registry = registryFactory.getRegistry(url);//獲取registry registry.register(serviceUrl);//將服務注冊到zookeeper,也就是把節點信息寫入到zookeeper中 } }
我們來看一下zookeeper注冊中心的工廠類:每個Registry都需要獨立維護一個ZkClient與zookeeper的鏈接
@SpiMeta(name = "zookeeper") public class ZookeeperRegistryFactory extends AbstractRegistryFactory { @Override protected Registry createRegistry(URL registryUrl) { try { int timeout = registryUrl.getIntParameter(URLParamType.connectTimeout.getName(), URLParamType.connectTimeout.getIntValue()); int sessionTimeout = registryUrl.getIntParameter(URLParamType.registrySessionTimeout.getName(), URLParamType.registrySessionTimeout.getIntValue()); ZkClient zkClient = new ZkClient(registryUrl.getParameter("address"), sessionTimeout, timeout);//創建zookeeper的客戶端 return new ZookeeperRegistry(registryUrl, zkClient);//創建實際的Registry } catch (ZkException e) { LoggerUtil.error("[ZookeeperRegistry] fail to connect zookeeper, cause: " + e.getMessage()); throw e; } } }
我們再來分析ZookeeperRegistry中的代碼
public ZookeeperRegistry(URL url, ZkClient client) { super(url); this.zkClient = client; IZkStateListener zkStateListener = new IZkStateListener() { @Override public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { // do nothing } @Override public void handleNewSession() throws Exception {//響應zkClient的事件 LoggerUtil.info("zkRegistry get new session notify."); reconnectService();//重新注冊服務 reconnectClient(); } }; zkClient.subscribeStateChanges(zkStateListener); } private void reconnectService() { Collection<URL> allRegisteredServices = getRegisteredServiceUrls(); if (allRegisteredServices != null && !allRegisteredServices.isEmpty()) { try { serverLock.lock(); for (URL url : getRegisteredServiceUrls()) { doRegister(url);//注冊 } LoggerUtil.info("[{}] reconnect: register services {}", registryClassName, allRegisteredServices); for (URL url : availableServices) { if (!getRegisteredServiceUrls().contains(url)) { LoggerUtil.warn("reconnect url not register. url:{}", url); continue; } doAvailable(url);//標識服務可以提供服務 } LoggerUtil.info("[{}] reconnect: available services {}", registryClassName, availableServices); } finally { serverLock.unlock(); } } } protected void doRegister(URL url) { try { serverLock.lock(); // 防止舊節點未正常注銷 removeNode(url, ZkNodeType.AVAILABLE_SERVER); removeNode(url, ZkNodeType.UNAVAILABLE_SERVER); createNode(url, ZkNodeType.UNAVAILABLE_SERVER); } catch (Throwable e) { throw new MotanFrameworkException(String.format("Failed to register %s to zookeeper(%s), cause: %s", url, getUrl(), e.getMessage()), e); } finally { serverLock.unlock(); } } protected void doAvailable(URL url) { try{ serverLock.lock(); if (url == null) { availableServices.addAll(getRegisteredServiceUrls()); for (URL u : getRegisteredServiceUrls()) { removeNode(u, ZkNodeType.AVAILABLE_SERVER); removeNode(u, ZkNodeType.UNAVAILABLE_SERVER); createNode(u, ZkNodeType.AVAILABLE_SERVER); } } else { availableServices.add(url); removeNode(url, ZkNodeType.AVAILABLE_SERVER); removeNode(url, ZkNodeType.UNAVAILABLE_SERVER); createNode(url, ZkNodeType.AVAILABLE_SERVER); } } finally { serverLock.unlock(); } }
private void createNode(URL url, ZkNodeType nodeType) { String nodeTypePath = ZkUtils.toNodeTypePath(url, nodeType); if (!zkClient.exists(nodeTypePath)) { zkClient.createPersistent(nodeTypePath, true);//對於服務的標識信息,創建持久化節點 } zkClient.createEphemeral(ZkUtils.toNodePath(url, nodeType), url.toFullStr());//對於服務的ip和端口號信息使用臨時節點,當服務斷了后,zookeeper自動摘除目標服務器 }
本文分析了motan的服務發布及注冊到zookeeper的流程相關的源碼,主要涉及到的知識點:
1.利用相關的配置對象進行信息的存儲及傳遞;
2.利用provider對具體的業務類進行封裝代理;
3.利用filter鏈的結構,來包裝實際的provider,把所有的過濾器都處理完畢后,最后調用實際的業務類,大家可以想象一下aop相關的原理,有些類似;
4.代碼中大量使用jdk的標准spi技術進行類的加載;
5.支持多個注冊中心,也就是同一個服務可以注冊到不同的注冊中心上,每個registry對應一個具體的zkclient;
6.利用了zookeeper的臨時節點來維護服務器的host和port信息;
7.支持多個服務發布到同一個端口,在本文中並沒分析netty使用相關的代碼,后面會分析到。