一、數據一致性服務執行流程
1.1 (臨時/永久客戶端注冊)流程圖

圖片來源: https://blog.csdn.net/wangwei19871103/article/details/105836960

1.2 數據一致性
nacos內部提供兩種數據同步方案AP和CP,而且是混用的,實例是臨時的默認用AP,如果是永久的要就用CP。兩個數據一致性服務的處理器類結構:

左邊的RaftConsistencyServiceImpl就是CP的實現類,右邊的DistroConsistencyServiceImpl就是AP的實現類;
注冊實例的時候通常是DelegateConsistencyServiceImpl判斷該用臨時的還是永久的服務,其實他內部就是代理這兩個:

DelegateConsistencyServiceImpl是一個一致性策略選擇的類,根據不同的策略觸發條件(在nacos中,CP與AP切換的條件是注冊的服務實例是否是臨時實例),選擇PersistentConsistencyService策略或者EphemeralConsistencyService策略,
而EphemeralConsistencyService對應的是DistroConsistencyServiceImpl,采用的協議是阿里自研的Distro;PersistentConsistencyService對應的是RaftConsistencyServiceImpl,其底層采用的是Raft協議;
這兩種一致性策略下的數據存儲互不影響,所以nacos實現了AP模式與CP模式在一個組件中同時存在 ;
1.3 永久實例和臨時實例區別
Nacos 在 1.0.0版本 instance級別增加了一個ephemeral字段,該字段表示注冊的實例是否是臨時實例還是持久化實例。
如果是臨時實例,則不會在 Nacos 服務端持久化存儲,需要通過上報心跳的方式進行包活,如果一段時間內沒有上報心跳,則會被 Nacos 服務端摘除。在被摘除后如果又開始上報心跳,則會重新將這個實例注冊。
持久化實例則會持久化被 Nacos 服務端,此時即使注冊實例的客戶端進程不在,這個實例也不會從服務端刪除,只會將健康狀態設為不健康。
二、永久實例數據同步(Raft協議)
客戶端注冊配置 ephemeral: false 客戶端(永久)實例時候,會調用到
InstanceController 的 register() ----->
ServiceManager 中consistencyService.put(key, instances);----->
RaftConsistencyServiceImpl.put() ;
2.1 RaftConsistencyServiceImpl的put方法
@Override
public void put(String key, Record value) throws NacosException {
checkIsStopWork();//檢查是否停止工作
raftCore.signalPublish(key, value); //發布信號
}
2.1.1 RaftCore.signalPublish()
public void signalPublish(String key, Record value) throws Exception {
//如果不是leader,raftProxy調用proxyPostLarge方法發送數據給leader
if (!isLeader()) {
final RaftPeer leader = getLeader();
// http調用 raft/datum 接口,發數據給leader
raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);
return;
}
//leader進行則處理, 將包發送給所有的 follower
OPERATE_LOCK.lock();
try {
final long start = System.currentTimeMillis();
final Datum datum = new Datum();
//組裝datum和請求參數...(省略)
// leader 本地 onPublish 方法用來處理持久化邏輯,
onPublish(datum, peers.local());
final String content = json.toString();
// 計數器: majorityCount=peers.size() / 2 + 1
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
//循環包括leader的所有 參與者,進行更新信息
for (final String server : peers.allServersIncludeMyself()) {
if (isLeader(server)) {
latch.countDown(); //減少鎖存器的計數,如果計數達到零,則釋放所有等待線程。
continue;
}
// 異步調用接口為 /raft/datum/commit , 也是調用到 raftCore.onPublish() 方法
final String url = buildUrl(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
if (!result.ok()) { log.warn(....)
return;
}
latch.countDown(); //響應成功的話,計數器-1
}
});
}
//計數器,等5秒中,直到為0或者被打斷
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
失敗拋異常...//只有多數節點成功才能算成功
}
} finally {
OPERATE_LOCK.unlock();
}
}
2.1.2 RaftProxy.proxyPostLarge()
代理post請求 攜帶大body;
public void proxyPostLarge(String server, String api, String content, Map<String, String> headers){
if (!server.contains(UtilsAndCommons.IP_PORT_SPLITER)) { // do proxy
server = server + UtilsAndCommons.IP_PORT_SPLITER + ApplicationUtils.getPort();
}
String url = "http://" + server + ApplicationUtils.getContextPath() + api;
RestResult<String> result = HttpClient.httpPostLarge(url, headers, content);
}
2.1.3 RaftCore.onPublish()
leader本地 onPublish 方法用來處理持久化邏輯 ;
public void onPublish(Datum datum, RaftPeer source) throws Exception {
RaftPeer local = peers.local();
if (datum.value == null) { 拋異常... }
if (!peers.isLeader(source.ip)) { 不是leader發布的拋異常 }
if (source.term.get() < local.term.get()) { / 來源 term 小於本地當前 term,拋異常 }
local.resetLeaderDue(); // 更新選舉超時時間
// 添加到緩存file
if (KeyBuilder.matchPersistentKey(datum.key)) {
raftStore.write(datum);
}
//添加到緩存.
datums.put(datum.key, datum);
//計算本地term或 leader term ...(省略).
raftStore.updateTerm(local.term.get()); //更新任期到file中
//發布 ValueChangeEvent 事件
NotifyCenter.publishEvent(ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
}
2.2 ValueChangeEvent事件
@Override
public void onEvent(ValueChangeEvent event) {
notify(event.getKey(), event.getAction(), find.apply(event.getKey()));
}
2.2.1PersistentNotifier.notify() 通知
public <T extends Record> void notify(final String key, final DataOperation action, final T value) {
//關鍵代碼是這里 監聽到 數據改變的事件
if (action == DataOperation.CHANGE) {
listener.onChange(key, value);
continue;
}
}
2.2.2 Service.onChange()
@Override
public void onChange(String key, Instances value) throws Exception {
//用到這里,..表示 實例改變了
//更新實例, 發布ServiceChangeEvent事件,通知PushService有數據改變
updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
//重新計算校驗和(用編碼字符集計算MD5十六進制字符串)
recalculateChecksum();
}
2.2.3Service.updateIPs()
這里面主要是,更新集群信息,通知 PushService有數據改變:
PushService.serviceChanged() 發布ServiceChangeEvent事件 ;
封裝UDP通知客戶端;
三、臨時實例數據同步(Distro協議)
客戶端注冊配置 ephemeral: true(默認) 客戶端(永久)實例時候,會調用到
InstanceController 的 register() ----->
ServiceManager 中 consistencyService.put(key, instances); ----->
DistroConsistencyServiceImpl.put() ;
3.1 DistroConsistencyServiceImpl.put()
public void put(String key, Record value) throws NacosException {
onPut(key, value); //放進一個隊列里,然后同步任務從隊列中獲取要同步的服務key
distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
globalConfig.getTaskDispatchPeriod() / 2);
}
3.1.1 DistroConsistencyServiceImpl.onPut() 保存數據
public void onPut(String key, Record value) {
//匹配臨時實例列表 key
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet(); //時間戳自增
dataStore.put(key, datum); //放入dataStore內部的並發map中 (不持久化)
}
notifier.addTask(key, DataOperation.CHANGE); //添加一個任務記錄 ,表示數據改變操作
}
3.1.2 Notifier.addTask()
public void addTask(String datumKey, DataOperation action) {
if (action == DataOperation.CHANGE) {
services.put(datumKey, StringUtils.EMPTY); //放入並發map中
}
tasks.offer(Pair.with(datumKey, action)); //將新的通知任務添加到隊列 , tasks是BlockingQueue
}
Notifier :
private volatileNotifier notifier = new Notifier(); //對所有線程可見
@PostConstruct
public void init() {
GlobalExecutor.submitDistroNotifyTask(notifier);
}
服務啟動后初始化,啟動notifier任務,執行run方法,
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
for (; ; ) {
//這里獲取不到任務會等待,take方法
Pair<String, DataOperation> pair = tasks.take();
handle(pair);
}
}
private void handle(Pair<String, DataOperation> pair) {
String datumKey = pair.getValue0();
DataOperation action = pair.getValue1();
services.remove(datumKey);
for (RecordListener listener : listeners.get(datumKey)) {
if (action == DataOperation.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
}
}
直接調用到了Service.Onchange()方法,前面已經介紹過了;
3.2 DistroProtocol.sync() 增量同步數據
onPut() 執行完后,還有一個方法,DistroProtocol節點之間同步, peer to peer ;

圖片來源: https://blog.csdn.net/u012050299/article/details/110946637
3.2.1 主要步驟
新增數據使用異步廣播同步:
- DistroProtocol 使用 sync() 方法接收增量數據;
- 調用 distroTaskEngineHolder 發布延遲任務;
- 調用 DistroDelayTaskProcessor.process() 方法進行任務投遞,將延遲任務轉換為異步變更任務;
- 執行變更任務 DistroSyncChangeTask.run() 方法向指定節點發送消息
- 調用 DistroHttpAgent.syncData() 方法發送數據,其內部調用 NamingProxy.syncData() 方法;
- 異常任務調用 handleFailedTask() 方法進行處理,如果失敗則會調用 DistroHttpCombinedKeyTaskFailedHandler 將失敗任務重新投遞成延遲任務;
3.2.2 sync() 將數據同步到所有遠程服務器
public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) { //遍歷除了自己之外的集群其他節點
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); //添加延遲任務
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
}
}
3.2.3 NacosDelayTaskExecuteEngine 執行任務
NacosDelayTaskExecuteEngine在初始化時候,會啟動線程固定周期執行任務ProcessRunnable,
private class ProcessRunnable implements Runnable {
@Override
public void run() { processTasks(); }
}
后面調用到DistroDelayTaskProcessor.process()方法 ;
@Override
public boolean process(NacosTask task) {
if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) {
DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder); //將延遲任務轉換為異步變更任務
distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);
return true;
}
return false;
}
3.2.4 DistroSyncChangeTask.run() 執行任務發送數據
@Override
public void run() {
//調用HTTP請求同步數據到目標服務器
boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer());
//沒有成功或者異常情況則重試
handleFailedTask(); // 重新投遞成延遲任務
}
3.2.5 DistroHttpAgent.syncData() 發送數據
@Override
public boolean syncData(DistroData data, String targetServer) {
byte[] dataContent = data.getContent(); //http調用
// 將datum同步到目標服務器 ,接口: xxx/distro/datum
return NamingProxy.syncData(dataContent, data.getDistroKey().getTargetServer());
}
四、Server節點啟動(Raft協議)
4.1 啟動加載數據
Nacos server在啟動時,會調用RaftCore.init()方法。
4.1.1 RaftCore.init()
@PostConstruct //bean加載完后會被執行
public void init() throws Exception {
final long start = System.currentTimeMillis();
// 從磁盤加載Datum進行數據恢復
raftStore.loadDatums(notifier, datums);
// 從緩存文件加載元數據。並獲取term 設置期限
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
initialized = true;
// 注冊選舉定時任務和心跳任務 (用於復制數據和探活等)
masterTask = GlobalExecutor.registerMasterElection(new MasterElection());
heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());
NotifyCenter.registerSubscriber(notifier);
}
4.1.2 RaftStore.loadDatums()
public synchronized void loadDatums(PersistentNotifier notifier, Map<String, Datum> datums) {
Datum datum;
//listCaches 表示存在磁盤中文件,注冊的永久客戶端實例信息,比如: {nacos}\data\naming\data\public
for (File cache : listCaches()) {
for (File datumFile : cache.listFiles()) {
datum = readDatum(datumFile, cache.getName()); //讀取客戶端實例信息
if (datum != null) {
datums.put(datum.key, datum);
if (notifier != null) { // 發布一個 數據改變的事件,也就是說需要通知,這些實例發生改變了
NotifyCenter.publishEvent(
ValueChangeEvent.builder().key(datum.key).action(DataOperation.CHANGE).build());
}
}
}
continue;
}
datum = readDatum(cache, StringUtils.EMPTY);
if (datum != null) {
datums.put(datum.key, datum);
}
}
//到這里就完成所有datums加載
}
ValueChangeEvent事件監聽者會調用onChange()方法;
4.1.3 RaftStore.setTerm()
優先讀取 data\naming\meta.properties的配置, 如果沒有則為0 ,term,越大說明狀態越新 ;
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
4.1.4 MasterElection選舉任務
raft選舉流程,后面章節單獨會分析。
4.2 HeartBeat心跳任務
RaftCore.init()方法除了上面的選舉操作之外,緊跟着進行了集群心跳機制的邏輯;同樣調用了一個定時任務,每個5s執行一個發送心跳的操作---new HeartBeat():
4.2.1 大致的過程
首先判斷本機節點是否是Leader節點,如果不是則直接返回;
如果是Leader節點,則將RaftPeer和時間戳等信息封裝並通過httpClient遠程發送到其他nacos集群follower節點中;
請求會發送到RaftController.beat()方法;
beat方法中調用了RaftCore.receivedBeat()方法;
並將遠程nacos節點RaftPeer返回到本機節點中;
然后更新RaftPeerSet集合信息,保持nacos集群數據節點的一致性。
4.2.2 HeartBeat.run()
該方法中,首先會獲取本機節點的RaftPeer信息,並重置心跳信息;同時調用sendBeat()方法發送心跳:
@Override
public void run() {
if (stopWork) { return; }
if (!peers.isReady()) {return; } // 程序是否已准備完畢
RaftPeer local = peers.local();
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) { return; } // 心跳周期判斷
local.resetHeartbeatDue(); // 重置心跳發送周期
sendBeat(); // 發送心跳信息
}
4.2.2 HeartBeat.sendBeat()
private void sendBeat() throws IOException, InterruptedException {
RaftPeer local = peers.local();
if (ApplicationUtils.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {
return; // 如果自己不是Leader節點或者處於單機模式下,則直接返回
}
local.resetLeaderDue(); // 重置Leader任期時間
// build data
ObjectNode packet = JacksonUtils.createEmptyJsonNode();
packet.replace("peer", JacksonUtils.transferToJsonNode(local));
ArrayNode array = JacksonUtils.createEmptyArrayNode();
// 如果開啟了在心跳包中攜帶Leader存儲的數據進行發送,則對數據進行打包操作
if (!switchDomain.isSendBeatOnly()) {
for (Datum datum : datums.values()) { //遍歷datums中的數據
ObjectNode element = JacksonUtils.createEmptyJsonNode();
if (KeyBuilder.matchServiceMetaKey(datum.key)) {
element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
} else if (KeyBuilder.matchInstanceListKey(datum.key)) {
element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
}
element.put("timestamp", datum.timestamp.get());
array.add(element);
}
}
packet.replace("datums", array);
// 廣播
Map<String, String> params = new HashMap<String, String>(1);
params.put("beat", JacksonUtils.toJson(packet));
// 將參數信息進行 Gzip算法壓縮,降低網絡消耗
String content = JacksonUtils.toJson(params);
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(content.getBytes(StandardCharsets.UTF_8));
gzip.close();
byte[] compressedBytes = out.toByteArray();
String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
// 遍歷所有的Follower節點進行發送心跳數據包
for (final String server : peers.allServersWithoutMySelf()) {
final String url = buildUrl(server, API_BEAT); // "/raft/beat" 接口
// 采用異步HTTP請求進行心跳數據發送
HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
//不成功則異常
// 成功后接收Follower節點的心跳回復(Follower節點的當前信息)進行節點更新到peers
peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));
}
});
}
}
4.2.2 RaftCore.receivedBeat()-Follower節點接收心跳
nacos中則是通過心跳來保證datum的一致性的。
邏輯功能: 比較哪些datum是需要更新的,如果需要更新,就異步發請求到leader,獲取最新的datum並更新本地datums。
比較方法: 通過比較timestamp,之前也提到了,publish的時候,timestamp是會進行+1操作的,作為邏輯時鍾,timestamp很好的區分了datum的版本。判斷時只需要找到比傳參來的時間戳小的datum就可以了。
代碼還是比較長的,這里簡化:
public RaftPeer receivedBeat(JsonNode beat) throws Exception {
final RaftPeer local = peers.local();
final RaftPeer remote = new RaftPeer();//遠程參與者
JsonNode peer = beat.get("peer");
//如果不是leader發出心跳包 拋出異常(...省略))
//任期小於本地任期 拋出異常(...省略))
//本地服務狀態不是follower 則設置為follower,選票設置為leader的ip
if (local.state != RaftPeer.State.FOLLOWER) {
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
}
//獲取心跳的數據
final JsonNode beatDatums = beat.get("datums");
local.resetLeaderDue(); // 重置Leader任期時間
local.resetHeartbeatDue(); // 重置心跳時間
peers.makeLeader(remote); //更新leader以及peers信息
//如果不僅僅只是發送心跳 (可能在心跳的時候也會攜帶需要更新的數據過來)
if (!switchDomain.isSendBeatOnly()) {
Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
for (Map.Entry<String, Datum> entry : datums.entrySet()) {
receivedKeysMap.put(entry.getKey(), 0);
}
// now check datums
List<String> batch = new ArrayList<>();
int processedCount = 0;
for (Object object : beatDatums) {
processedCount = processedCount + 1;
JsonNode entry = (JsonNode) object;
String key = entry.get("key").asText();
final String datumKey;
//判斷是元數據信息還是服務實例數據
if (KeyBuilder.matchServiceMetaKey(key)) {
datumKey = KeyBuilder.detailServiceMetaKey(key);
} else if (KeyBuilder.matchInstanceListKey(key)) {
datumKey = KeyBuilder.detailInstanceListkey(key);
} else {
continue; //其他的忽略
}
long timestamp = entry.get("timestamp").asLong(); // 邏輯時間鍾
receivedKeysMap.put(datumKey, 1);
//如果本地存在一樣的鍵的數據,時間戳本地的大於遠程更新的 說明數據過期,直接忽略
if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp
&& processedCount < beatDatums.size()) {
continue;
}
//如果本地沒有的鍵的數據或者時間戳大於本地的數據的時間戳,添加到list等待批量更新
if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
batch.add(datumKey);
}
//盡量攢夠50一起更新
if (batch.size() < 50 && processedCount < beatDatums.size()) {
continue;
}
String keys = StringUtils.join(batch, ",");
// 構建http請求根據數據的key獲取leader中的數據來更新數據
String url = buildUrl(remote.ip, API_GET);
Map<String, String> queryParam = new HashMap<>(1);
queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));
HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
//失敗直接return
//解析http結果數據
List<JsonNode> datumList = JacksonUtils
.toObj(result.getData(), new TypeReference<List<JsonNode>>() { });
for (JsonNode datumJson : datumList) {
Datum newDatum = null;
OPERATE_LOCK.lock();
Datum oldDatum = getDatum(datumJson.get("key").asText());
//舊數據不為空,而且遠程的時間戳小於等於本地的時間戳,直接忽略
if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp
.get()) { continue; }
//根據不同的數據類型進行處理
if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {
Datum<Service> serviceDatum = new Datum<>();
serviceDatum.key = datumJson.get("key").asText();
serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());
serviceDatum.value = JacksonUtils
.toObj(datumJson.get("value").toString(), Service.class);
newDatum = serviceDatum;
}
if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {
Datum<Instances> instancesDatum = new Datum<>();
instancesDatum.key = datumJson.get("key").asText();
instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());
instancesDatum.value = JacksonUtils
.toObj(datumJson.get("value").toString(), Instances.class);
newDatum = instancesDatum;
}
raftStore.write(newDatum); //本地磁盤寫入新數據
datums.put(newDatum.key, newDatum); //新數據放入本地內存中
//添加數據修改的事件
notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);
local.resetLeaderDue();
//更新本地leader和本機的term,數據每次變動,term+100
if (local.term.get() + 100 > remote.term.get()) {
getLeader().term.set(remote.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(100);
}
raftStore.updateTerm(local.term.get()); //更新磁盤的term
} finally {
OPERATE_LOCK.unlock();
}
}
return;
}
});
}
//如果是遠程沒有的key 那么證明本地的key是臟數據 需要刪除
List<String> deadKeys = new ArrayList<>();
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
if (entry.getValue() == 0) {
deadKeys.add(entry.getKey());
}
}
for (String deadKey : deadKeys) {
deleteDatum(deadKey);
}
}
return local;
}
五、Server節點啟動(Distro協議)
初始數據全量同步,Distro協議節點啟動時會從其他節點全量同步數據;
5.1 啟動加載數據流程(全量同步)

圖片來源: https://blog.csdn.net/u012050299/article/details/110946637
5.1.1 主要步驟
- 啟動一個定時任務線程DistroLoadDataTask加載數據,調用load()方法加載數據;
- 調用loadAllDataSnapshotFromRemote()方法從遠程機器同步所有的數據;
- 從namingProxy代理獲取所有的數據data,從獲取的結果result中獲取數據bytes;
- 處理數據processData從data反序列化出datumMap;
- 把數據存儲到dataStore,也就是本地緩存dataMap
- 如果監聽器不包括key,就創建一個空的service,並且綁定監聽器
- 監聽器listener執行成功后,就更新dataStore
5.2 代碼流程
5.2.1 DistroProtocol構造函數啟動任務
public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,
DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) {
this.memberManager = memberManager;
this.distroComponentHolder = distroComponentHolder;
this.distroTaskEngineHolder = distroTaskEngineHolder;
this.distroConfig = distroConfig;
startDistroTask();
}
private void startDistroTask() {
if (ApplicationUtils.getStandaloneMode()) { //單機啟動的返回.
isInitialized = true;
return;
}
startVerifyTask(); //驗證任務,調用http接口, "/distro/checksum";
startLoadTask();//開啟啟動任務
}
5.2.2 DistroLoadDataTask.run() 執行任務
@Override
public void run() {
load(); //加載
// 加載沒完成 再提交一下任務, 出現異常則設置初始化標識失敗 ...
}
private void load() throws Exception {
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
5.2.3 loadAllDataSnapshotFromRemote()從遠程機器同步所有的數據
//從遠程加載所有數據快照
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
//遍歷集群成員節點,不包括自己
for (Member each : memberManager.allMembersWithoutSelf()) {
//從遠程節點加載數據,返回只要有一個返回成功,
//調用http請求接口: distro/datums;
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
boolean result = dataProcessor.processSnapshot(distroData); //處理數據
if (result) { return true; }
}
return false;
}
5.2.4 DistroHttpAgent.getDatumSnapshot()請求獲取數據
@Override
public DistroData getDatumSnapshot(String targetServer) {
//從namingProxy代理獲取所有的數據data,從獲取的結果result中獲取數據bytes
byte[] allDatum = NamingProxy.getAllData(targetServer);
return new DistroData(new DistroKey("snapshot", KeyBuilder.INSTANCE_LIST_KEY_PREFIX), allDatum);
}
5.2.5 DistroHttpAgent.processSnapshot()解析數據
@Override
public boolean processSnapshot(DistroData distroData) {
return processData(distroData.getContent());
}
private boolean processData(byte[] data) throws Exception {
//從data反序列化出datumMap
Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class);
// 把數據存儲到dataStore,也就是本地緩存dataMap
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());
//監聽器不包括key,就創建一個空的service,並且綁定監聽器
if (!listeners.containsKey(entry.getKey())) {
// pretty sure the service not exist:
if (switchDomain.isDefaultInstanceEphemeral()) {
Service service = new Service();
... 省略部分代碼
// 與鍵值對應的監聽器不能為空,這里的監聽器類型是 ServiceManager
RecordListener listener = listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).peek();
// service 綁定監聽器 , 調用 ServiceManager.onChange方法 ,
listener.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
// 監聽器listener執行成功后,就更新dataStore
dataStore.put(entry.getKey(), entry.getValue());
}
}
return true;
}
六、Distro協議總結
6.1 簡單介紹
這個為臨時數據的一致性協議: 不需要把數據存儲到磁盤或者數據庫;因為臨時數據通常和服務器保持一個session會話;這個會話只要存在,數據就不會丟失。
寫必須永遠是成功的,即使可能會發生網絡分區。當網絡恢復時,每個數據分片都合並到一個set中,所以集群就是恢復成一致性的狀態。
6.2 關鍵點
- distro協議是為了注冊中心而創造出的協議;
- 客戶端與服務端有兩個重要的交互,服務注冊與心跳發送;
- 客戶端以服務為維度向服務端注冊,注冊后每隔一段時間向服務端發送一次心跳,心跳包需要帶上注冊服務的全部信息,在客戶端看來,服務端節點對等,所以請求的節點是隨機的;
- 客戶端請求失敗則換一個節點重新發送請求;
- 服務端節點都存儲所有數據,但每個節點只負責其中一部分服務,在接收到客戶端的"寫"(注冊、心跳、下線等)請求后,服務端節點判斷請求的服務是否為自己負責,如果是,則處理,否則交由負責的節點處理;
- 每個服務端節點主動發送健康檢查到其他節點,響應的節點被該節點視為健康節點;
- 服務端在接收到客戶端的服務心跳后,如果該服務不存在,則將該心跳請求當做注冊請求來處理;
- 服務端如果長時間未收到客戶端心跳,則下線該服務;
- 負責的節點在接收到服務注冊、服務心跳等寫請求后將數據寫入后即返回,后台異步地將數據同步給其他節點;
- 節點在收到讀請求后直接從本機獲取后返回,無論數據是否為最新。
七、Raft協議總結
7.1 簡單介紹
Raft通過當選的領導者達成共識。集群中的服務器是leader或folower,並且在選舉的精確情況下可以是候選者(領導者不可用)。
leader負責將日志復制到關注者。它通過發送心跳消息定期通知folower它的存在。
每個folower都有一個超時(通常在150到300毫秒之間),接收心跳時重置超時。
如果沒有收到心跳,則關注者將其狀態更改為候選人並開始leader選舉。
7.2 關鍵點
7.2.1 Raft 的選舉過程
Raft 協議在集群初始狀態下是沒有 Leader 的, 集群中所有成員均是 Follower,在選舉開始期間所有 Follower 均可參與選舉,這時所有 Follower 的角色均轉變為 Condidate, Leader 由集群中所有的 Condidate 投票選出,最后獲得投票最多的 Condidate 獲勝,其角色轉變為 Leader 並開始其任期,其余落敗的 Condidate 角色轉變為 Follower 開始服從 Leader 領導。
這里有一種意外的情況會選不出 Leader 就是所有 Condidate 均投票給自己,這樣無法決出票數多的一方,Raft 算法 選不出 Leader 不罷休,直到選出為止,一輪選不出 Leader,便令所有 Condidate 隨機時間 sleap(Raft 論文稱為 timeout)一段時間,然后馬上開始新一輪的選舉,這里的隨機 sleep 就起了很關鍵的因素,第一個從 sleap 狀態恢復過來的 Condidate 會向所有 Condidate 發出投票給我的申請,這時還沒有蘇醒的 Condidate 就只能投票給已經蘇醒的 Condidate ,因此可以有效解決 Condiadte 均投票給自己的故障,便可快速的決出 Leader。
選舉出 Leader 后 Leader 會定期向所有 Follower 發送 heartbeat 來維護其 Leader 地位,如果 Follower 一段時間后未收到 Leader 的心跳則認為 Leader 已經掛掉,便轉變自身角色為 Condidate,同時發起新一輪的選舉,產生新的 Leader。
7.2.2 Raft 的數據一致性策略
Raft 協議強依賴 Leader 節點來確保集群數據一致性。即 client 發送過來的數據均先到達 Leader 節點,Leader 接收到數據后,先將數據標記為 uncommitted 狀態,隨后 Leader 開始向所有 Follower 復制數據並等待響應,在獲得集群中大於 N/2 個 Follower 的已成功接收數據完畢的響應后,Leader 將數據的狀態標記為 committed,隨后向 client 發送數據已接收確認,在向 client 發送出已數據接收后,再向所有 Follower 節點發送通知表明該數據狀態為committed。
7.2.3 Raft 如何處理 Leader 意外
1. client 發送數據到達 Leader 之前 Leader 就掛了,因為數據還沒有到達集群內部,所以對集群內部數據的一致性沒有影響,Leader 掛了之后,集群會進行新的選舉產生新的 Leader,之前掛掉的 Leader 重啟后作為 Follower 加入集群,並同步 Leader 上的數據。這里最好要求 client 有重試機制在一定時間沒有收到 Leader 的數據已接收確認后進行一定次數的重試,並再次向新的 Leader 發送數據來確保業務的流暢性。
2. client 發送數據到 Leader,數據到達 Leader 后,Leader 還沒有開始向 Folloers 復制數據,Leader就掛了,此時數據仍被標記為 uncommited 狀態,這時集群會進行新的選舉產生新的 Leader,之前掛掉的 Leader 重啟后作為 Follower 加入集群,並同步 Leader 上的數據,來保證數據一致性,之前接收到 client 的數據由於是 uncommited 狀態所以可能會被丟棄。這里同樣最好要求 client 有重試機制通過在一定時間在沒有收到 Leader 的數據已接收確認后進行一定次數的重試,再次向新的 Leader 發送數據來確保業務的流暢性。
3. client 發送數據到 Leader, Leader 接收數據完畢后標記為 uncommited,開始向 Follower復制數據,在復制完畢一小部分 Follower 后 Leader 掛了,此時數據在所有已接收到數據的 Follower 上仍被標記為 uncommitted,此時集群將進行新的選舉,而擁有最新數據的 Follower 變換角色為 Condidate,也就意味着 Leader 將在擁有最新數據的 Follower 中產生,新的 Leader 產生后所有節點開始從新 Leader 上同步數據確保數據的一致性,包括之前掛掉后恢復了狀態的 老Leader,這時也以 Follower 的身份同步新 Leader 上的數據。
4. client 發送數據到 Leader,Leader 接收數據完畢后標記為 uncommitted,開始向 Follower 復制數據,在復制完畢所有 Follower 節點或者大部分節點(大於 N/2),並接收到大部分節點接收完畢的響應后,Leader 節點將數據標記為 committed,這時 Leader 掛了,此時已接收到數據的所有 Follower 節點上的數據狀態由於還沒有接收到 Leader 的 commited 通知,均處於 uncommited 狀態。這時集群進行了新的選舉,新的 Leader 將在擁有最新數據的節點中產生,新的 Leader 產生后,由於 client 端因老 Leader 掛掉前沒有通知其數據已接收,所以會向新的 Leader 發送重試請求,而新的 Leader 上已經存在了這個之前從老 Leader 上同步過來的數據,因此 Raft 集群要求各節點自身實現去重的機制,保證數據的一致性。
5. 集群腦裂的一致性處理,多發於雙機房的跨機房模式的集群。(網絡分區導致兩邊選了2個leader), 假設一個 5 節點的 Raft 集群,其中三個節點在 A 機房,Leader 節點也在 A 機房,兩個節點在 B 機房。突然 A、B 兩個機房之間因其他故障無法通訊,那么此時 B 機房中的 2 個Follower 因為失去與 Leader 的聯系,均轉變自身角色為 Condidate。根據 Leader 選舉機制,B 機房中產生了一個新的 Leader,這就發生了腦裂即存在 A 機房中的老 Leader 的集群與B機房新 Leader 的集群。Raft 針對這種情況的處理方式是老的 Leader 集群雖然剩下三個節點,但是 Leader 對數據的處理過程還是在按原來 5 個節點進行處理,所以老的 Leader 接收到的數據,在向其他 4 個節點復制數據,由於無法獲取超過 N/2 個 Follower 節點的復制完畢數據響應(因為無法連接到 B 機房中的 2個節點),所以 client 在向老 Leader 發送的數據請求均無法成功寫入,而 client 向B機房新 Leader 發送的數據,因為是新成立的集群,所以可以成功寫入數據,在A、B兩個機房恢復網絡通訊后,A 機房中的所有節點包括老 Leader 再以 Follower 角色接入這個集群,並同步新 Leader 中的數據,完成數據一致性處理。
(兩邊的節點合並時候會判斷任期)
參考:
https://www.liaochuntao.cn/2019/06/01/java-web-41 ,
https://blog.csdn.net/u012050299/article/details/110946637,
https://blog.csdn.net/kuaipao19950507/article/details/105980892 ,
https://blog.csdn.net/liyanan21/article/details/89320872 ,
https://blog.csdn.net/liaohonghb/article/details/105683239,
https://blog.csdn.net/wangwei19871103/article/details/105836960,
https://blog.csdn.net/xiaohanzuofengzhou/article/details/102544721
