一.介紹
1.1 Nacos的實現原理
圖片來自: https://www.cnblogs.com/wuzhenzhao/p/13625491.html
1.2 本地啟動
- 下載源碼包: https://github.com/alibaba/nacos.git
下載好后,我個人選擇切換到master分支,代碼比較完整。
- 找到console,選擇Nacos這個類,然后啟動, 如果有報錯,可能是本地config文件夾下需要一個application.properties文件,從console里面復制一份到本地即可;
- 本地啟動時候設置一下單機模式: -Dnacos.standalone=true
1.3 注冊中心
注冊中心其實就是一個Springboot的項目
@SpringBootApplication(scanBasePackages = "com.alibaba.nacos")
@ServletComponentScan
@EnableScheduling
public class Nacos {
public static void main(String[] args) {
SpringApplication.run(Nacos.class, args);
}
}
啟動后會初始化naming,console,config包下面的controllers接口,保存到methodsCache中。
@Component
@EnableScheduling
@PropertySource("/application.properties")
public class ConsoleConfig {
@Autowired
private ControllerMethodsCache methodsCache;
@PostConstruct
public void init() {
methodsCache.initClassMethod("com.alibaba.nacos.naming.controllers");
methodsCache.initClassMethod("com.alibaba.nacos.console.controller");
methodsCache.initClassMethod("com.alibaba.nacos.config.server.controller");
}
}
1.4 Client啟動類
spring-cloud-starter-alibaba-nacos-discovery 工程中 META-INF\spring.factories文件注入的類;
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\
com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\
com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
二. 服務注冊客戶端的處理
先來一張服務注冊流程圖:
圖片來自: https://blog.csdn.net/wangwei19871103/article/details/105787403
2.1 NacosDiscoveryAutoConfiguration配置實例化
public class NacosDiscoveryAutoConfiguration {
@Bean
public NacosServiceRegistry nacosServiceRegistry(
NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosServiceRegistry(nacosDiscoveryProperties);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosRegistration nacosRegistration(
NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
return new NacosRegistration(nacosDiscoveryProperties, context);
}
@Bean
@ConditionalOnBean(AutoServiceRegistrationProperties.class)
public NacosAutoServiceRegistration nacosAutoServiceRegistration(
NacosServiceRegistry registry, //上面方法實例化的
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) { //上面方法實例化的
return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
}
}
2.1.1 實例化 NacosDiscoveryProperties
讀取的是配置文件里面的內容:
@ConfigurationProperties("spring.cloud.nacos.discovery")
public class NacosDiscoveryProperties { ... )
2.1.2 實例化 NacosServiceRegistry
public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
this.namingService = nacosDiscoveryProperties.namingServiceInstance();
}
后面工廠調用到:
namingService= NacosFactory.createNamingService(getNacosProperties());
getNacosProperties方法里面加一些秘鑰,clusterName啥的操作;
創建方法里面用到反射;
2.1.3 實例化 NacosRegistration
public NacosRegistration(NacosDiscoveryProperties nacosDiscoveryProperties,
ApplicationContext context) {
this.nacosDiscoveryProperties = nacosDiscoveryProperties;
this.context = context;
}
這里就填充一下; 不過在init初始化方法里面填充了一部分management和心跳的信息;
2.1.4 實例化 NacosAutoServiceRegistration
參數 AutoServiceRegistrationProperties里面是幾個配置;
public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,
AutoServiceRegistrationProperties autoServiceRegistrationProperties,
NacosRegistration registration) {
super(serviceRegistry, autoServiceRegistrationProperties);
this.registration = registration;
}
2.2 NacosDiscoveryClientAutoConfiguration
@Configuration
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class })
public class NacosDiscoveryClientAutoConfiguration {
@Bean //讀取配置文件實例化
@ConditionalOnMissingBean
public NacosDiscoveryProperties nacosProperties() {
return new NacosDiscoveryProperties();
}
@Bean
public DiscoveryClient nacosDiscoveryClient(
NacosDiscoveryProperties discoveryProperties) {
return new NacosDiscoveryClient(discoveryProperties); // 當前的實例的客戶端
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", matchIfMissing = true)
public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties) {
return new NacosWatch(nacosDiscoveryProperties); // 該類會在啟動后發布一次心跳
}
}
2.2.1 實例化NacosWatch
public NacosWatch(NacosDiscoveryProperties properties) {
this(properties, getTaskScheduler());
}
@Override
public void start() {
if (this.running.compareAndSet(false, true)) {
this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
this::nacosServicesWatch, this.properties.getWatchDelay()); //延遲30000ms
}
}
public void nacosServicesWatch() {
// nacos doesn't support watch now , publish an event every 30 seconds.
this.publisher.publishEvent(
new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
}
2.3 NacosNamingService
1.4.第2節中創建的NamingService跟服務注冊、心跳等有關的所有代碼都在該類中,在該類構造方法中會調用init方法;
2.3.1 init()方法
private void init(Properties properties) {
namespace = InitUtils.initNamespaceForNaming(properties); //獲取namespace
initServerAddr(properties); //初始化注冊中心地址
InitUtils.initWebRootContext();
initCacheDir(); // 初始化緩存目錄地址,默認在/nacos/naming/public 目錄下
initLogName(properties); // 初始化日志名稱,默認為naming.log
eventDispatcher = new EventDispatcher(); // 后台會啟動一個線程用來創建namingEvent
//下面幾個重要, BeatReactor創建心跳反應堆,HostReactor創建主機反應堆
serverProxy = new NamingProxy(namespace, endpoint, serverList);
serverProxy.setProperties(properties);
beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));
hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties));
}
2.3.2 EventDispatcher方法創建namingEvent
public EventDispatcher() {
executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
executor.execute(new Notifier());
}
private class Notifier implements Runnable {
@Override
public void run() {
while (true) {
ServiceInfo serviceInfo = null; //5秒
serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
for (EventListener listener : listeners) {
List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));
}
}
}
}
2.3.3 InitUtils.initNamespaceForNaming()方法
nacos的數據模型是由三元組確定,分別為Namespace、Group、dataid。下面方法用於獲取Namespace,默認返回public
/* 初始化命名空間以進行命名。 配置初始化不一樣,因此不能直接重用 */
public static String initNamespaceForNaming(Properties properties) {
String tmpNamespace = null;
String isUseCloudNamespaceParsing =
properties.getProperty(PropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,
System.getProperty(SystemPropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,
String.valueOf(Constants.DEFAULT_USE_CLOUD_NAMESPACE_PARSING)));
if (Boolean.valueOf(isUseCloudNamespaceParsing)) {
tmpNamespace = TenantUtil.getUserTenantForAns();
tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
@Override
public String call() {
String namespace = System.getProperty(SystemPropertyKeyConst.ANS_NAMESPACE);
return namespace;
}
});
tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
@Override
public String call() {
String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE);
return namespace;
}
});
}
tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
@Override
public String call() {
String namespace = System.getProperty(PropertyKeyConst.NAMESPACE);
return namespace;
}
});
if (StringUtils.isEmpty(tmpNamespace) && properties != null) {
tmpNamespace = properties.getProperty(PropertyKeyConst.NAMESPACE);
}
tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
@Override
public String call() {
return UtilAndComs.DEFAULT_NAMESPACE_ID; //默認返回"public"
}
});
return tmpNamespace;
}
2.3.4 NamingProxy方法
public NamingProxy(String namespaceId, String endpoint, String serverList) {
this.namespaceId = namespaceId;
this.endpoint = endpoint;
if (StringUtils.isNotEmpty(serverList)) {
this.serverList = Arrays.asList(serverList.split(","));
if (this.serverList.size() == 1) {
this.nacosDomain = serverList;
}
}
initRefreshSrvIfNeed();
}
內部函數 initRefreshSrvIfNeed
private void initRefreshSrvIfNeed() {
// endpoint為空則返回
endpointScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.naming.serverlist.updater");
t.setDaemon(true);
return t;
}
});
//啟動了定時線程更新服務注冊時間; 30秒
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
refreshSrvIfNeed();
}
}, 0, vipSrvRefInterMillis, TimeUnit.MILLISECONDS);
refreshSrvIfNeed();
}
2.4 NacosAutoServiceRegistration
2.4.1 NacosWatch添加發布者
2.4.2 Springboot啟動時調用 finishRefresh
@Override
protected void finishRefresh() {
super.finishRefresh();
WebServer webServer = startWebServer();
if (webServer != null) {
publishEvent(new ServletWebServerInitializedEvent(webServer, this));
}
}
2.4.3 NacosAutoServiceRegistration綁定上面的event
public class NacosAutoServiceRegistration
extends AbstractAutoServiceRegistration<Registration> { ...}
父類中方法
@Override
@SuppressWarnings("deprecation")
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
his.start();
}
繼續調用start:
public void start() {
if (!isEnabled()) {
return;
}
if (!this.running.get()) {
this.context.publishEvent(
new InstancePreRegisteredEvent(this, getRegistration())); //發布實例預注冊事件
register(); //這里調用注冊方法
if (shouldRegisterManagement()) {
registerManagement();
}
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, getConfiguration())); //發布實例注冊事件
this.running.compareAndSet(false, true);
}
}
2.5 NacosServiceRegistry
2.5.1 NacosServiceRegistry注冊
@Override
public void register(Registration registration) {
//對應當前應用的application.name
String serviceId = registration.getServiceId();
//表示服務實例信息
Instance instance = getNacosInstanceFromRegistration(registration);
//通過命名服務進行注冊
namingService.registerInstance(serviceId, instance);
}
2.5.2 NacosNamingService開始注冊實例
主要做兩個動作
- 如果當前注冊的是臨時節點,則構建心跳信息,通過beat反應堆來構建心跳任務
- 調用registerService發起服務注冊
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
//是臨時節點,則構建心跳信息
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
long instanceInterval = instance.getInstanceHeartBeatInterval();
beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
//啟動心跳定時任務, 添加心跳信息進行處理
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
//調用服務代理類進行注冊
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
addBeatInfo方法里面有一個調度任務
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
long result = serverProxy.sendBeat(beatInfo);
long nextTime = result > 0 ? result : beatInfo.getPeriod();
//在給定延時之后調度任務
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
}
調用registerService方法(邏輯比較簡單):
public void registerService(String serviceName, String groupName, Instance instance) {
final Map<String, String> params = new HashMap<String, String>(9);
//params 填充參數,namespaceId,serviceName,groupName,ip.port,ephemeral等參數(...省略)
//請求API
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}
調用reqAPI方法:服務在進行注冊的時候會輪詢配置好的注冊中心的地址:
reqApi()首先會發送一次注冊請求,如果請求失敗則進入重試注冊循環中,有3次重試次數,如果依然注冊失敗則會拋出異常。
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
Exception exception = new Exception();
if (servers != null && !servers.isEmpty()) {
//隨機獲取一台服務器節點
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
// 遍歷服務列表
for (int i = 0; i < servers.size(); i++) {
String server = servers.get(index); //獲得索引位置的服務節點
try {
return callServer(api, params, server, method); //調用指定服務
} catch (NacosException e) { ... }
index = (index + 1) % servers.size(); //輪詢下一個
}
}
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
return callServer(api, params, nacosDomain);
}
}
callServer(api, params, server, method) 調用,通過HttpURLConnection 進行發起調用。
public String callServer(String api, Map<String, String> params, String curServer, String method) {
HttpClient.HttpResult result = HttpClient.request(url, headers, params, UtilAndComs.ENCODING, method);
}
debug會發現有一個請求: http://xxxx:8848/nacos/v1/ns/instance ;
另外還有: /nacos/v1/ns/instance/beat的定時任務請求;
三. Nacos注冊中心端的處理
3.1 InstanceController.register()接口
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final Instance instance = parseInstance(request); //從請求中解析出instance實例
//調用 ServiceManager 進行服務的注冊
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
3.1.1 registerInstance方法
serviceManager,通過該類管理service生命周期活動;
public void registerInstance(String namespaceId, String serviceName, Instance instance) {
//創建空服務,實際上是初始化一個ConcurrentHashMap集合
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//從serviceMap中,根據namespaceId和serviceName得到一個服務對象
Service service = getService(namespaceId, serviceName);
//調用addInstance創建一個服務實例, 添加一致性協議
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
3.1.2 createServiceIfAbsent方法
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster){
Service service = getService(namespaceId, serviceName); //從serviceMap中獲取服務對象
if (service == null) { //如果為空。則初始化
service = new Service();
(...省略部分代碼)
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
Nacos是通過不同的 namespace 來維護服務的,而每個namespace下有不同的group,不同的group下才有對應的Service ,再通過這個 serviceName 來確定服務實例。
第一次進來則會進入初始化,初始化完會調用 putServiceAndInit;
/** Map(namespace, Map(group::serviceName, Service)). */
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
putServiceAndInit方法
private void putServiceAndInit(Service service) throws NacosException {
putService(service); //把服務信息保存到serviceMap集合
service.init(); //建立心跳檢測機制
//實現數據一致性監聽,ephemeral=true表示采用Distro協議,false表示采用raft
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
}
3.1.3 addInstance方法
獲取到服務后把服務實例添加到集合中,然后基於一致性協議進行數據的同步。然后調用addInstance ;
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips){
//組裝key,例如: com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@nacos-config-client
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName); //獲取剛剛組裝的服務
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
consistencyService.put(key, instances); // 上一步實現監聽的類里添加注冊服務
}
}
然后給服務注冊方發送注冊成功的響應(return "ok")。結束服務注冊流程;
3.2 InstanceController.beat()接口
@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {
ObjectNode result = JacksonUtils.createEmptyJsonNode();
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); //解析beat參數
RsInfo clientBeat = null;
if (StringUtils.isNotBlank(beat)) {
clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
}
//獲取namespaceId,serviceName ,serviceName ,ip ,port等參數(...省略)
//獲取實例
Instance instance = serviceManager.getInstance(namespaceId, serviceName, serviceName, ip, port);
//如果沒有獲取到實例則新建一個
if (instance == null) {
instance = new Instance();
//填充實例的屬性值ip ,port等(...省略)
//注冊實例
serviceManager.registerInstance(namespaceId, serviceName, instance);
}
Service service = serviceManager.getService(namespaceId, serviceName);
if (clientBeat == null) {
clientBeat = new RsInfo();
clientBeat.setIp(ip);
clientBeat.setPort(port);
clientBeat.setCluster(clusterName);
}
//處理客戶端心跳方法下章單獨分析,主要是客戶端續約
service.processClientBeat(clientBeat);
result.put(CommonParams.CODE, NamingResponseCode.OK);
if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
}
result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
return result; //返回客戶端心跳
}
參考:
https://blog.csdn.net/V_zxw/article/details/108244306,
https://www.cnblogs.com/wuzhenzhao/p/13625491.html