ES選主策略


ES版本5.6.3

1、整個流程的開始,實在node啟動后觸發的,Node.java中start()方法,通過調用ZenDiscovery.java中的doStart()方法,之后會調用startInitialJoin方法開始進行加入現有的cluster或者選主。

public void startInitialJoin() {
        // start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
        clusterService.submitStateUpdateTask("initial_join", new LocalClusterUpdateTask() {

            @Override
            public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
                // do the join on a different thread, the DiscoveryService waits for 30s anyhow till it is discovered
                joinThreadControl.startNewThreadIfNotRunning();
                return unchanged();
            }

            @Override
            public void onFailure(String source, @org.elasticsearch.common.Nullable Exception e) {
                logger.warn("failed to start initial join process", e);
            }
        });
 }

2、ZenDiscovery類中startNewThreadIfNotRunning方法中innerJoinCluster()為實質性進行選主操作,其中findMaster()選擇master節點。 

private void innerJoinCluster() {
        DiscoveryNode masterNode = null;
        final Thread currentThread = Thread.currentThread();
        nodeJoinController.startElectionContext();
        while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
            masterNode = findMaster();
        }

       ......
    }

 

3、在findMaster()中。通過pingAndWait()方法獲取當前可以ping通的節點,並獲取PingResponse,此信息中包含節點信息以及該節點當前的master節點信息。之后,根據獲取的節點開始進行選主。

  此處有一參數需要注意:discovery.zen.master_election.ignore_non_master_pings,默認值為false,表明數據節點(node.master: false    node.data: true)是否參與選主,一般我們集群節點數較少時,不用修改此配置,如果集群規模很大,可以考慮只允許主節點參與選主操作。

  然后,根據獲取的pingResponses來判斷當前是否有master節點存在,存儲在activeMasters中,對於master的候選節點存儲在masterCandidates中。

  如果activeMasters為空,表明當前並未有master節點存在,則進行選主操作,即步驟4。這里需要注意的是discovery.zen.minimum_master_nodes,候選節點數必選大於等於該參數,選主才能繼續,否則是無法選主的。該參數一般配置為(N/2)+1,防止集群出現腦裂。

  如果activeMasters不為空,則當前的master節點,即為步驟5中的找到的master節點。

 private DiscoveryNode findMaster() {
        logger.trace("starting to ping");
        List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
        if (fullPingResponses == null) {
            logger.trace("No full ping responses");
            return null;
        }
        if (logger.isTraceEnabled()) {
            StringBuilder sb = new StringBuilder();
            if (fullPingResponses.size() == 0) {
                sb.append(" {none}");
            } else {
                for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                    sb.append("\n\t--> ").append(pingResponse);
                }
            }
            logger.trace("full ping responses:{}", sb);
        }

        final DiscoveryNode localNode = clusterService.localNode();

        // add our selves
        assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
            .filter(n -> n.equals(localNode)).findAny().isPresent() == false;

        fullPingResponses.add(new ZenPing.PingResponse(localNode, null, clusterService.state()));

        // filter responses
        final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);

        List<DiscoveryNode> activeMasters = new ArrayList<>();
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
            // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
            if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
                activeMasters.add(pingResponse.master());
            }
        }

        // nodes discovered during pinging
        List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
        for (ZenPing.PingResponse pingResponse : pingResponses) {
            if (pingResponse.node().isMasterNode()) {
                masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
            }
        }

        if (activeMasters.isEmpty()) {
            if (electMaster.hasEnoughCandidates(masterCandidates)) {
                final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
                logger.trace("candidate {} won election", winner);
                return winner.getNode();
            } else {
                // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
                            masterCandidates, electMaster.minimumMasterNodes());
                return null;
            }
        } else {
            assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
            // lets tie break between discovered nodes
            return electMaster.tieBreakActiveMasters(activeMasters);
        }
    }

4、ElectMasterService.java中的electMaster()方法為選主的具體實現,邏輯十分簡單根據當前的候選節點進行排序,排在第一個的即為master節點。

public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
        assert hasEnoughCandidates(candidates);
        List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
        sortedCandidates.sort(MasterCandidate::compare);
        return sortedCandidates.get(0);
    }

 

 5、當存在master節點時,則加入現有的集群中,如果是多個master節點,則會選擇排在第一個的master節點作為需要加入的集群。

public DiscoveryNode tieBreakActiveMasters(Collection<DiscoveryNode> activeMasters) {
        return activeMasters.stream().min(ElectMasterService::compareNodes).get();
    }

 

 

6、在選擇完master節點后,需要進行集群組建了。如果當前選擇出的master節點為本節點,則本節點需要等待其他節點來加入。這個邏輯不太確定,是異步實現的。大體如下:在每個node啟動時,均會注冊internal:discovery/zen/join請求,待其為maser后,其他節點通過該請求與之通信加入到master所在集群。在MembershipListener.java中注冊監聽,當有節點加入時,通過異步函數將信息存儲在NodeJoinController下的內部類ElectionContext中,具體判斷是否獲取了足夠的節點,判斷當次選舉是否成功。--------這塊邏輯比較繞,不確定理解是否正確,如哪位大神比較了解,望不吝賜教啊!!!!

7、如果選擇的master節點不是本節點,則選擇加入該集群ZenDiscovery.java中joinElectedMaster(),嘗試次數為discovery.zen.join_retry_attempts由控制,默認為3次,每次的超時時間:discovery.zen.join_timeout控制,默認值為discovery.zen.ping_timeout*20也就是60ms。所以這個參數不宜配置過長,否則在選舉失敗的超時時間就會比較長。如果加入master失敗或者超時,則會進行新的一輪選主,直到選則出滿足條件的master節點。

private boolean joinElectedMaster(DiscoveryNode masterNode) {
        try {
            // first, make sure we can connect to the master
            transportService.connectToNode(masterNode);
        } catch (Exception e) {
            logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to connect to master [{}], retrying...", masterNode), e);
            return false;
        }
        int joinAttempt = 0; // we retry on illegal state if the master is not yet ready
        while (true) {
            try {
                logger.trace("joining master {}", masterNode);
                membership.sendJoinRequestBlocking(masterNode, clusterService.localNode(), joinTimeout);
                return true;
            } catch (Exception e) {
                final Throwable unwrap = ExceptionsHelper.unwrapCause(e);
                if (unwrap instanceof NotMasterException) {
                    if (++joinAttempt == this.joinRetryAttempts) {
                        logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
                        return false;
                    } else {
                        logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
                    }
                } else {
                    if (logger.isTraceEnabled()) {
                        logger.trace((Supplier<?>) () -> new ParameterizedMessage("failed to send join request to master [{}]", masterNode), e);
                    } else {
                        logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ExceptionsHelper.detailedMessage(e));
                    }
                    return false;
                }
            }

            try {
                Thread.sleep(this.joinRetryDelay.millis());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

 

 

至此,master就已經選擇完成了。大概邏輯就是這樣,可能中間一些細節有待進一步深究。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM