Nacos 實現 AP+CP原理[Raft 算法 NO]


來源於網絡

一、什么是 Raft算法


Raft 適用於一個管理日志一致性的協議,相比於 Paxos 協議 Raft 更易於理解和去實現它。為了提高理解性,Raft 將一致性算法分為了幾個部分,包括領導選取(leader selection)、日志復制(log replication)、安全(safety),並且使用了更強的一致性來減少了必須需要考慮的狀態。

Raft算法將 Server划分為3種狀態,或者也可以稱作角色:
【1】Leader:負責 Client交互和 log復制,同一時刻系統中最多存在1個。
【2】Follower:被動響應請求RPC,從不主動發起請求RPC。
【3】Candidate:一種臨時的角色,只存在於 Leader的選舉階段,某個節點想要變成 Leader,那么就發起投票請求,同時自己變成 Candidate。如果選舉成功,則變為 Candidate,否則退回為 Follower

狀態或者說角色的流轉如下:

在 Raft中,問題分解為:領導選取日志復制安全成員變化

復制狀態機通過復制日志來實現

日志:每台機器保存一份日志,日志來自於客戶端的請求,包含一系列的命令
狀態機:狀態機會按順序執行這些命令
一致性模型:分布式環境下,保證多機的日志是一致的,這樣回放到狀態機中的狀態是一致的

Raft算法選主流程


Raft中有 Term的概念,Term類比中國歷史上的朝代更替,Raft 算法將時間划分成為任意不同長度的任期(term)。

選舉流程

1、Follower增加當前的term,轉變為 Candidate。
2、Candidate投票給自己,並發送RequestVote RPC給集群中的其他服務器。
3、收到 RequestVote的服務器,在同一 term中只會按照先到先得投票給至多一個Candidate。且只會投票給 log至少和自身一樣新的Candidate

關於Raft更詳細的描述,可以查看這里,從分布式一致性到共識機制(二)Raft算法

二、Nacos中的 CP一致性


Spring Cloud Alibaba Nacos 在 1.0.0 正式支持 AP 和 CP 兩種一致性協議,其中 CP一致性協議實現,是基於簡化的 Raft 的 CP 一致性。

如何實現 Raft算法

Nacos server在啟動時,會通過 RunningConfig.onApplicationEvent()方法調用 RaftCore.init()方法。

啟動選舉

 1 public static void init() throws Exception {
 2  
 3     Loggers.RAFT.info("initializing Raft sub-system");
 4  
 5     // 啟動Notifier,輪詢Datums,通知RaftListener
 6     executor.submit(notifier);
 7      
 8     // 獲取Raft集群節點,更新到PeerSet中
 9     peers.add(NamingProxy.getServers());
10  
11     long start = System.currentTimeMillis();
12  
13     // 從磁盤加載Datum和term數據進行數據恢復
14     RaftStore.load();
15  
16     Loggers.RAFT.info("cache loaded, peer count: {}, datum count: {}, current term: {}",
17         peers.size(), datums.size(), peers.getTerm());
18  
19     while (true) {
20         if (notifier.tasks.size() <= 0) {
21             break;
22         }
23         Thread.sleep(1000L);
24         System.out.println(notifier.tasks.size());
25     }
26  
27     Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
28  
29     GlobalExecutor.register(new MasterElection()); // Leader選舉
30     GlobalExecutor.register1(new HeartBeat()); // Raft心跳
31     GlobalExecutor.register(new AddressServerUpdater(), GlobalExecutor.ADDRESS_SERVER_UPDATE_INTERVAL_MS);
32  
33     if (peers.size() > 0) {
34         if (lock.tryLock(INIT_LOCK_TIME_SECONDS, TimeUnit.SECONDS)) {
35             initialized = true;
36             lock.unlock();
37         }
38     } else {
39         throw new Exception("peers is empty.");
40     }
41  
42     Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
43         GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
44 }

在 init方法主要做了如下幾件事:

  1. 獲取 Raft集群節點 peers.add(NamingProxy.getServers());
  2. Raft集群數據恢復 RaftStore.load();
  3. Raft選舉 GlobalExecutor.register(new MasterElection());
  4. Raft心跳 GlobalExecutor.register(new HeartBeat());
  5. Raft發布內容
  6. Raft保證內容一致性

選舉流程

其中,raft集群內部節點間是通過暴露的 Restful接口,代碼在 RaftController 中。RaftController控制器是 Raft集群內部節點間通信使用的,具體的信息如下

 1 POST HTTP://{ip:port}/v1/ns/raft/vote : 進行投票請求
 2 
 3 POST HTTP://{ip:port}/v1/ns/raft/beat : Leader向Follower發送心跳信息
 4 
 5 GET HTTP://{ip:port}/v1/ns/raft/peer : 獲取該節點的RaftPeer信息
 6 
 7 PUT HTTP://{ip:port}/v1/ns/raft/datum/reload : 重新加載某日志信息
 8 
 9 POST HTTP://{ip:port}/v1/ns/raft/datum : Leader接收傳來的數據並存入
10 
11 DELETE HTTP://{ip:port}/v1/ns/raft/datum : Leader接收傳來的數據刪除操作
12 
13 GET HTTP://{ip:port}/v1/ns/raft/datum : 獲取該節點存儲的數據信息
14 
15 GET HTTP://{ip:port}/v1/ns/raft/state : 獲取該節點的狀態信息{UP or DOWN}
16 
17 POST HTTP://{ip:port}/v1/ns/raft/datum/commit : Follower節點接收Leader傳來得到數據存入操作
18 
19 DELETE HTTP://{ip:port}/v1/ns/raft/datum : Follower節點接收Leader傳來的數據刪除操作
20 
21 GET HTTP://{ip:port}/v1/ns/raft/leader : 獲取當前集群的Leader節點信息
22 
23 GET HTTP://{ip:port}/v1/ns/raft/listeners : 獲取當前Raft集群的所有事件監聽者
24 RaftPeerSet

心跳機制

Raft中使用心跳機制來觸發 Leader選舉。心跳定時任務是在 GlobalExecutor 中,通過 GlobalExecutor.register(new HeartBeat())注冊心跳定時任務,具體操作包括:

  • 重置 Leader節點的heart timeout、election timeout;
  • sendBeat()發送心跳包
 1  public class HeartBeat implements Runnable {
 2         @Override
 3         public void run() {
 4             try {
 5 
 6                 if (!peers.isReady()) {
 7                     return;
 8                 }
 9 
10                 RaftPeer local = peers.local();
11                 local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
12                 if (local.heartbeatDueMs > 0) {
13                     return;
14                 }
15 
16                 local.resetHeartbeatDue();
17 
18                 sendBeat();
19             } catch (Exception e) {
20                 Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
21             }
22 
23         }
24 }

簡單說明了下Nacos中的Raft一致性實現,更詳細的流程,可以下載源碼,查看 RaftCore 進行了解。源碼可以通過以下地址檢出:鏈接

 

三、Nacos AP 實現


AP協議:Distro協議。Distro是阿里巴巴的私有協議,目前流行的 Nacos服務管理框架就采用了 Distro協議。Distro 協議被定位為 臨時數據的一致性協議 :該類型協議, 不需要把數據存儲到磁盤或者數據庫 ,因為臨時數據通常和服務器保持一個session會話, 該會話只要存在,數據就不會丟失 

Distro 協議保證寫必須永遠是成功的,即使可能會發生網絡分區。當網絡恢復時,把各數據分片的數據進行合並。

Distro 協議具有以下特點:

  • 專門為了注冊中心而創造出的協議;

  • 客戶端與服務端有兩個重要的交互,服務注冊與心跳發送;

  • 客戶端以服務為維度向服務端注冊,注冊后每隔一段時間向服務端發送一次心跳,心跳包需要帶上注冊服務的全部信息,在客戶端看來,服務端節點對等,所以請求的節點是隨機的;

  • 客戶端請求失敗則換一個節點重新發送請求;

  • 服務端節點都存儲所有數據,但每個節點只負責其中一部分服務,在接收到客戶端的“寫”(注冊、心跳、下線等)請求后,服務端節點判斷請求的服務是否為自己負責,如果是,則處理,否則交由負責的節點處理;

  • 每個服務端節點主動發送健康檢查到其他節點,響應的節點被該節點視為健康節點;

  • 服務端在接收到客戶端的服務心跳后,如果該服務不存在,則將該心跳請求當做注冊請求來處理;

  • 服務端如果長時間未收到客戶端心跳,則下線該服務;

  • 負責的節點在接收到服務注冊、服務心跳等寫請求后將數據寫入后即返回,后台異步地將數據同步給其他節點;

  • 節點在收到讀請求后直接從本機獲取后返回,無論數據是否為最新。

Distro協議服務端節點發現使用尋址機制來實現服務端節點的管理。在 Nacos中,尋址模式有三種:

單機模式:StandaloneMemberLookup

文件模式:FileConfigMemberLookup -- 利用監控 cluster.conf文件的變動實現節點的管理。核心代碼如下:

 

 服務器模式:AddressServerMemberLookup – 使用地址服務器存儲節點信息,服務端節點定時拉取信息進行管理

核心代碼:

初始全量同步

Distro協議節點啟動時會從其他節點全量同步數據。在 Nacos中,整體流程如下:

  • 啟動一個定時任務線程 DistroLoadDataTask加載數據,調用 load()方法加載數據

  • 調用 loadAllDataSnapshotFromRemote()方法從遠程機器同步所有的數據

  • 從 namingProxy代理獲取所有的數據data

    • 構造 http請求,調用 httpGet方法從指定的 server獲取數據

    • 從獲取的結果 result中獲取數據 bytes

  • 處理數據 processData

    • 從data反序列化出 datumMap

    • 把數據存儲到 dataStore,也就是本地緩存 dataMap

    • 監聽器不包括 key,就創建一個空的 service,並且綁定監聽器

  • 監聽器 listener執行成功后,就更新 data store

核心代碼如下:

 

 

增量同步

       新增數據使用異步廣播同步:

  • DistroProtocol 使用 sync() 方法接收增量數據

  • 向其他節點發布廣播任務

    • 調用 distroTaskEngineHolder 發布延遲任務

  • 調用 DistroDelayTaskProcessor.process() 方法進行任務投遞:將延遲任務轉換為異步變更任務

  • 執行變更任務 DistroSyncChangeTask.run() 方法:向指定節點發送消息

    • 調用 DistroHttpAgent.syncData() 方法發送數據

    • 調用 NamingProxy.syncData() 方法發送數據

  • 異常任務調用 handleFailedTask() 方法進行處理

    • 調用 DistroFailedTaskHandler 處理失敗任務

    • 調用 DistroHttpCombinedKeyTaskFailedHandler 將失敗任務重新投遞成延遲任務。

核心代碼如下:

 

 

Distro協議是阿里的私有協議,但是對外開源框架只有Nacos。所有我們只能從Nacos中一窺Distro協議。Distro協議是一個比較簡單的最終一致性協議。整體由節點尋址、數據全量同步、異步增量同步、定時上報client所有信息、心跳探活其他節點等組成。

       本文中的Nacos源碼版本為Nacos 1.3.2 ,屬於優化過的源碼,抽象出一致性協議抽象接口,和JRaft共用節點尋址模式。

  -     總結    -

       Distro協議是阿里的私有協議,但是對外開源框架只有Nacos。所有我們只能從Nacos中一窺Distro協議。Distro協議是一個比較簡單的最終一致性協議。整體由節點尋址、數據全量同步、異步增量同步、定時上報client所有信息、心跳探活其他節點等組成。

       本文中的Nacos源碼版本為Nacos 1.3.2 ,屬於優化過的源碼,抽象出一致性協議抽象接口,和JRaft共用節點尋址模式。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM