zenDiscovery和master選舉


上一篇通過 ElectMasterService源碼,分析了master選舉的原理的大部分內容:master候選節點ID排序保證選舉一致性及通過設置最小可見候選節點數目避免brain split。節點排序后選舉只能保證局部一致性,如果發生節點接收到了錯誤的集群狀態就會選舉出錯誤的master,因此必須有其它措施來保證選舉的一致性。這就是上一篇所提到的第二點:被選舉的數量達到一定的數目同時自己也選舉自己,這個節點才能成為master。這一點體現在zenDiscovery中,本篇將結合節點的發現過程進一步介紹master選舉機制。

節點啟動后首先啟動join線程,join線程會尋找cluster的master節點,如果集群之前已經啟動,並且運行良好,則試圖連接集群的master節點,加入集群。否則(集群正在啟動)選舉master節點,如果自己被選為master,則向集群中其它節點發送一個集群狀態更新的task,如果master是其它節點則試圖加入該集群。

join的代碼如下所示:

private void innerJoinCluster() {
        DiscoveryNode masterNode = null;
        final Thread currentThread = Thread.currentThread();
     //一直阻塞直到找到master節點,在集群剛剛啟動,或者集群master丟失的情況,這種阻塞能夠保證集群一致性
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) { masterNode = findMaster(); }       //有可能自己會被選舉為master(集群啟動,或者加入時正在選舉)
      
if (clusterService.localNode().equals(masterNode)) {
      //如果本身是master,則需要向其它所有節點發送集群狀態更新 clusterService.submitStateUpdateTask(
"zen-disco-join (elected_as_master)", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) {             //選舉時錯誤的,之前的master狀態良好,則不更新狀態,仍舊使用之前狀態。 if (currentState.nodes().masterNode() != null) { return currentState; } DiscoveryNodes.Builder builder = new DiscoveryNodes.Builder(currentState.nodes()).masterNodeId(currentState.nodes().localNode().id()); // update the fact that we are the master... ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()).removeGlobalBlock(discoverySettings.getNoMasterBlock()).build(); currentState = ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build(); // eagerly run reroute to remove dead nodes from routing table RoutingAllocation.Result result = allocationService.reroute(currentState); return ClusterState.builder(currentState).routingResult(result).build(); } @Override public void onFailure(String source, Throwable t) { logger.error("unexpected failure during [{}]", t, source); joinThreadControl.markThreadAsDoneAndStartNew(currentThread); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (newState.nodes().localNodeMaster()) { // we only starts nodesFD if we are master (it may be that we received a cluster state while pinging) joinThreadControl.markThreadAsDone(currentThread); nodesFD.updateNodesAndPing(newState); // start the nodes FD } else { // if we're not a master it means another node published a cluster state while we were pinging // make sure we go through another pinging round and actively join it joinThreadControl.markThreadAsDoneAndStartNew(currentThread); } sendInitialStateEventIfNeeded(); long count = clusterJoinsCounter.incrementAndGet(); logger.trace("cluster joins counter set to [{}] (elected as master)", count); } }); } else { // 找到的節點不是我,試圖連接該master final boolean success = joinElectedMaster(masterNode); // finalize join through the cluster state update thread final DiscoveryNode finalMasterNode = masterNode; clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new ClusterStateNonMasterUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { if (!success) { // failed to join. Try again... joinThreadControl.markThreadAsDoneAndStartNew(currentThread); return currentState; } if (currentState.getNodes().masterNode() == null) { // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have // a valid master. logger.debug("no master node is set, despite of join request completing. retrying pings."); joinThreadControl.markThreadAsDoneAndStartNew(currentThread); return currentState; } if (!currentState.getNodes().masterNode().equals(finalMasterNode)) { return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join"); } // Note: we do not have to start master fault detection here because it's set at {@link #handleNewClusterStateFromMaster } // when the first cluster state arrives. joinThreadControl.markThreadAsDone(currentThread); return currentState; } @Override public void onFailure(String source, @Nullable Throwable t) { logger.error("unexpected error while trying to finalize cluster join", t); joinThreadControl.markThreadAsDoneAndStartNew(currentThread); } }); } }

以上就是join的過程。zenDiscovery在啟動時會啟動一個join線程,這個線程調用了該方法。同時在節點離開,master丟失等情況下也會重啟這一線程仍然運行join方法。接下來看一下findMaster這個方法。這個方法體現了master選舉的機制。代碼如下:

private DiscoveryNode findMaster() {
      //ping集群中的節點 ZenPing.PingResponse[] fullPingResponses = pingService.pingAndWait(pingTimeout); if (fullPingResponses == null) {return null; }// 過濾所得到的ping響應,慮除client節點,單純的data節點 List<ZenPing.PingResponse> pingResponses = Lists.newArrayList(); for (ZenPing.PingResponse pingResponse : fullPingResponses) { DiscoveryNode node = pingResponse.node(); if (masterElectionFilterClientNodes && (node.clientNode() || (!node.masterNode() && !node.dataNode()))) { // filter out the client node, which is a client node, or also one that is not data and not master (effectively, client) } else if (masterElectionFilterDataNodes && (!node.masterNode() && node.dataNode())) { // filter out data node that is not also master } else { pingResponses.add(pingResponse); } } final DiscoveryNode localNode = clusterService.localNode(); List<DiscoveryNode> pingMasters = newArrayList();
     //獲取所有ping響應中的master節點,如果master節點是節點本身則過濾掉。pingMasters列表結果要么為空(本節點是master)要么是同一個節點(出現不同節點則集群出現了問題
不過沒關系,后面會進行選舉)
for (ZenPing.PingResponse pingResponse : pingResponses) { if (pingResponse.master() != null) { if (!localNode.equals(pingResponse.master())) { pingMasters.add(pingResponse.master()); } } } // nodes discovered during pinging Set<DiscoveryNode> activeNodes = Sets.newHashSet(); // nodes discovered who has previously been part of the cluster and do not ping for the very first time Set<DiscoveryNode> joinedOnceActiveNodes = Sets.newHashSet();
    
    Version minimumPingVersion = localNode.version();
    for (ZenPing.PingResponse pingResponse : pingResponses) {
     activeNodes.add(pingResponse.node());
    minimumPingVersion = Version.smallest(pingResponse.node().version(), minimumPingVersion);
    if (pingResponse.hasJoinedOnce() != null && pingResponse.hasJoinedOnce()) {
  joinedOnceActiveNodes.add(pingResponse.node());
    }
    }
      //本節點暫時是master也要加入候選節點進行選舉
        if (localNode.masterNode()) {
            activeNodes.add(localNode);
            long joinsCounter = clusterJoinsCounter.get();
            if (joinsCounter > 0) {
                logger.trace("adding local node to the list of active nodes who has previously joined the cluster (joins counter is [{}})", joinsCounter);
                joinedOnceActiveNodes.add(localNode);
            }
        }
      //pingMasters為空,則本節點是master節點,     
if (pingMasters.isEmpty()) { if (electMaster.hasEnoughMasterNodes(activeNodes)) {//保證選舉數量,說明有足夠多的節點選舉本節點為master,但是這還不夠,本節點還需要再選舉一次,如果
          本次選舉節點仍舊是自己,那么本節點才能成為master。這里就體現了master選舉的第二條原則。
DiscoveryNode master = electMaster.electMaster(joinedOnceActiveNodes); if (master != null) { return master; } return electMaster.electMaster(activeNodes); } else { // if we don't have enough master nodes, we bail, because there are not enough master to elect from logger.trace("not enough master nodes [{}]", activeNodes); return null; } } else {
        //pingMasters不為空(pingMasters列表中應該都是同一個節點),本節點沒有被選舉為master,那就接受之前的選舉。
return electMaster.electMaster(pingMasters); } }

上面的重點部分都做了標注,就不再分析。除了findMaster方法,還有一個方法也體現了master選舉,那就是handleMasterGone。下面是它的部分代碼,提交master丟失task部分,

 clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ProcessedClusterStateNonMasterUpdateTask() {           
      @Override
public ClusterState execute(ClusterState currentState) { //獲取到當前集群狀態下的所有節點 DiscoveryNodes discoveryNodes = DiscoveryNodes.builder(currentState.nodes()) // make sure the old master node, which has failed, is not part of the nodes we publish .remove(masterNode.id()) .masterNodeId(null).build();           //rejoin過程仍然是重復findMaster過程
          
if (rejoin) { return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "master left (reason = " + reason + ")"); }           //無法達到選舉數量,進行findMaster過程 if (!electMaster.hasEnoughMasterNodes(discoveryNodes)) { return rejoin(ClusterState.builder(currentState).nodes(discoveryNodes).build(), "not enough master nodes after master left (reason = " + reason + ")"); }
          //在當前集群狀態下,如果候選節點數量達到預期數量,那么選舉出來的節點一定是同一個節點,因為所有的節點看到的集群states是一致的
final DiscoveryNode electedMaster = electMaster.electMaster(discoveryNodes); // elect master final DiscoveryNode localNode = currentState.nodes().localNode(); .... }

從以上的代碼可以看到master選舉節點的應用場景,無論是findMaster還是handlemasterGone,他們都保證了選舉一致性。那就是所選節點數量必須要達到一定的數量,否則不能認為選舉成功,進入等待環境。如果當前節點被其它節點選舉為master,仍然要進行選舉一次以保證選舉的一致性。這樣在保證了選舉數量同時對候選節點排序從而保證選舉的一致性。

發現和加入集群是zenDiscovery的主要功能,當然它還有一些其它功能,如處理節點離開(handleLeaveRequest),處理master發送的最小clustersates(handleNewClusterStateFromMaster)等功能。這里就不一一介紹,有興趣請參考相關源碼。

總結一下:本節結合zenDiscovery,分析了master選舉的另外一部分內容。同時zenDiscovery是節點發現集群功能的集合,它主要功能是發現(選舉)出集群的master節點,並試圖加入集群。同時如果 本機是master還會處理節點的離開和節點丟失,如果不是master則會處理來自master的節點狀態更新。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM