Nacos 集群選舉原理


集群選舉問題:

  Nacos支持集群模式,很顯然。而一旦涉及到集群,就涉及到主從,那么nacos是一種什么樣的機制來實現的集群呢?

  Nacos的集群類似於zookeeper, 它分為leader角色和follower角色, 那么從這個角色的名字可以看出來,這個集群存在選舉的機制。 因為如果自己不具備選舉功能,角色的命名可能就是master/slave了.

選舉算法 :

  Nacos集群采用 raft 算法來實現,它是相對zookeeper的選舉算法較為簡單的一種。選舉算法的核心在 RaftCore 中,包括數據的處理和數據同步。

  raft 算法演示地址 :http://thesecretlivesofdata.com/raft/

  在Raft中,節點有三種角色:

  1. Leader:負責接收客戶端的請求
  2. Candidate:用於選舉Leader的一種角色(競選狀態)
  3. Follower:負責響應來自Leader或者Candidate的請求

  選舉分為兩個節點

  1. 服務啟動的時候
  2. leader掛了的時候

  所有節點啟動的時候,都是follower狀態。 如果在一段時間內如果沒有收到leader的心跳(可能是沒有leader,也可能是leader掛了),那么follower會變成Candidate。然后發起選舉,選舉之前,會增加 term,這個 term 和 zookeeper 中的 epoch 的道理是一樣的。

  follower會投自己一票,並且給其他節點發送票據vote,等到其他節點回復在這個過程中,可能出現幾種情況

  1. 收到過半的票數通過,則成為leader
  2. 被告知其他節點已經成為leader,則自己切換為follower
  3. 一段時間內沒有收到過半的投票,則重新發起選舉

  約束條件在任一term中,單個節點最多只能投一票

選舉的幾種情況 :

  1. 第一種情況,贏得選舉之后,leader會給所有節點發送消息,避免其他節點觸發新的選舉

  2. 第二種情況,比如有三個節點A B C。A B同時發起選舉,而A的選舉消息先到達C,C給A投了一票,當B的消息到達C時,已經不能滿足上面提到的約束條件,即C不會給B投票,而A和B顯然都不會給對方投票。A勝出之后,會給B,C發心跳消息,節點B發現節點A的term不低於自己的term,知道有已經有Leader了,於是轉換成follower

  3. 第三種情況, 沒有任何節點獲得majority投票,可能是平票的情況。加入總共有四個節點(A/B/C/D),Node C、Node D同時成為了candidate,但Node A投了NodeD一票,NodeB投了Node C一票,這就出現了平票 split vote的情況。這個時候大家都在等啊等,直到超時后重新發起選舉。如果出現平票的情況,那么就延長了系統不可用的時間,因此raft引入了  randomizedelection timeouts來盡量避免平票情況.

源碼分析 :

  RaftCore 初始化 :Raft選舉算法,是在RaftCore這個類中實現的。

/** * Init raft core. * * @throws Exception any exception during init */ @PostConstruct public void init() throws Exception { Loggers.RAFT.info("initializing Raft sub-system");      //開啟一個notifier監聽,這個線程中會遍歷listeners,根據ApplyAction執行相應的邏輯
 executor.submit(notifier); final long start = System.currentTimeMillis();      //遍歷/nacos/data/naming/data/文件件,也就是從磁盤中加載Datum到內存,用來做數據恢復。(數據同步采用2pc協議,leader收到請求會寫寫入到磁盤日志,然后再進行數據同步)
 raftStore.loadDatums(notifier, datums);      //從/nacos_home/data/naming/meta.properties文件中讀取term,term表示當前的時鍾周期。
        setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L)); Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm()); while (true) { if (notifier.tasks.size() <= 0) { break; } Thread.sleep(1000L); } initialized = true; Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start)); //開啟定時任務,每500ms執行一次,用來判斷是否需要發起leader選舉,每500ms發起一次心跳
        GlobalExecutor.registerMasterElection(new MasterElection()); GlobalExecutor.registerHeartbeat(new HeartBeat()); Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); }

  這里我們重點關注 MasterElection 選舉:

public class MasterElection implements Runnable { @Override public void run() { try { //判斷本機是否具准備就緒
                if (!peers.isReady()) { return; } //獲取本機的節點信息
                RaftPeer local = peers.local();           //leader選舉觸發間隔時間,第一次進來,會生成(0~15000毫秒)之間的一個隨機數-500.           // //后面由於500ms調度一次,所以每次該線程被調起,會將該leaderDueMs減去TICK_PERIOD_MS(500ms),直到小於0的時候會觸發選舉           //后面每次收到一次leader的心跳就會重置leaderDueMs = 15s+(隨機0-5s)
                local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; //當間隔時間>0,直接返回,等到下一次500ms后再調用
                if (local.leaderDueMs > 0) { return; } //重新設置本地的leaderDueMs // reset timeout
 local.resetLeaderDue(); local.resetHeartbeatDue();//設置心跳間隔5s //發起投票
 sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}", e); } } //發送票據數據
        private void sendVote() { RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()), local.term); //重置peer
 peers.reset(); //每一次投票,都累加一次term,表示當前投票的輪數
 local.term.incrementAndGet();        //選自己,此時peers中有一個votefor就是自己
            local.voteFor = local.ip;        //本地server狀態設置為CANDIDATE
            local.state = RaftPeer.State.CANDIDATE; Map<String, String> params = new HashMap<>(1); params.put("vote", JacksonUtils.toJson(local));//設置參數        //遍歷除了本機ip之外的其他節點,把自己的票據發送給所有節點

            for (final String server : peers.allServersWithoutMySelf()) { final String url = buildUrl(server, API_VOTE);//API_VOTE接口路徑:/raft/vote
                try { HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT .error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url); return 1; } //獲取其他server的響應
                            RaftPeer peer = JacksonUtils.toObj(response.getResponseBody(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer)); //計算leader
 peers.decideLeader(peer); return 0; } }); } catch (Exception e) { Loggers.RAFT.warn("error while sending vote to server: {}", server); } } } }

  RaftController : 我們先來看一下,其他節點收到投票請求后,如何處理呢?在沒有看代碼之前,不難猜測到,他應該要做票據的判斷,到底是不是贊同你作為leader。

@PostMapping("/vote") public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception {   //接受到投票請求
  RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class));   //返回結果
  return JacksonUtils.transferToJsonNode(peer); }

  處理邏輯非常簡單。

  • 判斷收到的請求的term是不是過期的數據,如果是,則認為對方的這個票據無效,直接告訴發送這個票據的節點,你應該選擇當前收到請求的節點。

  • 否則,當前收到請求的節點會自動接受對方的票據,並把自己設置成follower

public synchronized RaftPeer receivedVote(RaftPeer remote) { if (!peers.contains(remote)) { throw new IllegalStateException("can not find peer: " + remote.ip); } //得到本機節點信息
        RaftPeer local = peers.get(NetUtils.localServer());     //判斷周期是否過期,如果收到的票據是過期狀態
    if (remote.term.get() <= local.term.get()) { String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term; Loggers.RAFT.info(msg);       //如果voteFor為空,表示在此之前沒有收到其他節點的票據。則把remote節點的票據設置到自己的節點上
            if (StringUtils.isEmpty(local.voteFor)) { local.voteFor = local.ip; } return local; } //如果上面if不成立,說明remote機器率先發起的投票,那么就認同他的投票      //重置選舉間隔時間
 local.resetLeaderDue(); //設置為follower
        local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; local.term.set(remote.term.get());//同步term
 Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term); return local; }

  peers.decideLeader(peer) 表示用來決策誰能成為leader

public RaftPeer decideLeader(RaftPeer candidate) {     //假設3個節點:A,B,C。local節點為A,假設A,B,C第一輪同時發起選舉請求     //第一輪:處理B,C節點返回結果:peers{"ip_a":"candidate_a","ip_b":"candidate_b","ip_C":"candidate_C"} peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0; String maxApprovePeer = null;      /**第一輪投票結果:     * 第一次for循環是a自己的投票:      * maxApproveCount = 1,maxApprovePeer = A
    * 第二次for循環是B服務器返回的投票,該投票投向B:數據同步addInstance
     * 比如我們在注冊服務時,調用addInstance之后,最后會調用 consistencyService.put(key,instances); 這個方法,來實現數據一致性的同步。      * if (ips.getCount(peer.voteFor) > maxApproveCount) 條件不成立,maxApproveCount = 1,maxApprovePeer = A     *      * 第三次for循環是C服務器返回的投票,該投票投向C:      * if (ips.getCount(peer.voteFor) > maxApproveCount) 條件不成立,maxApproveCount = 1,maxApprovePeer = A     
*/     for (RaftPeer peer : peers.values()) { if (StringUtils.isEmpty(peer.voteFor)) { continue; } ips.add(peer.voteFor); if (ips.getCount(peer.voteFor) > maxApproveCount) { maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; } }     //majorityCount():2(假設3個節點)     //第一輪:maxApproveCount = 1 if條件不成立,返回leader,此時leader為null,沒有選舉成功     if (maxApproveCount >= majorityCount()) { //找到得票最多的那個peer RaftPeer peer = peers.get(maxApprovePeer); //設置這個peer為leader peer.state = RaftPeer.State.LEADER; if (!Objects.equals(leader, peer)) { leader = peer; ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local())); Loggers.RAFT.info("{} has become the LEADER", leader.ip); } } return leader; }

 

   通過上面的分析想必大家應該很清楚 Nacos 的集群選舉實現了 。如果還有不明白的小伙伴可以對着 Raft 的演示地址進行理解。

數據同步:

  在注冊服務時,調用addInstance之后,最后會調用 consistencyService.put(key,instances); 這個方法,來實現數據一致性的同步。

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } }

   調用 consistencyService.put 用來發布類容,也就是實現數據的一致性同步。

@Override public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); } private ConsistencyService mapConsistencyService(String key) { return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }

   這里會走后面 persistentConsistencyService ,由於  public class RaftConsistencyServiceImpl implements PersistentConsistencyService 所以這里走 RaftConsistencyServiceImpl  :

@Override public void put(String key, Record value) throws NacosException { try { raftCore.signalPublish(key, value); } catch (Exception e) { Loggers.RAFT.error("Raft put failed.", e); throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e); } } public void signalPublish(String key, Record value) throws Exception { //如果自己不是leader,則找到leader節點,把數據轉發到leader節點
    if (!isLeader()) { ObjectNode params = JacksonUtils.createEmptyJsonNode(); params.put("key", key); params.replace("value", JacksonUtils.transferToJsonNode(value)); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); final RaftPeer leader = getLeader(); //代理轉發
 raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); return; } //如果自己是leader,則向所有節點發送onPublish請求。這個所有節點包含自己。
    try { OPERATE_LOCK.lock();//先上個鎖
        final long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; if (getDatum(key) == null) { datum.timestamp.set(1L); } else { datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } ObjectNode json = JacksonUtils.createEmptyJsonNode(); json.replace("datum", JacksonUtils.transferToJsonNode(datum)); json.replace("source", JacksonUtils.transferToJsonNode(peers.local())); //onPublish可以當做是一次心跳了,更新選舉檢查時間,然后一個重點就是term增加100了 //當然還是就是更新內容了,先寫文件,再更新內存緩存。(也就是先記錄本地日志)
        onPublish(datum, peers.local());//發送數據到所有節點
        
        final String content = json.toString(); final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); //遍歷所有節點,發送事務提交請求,把記錄在本地日志中的數據進行提交
        for (final String server : peers.allServersIncludeMyself()) { if (isLeader(server)) {//再次判斷是否leader
 latch.countDown(); continue; }//構建發布同步接口地址
            final String url = buildUrl(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, response.getStatusCode()); return 1; } latch.countDown(); return 0; } @Override public STATE onContentWriteCompleted() { return STATE.CONTINUE; } }); } // .....
}

  其中  onPublish(datum, peers.local()); 發送數據到所有節點:

public void onPublish(Datum datum, RaftPeer source) throws Exception { RaftPeer local = peers.local(); // 本地數據信息
        if (datum.value == null) { //這個value就是服務注冊的服務,為空報錯
            Loggers.RAFT.warn("received empty datum"); throw new IllegalStateException("received empty datum"); } // 判斷是否事leader 不是報錯
        if (!peers.isLeader(source.ip)) { Loggers.RAFT .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(getLeader())); throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader"); } // 判斷 term
        if (source.term.get() < local.term.get()) { Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(local)); throw new IllegalStateException( "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get()); } //重置事件
 local.resetLeaderDue(); // if data should be persisted, usually this is true:
        if (KeyBuilder.matchPersistentKey(datum.key)) { raftStore.write(datum);// 發送持久化數據,完成數據同步
 } datums.put(datum.key, datum); if (isLeader()) {//如果是leader ,則增加 term
 local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } else { if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) { //set leader term:
 getLeader().term.set(source.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } }      //更新/nacos_home/data/naming/meta.properties文件
 raftStore.updateTerm(local.term.get()); notifier.addTask(datum.key, DataOperation.CHANGE); Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term); }

  通過以上的操作就完成了數據的同步。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM