nacos服務注冊之服務器端Raft


Raft是持久化,數據存儲在\nacos\data\naming\data目錄
nacos啟動后首先從數據存儲目錄加載數據
Raft協議中節點只有一個LEADER,只有LEADER節點負責數據寫入,FOLLOWER節點接受到寫入請求后轉發給LEADER節點處理
Raft協議中LEADER節點接受寫入請求后首先寫入本機,然后同步到集群中其他節點,許超過半數節點返回成功,才認為寫入成功。
Raft協議中LEADER定時發送心跳數據(包含全量數據)同步FOLLOWER。

Raft存儲代碼分析; RaftStore類負責數據的存儲,數據存儲在\nacos\data\naming\data\public(namespaceId)目錄,
com.alibaba.nacos.naming.domains.meta.public##@@nacos.test.5
com.alibaba.nacos.naming.iplist.public##@@nacos.test.5

@Component
public class RaftStore {

    /**
     * 數據持久化到文件,文件內容就json字符串
     * @param datum
     * @throws Exception
     */
    public synchronized void write(final Datum datum) throws Exception {
        String namespaceId = KeyBuilder.getNamespace(datum.key);
        File cacheFile = new File(cacheDir + File.separator + namespaceId + File.separator + encodeFileName(datum.key));
        FileChannel fc = null;
        ByteBuffer data = ByteBuffer.wrap(JSON.toJSONString(datum).getBytes(StandardCharsets.UTF_8));

        try {
            fc = new FileOutputStream(cacheFile, false).getChannel();
            fc.write(data, data.position());
            fc.force(true);
        } catch (Exception e) {
            MetricsMonitor.getDiskException().increment();
            throw e;
        } finally {
            if (fc != null) {
                fc.close();
            }
        }
    }
}

Raft服務注冊源碼分析:
只有LEADER節點負責數據寫入,FOLLOWER節點接受到寫入請求后轉發給LEADER節點處理
LEADER節點接受寫入請求后首先寫入本機,然后同步到集群中其他節點,許超過半數節點返回成功,才認為寫入成功。

@Component
public class RaftCore {

    /**
     * 服務注冊
     * @param key
     * @param value
     * @throws Exception
     */
    public void signalPublish(String key, Record value) throws Exception {
        //如果是FOLLOWER節點則轉發到LEADER節點處理
        if (!isLeader()) {
            JSONObject params = new JSONObject();
            params.put("key", key);
            params.put("value", value);
            Map<String, String> parameters = new HashMap<>(1);
            parameters.put("key", key);
            raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
            return;
        }
        // LEADER節點處理
        try {
            OPERATE_LOCK.lock();
            final Datum datum = new Datum();
            datum.key = key;
            datum.value = value;
            datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());

            JSONObject json = new JSONObject();
            json.put("datum", datum);
            json.put("source", peers.local());
            //數據注冊到本地節點
            onPublish(datum, peers.local());

            final String content = JSON.toJSONString(json);
            //只有大多數服務器(majorityCount=(peers.size() / 2 + 1))返回成功,我們才能認為這次更新成功
            final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
            //數據同步到集群中的所有節點
            for (final String server : peers.allServersIncludeMyself()) {
                if (isLeader(server)) {
                    latch.countDown();
                    continue;
                }
                // 數據同步地址:/nacos/v1/ns/raft/datum/commit"
                final String url = buildURL(server, API_ON_PUB);
                HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
                    @Override
                    public Integer onCompleted(Response response) throws Exception {
                        if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                            return 1;
                        }
                        latch.countDown();
                        return 0;
                    }

                    @Override
                    public STATE onContentWriteCompleted() {
                        return STATE.CONTINUE;
                    }
                });

            }
            //等待大多數服務器成功或超時(RAFT_PUBLISH_TIMEOUT=5000)
            if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
                // only majority servers return success can we consider this update success
                throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
            }
        } finally {
            OPERATE_LOCK.unlock();
        }
    }

    /**
     * 數據注冊到本地節點
     * @param datum 
     * @param source    
     * @throws Exception
     */
    public void onPublish(Datum datum, RaftPeer source) throws Exception {
        //驗證數據
        .....................
        RaftPeer local = peers.local();
        local.resetLeaderDue();

        // if data should be persisted, usually this is true:
        if (KeyBuilder.matchPersistentKey(datum.key)) {
            raftStore.write(datum);
        }
        // 存入內存(ConcurrentHashMap)
        datums.put(datum.key, datum);

        if (isLeader()) {
            local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
        } else {
            if (local.term.get() + PUBLISH_TERM_INCREASE_COUNT > source.term.get()) {
                //set leader term:
                getLeader().term.set(source.term.get());
                local.term.set(getLeader().term.get());
            } else {
                local.term.addAndGet(PUBLISH_TERM_INCREASE_COUNT);
            }
        }
        //更新任期
        raftStore.updateTerm(local.term.get());
        //通知其他類
        notifier.addTask(datum.key, ApplyAction.CHANGE);
    }
}

Raft協議中LEADER定時(TICK_PERIOD_MS=500毫秒)發送心跳數據(包含全量數據)同步FOLLOWER。

public class HeartBeat implements Runnable {
       /**
         * 發送心跳
         * @throws IOException
         * @throws InterruptedException
         */
        public void sendBeat() throws IOException, InterruptedException {
            RaftPeer local = peers.local();
            if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {
                return;
            }
            local.resetLeaderDue();

            // 構造報文
            JSONObject packet = new JSONObject();
            packet.put("peer", local);
            JSONArray array = new JSONArray();
            if (!switchDomain.isSendBeatOnly()) {
                //遍歷所有服務
                for (Datum datum : datums.values()) {
                    JSONObject element = new JSONObject();
                    if (KeyBuilder.matchServiceMetaKey(datum.key)) {
                        element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
                    } else if (KeyBuilder.matchInstanceListKey(datum.key)) {
                        element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
                    }
                    element.put("timestamp", datum.timestamp);
                    array.add(element);
                }
            }

            packet.put("datums", array);
            // broadcast
            Map<String, String> params = new HashMap<String, String>(1);
            params.put("beat", JSON.toJSONString(packet));
            // 壓縮數據
            String content = JSON.toJSONString(params);
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            GZIPOutputStream gzip = new GZIPOutputStream(out);
            gzip.write(content.getBytes(StandardCharsets.UTF_8));
            gzip.close();

            byte[] compressedBytes = out.toByteArray();
            String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
            //發送集群中所有節點
            for (final String server : peers.allServersWithoutMySelf()) {
                try {
                    final String url = buildURL(server, API_BEAT);
                    HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
                                MetricsMonitor.getLeaderSendBeatFailedException().increment();
                                return 1;
                            }

                            peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
                            return 0;
                        }
                    });
                } catch (Exception e) {
                    Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
                    MetricsMonitor.getLeaderSendBeatFailedException().increment();
                }
            }

        }
}

Raft協議選舉流程:
nacos啟動時啟動一個選舉定時任務:executorService.scheduleAtFixedRate(runnable, 0, TICK_PERIOD_MS=500L, TimeUnit.MILLISECONDS);
nacos節點定時任務檢測如果超過15秒沒有收到LEADER心跳則發起選舉投票(選自己為LEADER),發送到集群其他節點,自己狀態為CANDIDATE。
nacos節點收到選舉投票如果CANDIDATE節點term大於本地的term則同意發送節點為LEADER,否則投票自己為LEADER。
CANDIDATE節點依次收到其他節點的投票回復,統計投票,只要某個節點超過半數投票則確認為LEADER。
LEADER節點同過心跳通知其他節點,自己為新LEADER。


    public class MasterElection implements Runnable {
        @Override
        public void run() {
            try {
                if (!peers.isReady()) {
                    return;
                }

                RaftPeer local = peers.local();
                local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
                //是否超過15秒沒有收到LEADER心跳
                if (local.leaderDueMs > 0) {
                    return;
                }
                // reset timeout
                local.resetLeaderDue();
                local.resetHeartbeatDue();
                // 發送選舉
                sendVote();
            } catch (Exception e) {
                Loggers.RAFT.warn("[RAFT] error while master election {}", e);
            }

        }

        /**
         * 發送選舉流程
         */
        public void sendVote() {
            RaftPeer local = peers.get(NetUtils.localServer());
            peers.reset();
            local.term.incrementAndGet();
            local.voteFor = local.ip; //選自己
            local.state = RaftPeer.State.CANDIDATE;

            Map<String, String> params = new HashMap<>(1);
            params.put("vote", JSON.toJSONString(local));
            //發送集群其他節點: /nacos/v1/ns/raft/vote
            for (final String server : peers.allServersWithoutMySelf()) {
                final String url = buildURL(server, API_VOTE);
                try {
                    HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
                        @Override
                        public Integer onCompleted(Response response) throws Exception {
                            //收到回復統計選票確定誰是LEADER
                            RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
                            peers.decideLeader(peer);
                            return 0;
                        }
                    });
                } catch (Exception e) {
                    Loggers.RAFT.warn("error while sending vote to server: {}", server);
                }
            }
        }
    }

收到選舉投票請求的處理

   /**
     * 收到選舉投票請求
     * @param remote  CANDIDATE節點
     * @return        自己投票的節點
     */
    public synchronized RaftPeer receivedVote(RaftPeer remote) {
        RaftPeer local = peers.get(NetUtils.localServer());
        // 本地節點term大於等於CANDIDATE節點term則投票自己為LEADER
        if (remote.term.get() <= local.term.get()) {
            if (StringUtils.isEmpty(local.voteFor)) {
                local.voteFor = local.ip;
            }
            return local;
        }
        // CANDIDATE節點term大於本地的term則同意CANDIDATE節點為LEADER
        local.resetLeaderDue();
        local.state = RaftPeer.State.FOLLOWER;
        local.voteFor = remote.ip;
        local.term.set(remote.term.get());
        return local;
    }

收到投票回復統計選票確定誰是LEADER

    /**
     * 統計選票確定誰是LEADER
     * @param candidate  一次計票
     * @return
     */
    public RaftPeer decideLeader(RaftPeer candidate) {
        //放到投票箱
        peers.put(candidate.ip, candidate);
        SortedBag ips = new TreeBag();
        int maxApproveCount = 0;
        String maxApprovePeer = null;
        //統計投票找出最大的投票節點
        for (RaftPeer peer : peers.values()) {
            if (StringUtils.isEmpty(peer.voteFor)) {
                continue;
            }
            ips.add(peer.voteFor);
            if (ips.getCount(peer.voteFor) > maxApproveCount) {
                maxApproveCount = ips.getCount(peer.voteFor);
                maxApprovePeer = peer.voteFor;
            }
        }
        // 只要超過半數投票則確認為LEADER。
        if (maxApproveCount >= majorityCount()) {
            RaftPeer peer = peers.get(maxApprovePeer);
            peer.state = RaftPeer.State.LEADER;
            if (!Objects.equals(leader, peer)) {
                leader = peer;
                applicationContext.publishEvent(new LeaderElectFinishedEvent(this, leader));
            }
        }
        return leader;
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM