首先,一個Elasticsearch集群(下面簡稱ES集群)是由許多節點(Node)構成的,Node可以有不同的類型,通過以下配置,可以產生四種不同類型的Node:
conf/elasticsearch.yml: node.master: true/false node.data: true/false
四種不同類型的Node是一個node.master和node.data的true/false的兩兩組合。當然還有其他類型的Node,比如IngestNode(用於數據預處理等),不在本文討論范圍內。
當node.master為true時,其表示這個node是一個master的候選節點,可以參與選舉,在ES的文檔中常被稱作master-eligible node,類似於MasterCandidate。ES正常運行時只能有一個master(即leader),多於1個時會發生腦裂。
當node.data為true時,這個節點作為一個數據節點,會存儲分配在該node上的shard的數據並負責這些shard的寫入、查詢等。
此外,任何一個集群內的node都可以執行任何請求,其會負責將請求轉發給對應的node進行處理,所以當node.master和node.data都為false時,這個節點可以作為一個類似proxy的節點,接受請求並進行轉發、結果聚合等。
上圖是一個ES集群的示意圖,其中NodeA是當前集群的Master,NodeB和NodeC是Master的候選節點,其中NodeA和NodeB同時也是數據節點(DataNode),此外,NodeD是一個單純的數據節點,Node_E是一個proxy節點。每個Node會跟其他所有Node建立連接。
節點發現
Node啟動后,首先要通過節點發現功能加入集群。ZenDiscovery是ES自己實現的一套用於節點發現和選主等功能的模塊,沒有依賴Zookeeper等工具,官方文檔:
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery-zen.html
簡單來說,節點發現依賴以下配置:
這個配置可以看作是,在本節點到每個hosts中的節點建立一條邊,當整個集群所有的node形成一個聯通圖時,所有節點都可以知道集群中有哪些節點,不會形成孤島。
Master選舉
上面提到,集群中可能會有多個master-eligible node,此時就要進行master選舉,保證只有一個當選master。如果有多個node當選為master,則集群會出現腦裂,腦裂會破壞數據的一致性,導致集群行為不可控,產生各種非預期的影響。
為了避免產生腦裂,ES采用了常見的分布式系統思路,保證選舉出的master被多數派(quorum)的master-eligible node認可,以此來保證只有一個master。這個quorum通過以下配置進行配置:
conf/elasticsearch.yml:
discovery.zen.minimum_master_nodes: 2
這個配置對於整個集群非常重要。
1、master選舉誰發起,什么時候發起?
master選舉當然是由master-eligible節點發起,當一個master-eligible節點發現滿足以下條件時發起選舉:
- 該master-eligible節點的當前狀態不是master。
- 該master-eligible節點通過ZenDiscovery模塊的ping操作詢問其已知的集群其他節點,沒有任何節點連接到master。
- 包括本節點在內,當前已有超過minimum_master_nodes個節點沒有連接到master。
總結一句話,即當一個節點發現包括自己在內的多數派的master-eligible節點認為集群沒有master時,就可以發起master選舉。
2、當需要選舉master時,選舉誰?
首先是選舉誰的問題,如下面源碼所示,選舉的是排序后的第一個MasterCandidate(即master-eligible node,候選節點)。
public MasterCandidate electMaster(Collection<MasterCandidate> candidates) { assert hasEnoughCandidates(candidates); List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates); sortedCandidates.sort(MasterCandidate::compare); return sortedCandidates.get(0);
那么是按照什么排序的?
public static int compare(MasterCandidate c1, MasterCandidate c2) { // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted // list, so if c2 has a higher cluster state version, it needs to come first. int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion); if (ret == 0) { ret = compareNodes(c1.getNode(), c2.getNode()); } return ret; }
如上面源碼所示,先根據節點的clusterStateVersion比較,clusterStateVersion越大,優先級越高。clusterStateVersion相同時,進入compareNodes,其內部按照節點的Id比較(Id為節點第一次啟動時隨機生成)。
總結一下:
- 當clusterStateVersion越大,優先級越高。這是為了保證新Master擁有最新的clusterState(即集群的meta),避免已經commit的meta變更丟失。因為Master當選后,就會以這個版本的clusterState為基礎進行更新。(一個例外是集群全部重啟,所有節點都沒有meta,需要先選出一個master,然后master再通過持久化的數據進行meta恢復,再進行meta同步)。
- 當clusterStateVersion相同時,節點的Id越小,優先級越高。即總是傾向於選擇Id小的Node,這個Id是節點第一次啟動時生成的一個隨機字符串。之所以這么設計,應該是為了讓選舉結果盡可能穩定,不要出現都想當master而選不出來的情況。
3 什么時候選舉成功?
當一個master-eligible node(我們假設為Node_A)發起一次選舉時,它會按照上述排序策略選出一個它認為的master。
假設Node_A選Node_B當Master:
Node_A會向Node_B發送join請求,那么此時:
(1) 如果Node_B已經成為Master,Node_B就會把Node_A加入到集群中,然后發布最新的cluster_state, 最新的cluster_state就會包含Node_A的信息。相當於一次正常情況的新節點加入。對於Node_A,等新的cluster_state發布到Node_A的時候,Node_A也就完成join了。
(2) 如果Node_B在競選Master,那么Node_B會把這次join當作一張選票。對於這種情況,Node_A會等待一段時間,看Node_B是否能成為真正的Master,直到超時或者有別的Master選成功。
(3) 如果Node_B認為自己不是Master(現在不是,將來也選不上),那么Node_B會拒絕這次join。對於這種情況,Node_A會開啟下一輪選舉。
假設Node_A選自己當Master:
此時NodeA會等別的node來join,即等待別的node的選票,當收集到超過半數的選票時,認為自己成為master,然后變更cluster_state中的master node為自己,並向集群發布這一消息。
有興趣的同學可以看看下面這段源碼:
if (transportService.getLocalNode().equals(masterNode)) { final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins); nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout, new NodeJoinController.ElectionCallback() { @Override public void onElectedAsMaster(ClusterState state) { synchronized (stateMutex) { joinThreadControl.markThreadAsDone(currentThread); } } @Override public void onFailure(Throwable t) { logger.trace("failed while waiting for nodes to join, rejoining", t); synchronized (stateMutex) { joinThreadControl.markThreadAsDoneAndStartNew(currentThread); } } } ); } else { // process any incoming joins (they will fail because we are not the master) nodeJoinController.stopElectionContext(masterNode + " elected"); // send join request final boolean success = joinElectedMaster(masterNode); synchronized (stateMutex) { if (success) { DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode(); if (currentMasterNode == null) { // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have // a valid master. logger.debug("no master node is set, despite of join request completing. retrying pings."); joinThreadControl.markThreadAsDoneAndStartNew(currentThread); } else if (currentMasterNode.equals(masterNode) == false) { // update cluster state joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join"); } joinThreadControl.markThreadAsDone(currentThread); } else { // failed to join. Try again... joinThreadControl.markThreadAsDoneAndStartNew(currentThread); } } }
按照上述流程,我們描述一個簡單的場景來幫助大家理解:
假如集群中有3個master-eligible node,分別為Node_A、 Node_B、 Node_C, 選舉優先級也分別為Node_A、Node_B、Node_C。三個node都認為當前沒有master,於是都各自發起選舉,選舉結果都為Node_A(因為選舉時按照優先級排序,如上文所述)。於是Node_A開始等join(選票),Node_B、Node_C都向Node_A發送join,當Node_A接收到一次join時,加上它自己的一票,就獲得了兩票了(超過半數),於是Node_A成為Master。此時cluster_state(集群狀態)中包含兩個節點,當Node_A再收到另一個節點的join時,cluster_state包含全部三個節點。
4、選舉怎么保證不腦裂?
基本原則還是多數派的策略,如果必須得到多數派的認可才能成為Master,那么顯然不可能有兩個Master都得到多數派的認可。
上述流程中,master候選人需要等待多數派節點進行join后才能真正成為master,就是為了保證這個master得到了多數派的認可。但是我這里想說的是,上述流程在絕大部份場景下沒問題,聽上去也非常合理,但是卻是有bug的。
因為上述流程並沒有限制在選舉過程中,一個Node只能投一票,那么什么場景下會投兩票呢?比如NodeB投NodeA一票,但是NodeA遲遲不成為Master,NodeB等不及了發起了下一輪選主,這時候發現集群里多了個Node0,Node0優先級比NodeA還高,那NodeB肯定就改投Node0了。假設Node0和NodeA都處在等選票的環節,那顯然這時候NodeB其實發揮了兩票的作用,而且投給了不同的人。
那么這種問題應該怎么解決呢,比如raft算法中就引入了選舉周期(term)的概念,保證了每個選舉周期中每個成員只能投一票,如果需要再投就會進入下一個選舉周期,term+1。假如最后出現兩個節點都認為自己是master,那么肯定有一個term要大於另一個的term,而且因為兩個term都收集到了多數派的選票,所以多數節點的term是較大的那個,保證了term小的master不可能commit任何狀態變更(commit需要多數派節點先持久化日志成功,由於有term檢測,不可能達到多數派持久化條件)。這就保證了集群的狀態變更總是一致的。
而ES目前(6.2版本)並沒有解決這個問題,構造類似場景的測試case可以看到會選出兩個master,兩個node都認為自己是master,向全集群發布狀態變更,這個發布也是兩階段的,先保證多數派節點“接受”這次變更,然后再要求全部節點commit這次變更。很不幸,目前兩個master可能都完成第一個階段,進入commit階段,導致節點間狀態出現不一致,而在raft中這是不可能的。那么為什么都能完成第一個階段呢,因為第一個階段ES只是將新的cluster_state做簡單的檢查后放入內存隊列,如果當前cluster_state的master為空,不會對新的clusterstate中的master做檢查,即在接受了NodeA成為master的cluster_state后(還未commit),還可以繼續接受NodeB成為master的cluster_state。這就使NodeA和NodeB都能達到commit條件,發起commit命令,從而將集群狀態引向不一致。當然,這種腦裂很快會自動恢復,因為不一致發生后某個master再次發布cluster_state時就會發現無法達到多數派條件,或者是發現它的follower並不構成多數派而自動降級為candidate等。
這里要表達的是,ES的ZenDiscovery模塊與成熟的一致性方案相比,在某些特殊場景下存在缺陷,下一篇文章講ES的meta變更流程時也會分析其他的ES無法滿足一致性的場景。
錯誤檢測
1. MasterFaultDetection與NodesFaultDetection
這里的錯誤檢測可以理解為類似心跳的機制,有兩類錯誤檢測,一類是Master定期檢測集群內其他的Node,另一類是集群內其他的Node定期檢測當前集群的Master。檢查的方法就是定期執行ping請求。ES文檔:
There are two fault detection processes running. The first is by the master, to ping all the other nodes in the cluster and verify that they are alive. And on the other end, each node pings to master to verify if its still alive or an election process needs to be initiated.
如果Master檢測到某個Node連不上了,會執行removeNode的操作,將節點從cluste_state中移除,並發布新的cluster_state。當各個模塊apply新的cluster_state時,就會執行一些恢復操作,比如選擇新的primaryShard或者replica,執行數據復制等。
如果某個Node發現Master連不上了,會清空pending在內存中還未commit的new cluster_state,然后發起rejoin,重新加入集群(如果達到選舉條件則觸發新master選舉)。
rejoin
除了上述兩種情況,還有一種情況是Master發現自己已經不滿足多數派條件(>=minimumMasterNodes)了,需要主動退出master狀態(退出master狀態並執行rejoin)以避免腦裂的發生,那么master如何發現自己需要rejoin呢?
上面提到,當有節點連不上時,會執行removeNode。在執行removeNode時判斷剩余的Node是否滿足多數派條件,如果不滿足,則執行rejoin。
if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) { final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes()); rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])", masterNodes, electMasterService.minimumMasterNodes())); return resultBuilder.build(currentState); } else { return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks))); }
在publish新的cluster_state時,分為send階段和commit階段,send階段要求多數派必須成功,然后再進行commit。如果在send階段沒有實現多數派返回成功,那么可能是有了新的master或者是無法連接到多數派個節點等,則master需要執行rejoin。
try { publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener); } catch (FailedToCommitClusterStateException t) { // cluster service logs a WARN message logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])", newState.version(), electMaster.minimumMasterNodes()); synchronized (stateMutex) { pendingStatesQueue.failAllStatesAndClear( new ElasticsearchException("failed to publish cluster state")); rejoin("zen-disco-failed-to-publish"); } throw t; }
在對其他節點進行定期的ping時,發現有其他節點也是master,此時會比較本節點與另一個master節點的cluster_state的version,誰的version大誰成為master,version小的執行rejoin。
if (otherClusterStateVersion > localClusterState.version()) { rejoin("zen-disco-discovered another master with a new cluster_state [" + otherMaster + "][" + reason + "]"); } else { // TODO: do this outside mutex logger.warn("discovered [{}] which is also master but with an older cluster_state, telling [{}] to rejoin the cluster ([{}])", otherMaster, otherMaster, reason); try { // make sure we're connected to this node (connect to node does nothing if we're already connected) // since the network connections are asymmetric, it may be that we received a state but have disconnected from the node // in the past (after a master failure, for example) transportService.connectToNode(otherMaster); transportService.sendRequest(otherMaster, DISCOVERY_REJOIN_ACTION_NAME, new RejoinClusterRequest(localClusterState.nodes().getLocalNodeId()), new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { @Override public void handleException(TransportException exp) { logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to send rejoin request to [{}]", otherMaster), exp); } }); } catch (Exception e) { logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to send rejoin request to [{}]", otherMaster), e); } }
總結:
參考博客:https://blog.csdn.net/ailiandeziwei/article/details/87856210