集群選舉問題:
Nacos支持集群模式,很顯然。而一旦涉及到集群,就涉及到主從,那么nacos是一種什么樣的機制來實現的集群呢?
Nacos的集群類似於zookeeper, 它分為leader角色和follower角色, 那么從這個角色的名字可以看出來,這個集群存在選舉的機制。 因為如果自己不具備選舉功能,角色的命名可能就是master/slave了.
選舉算法 :
Nacos集群采用 raft 算法來實現,它是相對zookeeper的選舉算法較為簡單的一種。選舉算法的核心在 RaftCore 中,包括數據的處理和數據同步。
raft 算法演示地址 :http://thesecretlivesofdata.com/raft/
在Raft中,節點有三種角色:
- Leader:負責接收客戶端的請求
- Candidate:用於選舉Leader的一種角色(競選狀態)
- Follower:負責響應來自Leader或者Candidate的請求
選舉分為兩個節點
- 服務啟動的時候
- leader掛了的時候
所有節點啟動的時候,都是follower狀態。 如果在一段時間內如果沒有收到leader的心跳(可能是沒有leader,也可能是leader掛了),那么follower會變成Candidate。然后發起選舉,選舉之前,會增加 term,這個 term 和 zookeeper 中的 epoch 的道理是一樣的。
follower會投自己一票,並且給其他節點發送票據vote,等到其他節點回復在這個過程中,可能出現幾種情況
- 收到過半的票數通過,則成為leader
- 被告知其他節點已經成為leader,則自己切換為follower
- 一段時間內沒有收到過半的投票,則重新發起選舉
約束條件在任一term中,單個節點最多只能投一票
選舉的幾種情況 :
-
第一種情況,贏得選舉之后,leader會給所有節點發送消息,避免其他節點觸發新的選舉
-
第二種情況,比如有三個節點A B C。A B同時發起選舉,而A的選舉消息先到達C,C給A投了一票,當B的消息到達C時,已經不能滿足上面提到的約束條件,即C不會給B投票,而A和B顯然都不會給對方投票。A勝出之后,會給B,C發心跳消息,節點B發現節點A的term不低於自己的term,知道有已經有Leader了,於是轉換成follower
-
第三種情況, 沒有任何節點獲得majority投票,可能是平票的情況。加入總共有四個節點(A/B/C/D),Node C、Node D同時成為了candidate,但Node A投了NodeD一票,NodeB投了Node C一票,這就出現了平票 split vote的情況。這個時候大家都在等啊等,直到超時后重新發起選舉。如果出現平票的情況,那么就延長了系統不可用的時間,因此raft引入了 randomizedelection timeouts來盡量避免平票情況.
源碼分析 :
RaftCore 初始化 :Raft選舉算法,是在RaftCore這個類中實現的。
/** * Init raft core. * * @throws Exception any exception during init */ @PostConstruct public void init() throws Exception { Loggers.RAFT.info("initializing Raft sub-system"); //開啟一個notifier監聽,這個線程中會遍歷listeners,根據ApplyAction執行相應的邏輯
executor.submit(notifier); final long start = System.currentTimeMillis(); //遍歷/nacos/data/naming/data/文件件,也就是從磁盤中加載Datum到內存,用來做數據恢復。(數據同步采用2pc協議,leader收到請求會寫寫入到磁盤日志,然后再進行數據同步)
raftStore.loadDatums(notifier, datums); //從/nacos_home/data/naming/meta.properties文件中讀取term,term表示當前的時鍾周期。
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L)); Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm()); while (true) { if (notifier.tasks.size() <= 0) { break; } Thread.sleep(1000L); } initialized = true; Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start)); //開啟定時任務,每500ms執行一次,用來判斷是否需要發起leader選舉,每500ms發起一次心跳
GlobalExecutor.registerMasterElection(new MasterElection()); GlobalExecutor.registerHeartbeat(new HeartBeat()); Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); }
這里我們重點關注 MasterElection 選舉:
public class MasterElection implements Runnable { @Override public void run() { try { //判斷本機是否具准備就緒
if (!peers.isReady()) { return; } //獲取本機的節點信息
RaftPeer local = peers.local(); //leader選舉觸發間隔時間,第一次進來,會生成(0~15000毫秒)之間的一個隨機數-500. // //后面由於500ms調度一次,所以每次該線程被調起,會將該leaderDueMs減去TICK_PERIOD_MS(500ms),直到小於0的時候會觸發選舉 //后面每次收到一次leader的心跳就會重置leaderDueMs = 15s+(隨機0-5s)
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; //當間隔時間>0,直接返回,等到下一次500ms后再調用
if (local.leaderDueMs > 0) { return; } //重新設置本地的leaderDueMs // reset timeout
local.resetLeaderDue(); local.resetHeartbeatDue();//設置心跳間隔5s //發起投票
sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}", e); } } //發送票據數據
private void sendVote() { RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()), local.term); //重置peer
peers.reset(); //每一次投票,都累加一次term,表示當前投票的輪數
local.term.incrementAndGet(); //選自己,此時peers中有一個votefor就是自己
local.voteFor = local.ip; //本地server狀態設置為CANDIDATE
local.state = RaftPeer.State.CANDIDATE; Map<String, String> params = new HashMap<>(1); params.put("vote", JacksonUtils.toJson(local));//設置參數 //遍歷除了本機ip之外的其他節點,把自己的票據發送給所有節點
for (final String server : peers.allServersWithoutMySelf()) { final String url = buildUrl(server, API_VOTE);//API_VOTE接口路徑:/raft/vote
try { HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT .error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url); return 1; } //獲取其他server的響應
RaftPeer peer = JacksonUtils.toObj(response.getResponseBody(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer)); //計算leader
peers.decideLeader(peer); return 0; } }); } catch (Exception e) { Loggers.RAFT.warn("error while sending vote to server: {}", server); } } } }
RaftController : 我們先來看一下,其他節點收到投票請求后,如何處理呢?在沒有看代碼之前,不難猜測到,他應該要做票據的判斷,到底是不是贊同你作為leader。
@PostMapping("/vote") public JsonNode vote(HttpServletRequest request, HttpServletResponse response) throws Exception { //接受到投票請求
RaftPeer peer = raftCore.receivedVote(JacksonUtils.toObj(WebUtils.required(request, "vote"), RaftPeer.class)); //返回結果
return JacksonUtils.transferToJsonNode(peer); }
處理邏輯非常簡單。
-
判斷收到的請求的term是不是過期的數據,如果是,則認為對方的這個票據無效,直接告訴發送這個票據的節點,你應該選擇當前收到請求的節點。
-
否則,當前收到請求的節點會自動接受對方的票據,並把自己設置成follower
public synchronized RaftPeer receivedVote(RaftPeer remote) { if (!peers.contains(remote)) { throw new IllegalStateException("can not find peer: " + remote.ip); } //得到本機節點信息
RaftPeer local = peers.get(NetUtils.localServer()); //判斷周期是否過期,如果收到的票據是過期狀態
if (remote.term.get() <= local.term.get()) { String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term; Loggers.RAFT.info(msg); //如果voteFor為空,表示在此之前沒有收到其他節點的票據。則把remote節點的票據設置到自己的節點上
if (StringUtils.isEmpty(local.voteFor)) { local.voteFor = local.ip; } return local; } //如果上面if不成立,說明remote機器率先發起的投票,那么就認同他的投票 //重置選舉間隔時間
local.resetLeaderDue(); //設置為follower
local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; local.term.set(remote.term.get());//同步term
Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term); return local; }
peers.decideLeader(peer) 表示用來決策誰能成為leader
public RaftPeer decideLeader(RaftPeer candidate) { //假設3個節點:A,B,C。local節點為A,假設A,B,C第一輪同時發起選舉請求 //第一輪:處理B,C節點返回結果:peers{"ip_a":"candidate_a","ip_b":"candidate_b","ip_C":"candidate_C"} peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0; String maxApprovePeer = null; /**第一輪投票結果: * 第一次for循環是a自己的投票: * maxApproveCount = 1,maxApprovePeer = A
* 第二次for循環是B服務器返回的投票,該投票投向B:數據同步addInstance
* 比如我們在注冊服務時,調用addInstance之后,最后會調用 consistencyService.put(key,instances); 這個方法,來實現數據一致性的同步。 * if (ips.getCount(peer.voteFor) > maxApproveCount) 條件不成立,maxApproveCount = 1,maxApprovePeer = A * * 第三次for循環是C服務器返回的投票,該投票投向C: * if (ips.getCount(peer.voteFor) > maxApproveCount) 條件不成立,maxApproveCount = 1,maxApprovePeer = A */
for (RaftPeer peer : peers.values()) { if (StringUtils.isEmpty(peer.voteFor)) { continue; } ips.add(peer.voteFor); if (ips.getCount(peer.voteFor) > maxApproveCount) { maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; } } //majorityCount():2(假設3個節點) //第一輪:maxApproveCount = 1 if條件不成立,返回leader,此時leader為null,沒有選舉成功
if (maxApproveCount >= majorityCount()) { //找到得票最多的那個peer
RaftPeer peer = peers.get(maxApprovePeer); //設置這個peer為leader
peer.state = RaftPeer.State.LEADER; if (!Objects.equals(leader, peer)) { leader = peer; ApplicationUtils.publishEvent(new LeaderElectFinishedEvent(this, leader, local())); Loggers.RAFT.info("{} has become the LEADER", leader.ip); } } return leader; }
通過上面的分析想必大家應該很清楚 Nacos 的集群選舉實現了 。如果還有不明白的小伙伴可以對着 Raft 的演示地址進行理解。
數據同步:
在注冊服務時,調用addInstance之后,最后會調用 consistencyService.put(key,instances); 這個方法,來實現數據一致性的同步。
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } }
調用 consistencyService.put 用來發布類容,也就是實現數據的一致性同步。
@Override public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); } private ConsistencyService mapConsistencyService(String key) { return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }
這里會走后面 persistentConsistencyService ,由於 public class RaftConsistencyServiceImpl implements PersistentConsistencyService 所以這里走 RaftConsistencyServiceImpl :
@Override public void put(String key, Record value) throws NacosException { try { raftCore.signalPublish(key, value); } catch (Exception e) { Loggers.RAFT.error("Raft put failed.", e); throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e); } } public void signalPublish(String key, Record value) throws Exception { //如果自己不是leader,則找到leader節點,把數據轉發到leader節點
if (!isLeader()) { ObjectNode params = JacksonUtils.createEmptyJsonNode(); params.put("key", key); params.replace("value", JacksonUtils.transferToJsonNode(value)); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); final RaftPeer leader = getLeader(); //代理轉發
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters); return; } //如果自己是leader,則向所有節點發送onPublish請求。這個所有節點包含自己。
try { OPERATE_LOCK.lock();//先上個鎖
final long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; if (getDatum(key) == null) { datum.timestamp.set(1L); } else { datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } ObjectNode json = JacksonUtils.createEmptyJsonNode(); json.replace("datum", JacksonUtils.transferToJsonNode(datum)); json.replace("source", JacksonUtils.transferToJsonNode(peers.local())); //onPublish可以當做是一次心跳了,更新選舉檢查時間,然后一個重點就是term增加100了 //當然還是就是更新內容了,先寫文件,再更新內存緩存。(也就是先記錄本地日志)
onPublish(datum, peers.local());//發送數據到所有節點
final String content = json.toString(); final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); //遍歷所有節點,發送事務提交請求,把記錄在本地日志中的數據進行提交
for (final String server : peers.allServersIncludeMyself()) { if (isLeader(server)) {//再次判斷是否leader
latch.countDown(); continue; }//構建發布同步接口地址
final String url = buildUrl(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT .warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, response.getStatusCode()); return 1; } latch.countDown(); return 0; } @Override public STATE onContentWriteCompleted() { return STATE.CONTINUE; } }); } // .....
}
其中 onPublish(datum, peers.local()); 發送數據到所有節點:
public void onPublish(Datum datum, RaftPeer source) throws Exception { RaftPeer local = peers.local(); // 本地數據信息
if (datum.value == null) { //這個value就是服務注冊的服務,為空報錯
Loggers.RAFT.warn("received empty datum"); throw new IllegalStateException("received empty datum"); } // 判斷是否事leader 不是報錯
if (!peers.isLeader(source.ip)) { Loggers.RAFT .warn("peer {} tried to publish data but wasn't leader, leader: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(getLeader())); throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader"); } // 判斷 term
if (source.term.get() < local.term.get()) { Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JacksonUtils.toJson(source), JacksonUtils.toJson(local)); throw new IllegalStateException( "out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get()); } //重置事件
local.resetLeaderDue(); // if data should be persisted, usually this is true:
if (KeyBuilder.matchPersistentKey(datum.key)) { raftStore.write(datum);// 發送持久化數據,完成數據同步
} datums.put(datum.key, datum); if (isLeader()) {//如果是leader ,則增加 term
local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } else { if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) { //set leader term:
getLeader().term.set(source.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } } //更新/nacos_home/data/naming/meta.properties文件
raftStore.updateTerm(local.term.get()); notifier.addTask(datum.key, DataOperation.CHANGE); Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term); }
通過以上的操作就完成了數據的同步。