Eureka 系列(05)消息廣播(上):消息廣播原理分析


Eureka 系列(05)消息廣播(上):消息廣播原理分析

0. Spring Cloud 系列目錄 - Eureka 篇

首先回顧一下客戶端服務發現的流程,在上一篇 Eureka 系列(04)客戶端源碼分析 中對 Eureka Client 的源碼進行了分析,DiscoverClient 負載服務發現,會將 Eureka Server 的服務全量同步到客戶端。客戶端同步的方式有兩種:一是全量同步,二是增量同步,如果增量同步失敗,則回滾到全量同步。

Eureka Client 服務發現的具體方式是啟動了幾個定時任務:

  1. CacheRefreshThread 本地緩存更新線程,采用輪詢的方式,默認每 30s 從服務器同步注冊服務信息。
  2. HeartbeatThread 心跳檢測線程,默認每 30s 發送一次心跳到服務端。
  3. InstanceInfoReplicator 線程,默認每 30s 檢測一次實例信息是否發生變更,如果發生變化就重新注冊一次。這個好像是 Eureka 獨有的吧!

接下來,我們分析一下服務器消息廣播機制,如何保障數據的最終一致性?相關的核心實現在 com.netflix.eureka.cluster 內。

Eureka 消息廣播主要分三部分講解:

  1. 服務器列表管理:PeerEurekaNodes 管理了所有的 PeerEurekaNode 節點。
  2. 消息廣播機制分析:PeerAwareInstanceRegistryImpl 收到客戶端的消息后,第一步:先更新本地注冊信息;第二步:遍歷所有的 PeerEurekaNode,轉發給其它節點。
  3. TaskDispacher 消息處理: Acceptor - Worker 模式分析。

本文重點分析前兩部分的消息廣播原理,下一章則分析 TaskDispacher 的 Acceptor - Worker 模式。

1. 服務器列表管理

Eureka 中負責服務器列表管理的是 PeerEurekaNodes,在 Nacos Naming 中也有一個類似功能的類 ServerListManager。這個類還是要關注一下,涉及到 Eureka 的動態擴容。

PeerEurekaNodes 構建時會初始化 "Eureka-PeerNodesUpdater" 定時器,默認每 10min 調用 updatePeerEurekaNodes(resolvePeerUrls()) 方法更新一次服務列表。

圖1:Eureka 服務器列表更新
sequenceDiagram participant Scheduler participant PeerEurekaNodes participant EndpointUtils participant PeerEurekaNode Scheduler ->> PeerEurekaNodes : updatePeerEurekaNodes PeerEurekaNodes ->> PeerEurekaNodes : 1. 查找最新的服務器列表:resolvePeerUrls PeerEurekaNodes ->> EndpointUtils : getDiscoveryServiceUrls PeerEurekaNodes ->> PeerEurekaNode : 2.1 廢棄的Server: shutDown PeerEurekaNodes ->> PeerEurekaNode : 2.2 新增的Server: createPeerEurekaNode

總結: EndpointUtils.getDiscoveryServiceUrls 默認調用 getServiceUrlsFromConfig,即讀取配置文件的 serviceUrl 配置。當服務器列表發生變化時會將廢棄的 PeerEurekaNode 節點關閉,同時將新增的節點添加到 List<PeerEurekaNode> peerEurekaNodes 服務器列表中。

注意:peerEurekaNodes 服務器列表中並不包含當前 Server 的服務器,在 resolvePeerUrls 時會將當前服務器排除。

1.1 創建 PeerEurekaNode

protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
    HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
    String targetHost = hostFromUrl(peerEurekaNodeUrl);
    if (targetHost == null) {
        targetHost = "host";
    }
    return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}

總結: PeerEurekaNode 代表一個 Eureka Server 節點,包含節點的 url 和配置信息 serverConfig,其中最重要的兩個屬性是 registry 和 replicationClient:

  • targetHost/serverConfig 當前 Eureka Server 的 url 信息。
  • registry 管理所有的注冊信息。
  • replicationClient HTTP Client,用於網絡傳輸。

注意: Discovery Client 默認是 JerseyApplicationClient,這兩者的區別是 JerseyReplicationClient 的請求頭是 PeerEurekaNode.HEADER_REPLICATION=true,而 JerseyApplicationClient 請求頭的默認參數為 false。isReplication 這個參數的意思是是否是其它服務器轉發的請求。

為什么要有這個參數呢?大家想一下,EurekaA 向 EurekaB 轉發請求,如果 EurekaB 又向 EurekaA 轉發請求,這樣就會造成死循環,所以就在請求頭中加上這個參數 isReplication=true。當然如果是客戶端發起的請求,則需要同步給其它服務器,所以客戶端 isReplication=false。

2. 消息廣播分析

Eureka Server 接收客戶端的請求后,會將請求轉發給 PeerAwareInstanceRegistryImpl 處理。這個 registry 會做兩件事:一是本地注冊信息更新(同步);二是將消息廣播給其它服務器(異步)。

由此也可以看出 Eureka 是 AP 模型的,優先保障了可用性,事實上大多數注冊中心的實現方案都是 AP 模型,只有 ZK 是 CP 模型。事實上,ZK 是分布式協調服務,並不是專門用來進行服務治理的。

本文重點關注第二步:消息廣播機制。

2.1 Eureka 消息廣播流程

PeerAwareInstanceRegistryImpl 處理完本地注冊信息更新后,會將請求轉發給 PeerEurekaNode 處理,這個過程是異步的。也就是說本地注冊信息更新后請求就返回了,而消息廣播都是由 TaskDispatcher 異步處理,當然數據也就可能會短時間內不一致。

圖2:Eureka 消息廣播流程
sequenceDiagram participant PeerAwareInstanceRegistryImpl participant AbstractInstanceRegistry participant PeerEurekaNodes participant PeerEurekaNode note over PeerAwareInstanceRegistryImpl,PeerEurekaNode : 接收EurekaClient請求:<br/>register/cancel/heartbeat/statusUpdate/deleteStatusOverride PeerAwareInstanceRegistryImpl ->> AbstractInstanceRegistry : 1. 更新本地注冊信息:register/cancel/heartbeat/... loop 2. 消息廣播給其它Server PeerAwareInstanceRegistryImpl ->>+ PeerAwareInstanceRegistryImpl : replicateToPeers PeerAwareInstanceRegistryImpl ->> PeerEurekaNodes : getPeerEurekaNodes PeerAwareInstanceRegistryImpl ->> PeerEurekaNodes : continue: isThisMyUrl PeerAwareInstanceRegistryImpl ->> PeerAwareInstanceRegistryImpl : replicateInstanceActionsToPeers loop 消息廣播 PeerAwareInstanceRegistryImpl ->>- PeerEurekaNode : register/cancel/heartbeat/... end end

總結: PeerAwareInstanceRegistryImpl 是 Eureka 的核心類,服務的注冊、下線、心跳檢測都是由這個類完成的,服務的本地注冊信息都是由這個其父類 AbstractInstanceRegistry 進行維護的。

  1. 本地注冊信息更新(同步):首先由 AbstractInstanceRegistry 完成本地緩存的服務信息更新。

  2. 消息廣播(異步):replicateToPeers 方法先從 PeerEurekaNodes 獲取所有的服務器節點,通過 isThisMyUrl 排除自身后,給其余的所有服務器進行消息廣播。消息廣播的處理是由 PeerEurekaNode 類完成的,這個類的處理都是異步的。

    注意:即使 Eureka Server 宕機,也會進行消息廣播,直到任務過期為至。這中間可能會出現數據不同步,但一旦網絡恢復后,接收到其它服務器廣播的心跳信息,此時會進行數據同步。

最終所有的消息廣播都由 PeerEurekaNode 處理,代碼如下:

// 消息廣播給 PeerEurekaNode 處理
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
    }
}

總結: 這個代碼就不細說了,接下來就要重點分析 PeerEurekaNode 是如何進行消息轉發的。

2.2 PeerEurekaNode 消息處理

2.2.1 消息處理整體流程分析

圖3:Eureka 消息批處理時序圖
sequenceDiagram participant PeerEurekaNode participant batchingTaskDispatcher participant BatchWorkerRunnable participant AcceptorExecutor participant ReplicationTaskProcessor participant JerseyReplicationClient PeerEurekaNode ->> batchingTaskDispatcher : process -> (register/cancel/heartbeat/...) batchingTaskDispatcher ->> AcceptorExecutor : process batchingTaskDispatcher ->>+ BatchWorkerRunnable : run BatchWorkerRunnable ->> AcceptorExecutor : requestWorkItems BatchWorkerRunnable ->>- ReplicationTaskProcessor : process(List<ReplicationTask> tasks) ReplicationTaskProcessor ->> JerseyReplicationClient : submitBatchUpdates -> `POST: peerreplication/batch` opt 處理失敗 ReplicationTaskProcessor -->> AcceptorExecutor : reprocess end

總結: PeerEurekaNode 收到請求后,將請求轉發給 TaskDispatcher,TaskDispatcher 內部維護一個阻塞隊列。即然是阻塞隊列那就肯定有消費線程了,這個線程就是 WorkerRunnable。WorkerRunnable 不斷輪詢,只要有任務是調用 ReplicationTaskProcessor 進行數據同步。如果同步失敗進行重試,直到任務失效。這樣再配合周期性的心跳檢測,就能保證數據的最終一致性了。

nonBatchingDispatcher 和 batchingTaskDispatcher 類似,就不多介紹了。

思考: 如果同時有大量的數據需要同步給其它服務器,此時會發起多個網絡請求,有什么好辦法?

Eureka 考慮到了這個問題,具體措施就是將多個請求合並成一個請求進行處理,這就是 batchingTaskDispatcher 和 nonBatchingDispatcher 的區別。

消息廣播核心類功能分析

PeerEurekaNode 接收消息廣播任務后,統一由 TaskDispatcher 進行異步處理。TaskDispatcher 將任務的接收和處理分別交由不同的線程完成,即典型的 Acceptor - Worker 模式。WorkerRunnable 通過 AcceptorExecutor#requestWorkItems 獲取即將執行的任務后,調用 ReplicationTaskProcessor 執行消息廣播任務。

  • 數據同步(PeerEurekaNode):接收消息廣播任務。
  • 任務分發(TaskDispatcher):統一調度 PeerEurekaNode 接收的消息廣播任務。實際接收消息廣播由線程 AcceptorExecutor 處理,執行由 WorkerRunnable 處理。
  • 任務管理(AcceptorExecutor):統一管理所有的任務。
  • 執行線程(WorkerRunnable):消息廣播任務執行線程。
  • 任務處理(ReplicationTaskProcessor):執行數據同步。

2.2.2 初始化

PeerEurekaNode 內部有兩個重要的變量:一是 batchingDispatcher 批處理;二是 nonBatchingDispatcher 單獨處理器。這二個任務派發器都是異步處理的。

PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, 
               String serviceUrl, HttpReplicationClient replicationClient, 	
               EurekaServerConfig config, int batchSize, long maxBatchingDelayMs,
               long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
    this.registry = registry;
    this.targetHost = targetHost;
    this.replicationClient = replicationClient; // HTTP客戶端

    this.serviceUrl = serviceUrl;
    this.config = config;
    this.maxProcessingDelayMs = config.getMaxTimeForReplication();

    // 任務處理器,真正進行消息轉發
    ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
    // 批處理
    String batcherName = getBatcherName();
    this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
        batcherName,
        config.getMaxElementsInPeerReplicationPool(),
        batchSize,
        config.getMaxThreadsForPeerReplication(),
        maxBatchingDelayMs,
        serverUnavailableSleepTimeMs,
        retrySleepTimeMs,
        taskProcessor
    );
    // 單獨處理
    this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
        targetHost,
        config.getMaxElementsInStatusReplicationPool(),
        config.getMaxThreadsForStatusReplication(),
        maxBatchingDelayMs,
        serverUnavailableSleepTimeMs,
        retrySleepTimeMs,
        taskProcessor
    );
}

總結: PeerEurekaNode 所有的消息都是異步處理的,分為 batchingDispatcher 和 nonBatchingDispatcher 兩種情況。為什么會有批處理了呢?很顯然,如何有大量的消息需要轉發給另一台服務器,如何一條條發送會浪費網絡,這時可以將多個消息合並成一個消息進行發送,這就是 batchingDispatcher 的功能。

2.2.3 任務接收

我們看一下 PeerEurekaNode 接收任務,以注冊為例:

public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    // 任務id、任務內容task、任務過期時間expiryTime
    batchingDispatcher.process(
        taskId("register", info),
        new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
            public EurekaHttpResponse<Void> execute() {
                return replicationClient.register(info);
            }
        }, expiryTime);
}

總結: PeerEurekaNode 收到消息廣播任務后,會由 TaskDispatcher 完成任務的調度。TaskDispatcher 將任務的接收實際委托給了 AcceptorExecutor 線程完成。TaskDispatcher 將任務的接收和處理分別交由不同的線程完成,這是一種典型的 Acceptor - Worker 模式。相關原理會在第三小節進行詳細的分析。

2.2.4 任務處理

TaskDispatcher 是一種典型的 Acceptor - Worker 模式。batchingDispatcher 通過 AcceptorExecutor 線程接收任務后,處理就交給 BatchWorkerRunnable 線程。

(1) TaskDispatcher 任務調度

消息處理是在 TaskDispatcher 中完成的,下面以 BatchWorkerRunnable 為例,分析批處理的原理。

public void run() {
    try {
        while (!isShutdown.get()) {
            // 1. 獲取要轉發的消息,TaskHolder 持有的都是 InstanceReplicationTask
            List<TaskHolder<ID, T>> holders = getWork();
            metrics.registerExpiryTimes(holders);

            List<T> tasks = getTasksOf(holders);
            // 2. 請求轉發
            ProcessingResult result = processor.process(tasks);
            // 3. 結果處理,網絡IO失敗會調用reprocess重試,其它未知異常則取消任務
            switch (result) {
                case Success:
                    break;
                case Congestion:	// 服務器忙,服務器有競爭
                case TransientError:// 網絡異常,IOException
                    taskDispatcher.reprocess(holders, result);
                    break;
                case PermanentError:// 其它未知異常
                    logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
            }
            metrics.registerTaskResult(result, tasks.size());
        }
    } catch (InterruptedException e) {
    } catch (Throwable e) {
    }
}

總結: TaskDispatcher#BatchWorkerRunnable 負責調度任務,請求的處理還是由 ReplicationTaskProcessor 完成的。需要關注一下 Eureka 異常的處理:

  1. 對方服務器忙或網絡IO異常,則會調用 reprocess 進行重試。
  2. 其它未知異常,則統一取消任務。

(2) ReplicationTaskProcessor 任務處理

public ProcessingResult process(List<ReplicationTask> tasks) {
    // 1. 合並請求
    ReplicationList list = createReplicationListOf(tasks);
    try {
        // 2. 發送請求: POST /peerreplication/batch
        EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
        int statusCode = response.getStatusCode();
        if (!isSuccess(statusCode)) {
        	// 3.1 服務器忙,重試
            if (statusCode == 503) {
                return ProcessingResult.Congestion;
            } else { // 其它異常,取消任務
                return ProcessingResult.PermanentError;
            }
        } else {
            handleBatchResponse(tasks, response.getEntity().getResponseList());
        }
    } catch (Throwable e) {
        // 3.2 讀超時,重試
        if (maybeReadTimeOut(e)) {
            return ProcessingResult.Congestion;
        // 3.3 網絡IO異常,重試
        } else if (isNetworkConnectException(e)) {
            logNetworkErrorSample(null, e);
            return ProcessingResult.TransientError;
        } else { // 其它異常,取消任務
            return ProcessingResult.PermanentError;
        }
    }
    return ProcessingResult.Success;
}

總結: 異常可以和上面對照一下,再看一下批處理到底是如何實現的。批處理實際是將多個消息任務 ReplicationTask 合並成一個任務 ReplicationList,而且轉發的路徑也變成 POST /peerreplication/batch

// 任務合並:List<ReplicationTask> -> ReplicationList
private ReplicationList createReplicationListOf(List<ReplicationTask> tasks) {
    ReplicationList list = new ReplicationList();
    for (ReplicationTask task : tasks) {
        list.addReplicationInstance(
            createReplicationInstanceOf((InstanceReplicationTask) task));
    }
    return list;
}

3. 附錄

附錄1:EurekaServerConfigBean 主要參數
參數 功能 默認值
peerEurekaNodesUpdateIntervalMs 定時刷新服務列表的時間 10min

每天用心記錄一點點。內容也許不重要,但習慣很重要!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM