【Nacos】數據一致性


轉自:https://blog.csdn.net/liyanan21/article/details/89320872

 

目錄

一、Raft算法

二、Nacos中Raft部分源碼

init()

1. 獲取Raft集群節點 

NamingProxy.getServers()獲取集群節點

NamingProxy.refreshSrvIfNeed()得到節點信息

NamingProxy.refreshServerListFromDisk()獲取集群節點信息

2. Raft集群數據恢復

RaftStore.load()

3. Raft選舉

GlobalExecutor.register(new MasterElection())注冊選舉定時任務

MasterElection.sendVote()發送定時任務

(1)RaftCommands.vote()處理/v1/ns/raft/vote請求

(2)PeerSet.decideLeader()選舉

4. Raft心跳

GlobalExecutor.register(new HeartBeat())注冊心跳定時任務

HeartBeat.sendBeat()發送心跳包

(·)RaftCommands.beat()方法處理/v1/ns/raft/beat請求

5. Raft發布內容

注冊入口

實例信息持久化

(1)Service.put()

(2)RaftCore.signalPublish()

(3)/raft/datum 接口 和 /raft/datum/commit 接口

發布入口 RaftCommands.publish()

6. Raft保證內容一致性


一、Raft算法

Raft通過當選的領導者達成共識。筏集群中的服務器是領導者或追隨者,並且在選舉的精確情況下可以是候選者(領導者不可用)。領導者負責將日志復制到關注者。它通過發送心跳消息定期通知追隨者它的存在。每個跟隨者都有一個超時(通常在150到300毫秒之間),它期望領導者的心跳。接收心跳時重置超時。如果沒有收到心跳,則關注者將其狀態更改為候選人並開始領導選舉。

詳見:Raft算法

二、Nacos中Raft部分源碼

Nacos server在啟動時,會通過RunningConfig.onApplicationEvent()方法調用RaftCore.init()方法。

init()

public static void init() throws Exception { Loggers.RAFT.info("initializing Raft sub-system"); // 啟動Notifier,輪詢Datums,通知RaftListener executor.submit(notifier); // 獲取Raft集群節點,更新到PeerSet中 peers.add(NamingProxy.getServers()); long start = System.currentTimeMillis(); // 從磁盤加載Datum和term數據進行數據恢復 RaftStore.load(); Loggers.RAFT.info("cache loaded, peer count: {}, datum count: {}, current term: {}", peers.size(), datums.size(), peers.getTerm()); while (true) { if (notifier.tasks.size() <= 0) { break; } Thread.sleep(1000L); System.out.println(notifier.tasks.size()); } Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start)); GlobalExecutor.register(new MasterElection()); // Leader選舉 GlobalExecutor.register1(new HeartBeat()); // Raft心跳 GlobalExecutor.register(new AddressServerUpdater(), GlobalExecutor.ADDRESS_SERVER_UPDATE_INTERVAL_MS); if (peers.size() > 0) { if (lock.tryLock(INIT_LOCK_TIME_SECONDS, TimeUnit.SECONDS)) { initialized = true; lock.unlock(); } } else { throw new Exception("peers is empty."); } Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}", GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS); }

在init方法主要做了如下幾件事:

  • 1. 獲取Raft集群節點 peers.add(NamingProxy.getServers());
  • 2. Raft集群數據恢復 RaftStore.load();
  • 3. Raft選舉 GlobalExecutor.register(new MasterElection()); 
  • 4. Raft心跳 GlobalExecutor.register(new HeartBeat()); 
  • 5. Raft發布內容
  • 6. Raft保證內容一致性

1. 獲取Raft集群節點 

NamingProxy.getServers()獲取集群節點

  • NamingProxy.refreshSrvIfNeed()得到節點信息
  • 返回List<String> servers

NamingProxy.refreshSrvIfNeed()得到節點信息

  • 如果單機模式

    則本主機的ip:port為Raft節點信息;

    否則

    調用下面的NamingProxy.refreshServerListFromDisk()獲取Raft集群節點信息

  • 獲取到Raft集群節點信息之后(即ip:port列表),更新NamingProxy的List<String> serverlistFromConfig屬性和List<String> servers屬性。

NamingProxy.refreshServerListFromDisk()獲取集群節點信息

從磁盤或系統環境變量種讀取Raft集群節點信息,即ip:port列表

2. Raft集群數據恢復

Nacos啟動/重啟時會從磁盤加載Datum和term數據進行數據恢復。

nacos server端啟動后->RaftCore.init()方法->RaftStore.load()方法。

RaftStore.load()

  • 從磁盤獲取Datum數據:

    將Datum放到RaftCore的ConcurrentMap<String, Datum> datums集合中,key為Datum的key;

    將Datum和ApplyAction.CHANGE封裝成Pair放到Notifier的tasks隊列中,通知相關的RaftListener;

  • 從META_FILE_NAME:<user.home>\nacos\raft\meta.properties獲取任期term值(long值):

    調用RaftSet.setTerm(long term)方法更新Raft集群中每個節點的term值

3. Raft選舉

GlobalExecutor.register(new MasterElection())注冊選舉定時任務

Nacos的Raft選舉是通過MasterElection這個線程任務完成的。

  • 更新候選節點的election timeout、heart timeout。
  • 調用MasterElection.sendVote()進行投票。
public class MasterElection implements Runnable { @Override public void run() { try { if (!peers.isReady()) { return; } RaftPeer local = peers.local(); local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS; if (local.leaderDueMs > 0) { return; } // 重置選舉超時時間,每次心跳以及收到數據包都會重置 local.resetLeaderDue(); local.resetHeartbeatDue(); // 發起選舉 sendVote(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while master election {}", e); } } }

MasterElection.sendVote()發送定時任務

  • 重置Raft集群數據:

leader置為null; 所有Raft節點的voteFor字段置為null;

  • 更新候選節點數據:

任期term自增1;(通過自增1制造和其它節點的term差異,避免所有節點term一樣選舉不出Leader)

候選節點的voteFor字段設置為自己;

state置為CANDIDATE;

  • 候選節點向除自身之外的所有其它Raft節點的/v1/ns/raft/vote發送HTTP POST請求:

請求內容為vote:JSON.toJSONString(local)

  • 候選節點收到其他節點投的候選節點數據,交給PeerSet.decideLeader()方法處理

把超半數的voteFor對應的RaftPerr設置為Leader。

        public void sendVote() { RaftPeer local = peers.get(NetUtils.localServer()); Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JSON.toJSONString(getLeader()), local.term); //重置Raft集群數據 peers.reset(); //更新候選節點數據 local.term.incrementAndGet(); local.voteFor = local.ip; local.state = RaftPeer.State.CANDIDATE; //候選節點向除自身之外的所有其它Raft節點的/v1/ns/raft/vote發送HTTP POST請求 //請求內容為vote:JSON.toJSONString(local) Map<String, String> params = new HashMap<String, String>(1); params.put("vote", JSON.toJSONString(local)); for (final String server : peers.allServersWithoutMySelf()) { final String url = buildURL(server, API_VOTE); try { HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url); return 1; } RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class); Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer)); //候選節點收到其他節點投的候選節點數據,交給PeerSet.decideLeader //方法處理 peers.decideLeader(peer); return 0; } }); } catch (Exception e) { Loggers.RAFT.warn("error while sending vote to server: {}", server); } } } } 

(1)RaftCommands.vote()處理/v1/ns/raft/vote請求

選舉請求的 http 接口

@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft") public class RaftController { ...... @NeedAuth @RequestMapping(value = "/vote", method = RequestMethod.POST) public JSONObject vote(HttpServletRequest request, HttpServletResponse response) throws Exception { // 處理選舉請求 RaftPeer peer = raftCore.receivedVote( JSON.parseObject(WebUtils.required(request, "vote"), RaftPeer.class)); return JSON.parseObject(JSON.toJSONString(peer)); } ...... }

調用RaftCore.MasterElection.receivedVote()方法

如果收到的候選節點term比本地節點term要小,則:

                   本地節點的voteFor更新為自己;(意思是我自己更適合做leader,這一票我投給自己)

否則:

                   這個Follower重置它的election timeout;

                   更新它的voteFor為收到的候選節點ip;(意思是就按你說的做,這一票就投給你了。)

                   更新它的term為收到的候選節點term;

將本地節點作為http響應返回;

@Component public class RaftCore { ...... public RaftPeer receivedVote(RaftPeer remote) { if (!peers.contains(remote)) { throw new IllegalStateException("can not find peer: " + remote.ip); } // 若當前節點的 term 大於等於發送選舉請求的節點 term,則選擇自己為 leader RaftPeer local = peers.get(NetUtils.localServer()); if (remote.term.get() <= local.term.get()) { String msg = "received illegitimate vote" + ", voter-term:" + remote.term + ", votee-term:" + local.term; Loggers.RAFT.info(msg); if (StringUtils.isEmpty(local.voteFor)) { local.voteFor = local.ip; } return local; } local.resetLeaderDue(); // 若當前節點的 term 小於發送請求的節點 term,選擇發送請求的節點為 leader local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; local.term.set(remote.term.get()); Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term); return local; } }

(2)PeerSet.decideLeader()選舉

@Component @DependsOn("serverListManager") public class RaftPeerSet implements ServerChangeListener { ...... public RaftPeer decideLeader(RaftPeer candidate) { peers.put(candidate.ip, candidate); SortedBag ips = new TreeBag(); int maxApproveCount = 0; String maxApprovePeer = null; // 遍歷所有的節點,若 voteFor 不為空,則將節點的 voteFor 添加到 ips 中,記錄被選舉次數最多的節點和次數 for (RaftPeer peer : peers.values()) { if (StringUtils.isEmpty(peer.voteFor)) { continue; } ips.add(peer.voteFor); if (ips.getCount(peer.voteFor) > maxApproveCount) { maxApproveCount = ips.getCount(peer.voteFor); maxApprovePeer = peer.voteFor; } } // 將選舉出來的節點設置為 leader if (maxApproveCount >= majorityCount()) { RaftPeer peer = peers.get(maxApprovePeer); peer.state = RaftPeer.State.LEADER; if (!Objects.equals(leader, peer)) { leader = peer; Loggers.RAFT.info("{} has become the LEADER", leader.ip); } } return leader; } }

4. Raft心跳

GlobalExecutor.register(new HeartBeat())注冊心跳定時任務

  •  重置Leader節點的heart timeout、election timeout;
  • sendBeat()發送心跳包
public class HeartBeat implements Runnable { @Override public void run() { try { if (!peers.isReady()) { return; } RaftPeer local = peers.local(); // hearbeatDueMs 默認為 5s,TICK_PERIOD_MS 為 500ms,每 500ms 檢查一次,每 5s 發送一次心跳 local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS; if (local.heartbeatDueMs > 0) { return; } // 重置 heartbeatDueMs local.resetHeartbeatDue(); // 發送心跳包 sendBeat(); } catch (Exception e) { Loggers.RAFT.warn("[RAFT] error while sending beat {}", e); } } }

HeartBeat.sendBeat()發送心跳包

  • 重置Leader節點的heart timeout、election timeout;
  • 向除自身之外的其它節點/v1/ns/raft/beat路徑發送HTTP POST請求,請求內容如下:

JSONObject packet = new JSONObject();

packet.put("peer", local);  //local為Leader節點對應的RaftPeer對象

packet.put("datums", array); //array中封裝了RaftCore中所有的Datum的key和timestamp

Map<String, String> params = new HashMap<String, String>(1);

params.put("beat", JSON.toJSONString(packet));

  • 拿到各個節點返回的http響應,即RaftPeer對象,更新PeerSet的Map<String, RaftPeer> peers集合。(保持集群節點數據一致)
    public void sendBeat() throws IOException, InterruptedException { RaftPeer local = peers.local(); // 只有 leader 才發送心跳 if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) { return; } Loggers.RAFT.info("[RAFT] send beat with {} keys.", datums.size()); // 重置收不到包就選舉 leader 的時間間隔 local.resetLeaderDue(); // 構建心跳包信息,local 為當前 nacos 節點的信息,key 為 peer JSONObject packet = new JSONObject(); packet.put("peer", local); JSONArray array = new JSONArray(); // 只發送心跳包,不帶數據過去 if (switchDomain.isSendBeatOnly()) { Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(switchDomain.isSendBeatOnly())); } // 將相關的 key 通過心跳包發送給 follower if (!switchDomain.isSendBeatOnly()) { for (Datum datum : datums.values()) { JSONObject element = new JSONObject(); // 將 key 和對應的版本放入 element 中,最終添加到 array 里 if (KeyBuilder.matchServiceMetaKey(datum.key)) { element.put("key", KeyBuilder.briefServiceMetaKey(datum.key)); } else if (KeyBuilder.matchInstanceListKey(datum.key)) { element.put("key", KeyBuilder.briefInstanceListkey(datum.key)); } element.put("timestamp", datum.timestamp); array.add(element); } } else { Loggers.RAFT.info("[RAFT] send beat only."); } // 將所有 key 組成的 array 放入數據包 packet.put("datums", array); // 將數據包轉換成 json 字符串放入 params 中 Map<String, String> params = new HashMap<String, String>(1); params.put("beat", JSON.toJSONString(packet)); String content = JSON.toJSONString(params); // 用 gzip 壓縮 ByteArrayOutputStream out = new ByteArrayOutputStream(); GZIPOutputStream gzip = new GZIPOutputStream(out); gzip.write(content.getBytes("UTF-8")); gzip.close(); byte[] compressedBytes = out.toByteArray(); String compressedContent = new String(compressedBytes, "UTF-8"); Loggers.RAFT.info("raw beat data size: {}, size of compressed data: {}", content.length(), compressedContent.length()); // 將心跳包發送給所有的 follower for (final String server : peers.allServersWithoutMySelf()) { try { final String url = buildURL(server, API_BEAT); Loggers.RAFT.info("send beat to server " + server); HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", response.getResponseBody(), server); MetricsMonitor.getLeaderSendBeatFailedException().increment(); return 1; } peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class)); Loggers.RAFT.info("receive beat response from: {}", url); return 0; } @Override public void onThrowable(Throwable t) { Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t); MetricsMonitor.getLeaderSendBeatFailedException().increment(); } }); } catch (Exception e) { Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e); MetricsMonitor.getLeaderSendBeatFailedException().increment(); } } }

(·)RaftCommands.beat()方法處理/v1/ns/raft/beat請求

接收心跳包的 http 接口:

@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft") public class RaftController { ...... @NeedAuth @RequestMapping(value = "/beat", method = RequestMethod.POST) public JSONObject beat(HttpServletRequest request, HttpServletResponse response) throws Exception { String entity = new String(IoUtils.tryDecompress(request.getInputStream()), "UTF-8"); String value = URLDecoder.decode(entity, "UTF-8"); value = URLDecoder.decode(value, "UTF-8"); // 解析心跳包 JSONObject json = JSON.parseObject(value); JSONObject beat = JSON.parseObject(json.getString("beat")); // 處理心跳包並將本節點的信息作為 response 返回 RaftPeer peer = raftCore.receivedBeat(beat); return JSON.parseObject(JSON.toJSONString(peer)); } ...... }

HeartBeat.receivedBeat()處理心跳包

  • 如果收到心跳的節點不是Follower角色,則設置為Follower角色,並把它的voteFor設置為Leader節點的ip;
  • 重置本地節點的heart timeout、election timeout;
  • 調用PeerSet.makeLeader()通知這個節點更新Leader;(也就是說Leader節點會通過心跳通知其它節點更新Leader)
  • 檢查Datum:

遍歷請求參數中的datums,如果Follwoer不存在這個datumKey或者時間戳比較舊,則收集這個datumKey;

每收集到50個datumKey,則向Leader節點的/v1/ns/raft/get路徑發送請求,請求參數為這50個datumKey,獲取對應的50個最新的Datum對象;

遍歷這些Daum對象,接下來做的是和RaftCore.onPublish()方法中做的事類似:
              1.調用RaftStore#write將Datum序列化為json寫到cacheFile中
              2.將Datum存放到RaftCore的datums集合中,key為上面的datum的key值
              3.更新本地節點的election timeout
              4.更新本地節點的任期term
              5.本地節點的任期term持久化到properties文件中
              6.調用notifier.addTask(datum, Notifier.ApplyAction.CHANGE);

通知對應的RaftListener

RaftCore.deleteDatum(String key)用來刪除舊的Datum
              datums集合中刪除key對應的Datum;
              RaftStore.delete(),在磁盤上刪除這個Datum對應的文件;
              notifier.addTask(deleted, Notifier.ApplyAction.DELETE),通知對應的RaftListener有DELETE事件。

  • 本地節點的RaftPeer作為http響應返回。
@Component public class RaftCore { ...... public RaftPeer receivedBeat(JSONObject beat) throws Exception { final RaftPeer local = peers.local(); // 解析發送心跳包的節點信息 final RaftPeer remote = new RaftPeer(); remote.ip = beat.getJSONObject("peer").getString("ip"); remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state")); remote.term.set(beat.getJSONObject("peer").getLongValue("term")); remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs"); remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs"); remote.voteFor = beat.getJSONObject("peer").getString("voteFor"); // 若收到的心跳包不是 leader 節點發送的,則拋異常 if (remote.state != RaftPeer.State.LEADER) { Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state, JSON.toJSONString(remote)); throw new IllegalArgumentException("invalid state from master, state: " + remote.state); } // 本地 term 大於心跳包的 term,則心跳包不進行處理 if (local.term.get() > remote.term.get()) { Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}" , remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs); throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get()); } // 若當前節點不是 follower 節點,則將其更新為 follower 節點 if (local.state != RaftPeer.State.FOLLOWER) { Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote)); // mk follower local.state = RaftPeer.State.FOLLOWER; local.voteFor = remote.ip; } final JSONArray beatDatums = beat.getJSONArray("datums"); // 更新心跳包發送間隔和收不到心跳包的選舉間隔 local.resetLeaderDue(); local.resetHeartbeatDue(); // 更新 leader 信息,將 remote 設置為新 leader,更新原有 leader 的節點信息 peers.makeLeader(remote); // 將當前節點的 key 存放到一個 map 中,value 都為 0 Map<String, Integer> receivedKeysMap = new HashMap<String, Integer>(datums.size()); for (Map.Entry<String, Datum> entry : datums.entrySet()) { receivedKeysMap.put(entry.getKey(), 0); } // 檢查接收到的 datum 列表 List<String> batch = new ArrayList<String>(); if (!switchDomain.isSendBeatOnly()) { int processedCount = 0; Loggers.RAFT.info("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}", beatDatums.size(), datums.size(), remote.ip, remote.term, local.term); for (Object object : beatDatums) { processedCount = processedCount + 1; JSONObject entry = (JSONObject) object; String key = entry.getString("key"); final String datumKey; // 構建 datumKey(加上前綴,發送的時候 key 是去掉了前綴的) if (KeyBuilder.matchServiceMetaKey(key)) { datumKey = KeyBuilder.detailServiceMetaKey(key); } else if (KeyBuilder.matchInstanceListKey(key)) { datumKey = KeyBuilder.detailInstanceListkey(key); } else { // ignore corrupted key: continue; } // 獲取收到的 key 對應的版本 long timestamp = entry.getLong("timestamp"); // 將收到的 key 在本地 key 的 map 中標記為 1 receivedKeysMap.put(datumKey, 1); try { // 收到的 key 在本地存在 並且 本地的版本大於收到的版本 並且 還有數據未處理,則直接 continue if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) { continue; } // 若收到的 key 在本地沒有,或者本地的版本小於收到的版本,放入 batch,准備下一步獲取數據 if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) { batch.add(datumKey); } // 只有 batch 的數量超過 50 或已經處理完了,才進行獲取數據操作 if (batch.size() < 50 && processedCount < beatDatums.size()) { continue; } String keys = StringUtils.join(batch, ","); if (batch.size() <= 0) { continue; } Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}" , getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size()); // 獲取對應 key 的數據 // update datum entry String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8"); HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { return 1; } List<Datum> datumList = JSON.parseObject(response.getResponseBody(), new TypeReference<List<Datum>>() { }); // 更新本地數據 for (Datum datum : datumList) { OPERATE_LOCK.lock(); try { Datum oldDatum = getDatum(datum.key); if (oldDatum != null && datum.timestamp.get() <= oldDatum.timestamp.get()) { Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}", datum.key, datum.timestamp, oldDatum.timestamp); continue; } raftStore.write(datum); if (KeyBuilder.matchServiceMetaKey(datum.key)) { Datum<Service> serviceDatum = new Datum<>(); serviceDatum.key = datum.key; serviceDatum.timestamp.set(datum.timestamp.get()); serviceDatum.value = JSON.parseObject(JSON.toJSONString(datum.value), Service.class); datum = serviceDatum; } if (KeyBuilder.matchInstanceListKey(datum.key)) { Datum<Instances> instancesDatum = new Datum<>(); instancesDatum.key = datum.key; instancesDatum.timestamp.set(datum.timestamp.get()); instancesDatum.value = JSON.parseObject(JSON.toJSONString(datum.value), Instances.class); datum = instancesDatum; } datums.put(datum.key, datum); notifier.addTask(datum.key, ApplyAction.CHANGE); local.resetLeaderDue(); if (local.term.get() + 100 > remote.term.get()) { getLeader().term.set(remote.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(100); } raftStore.updateTerm(local.term.get()); Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}", datum.key, datum.timestamp, JSON.toJSONString(remote), local.term); } catch (Throwable e) { Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, key: {} {}", datum.key, e); } finally { OPERATE_LOCK.unlock(); } } TimeUnit.MILLISECONDS.sleep(200); return 0; } }); batch.clear(); } catch (Exception e) { Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey); } } // 若某個 key 在本地存在但收到的 key 列表中沒有,則證明 leader 已經刪除,那么本地也要刪除 List<String> deadKeys = new ArrayList<String>(); for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) { if (entry.getValue() == 0) { deadKeys.add(entry.getKey()); } } for (String deadKey : deadKeys) { try { deleteDatum(deadKey); } catch (Exception e) { Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e); } } } return local; } }

5. Raft發布內容

注冊入口

注冊http接口

@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { ...... @CanDistro @RequestMapping(value = "", method = RequestMethod.POST) public String register(HttpServletRequest request) throws Exception { // 獲取 namespace 和 serviceName String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // 執行注冊邏輯 serviceManager.registerInstance(namespaceId, serviceName, parseInstance(request)); return "ok"; } }

注冊實例

@Component @DependsOn("nacosApplicationContext") public class ServiceManager implements RecordListener<Service> { ...... private Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); ...... // 注冊新實例 public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { // 創建空 service,所有的 service 都存放在 serviceMap 中,serviceMap 類型為:Map<String, Map<String, Service>>,第一層 map 的 key 為 namespace,第二層 map 的 key 為 serviceName; // 每個 service 中維護一個 clusterMap,clusterMap 中有兩個 set,用來存放 instance if (ServerMode.AP.name().equals(switchDomain.getServerMode())) { createEmptyService(namespaceId, serviceName); } Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } // 檢查實例是否已存在,通過 ip 進行比較 if (service.allIPs().contains(instance)) { throw new NacosException(NacosException.INVALID_PARAM, "instance already exist: " + instance); } // 添加新實例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); } // 創建空 service public void createEmptyService(String namespaceId, String serviceName) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null) { service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(Constants.DEFAULT_GROUP); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); service.validate(); putService(service); service.init(); // 添加對 service 的監聽,用來同步數據 consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); } } // 添加 instance 到緩存中,並且持久化 public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); // 添加 instance 到本地緩存 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); // 將 instance 信息持久化 consistencyService.put(key, instances); } // 添加實例到緩存 public List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips); } // 真正的添加實例到緩存的邏輯 public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); Map<String, Instance> oldInstanceMap = new HashMap<>(16); List<Instance> currentIPs = service.allIPs(ephemeral); Map<String, Instance> map = new ConcurrentHashMap<>(currentIPs.size()); for (Instance instance : currentIPs) { map.put(instance.toIPAddr(), instance); } if (datum != null) { oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map); } // use HashMap for deep copy: HashMap<String, Instance> instanceMap = new HashMap<>(oldInstanceMap.size()); instanceMap.putAll(oldInstanceMap); for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName()); cluster.setService(service); service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJSON()); } if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JSON.toJSONString(instanceMap.values())); } return new ArrayList<>(instanceMap.values()); } // 將舊的 instance 列表與新的 instance 合並到一起 private Map<String, Instance> setValid(List<Instance> oldInstances, Map<String, Instance> map) { Map<String, Instance> instanceMap = new HashMap<>(oldInstances.size()); for (Instance instance : oldInstances) { Instance instance1 = map.get(instance.toIPAddr()); if (instance1 != null) { instance.setHealthy(instance1.isHealthy()); instance.setLastBeat(instance1.getLastBeat()); } instanceMap.put(instance.getDatumKey(), instance); } return instanceMap; } ...... }

實例信息持久化

RaftConsistencyServiceImpl.put() 方法用來做實例信息持久化的工作,即上面提到的consistencyService.put(key, instances);這一步

(1)Service.put()

@Service public class RaftConsistencyServiceImpl implements PersistentConsistencyService { ...... @Override public void put(String key, Record value) throws NacosException { try { raftCore.signalPublish(key, value); } catch (Exception e) { Loggers.RAFT.error("Raft put failed.", e); throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value); } } }

最終調用到 RaftCore 的 signalPublish() 方法:

(2)RaftCore.signalPublish()

@Component public class RaftCore { ...... public void signalPublish(String key, Record value) throws Exception { // 若不是 leader,直接將包轉發給 leader if (!isLeader()) { JSONObject params = new JSONObject(); params.put("key", key); params.put("value", value); Map<String, String> parameters = new HashMap<>(1); parameters.put("key", key); // 調用 /raft/datum 接口 raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters); return; } // 若是 leader,將包發送給所有的 follower try { OPERATE_LOCK.lock(); long start = System.currentTimeMillis(); final Datum datum = new Datum(); datum.key = key; datum.value = value; if (getDatum(key) == null) { datum.timestamp.set(1L); } else { datum.timestamp.set(getDatum(key).timestamp.incrementAndGet()); } JSONObject json = new JSONObject(); json.put("datum", datum); json.put("source", peers.local()); // 本地 onPublish 方法用來處理持久化邏輯 onPublish(datum, peers.local()); final String content = JSON.toJSONString(json); final CountDownLatch latch = new CountDownLatch(peers.majorityCount()); // 將包發送給所有的 follower,調用 /raft/datum/commit 接口 for (final String server : peers.allServersIncludeMyself()) { if (isLeader(server)) { latch.countDown(); continue; } final String url = buildURL(server, API_ON_PUB); HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() { @Override public Integer onCompleted(Response response) throws Exception { if (response.getStatusCode() != HttpURLConnection.HTTP_OK) { Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}", datum.key, server, response.getStatusCode()); return 1; } latch.countDown(); return 0; } @Override public STATE onContentWriteCompleted() { return STATE.CONTINUE; } }); } if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) { // only majority servers return success can we consider this update success Loggers.RAFT.info("data publish failed, caused failed to notify majority, key={}", key); throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key); } long end = System.currentTimeMillis(); Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key); } finally { OPERATE_LOCK.unlock(); } } }

(3)/raft/datum 接口 和 /raft/datum/commit 接口

@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/raft") public class RaftController { ...... @NeedAuth @RequestMapping(value = "/datum", method = RequestMethod.POST) public String publish(HttpServletRequest request, HttpServletResponse response) throws Exception { response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request)); response.setHeader("Cache-Control", "no-cache"); response.setHeader("Content-Encode", "gzip"); String entity = IOUtils.toString(request.getInputStream(), "UTF-8"); String value = URLDecoder.decode(entity, "UTF-8"); JSONObject json = JSON.parseObject(value); // 這里也是調用 RaftConsistencyServiceImpl.put() 進行處理,與服務注冊的邏輯在此回合,最終調用到 signalPublish 方法 String key = json.getString("key"); if (KeyBuilder.matchInstanceListKey(key)) { raftConsistencyService.put(key, JSON.parseObject(json.getString("value"), Instances.class)); return "ok"; } if (KeyBuilder.matchSwitchKey(key)) { raftConsistencyService.put(key, JSON.parseObject(json.getString("value"), SwitchDomain.class)); return "ok"; } if (KeyBuilder.matchServiceMetaKey(key)) { raftConsistencyService.put(key, JSON.parseObject(json.getString("value"), Service.class)); return "ok"; } throw new NacosException(NacosException.INVALID_PARAM, "unknown type publish key: " + key); } @NeedAuth @RequestMapping(value = "/datum/commit", method = RequestMethod.POST) public String onPublish(HttpServletRequest request, HttpServletResponse response) throws Exception { response.setHeader("Content-Type", "application/json; charset=" + getAcceptEncoding(request)); response.setHeader("Cache-Control", "no-cache"); response.setHeader("Content-Encode", "gzip"); String entity = IOUtils.toString(request.getInputStream(), "UTF-8"); String value = URLDecoder.decode(entity, "UTF-8"); JSONObject jsonObject = JSON.parseObject(value); String key = "key"; RaftPeer source = JSON.parseObject(jsonObject.getString("source"), RaftPeer.class); JSONObject datumJson = jsonObject.getJSONObject("datum"); Datum datum = null; if (KeyBuilder.matchInstanceListKey(datumJson.getString(key))) { datum = JSON.parseObject(jsonObject.getString("datum"), new TypeReference<Datum<Instances>>() {}); } else if (KeyBuilder.matchSwitchKey(datumJson.getString(key))) { datum = JSON.parseObject(jsonObject.getString("datum"), new TypeReference<Datum<SwitchDomain>>() {}); } else if (KeyBuilder.matchServiceMetaKey(datumJson.getString(key))) { datum = JSON.parseObject(jsonObject.getString("datum"), new TypeReference<Datum<Service>>() {}); } // 該方法最終調用到 onPublish 方法 raftConsistencyService.onPut(datum, source); return "ok"; } ...... }

發布入口 RaftCommands.publish()

@Component public class RaftCore { ...... public void onPublish(Datum datum, RaftPeer source) throws Exception { RaftPeer local = peers.local(); if (datum.value == null) { Loggers.RAFT.warn("received empty datum"); throw new IllegalStateException("received empty datum"); } // 若該包不是 leader 發布來的,拋異常 if (!peers.isLeader(source.ip)) { Loggers.RAFT.warn("peer {} tried to publish data but wasn't leader, leader: {}", JSON.toJSONString(source), JSON.toJSONString(getLeader())); throw new IllegalStateException("peer(" + source.ip + ") tried to publish " + "data but wasn't leader"); } // 來源 term 小於本地當前 term,拋異常 if (source.term.get() < local.term.get()) { Loggers.RAFT.warn("out of date publish, pub-term: {}, cur-term: {}", JSON.toJSONString(source), JSON.toJSONString(local)); throw new IllegalStateException("out of date publish, pub-term:" + source.term.get() + ", cur-term: " + local.term.get()); } // 更新選舉超時時間 local.resetLeaderDue(); // 節點信息持久化 // if data should be persistent, usually this is always true: if (KeyBuilder.matchPersistentKey(datum.key)) { raftStore.write(datum); } // 添加到緩存 datums.put(datum.key, datum); // 更新 term 信息 if (isLeader()) { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } else { if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) { //set leader term: getLeader().term.set(source.term.get()); local.term.set(getLeader().term.get()); } else { local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT); } } raftStore.updateTerm(local.term.get()); // 通知應用程序節點信息有變動 notifier.addTask(datum.key, ApplyAction.CHANGE); Loggers.RAFT.info("data added/updated, key={}, term={}", datum.key, local.term); } }

6. Raft保證內容一致性

Nacos通過Raft發布內容,內容只是存在了Leader節點上,通過Raft心跳機制來保證一致性。

在注冊信息的時候,addInstance() 方法將 instance 添加到了本地緩存中,但 raft 在從 leader 到 follower 同步數據的時候,follower 接收到包之后,只是通過 onPublish() 方法進行了持久化,並沒有將信息更新到本地緩存,而是通過一個監聽器來實現:

在 onPublish 方法最后,有一行:notifier.addTask(datum.key, ApplyAction.CHANGE);,即:將本次的變動,添加到通知任務中,我們來看通知任務將如何被處理:

@Component public class RaftCore { ...... public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024); // 添加變更任務到 tasks 隊列 public void addTask(String datumKey, ApplyAction action) { if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) { return; } if (action == ApplyAction.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } tasks.add(Pair.with(datumKey, action)); } public int getTaskSize() { return tasks.size(); } // 處理任務線程 @Override public void run() { Loggers.RAFT.info("raft notifier started"); while (true) { try { Pair pair = tasks.take(); if (pair == null) { continue; } String datumKey = (String) pair.getValue0(); ApplyAction action = (ApplyAction) pair.getValue1(); // 從服務列表中刪除該 key services.remove(datumKey); int count = 0; if (listeners.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) { if (KeyBuilder.matchServiceMetaKey(datumKey) && !KeyBuilder.matchSwitchKey(datumKey)) { for (RecordListener listener : listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) { try { // 根據變更類型,調用不同的回調方法來進行緩存更新 if (action == ApplyAction.CHANGE) { listener.onChange(datumKey, getDatum(datumKey).value); } if (action == ApplyAction.DELETE) { listener.onDelete(datumKey); } } catch (Throwable e) { Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datumKey, e); } } } } if (!listeners.containsKey(datumKey)) { continue; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == ApplyAction.CHANGE) { listener.onChange(datumKey, getDatum(datumKey).value); continue; } if (action == ApplyAction.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {} {}", datumKey, e); } } if (Loggers.RAFT.isDebugEnabled()) { Loggers.RAFT.debug("[NACOS-RAFT] datum change notified, key: {}, listener count: {}", datumKey, count); } } catch (Throwable e) { Loggers.RAFT.error("[NACOS-RAFT] Error while handling notifying task", e); } } } } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM