Elasticsearch-04-master選舉


3.2 master選舉機制

3.2.1 選舉算法

1)bully算法

核心思想

  • 假定所有的節點都具有一個可以比較的ID,通過比較這個ID來選舉master

流程說明

  1. 節點向所有比自己ID大的節點發送選舉信息(election),告訴他們我選你
  2. 如果收到了回復消息(alive),這說明有人比自己“資歷”更老,要讓他去做老大,他只能乖乖等着老大選舉
    1. 等待老大成功選舉的消息(victory)
    2. 如果超時之后還沒有成功選舉消息,那么重新發送選舉信息
  3. 如果沒有收到任何回復消息(alive),那么就自己當老大,同時向其他節點發送當選信息(victory)

示例

  1. 首先,我們有6個節點的集群,所有節點都互聯,P6是master

    img

  2. P6掛了

    img

  3. P3發現P6掛了,於是向所有比自己ID大的節點發送選舉消息(election)

    • 要給P6發的原因是P6有可能恢復了,所以P6也要發

    img

  4. P4和P5都收到了消息,並表示他們會接手,你就不用管了(bully P3)

    img

  5. P4開始接管選主流程,它開始向P5和P6發送選舉信息

    img

  6. 只有P5相應了,P5從這里開始接管選舉(bully p4)

    img

  7. P5發送選舉信息

    img

  8. 沒有人能響應P5的選舉信息,於是P5當選master,同時告訴別人他是master

    img

優缺點

  • 優點
    • 簡單粗暴,只要我比你大,我就來組織選舉
  • 缺點
    • master假死會使得集群狀態不穩定。假定P6在P5發布當選信息后重新上線,P5檢測到P6的話,又會重新開啟選舉,因為P6的id比P5大
    • 腦裂問題,當出現網絡分區的時候,一個集群可能會選舉出兩個master(因為網絡通信受限)
2)raft算法

raft算法首先將系統中角色定義為三種:leader、follower、candidate。同時將系統一致性拆分為Leader選舉(Leader election)、日志同步(Log replication)、安全性(Safety)、日志壓縮(Log compaction)、成員變更(Membership change)等多個子問題。這里,我們只討論Leader election

核心思想

  • 每個leader都有一個任期(term),在它的任期內,他是老大;只要發現有人的任期比自己大,他就會無條件的加入

選主流程

  1. follower在一段時間內沒有收到leader發送來的確認信息之后會轉變為candidate
  2. candidate等待投票請求
    • 收到投票請求,投票,然后等待選舉結果
    • 超時,給自己投票,發送投票請求
  3. 收到足夠投票請求后,成功當選leader,開始維護集群
raft選舉流程

參考資料

3.2.2 選舉實現

1)es6.8
  • 邏輯流程圖

    es6.8 選主流程
  • 源代碼

    • 集群初始化

      /**
       * the main function of a join thread. This function is guaranteed to join the cluster
       * or spawn a new join thread upon failure to do so.
       */
      private void innerJoinCluster() {
          DiscoveryNode masterNode = null;
          final Thread currentThread = Thread.currentThread();
          nodeJoinController.startElectionContext();
          while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
              masterNode = findMaster();
          }
      
          if (joinThreadControl.joinThreadActive(currentThread) == false) {
              logger.trace("thread is no longer in currentJoinThread. Stopping.");
              return;
          }
      
          // 如果當前節點是被選出來的master,那么他就成功當選master,開始接受其他節點的連接請求
          // 如果沒有成功當選master,那么就去加入master
          // 這里也解釋了為什么在判斷存活master的時候不能把自己算進去。因為把自己算進去的話,所有節點都會認為自己是master,
          if (transportService.getLocalNode().equals(masterNode)) {
              final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
              logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
              nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                      new NodeJoinController.ElectionCallback() {
                          @Override
                          public void onElectedAsMaster(ClusterState state) {
                              synchronized (stateMutex) {
                                  joinThreadControl.markThreadAsDone(currentThread);
                              }
                          }
      
                          @Override
                          public void onFailure(Throwable t) {
                              logger.trace("failed while waiting for nodes to join, rejoining", t);
                              synchronized (stateMutex) {
                                  joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                              }
                          }
                      }
      
              );
          } else {
              // process any incoming joins (they will fail because we are not the master)
              nodeJoinController.stopElectionContext(masterNode + " elected");
      
              // send join request
              final boolean success = joinElectedMaster(masterNode);
      
              synchronized (stateMutex) {
                  if (success) {
                      DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
                      if (currentMasterNode == null) {
                          // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                          // a valid master.
                          logger.debug("no master node is set, despite of join request completing. retrying pings.");
                          joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                      } else if (currentMasterNode.equals(masterNode) == false) {
                          // update cluster state
                          joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
                      }
      
                      joinThreadControl.markThreadAsDone(currentThread);
                  } else {
                      // failed to join. Try again...
                      joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                  }
              }
          }
      }
      
    • 選主邏輯

      private DiscoveryNode findMaster() {
          logger.trace("starting to ping");
          List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
          if (fullPingResponses == null) {
              logger.trace("No full ping responses");
              return null;
          }
          if (logger.isTraceEnabled()) {
              StringBuilder sb = new StringBuilder();
              if (fullPingResponses.size() == 0) {
                  sb.append(" {none}");
              } else {
                  for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                      sb.append("\n\t--> ").append(pingResponse);
                  }
              }
              logger.trace("full ping responses:{}", sb);
          }
      
          final DiscoveryNode localNode = transportService.getLocalNode();
      
          // add our selves
          assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
              .filter(n -> n.equals(localNode)).findAny().isPresent() == false;
      
          fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));
      
          // filter responses
          final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
      
          List<DiscoveryNode> activeMasters = new ArrayList<>();
          for (ZenPing.PingResponse pingResponse : pingResponses) {
              // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
              // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
              if (pingResponse.master() != null && localNode.equals(pingResponse.master()) == false) {
                  activeMasters.add(pingResponse.master());
              }
          }
      
          // nodes discovered during pinging
          List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
          for (ZenPing.PingResponse pingResponse : pingResponses) {
              if (pingResponse.node().isMasterNode()) {
                  masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
              }
          }
      
          // activeMasters為空的時候有兩種情況:1.當前節點能看到的所有節點都選出了一個共同的master,且那個節點就是本地節點;2.沒有master
          // 1 --> 需要發布選主信息,告訴別人,master是誰
          // 2 --> 既然大家都沒有master,那么就來嘗試選舉master
          // activeMasters不為空時,表示其他節點已經選出了一個master,當前節點要做的事情就是加入這個master
          if (activeMasters.isEmpty()) {
              if (electMaster.hasEnoughCandidates(masterCandidates)) {
                  final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
                  logger.trace("candidate {} won election", winner);
                  return winner.getNode();
              } else {
                  // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                  logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
                              masterCandidates, electMaster.minimumMasterNodes());
                  return null;
              }
          } else {
              assert activeMasters.contains(localNode) == false :
                  "local node should never be elected as master when other nodes indicate an active master";
              // lets tie break between discovered nodes
              return electMaster.tieBreakActiveMasters(activeMasters);
          }
      }
      
    • 法定人數判斷邏輯

      // 變量 minimumMasterNodes 就是配置項 discovery.zen.minimum_master_nodes
      public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
          if (candidates.isEmpty()) {
              return false;
          }
          if (minimumMasterNodes < 1) {
              return true;
          }
          assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
              "duplicates ahead: " + candidates;
          return candidates.size() >= minimumMasterNodes;
      }
      
    • 節點比較邏輯

      /**
       * compares two candidates to indicate which the a better master.
       * A higher cluster state version is better
       *
       * @return -1 if c1 is a batter candidate, 1 if c2.
       */
      public static int compare(MasterCandidate c1, MasterCandidate c2) {
          // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
          // list, so if c2 has a higher cluster state version, it needs to come first.
          int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
          if (ret == 0) {
              ret = compareNodes(c1.getNode(), c2.getNode());
          }
          return ret;
      }
      
  • 分析

    • 什么時候開始選主投票?

      • 集群剛啟動時
      • master檢測到其他節點離開時
      • 其他節點檢測到master離開時
    • 在選主進行的時候,有新的節點加怎么辦?

      • ES會暫時擱置這些加入請求,直到選主結束之后再來處理。如果本地節點成功當選,就接收這些連接請求;如果沒有成功當選,則丟棄這些請求
      • 這些新發現的節點不會被計算到候選者中
    • 每個節點選舉出來的master可能不一樣,是怎么做到不腦裂的?

      • ping過程中發現的候選者數量要大於等於設置項 discovery.zen.minimum_master_nodes
    • 為什么會出現腦裂,不是已經有 discovery.zen.minimum_master_nodes 配置了嗎?

      • 假設一開始集群規模為3,那么配置為2是沒有任何問題的。但是,一旦集群規模擴大到7,那么合理的配置因為為4。於是,新節點的配置為4,而老節點的配置為2。如果沒有及時更新老節點的配置,就會存在腦裂的風險(試想一下,在主節點掛掉時,2個舊節點又恰好和4個新節點產生了網絡分區,而由於節點配置項不統一,就會導致腦裂)
2)es7.13

https://www.easyice.cn/archives/332

流程圖

es7選主
  • 和標准raft算法的不同之處
    • 在集群啟動時,節點默認為candidate
    • candidate不做任何投票限制,這有可能導致產生多個leader,ES選擇的是最新的leader(term最大的)
    • candidate在投票的時候,最后才會給自己投票,防止出現同票現象
    • 有預選環節,同預選環節來防止不必要的選舉。(比如只有一個節點認為leader離線)
  • 和舊版不同之處
    • cluster.initial_master_nodes配置。實際上,這個配置只在集群啟動時發揮作用。這個配置建議配置上所有具有master資格的節點,且所有節點配置相同。實際上,在集群啟動時,es沒有足夠的信息來判斷是否具有足夠的法定人數,所以只能依靠這個來進行判斷。所有節點配置一致可以保證所有節點認為的法定人數一致,而不會形成多個集群

為什么說新版本杜絕了腦裂問題? 動態法定人數和新舊兩種集群狀態

  • 因為新版本中的法定投票人數不再由設置決定,而是變成了一個動態更新的值。由ES在依據存活節點數量來判斷是否有足夠的參與人數。當存活人數為偶數時,es會多排除一個,使得存活人數始終是奇數。(為什么?)

    public boolean hasQuorum(Collection<String> votes) {
        final HashSet<String> intersection = new HashSet<>(nodeIds);
        intersection.retainAll(votes);
        return intersection.size() * 2 > nodeIds.size();
    }
    
  • 當我們分析是否存在法定人數的調用鏈時,會看到有這么一段判斷,為什么會進行兩次是否法定的判斷?

    /**
     * Whether there is an election quorum from the point of view of the given local node under the provided voting configurations
     */
    // satisfiesAdditionalQuorumConstraints由具體實現,這里始終返回true
    public final boolean isElectionQuorum(DiscoveryNode localNode, long localCurrentTerm, long localAcceptedTerm, long localAcceptedVersion,
                                          VotingConfiguration lastCommittedConfiguration, VotingConfiguration lastAcceptedConfiguration,
                                          VoteCollection joinVotes) {
        return joinVotes.isQuorum(lastCommittedConfiguration) &&
            joinVotes.isQuorum(lastAcceptedConfiguration) &&
            satisfiesAdditionalQuorumConstraints(localNode, localCurrentTerm, localAcceptedTerm, localAcceptedVersion,
                lastCommittedConfiguration, lastAcceptedConfiguration, joinVotes);
    }
    

    因為集群狀態中保留了新舊兩種投票配置,這兩個配置在集群穩定時是相同的,但在集群擴張或縮小時會不同。由於在集群變化時,總有那么一個時刻會同時存在新舊兩種集群狀態,部分節點為新狀態,部分節點為舊狀態。如果我們只根據其中一個狀態來判斷法定人數的話,這就會導致腦裂。所以要判斷兩次,防止出現腦裂現象。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM