四、eureka服務端同步注冊操作


所有文章

https://www.cnblogs.com/lay2017/p/11908715.html

 

正文

eureka服務端注冊服務一文中,我們提到register方法做了兩件事

1)注冊服務實例信息到當前節點

2)復制服務實例信息到其它節點

本文關注第二點,復制服務實例信息到其它節點。為此,我們先簡單看一下register方法的代碼

打開PeerAwareInstanceRegistryImpl類的register方法

public void register(final InstanceInfo info, final boolean isReplication) {
    // ...
    super.register(info, leaseDuration, isReplication);
    // 復制到其它節點
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

 

replicateToPeers擔負了復制的功能,跟進它

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // 如果本次register操作本身就是復制,就不再復制到其它節點了
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        // 遍歷左右節點
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // 如果是當前節點,直接跳過
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // 復制操作觸發
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

這里其實就是向所有其它節點發送注冊請求

 

跟進replicateInstanceActionsToPeers,可以看到除了register還有一些其它操作一樣是需要同步到其它節點的。這里我們只關注register操作

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register: node.register(info); break; case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        // ... 
    }
}

 

跟進register方法,復制實例信息被構造成了一個任務丟給了batchingDispatcher去異步執行,如果失敗將會重試。

public void register(final InstanceInfo info) throws Exception {
    long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
    // 異步執行任務
    batchingDispatcher.process(
            taskId("register", info),
            // 構造了一個復制實例信息的任務
            new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                public EurekaHttpResponse<Void> execute() {
                    return replicationClient.register(info);
                }
            },
            expiryTime
    );
}

 

InstanceReplicationTask的主要邏輯就是調用了replicationClient的register方法,跟進它

這里以Jersey的實現為例

public EurekaHttpResponse<Void> register(InstanceInfo info) {
    String urlPath = "apps/" + info.getAppName();
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder
                .header("Accept-Encoding", "gzip")
                .type(MediaType.APPLICATION_JSON_TYPE)
                .accept(MediaType.APPLICATION_JSON)
                .post(ClientResponse.class, info);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        // ...
    }
}

可以看到,其實就是發送了http請求到其它eureka Server端

 

總結

eureka Server會將register、cancel、heartbeat等操作從一個節點同步發送到其它節點,從而實現了復制的功能。eureka和zookeeper不一樣,它是遵循ap的,所以采用了最終一致性,並沒有像zookeeper一樣選擇強一致。eureka Server之間的維持最終一致性的細節點還是很多的,比如失敗重試、超時、心跳、實例的版本號、同一個節點的鎖控制等等。有興趣的話可以詳細了解它。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM