nacos服務注冊之服務器端Distro


一致性協議算法Distro阿里自己的創的算法吧,網上能找到的資料很少。Distro用於處理ephemeral類型數據

  1. Distro協議算法看代碼大體流程是:
  • nacos啟動首先從其他遠程節點同步全部數據
  • nacos每個節點是平等的都可以處理寫入請求,同時把新數據同步到其他節點
  • 每個節點只負責部分數據,定時發送自己負責數據校驗值到其他節點來保持數據一致性
  1. Distro代碼分析, nacos負責數據一致性的服務是ConsistencyService接口,DistroConsistencyServiceImpl是EphemeralConsistencyService和ConsistencyService實現類。

Distro協議算法數據存儲實現筆記簡單,DataStore是存儲實現類臨時節點的數據都存儲在ConcurrentHashMap里面

/**
 * Store of data
 *
 * @author nkorange
 * @since 1.0.0
 */
@Component
public class DataStore {

    private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);

    public void put(String key, Datum value) {
        dataMap.put(key, value);
    }

    public Datum remove(String key) {
        return dataMap.remove(key);
    }

    public Set<String> keys() {
        return dataMap.keySet();
    }

    public Datum get(String key) {
        return dataMap.get(key);
    }

    public boolean contains(String key) {
        return dataMap.containsKey(key);
    }

    public Map<String, Datum> batchGet(List<String> keys) {
        Map<String, Datum> map = new HashMap<>(128);
        for (String key : keys) {
            Datum datum = dataMap.get(key);
            if (datum == null) {
                continue;
            }
            map.put(key, datum);
        }
        return map;
    }

    public int getInstanceCount() {
        int count = 0;
        for (Map.Entry<String, Datum> entry : dataMap.entrySet()) {
            try {
                Datum instancesDatum = entry.getValue();
                if (instancesDatum.value instanceof Instances) {
                    count += ((Instances) instancesDatum.value).getInstanceList().size();
                }
            } catch (Exception ignore) {
            }
        }
        return count;
    }

    public Map<String, Datum> getDataMap() {
        return dataMap;
    }
}

nacos啟動首先從其他節點同步全部數據
DistroConsistencyServiceImpl類構造完成后啟動同步線程直到首次同步成功

@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
    private volatile Notifier notifier = new Notifier();

    private LoadDataTask loadDataTask = new LoadDataTask();

    @PostConstruct
    public void init() {
        GlobalExecutor.submit(loadDataTask);
        GlobalExecutor.submitDistroNotifyTask(notifier);
    }

   /**
    * 從其它節點同步數據任務線程
    */
   private class LoadDataTask implements Runnable {

        @Override
        public void run() {
            try {
                load();
                if (!initialized) {
                    GlobalExecutor.submit(this, globalConfig.getLoadDataRetryDelayMillis());
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("load data failed.", e);
            }
        }
    }

   /**
     * 初始化時從其它節點同步數據
     * @throws Exception
     */
    public void load() throws Exception {
        //如果單列模式則返回
        if (SystemUtils.STANDALONE_MODE) {
            initialized = true;
            return;
        }
        // size = 1 means only myself in the list, we need at least one another server alive:
        while (serverListManager.getHealthyServers().size() <= 1) {
            Thread.sleep(1000L);
            Loggers.DISTRO.info("waiting server list init...");
        }
        //只要有一個節點同步成功則返回
        for (Server server : serverListManager.getHealthyServers()) {
            if (NetUtils.localServer().equals(server.getKey())) {
                continue;
            }
            // try sync data from remote server:
            if (syncAllDataFromRemote(server)) {
                initialized = true;
                return;
            }
        }
    }

   /**
     * 從指定nacos節點同步數據
     * @param server 目標nacos節點
     * @return  是否同步成功
     */
    public boolean syncAllDataFromRemote(Server server) {
        try {
            byte[] data = NamingProxy.getAllData(server.getKey());
            processData(data);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

     /**
     * 處理同步報文信息
     * 更新數據存儲和通知監聽器
     * @param data
     * @param data
     * @throws Exception
     */
    public void processData(byte[] data) throws Exception {
        if (data.length > 0) {
            Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);

            for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
                
                dataStore.put(entry.getKey(), entry.getValue());

                if (!listeners.containsKey(entry.getKey())) {
                    // pretty sure the service not exist:
                    if (switchDomain.isDefaultInstanceEphemeral()) {
                        // create empty service
                        Loggers.DISTRO.info("creating service {}", entry.getKey());
                        Service service = new Service();
                        String serviceName = KeyBuilder.getServiceName(entry.getKey());
                        String namespaceId = KeyBuilder.getNamespace(entry.getKey());
                        service.setName(serviceName);
                        service.setNamespaceId(namespaceId);
                        service.setGroupName(Constants.DEFAULT_GROUP);
                        // now validate the service. if failed, exception will be thrown
                        service.setLastModifiedMillis(System.currentTimeMillis());
                        service.recalculateChecksum();
                        listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0).onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
                    }
                }
            }

            for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
                if (!listeners.containsKey(entry.getKey())) {
                    // Should not happen:
                    continue;
                }

                try {
                    for (RecordListener listener : listeners.get(entry.getKey())) {
                        listener.onChange(entry.getKey(), entry.getValue().value);
                    }
                } catch (Exception e) {
                    continue;
                }

                // Update data store if listener executed successfully:
                dataStore.put(entry.getKey(), entry.getValue());
            }
        }
    }
}

public class NamingProxy {
   /**
     * 獲取指定nacos節點所有數據
     * @param server 目標nacos節點
     * @return       返回到數據
     * @throws Exception
     */
    public static byte[] getAllData(String server) throws Exception {
        Map<String, String> params = new HashMap<>(8);
        //http://120.0.0.1:8848/nacos/v1/ns/distro/datums
        HttpClient.HttpResult result = HttpClient.httpGet("http://" + server + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + ALL_DATA_GET_URL, new ArrayList<>(), params);

        if (HttpURLConnection.HTTP_OK == result.code) {
            return result.content.getBytes();
        }

        throw new IOException("failed to req API: " + "http://" + server + RunningConfig.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + DATA_GET_URL + ". code: " + result.code + " msg: " + result.content);
    }
}

Distro兩種數據的同步方式:1.定時同步 2.本節點接收到服務變更同步
DataSyncer啟動定時同步線程和負責數據同步
TaskDispatcher負責接受服務變更后分發到DataSyncer即時同步


@Component
@DependsOn("serverListManager")
public class DataSyncer {
    @PostConstruct
    //啟動定時同步的定時任務
    public void init() {
        startTimedSync();
    }

    public void startTimedSync() {
        //每5秒一次
        //dataSyncExecutor.scheduleWithFixedDelay(runnable, PARTITION_DATA_TIMED_SYNC_INTERVAL=5000, PARTITION_DATA_TIMED_SYNC_INTERVAL=5000, TimeUnit.MILLISECONDS);
        GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
    }

    public class TimedSync implements Runnable {

        @Override
        public void run() {

            try {
                // send local timestamps to other servers:
                Map<String, String> keyChecksums = new HashMap<>(64);
                for (String key : dataStore.keys()) {
                    //只同步自己負責的那部分數據
                    // int index = healthyList.indexOf(NetUtils.localServer());
                    // int target = distroHash(serviceName) % healthyList.size();
                    if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
                        continue;
                    }

                    Datum datum = dataStore.get(key);
                    if (datum == null) {
                        continue;
                    }
                    keyChecksums.put(key, datum.value.getChecksum());
                }

                if (keyChecksums.isEmpty()) {
                    return;
                }
                for (Server member : getServers()) {
                    if (NetUtils.localServer().equals(member.getKey())) {
                        continue;
                    }
                    //請求路徑/nacos/v1/ns/distro/checksum
                    NamingProxy.syncCheckSums(keyChecksums, member.getKey());
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("timed sync task failed.", e);
            }
        }

    }
}

TaskDispatcher負責接受服務變更后分發到DataSyncer即時同步

/**
 * Data sync task dispatcher
 *
 * @author nkorange
 * @since 1.0.0
 */
@Component
public class TaskDispatcher {

    @Autowired
    private GlobalConfig partitionConfig;

    @Autowired
    private DataSyncer dataSyncer;

    private List<TaskScheduler> taskSchedulerList = new ArrayList<>();

    private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();

    @PostConstruct
    public void init() {
        for (int i = 0; i < cpuCoreCount; i++) {
            TaskScheduler taskScheduler = new TaskScheduler(i);
            taskSchedulerList.add(taskScheduler);
            GlobalExecutor.submitTaskDispatch(taskScheduler);
        }
    }
    
    /**
     * 接收服務變更后同步任務
     * @param key 服務唯一標識
     */
    public void addTask(String key) {
        taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
    }

    public class TaskScheduler implements Runnable {

        private int index;

        private int dataSize = 0;

        private long lastDispatchTime = 0L;

        private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);

        public TaskScheduler(int index) {
            this.index = index;
        }

        public void addTask(String key) {
            queue.offer(key);
        }

        public int getIndex() {
            return index;
        }

        @Override
        public void run() {
            List<String> keys = new ArrayList<>();
            while (true) {
                try {
                    String key = queue.poll(partitionConfig.getTaskDispatchPeriod(), TimeUnit.MILLISECONDS);
                    if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
                        continue;
                    }

                    if (StringUtils.isBlank(key)) {
                        continue;
                    }

                    if (dataSize == 0) {
                        keys = new ArrayList<>();
                    }

                    keys.add(key);
                    dataSize++;

                    //批量同步; 1 數量達到batchSyncKeyCount=1000,2:距上次同步事件大於taskDispatchPeriod=2000
                    if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
                        (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {

                        for (Server member : dataSyncer.getServers()) {
                            if (NetUtils.localServer().equals(member.getKey())) {
                                continue;
                            }
                            SyncTask syncTask = new SyncTask();
                            syncTask.setKeys(keys);
                            syncTask.setTargetServer(member.getKey());
                            //分發任務到dataSyncer同步數據
                            dataSyncer.submit(syncTask, 0);
                        }
                        lastDispatchTime = System.currentTimeMillis();
                        dataSize = 0;
                    }

                } catch (Exception e) {
                    Loggers.DISTRO.error("dispatch sync task failed.", e);
                }
            }
        }
    }
}

@Component
@DependsOn("serverListManager")
public class DataSyncer {
    /**
     * 
     * @param task  同步數據
     * @param delay  延遲時間
     */
    public void submit(SyncTask task, long delay) {
        //各種監測
        .................. 
        GlobalExecutor.submitDataSync(() -> {
            // 1. check the server
             .................. 
            // 2. get the datums by keys and check the datum is empty or not
             .................. 
      
            byte[] data = serializer.serialize(datumMap);
            long timestamp = System.currentTimeMillis();
            // DATA_ON_SYNC_URL = "/nacos/v1/ns/distro/datum"
            boolean success = NamingProxy.syncData(data, task.getTargetServer());
            ..............
            //不成功重試
        }, delay);
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM