所有文章
https://www.cnblogs.com/lay2017/p/11908715.html
正文
在eureka服務端注冊服務一文中,我們提到register方法做了兩件事
1)注冊服務實例信息到當前節點
2)復制服務實例信息到其它節點
本文關注第二點,復制服務實例信息到其它節點。為此,我們先簡單看一下register方法的代碼
打開PeerAwareInstanceRegistryImpl類的register方法
public void register(final InstanceInfo info, final boolean isReplication) { // ... super.register(info, leaseDuration, isReplication); // 復制到其它節點 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
replicateToPeers擔負了復制的功能,跟進它
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // 如果本次register操作本身就是復制,就不再復制到其它節點了 if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } // 遍歷左右節點 for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // 如果是當前節點,直接跳過 if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } // 復制操作觸發 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
這里其實就是向所有其它節點發送注冊請求
跟進replicateInstanceActionsToPeers,可以看到除了register還有一些其它操作一樣是需要同步到其它節點的。這里我們只關注register操作
private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { // ... } }
跟進register方法,復制實例信息被構造成了一個任務丟給了batchingDispatcher去異步執行,如果失敗將會重試。
public void register(final InstanceInfo info) throws Exception { long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); // 異步執行任務 batchingDispatcher.process( taskId("register", info), // 構造了一個復制實例信息的任務 new InstanceReplicationTask(targetHost, Action.Register, info, null, true) { public EurekaHttpResponse<Void> execute() { return replicationClient.register(info); } }, expiryTime ); }
InstanceReplicationTask的主要邏輯就是調用了replicationClient的register方法,跟進它
這里以Jersey的實現為例
public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { // ... } }
可以看到,其實就是發送了http請求到其它eureka Server端
總結
eureka Server會將register、cancel、heartbeat等操作從一個節點同步發送到其它節點,從而實現了復制的功能。eureka和zookeeper不一樣,它是遵循ap的,所以采用了最終一致性,並沒有像zookeeper一樣選擇強一致。eureka Server之間的維持最終一致性的細節點還是很多的,比如失敗重試、超時、心跳、實例的版本號、同一個節點的鎖控制等等。有興趣的話可以詳細了解它。