nacos源碼解析(三)-注冊中心服務注冊處理


概述

  注冊中心服務端的主要功能包括,接收客戶端的服務注冊,服務發現,服務下線的功能,但是除了這些和客戶端的交互之外,服務端還要做一些更重要的事情,就是我們常常會在分布式系統中聽到的AP和CP,作為一個集群,nacos即實現了AP也實現了CP,其中AP使用的自己實現的Distro協議,而CP是采用raft協議實現的,這個過程中牽涉到心跳啊,選主啊等操作,說復雜還是挺復雜的。

  本文主要介紹一下注冊中心服務端接收客戶端服務注冊的功能,其他功能暫時先不涉及。

 

服務端接收客戶端注冊的接口如下

@CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {

        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
                Constants.DEFAULT_NAMESPACE_ID);

        final Instance instance = parseInstance(request);

        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

該方法在com.alibaba.nacos.naming.controllers.InstanceController類中

 

服務注冊流程圖

根據流程圖,我把這個過程拆分成了兩塊,第一塊就是更新本地緩存,因為服務注冊並不是向集群中每個節點都注冊,而是隨機選擇其中一個節點進行注冊的。第二塊其實就是把注冊的服務信息同步給集群中別的節點,好,接下來我們詳細分析一下這兩塊。

 

第一塊:更新本地緩存

 public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        //判斷本地緩存中是否存在該命名空間,如果不存在就創建,之后判斷該命名空間下是否
        //存在該服務,如果不存在就創建空的服務
        //注意這里並沒有更新服務的實例信息
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        //從本地緩存中獲取服務信息
        Service service = getService(namespaceId, serviceName);

        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        //服務注冊,這一步才會把服務的實例信息和服務綁定起來
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

進入com.alibaba.nacos.naming.core.ServiceManager#createEmptyService

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            //如果服務不存在,創建一個空的服務
            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            // now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();
            //將創建的空的服務插入緩存,並初始化
            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }

進入com.alibaba.nacos.naming.core.ServiceManager#putServiceAndInit

 private void putServiceAndInit(Service service) throws NacosException {
        //將服務插入緩存
        putService(service);
        //對服務啟動一個健康檢查的定時任務
        service.init();
        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON());
    }

1、putService很簡單就不分析了

2.進入com.alibaba.nacos.naming.core.Service#init

這里就不深入分析了,關於健康檢查我打算專門寫一篇文章分析,因為這里阿里的大佬們使用了tcp連接,並且使用了nio來進行檢查,里面還是挺復雜的。

3.監聽的代碼,這部分涉及的東西也很多,這里也是使用的觀察者模式實現的。

總結:到這里,第一部分的代碼就算是分析完了,里面我覺得要仔細分析,還有很多的細節沒有仔細看。

 

第二部分:將服務信息持久化到磁盤並同步到其他節點

進入com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put()方法

@Override
    public void put(String key, Record value) throws NacosException {
        mapConsistencyService(key).put(key, value);
    }

由於我們注冊的服務是臨時的,至於為什么是臨時的,看校驗參數那一塊,具體方法為:

com.alibaba.nacos.naming.controllers.InstanceController#parseInstance(),在這個方法內部會調用getIPAddress()方法

之后進com.alibaba.nacos.naming.controllers.InstanceController#getIPAddress(),之后你就會發現有這段代碼

boolean ephemeral = BooleanUtils.toBoolean(WebUtils.optional(request, "ephemeral",
                String.valueOf(switchDomain.isDefaultInstanceEphemeral())));

        Instance instance = new Instance();
        instance.setPort(Integer.parseInt(port));
        instance.setIp(ip);
        instance.setWeight(Double.parseDouble(weight));
        instance.setClusterName(cluster);
        instance.setHealthy(healthy);
        instance.setEnabled(enabled);
        instance.setEphemeral(ephemeral);

這個參數其實是由你注冊服務的時候,客戶端自己設置的。

 

在客戶端代碼com.alibaba.nacos.client.naming.net.NamingProxy#registerService

 public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
            namespaceId, serviceName, instance);

        final Map<String, String> params = new HashMap<String, String>(9);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        //設置是否是臨時服務,默認為true
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JSON.toJSONString(instance.getMetadata()));

        reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

    }

 

 

所以上面的put方法是com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put(),我們進入

    @Override
    public void put(String key, Record value) throws NacosException {
        //將服務信息放入緩存
        onPut(key, value);
        //新增任務,把服務信息同步給別的節點
        taskDispatcher.addTask(key);
    }

1.onput方法比較簡單,不介紹了

2.進入taskDispatcher.addTask(key);,由於代碼太長,我就折疊了

/*
 * Copyright 1999-2018 Alibaba Group Holding Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.nacos.naming.consistency.ephemeral.distro;

import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.naming.misc.*;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Data sync task dispatcher
 *
 * @author nkorange
 * @since 1.0.0
 */
@Component
public class TaskDispatcher {

    @Autowired
    private GlobalConfig partitionConfig;

    @Autowired
    private DataSyncer dataSyncer;

    private List<TaskScheduler> taskSchedulerList = new ArrayList<>();

    private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();
    //當這個bean加載的時候執行
    @PostConstruct
    public void init() {
        //根據cpu個數,新建任務,並且執行每個任務
        for (int i = 0; i < cpuCoreCount; i++) {
            TaskScheduler taskScheduler = new TaskScheduler(i);
            taskSchedulerList.add(taskScheduler);
            GlobalExecutor.submitTaskDispatch(taskScheduler);
        }
    }

    public void addTask(String key) {
        //從任務列表中隨機選擇一個任務處理當前的key
        taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
    }
    //任務類
    public class TaskScheduler implements Runnable {

        private int index;

        private int dataSize = 0;

        private long lastDispatchTime = 0L;

        private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);

        public TaskScheduler(int index) {
            this.index = index;
        }

        public void addTask(String key) {
            queue.offer(key);
        }

        public int getIndex() {
            return index;
        }

        @Override
        public void run() {

            List<String> keys = new ArrayList<>();
            while (true) {

                try {

                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
                        TimeUnit.MILLISECONDS);

//                    String key = queue.take();
//                    System.out.println("test");


                    if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                        Loggers.DISTRO.debug("got key: {}", key);
                    }

                    if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
                        continue;
                    }

                    if (StringUtils.isBlank(key)) {
                        continue;
                    }

                    if (dataSize == 0) {
                        keys = new ArrayList<>();
                    }

                    keys.add(key);
                    dataSize++;

                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {

                        for (Member member : dataSyncer.getServers()) {
                            if (NetUtils.localServer().equals(member.getAddress())) {
                                continue;
                            }
                            //new一個同步任務,用於同步
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getAddress());

                            if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
                                Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
                            }
                            //執行同步
                            dataSyncer.submit(syncTask, 0);
                        }
                        lastDispatchTime = System.currentTimeMillis();
                        dataSize = 0;
                    }

                } catch (Exception e) {
                    Loggers.DISTRO.error("dispatch sync task failed.", e);
                }
            }
        }
    }
}
View Code
/*
 * Copyright 1999-2018 Alibaba Group Holding Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.nacos.naming.consistency.ephemeral.distro;

import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.misc.*;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Data replicator
 *
 * @author nkorange
 * @since 1.0.0
 */
@Component
@DependsOn("ProtocolManager")
public class DataSyncer {

    @Autowired
    private DataStore dataStore;

    @Autowired
    private GlobalConfig partitionConfig;

    @Autowired
    private Serializer serializer;

    @Autowired
    private DistroMapper distroMapper;

    @Autowired
    private ServerMemberManager memberManager;

    private Map<String, String> taskMap = new ConcurrentHashMap<>();

    @PostConstruct
    public void init() {
        startTimedSync();
    }

    public void submit(SyncTask task, long delay) {

        // If it's a new task:
        if (task.getRetryCount() == 0) {
            Iterator<String> iterator = task.getKeys().iterator();
            while (iterator.hasNext()) {
                String key = iterator.next();
                if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
                    // associated key already exist:
                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("sync already in process, key: {}", key);
                    }
                    iterator.remove();
                }
            }
        }

        if (task.getKeys().isEmpty()) {
            // all keys are removed:
            return;
        }

        GlobalExecutor.submitDataSync(() -> {
            // 1. check the server
            if (getServers() == null || getServers().isEmpty()) {
                Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
                return;
            }

            List<String> keys = task.getKeys();

            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys);
            }
            // 2. get the datums by keys and check the datum is empty or not
            Map<String, Datum> datumMap = dataStore.batchGet(keys);
            if (datumMap == null || datumMap.isEmpty()) {
                // clear all flags of this task:
                for (String key : keys) {
                    taskMap.remove(buildKey(key, task.getTargetServer()));
                }
                return;
            }

            byte[] data = serializer.serialize(datumMap);

            long timestamp = System.currentTimeMillis();
            //同步到別的節點
            boolean success = NamingProxy.syncData(data, task.getTargetServer());
            if (!success) {
                SyncTask syncTask = new SyncTask();
                syncTask.setKeys(task.getKeys());
                syncTask.setRetryCount(task.getRetryCount() + 1);
                syncTask.setLastExecuteTime(timestamp);
                syncTask.setTargetServer(task.getTargetServer());
                retrySync(syncTask);
            } else {
                // clear all flags of this task:
                for (String key : task.getKeys()) {
                    taskMap.remove(buildKey(key, task.getTargetServer()));
                }
            }
        }, delay);
    }

    public void retrySync(SyncTask syncTask) {
        Member member = new Member();
        member.setIp(syncTask.getTargetServer().split(":")[0]);
        member.setPort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
        if (!getServers().contains(member)) {
            // if server is no longer in healthy server list, ignore this task:
            //fix #1665 remove existing tasks
            if (syncTask.getKeys() != null) {
                for (String key : syncTask.getKeys()) {
                    taskMap.remove(buildKey(key, syncTask.getTargetServer()));
                }
            }
            return;
        }

        // TODO may choose other retry policy.
        submit(syncTask, partitionConfig.getSyncRetryDelay());
    }

    public void startTimedSync() {
        GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
    }

    public class TimedSync implements Runnable {

        @Override
        public void run() {

            try {

                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("server list is: {}", getServers());
                }

                // send local timestamps to other servers:
                Map<String, String> keyChecksums = new HashMap<>(64);
                for (String key : dataStore.keys()) {
                    if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
                        continue;
                    }

                    Datum datum = dataStore.get(key);
                    if (datum == null) {
                        continue;
                    }
                    keyChecksums.put(key, datum.value.getChecksum());
                }

                if (keyChecksums.isEmpty()) {
                    return;
                }

                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("sync checksums: {}", keyChecksums);
                }

                for (Member member : getServers()) {
                    if (NetUtils.localServer().equals(member.getAddress())) {
                        continue;
                    }
                    NamingProxy.syncCheckSums(keyChecksums, member.getAddress());
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("timed sync task failed.", e);
            }
        }

    }

    public Collection<Member> getServers() {
        return memberManager.allMembers();
    }

    public String buildKey(String key, String targetServer) {
        return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
    }
}
View Code

上面的兩個類就是處理同步的,其實同步除了這個之外,在com.alibaba.nacos.naming.consistency.ephemeral.distro.DataSyncer這個類中還有一個定時同步,這個同步是上一個同步的補充。

  第二個定時同步,會把每個服務和這個服務的校驗和封裝到一個map中,然后異步發送給別的節點。

 

總結

  里面還是有很多問題沒有搞明白,只是搞明白了流程,還有很多細節沒有仔細看,有什么問題,忘大家指教

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM