Spring Cloud Eureka源碼分析---服務注冊


本篇我們着重分析Eureka服務端的邏輯實現,主要涉及到服務的注冊流程分析。

在Eureka的服務治理中,會涉及到下面一些概念:

服務注冊:Eureka Client會通過發送REST請求的方式向Eureka Server注冊自己的服務,提供自身的元數據,比如 IP 地址、端口、運行狀況指標的URL、主頁地址等信息。Eureka Server接收到注冊請求后,就會把這些元數據信息存儲在一個ConcurrentHashMap中。

服務續約:在服務注冊后,Eureka Client會維護一個心跳來持續通知Eureka Server,說明服務一直處於可用狀態,防止被剔除。Eureka Client在默認的情況下會每隔30秒發送一次心跳來進行服務續約。

服務同步:Eureka Server之間會互相進行注冊,構建Eureka Server集群,不同Eureka Server之間會進行服務同步,用來保證服務信息的一致性。

獲取服務:服務消費者(Eureka Client)在啟動的時候,會發送一個REST請求給Eureka Server,獲取上面注冊的服務清單,並且緩存在Eureka Client本地,默認緩存30秒。同時,為了性能考慮,Eureka Server也會維護一份只讀的服務清單緩存,該緩存每隔30秒更新一次。

服務調用:服務消費者在獲取到服務清單后,就可以根據清單中的服務列表信息,查找到其他服務的地址,從而進行遠程調用。Eureka有Region和Zone的概念,一個Region可以包含多個Zone,在進行服務調用時,優先訪問處於同一個Zone中的服務提供者。

服務下線:當Eureka Client需要關閉或重啟時,就不希望在這個時間段內再有請求進來,所以,就需要提前先發送REST請求給Eureka Server,告訴Eureka Server自己要下線了,Eureka Server在收到請求后,就會把該服務狀態置為下線(DOWN),並把該下線事件傳播出去。

服務剔除:有時候,服務實例可能會因為網絡故障等原因導致不能提供服務,而此時該實例也沒有發送請求給Eureka Server來進行服務下線,所以,還需要有服務剔除的機制。Eureka Server在啟動的時候會創建一個定時任務,每隔一段時間(默認60秒),從當前服務清單中把超時沒有續約(默認90秒)的服務剔除。

自我保護:既然Eureka Server會定時剔除超時沒有續約的服務,那就有可能出現一種場景,網絡一段時間內發生了異常,所有的服務都沒能夠進行續約,Eureka Server就把所有的服務都剔除了,這樣顯然不太合理。所以,就有了自我保護機制,當短時間內,統計續約失敗的比例,如果達到一定閾值,則會觸發自我保護的機制,在該機制下,Eureka Server不會剔除任何的微服務,等到正常后,再退出自我保護機制。

1. 基本原理:

  1. Eureka Server 提供服務注冊服務,各個節點啟動后,會在Eureka Server中進行注冊,這樣Eureka Server中的服務注冊表中將會存儲所有可用服務節點的信息,服務節點的信息可以在界面中直觀的看到;
  2. Eureka Client 是一個Java 客戶端,用於簡化與Eureka Server的交互,客戶端同時也具備一個內置的、使用輪詢負載算法的負載均衡器;
  3. 在應用啟動后,將會向Eureka Server發送心跳(默認周期為30秒),如果Eureka Server在多個心跳周期沒有收到某個節點的心跳,Eureka Server 將會從服務注冊表中把這個服務節點移除(默認90秒);
  4. Eureka Server之間將會通過復制的方式完成數據的同步;
  5. Eureka Client具有緩存的機制,即使所有的Eureka Server 都掛掉的話,客戶端依然可以利用緩存中的信息消費其它服務的API;

上一篇中我們搭建了一個簡單的Eureka客戶端和服務端。如果你有啟動過觀看啟動日志不難發現:

這里有個EurekaServerBootstrap類,啟動日志中給出:Setting the eureka configuration..,Initialized server context。看起來這個應該是個啟動類,跟進去看一下,有個很顯眼的方法:

這個方法的調用先按住不表,我們先從啟動類上添加的 EnableEurekaServer注解着手,看看為什么添加了一個注解就能激活 Rureka。

從server啟動類上的EnableEurekaServer注解進入:

  1. 接下來引用了EurekaServerMarkerConfiguration,看到在這個注解上有個注釋:啟用這個注解的目的是為了激活:EurekaServerAutoConfiguration類;

  2. 進入EurekaServerAutoConfiguration看到在類頭部有一個注解:

    @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
    

    EurekaServerAutoConfiguration啟動的條件是EurekaServerMarkerConfiguration注解先加載。

上面這一張圖標識出了從啟動注解到預啟動類的流程,但是你會發現實際上 EurekaServerAutoConfiguration 也沒有做什么事情:配置初始化,啟動一些基本的過濾器。同樣在類頭部的引用上有一個Import注解:

@Import(EurekaServerInitializerConfiguration.class)

所以在 EurekaServerAutoConfiguration 初始化的時候,會引用到 EurekaServerInitializerConfiguration,激活它的初始化。EurekaServerInitializerConfiguration 實現了SmartLifecycle.start方法,在spring 初始化的時候會被啟動,激活 run 方法。可以看到在 run 方法中調用的就是:

eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);

即我們上面截圖中的EurekaServerBootstrap.contextInitialized()方法。

整體的調用流程如下:

具體的初始化信息見下圖:

2. 服務注冊實現

2.1 server端啟動時同步別的server上的client

在上面講到Eureka server啟動過程中,啟動一個Eureka Client的時候,initEurekaServerContext()里面會進行服務同步和服務剔除,syncUp()方法所屬的類是:PeerAwareInstanceRegistry,即server端的服務注冊邏輯都在這里面。因為沒有使用AWS的服務器,所以默認實例化的實現類為:PeerAwareInstanceRegistryImpl。

PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
    registry = new AwsInstanceRegistry(
        eurekaServerConfig,
        eurekaClient.getEurekaClientConfig(),
        serverCodecs,
        eurekaClient
    );
    awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
    awsBinder.start();
} else {
    registry = new PeerAwareInstanceRegistryImpl(
        eurekaServerConfig,
        eurekaClient.getEurekaClientConfig(),
        serverCodecs,
        eurekaClient
    );
}

PeerAwareInstanceRegistryImpl 繼承了一個抽象類 AbstractInstanceRegistry:

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
    
}

AbstractInstanceRegistry中的實現邏輯是真正的服務注冊存儲所在地:

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
    private static final Logger logger = LoggerFactory.getLogger(AbstractInstanceRegistry.class);

    private static final String[] EMPTY_STR_ARRAY = new String[0];
    private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
    protected Map<String, RemoteRegionRegistry> regionNameVSRemoteRegistry = new HashMap<String, RemoteRegionRegistry>();
    protected final ConcurrentMap<String, InstanceStatus> overriddenInstanceStatusMap = CacheBuilder
            .newBuilder().initialCapacity(500)
            .expireAfterAccess(1, TimeUnit.HOURS)
            .<String, InstanceStatus>build().asMap();

    
 ....
 ....
 ....
}

所有的服務實例信息都保存在 server 本地的map當中。所以在server端啟動的時候會去拉別的server上存儲的client實例,然后存儲到本地緩存。

2.2 client主動注冊

如果是某個client主動發出了注冊請求,那么是如何注冊到服務端呢?

還是查看日志:啟動服務端,然后再啟動客戶端,查看服務端日志:

這里能看到剛才啟動的客戶端已經在服務端注冊了,注冊邏輯走的類是:AbstractInstanceRegistry。

上面也提到 是服務注冊的邏輯實現類,完成保存客戶端信息的方法是:

    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        ......
    }

代碼就不貼了,主要實現的邏輯是保存當前注冊的客戶端信息。我們知道客戶端是發送了一次http請求給服務端,那么真正的注冊邏輯應該是從一個http請求的接收處進來的。跟着使用了register方法的地方去找,PeerAwareInstanceRegistryImpl里面有調用:

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    //將新節點信息告訴別的服務端
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

這里沒有改寫父類的register邏輯,下面還多了一句:replicateToPeers,這里主要做的邏輯是:給兄弟 server節點發送register 請求,告訴他們有客戶端來注冊。

繼續看誰調用了這里,可以找到:ApplicationResource 的addInstance方法調用了:

@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
                            @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
......
  registry.register(info, "true".equals(isReplication));
  return Response.status(204).build();  // 204 to be backwards compatible
}

而這里很顯然是一個接口,邏輯就很清晰了:

另外,我們查看addInstance方法被誰調用的過程中發現:PeerReplicationResource--->batchReplication 方法也調用了注冊的邏輯。

這個方法一看竟然解答了之前我的疑惑:服務端之間是如何發送心跳的。原來實現是在這里。通過dispatch方法來區分當前的調用是何種請求,

可以看到,服務注冊,心跳檢測,服務取消,服務下線,服務剔除的入口都在這里:

@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
    try {
        ReplicationListResponse batchResponse = new ReplicationListResponse();
        for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
            try {
                batchResponse.addResponse(dispatch(instanceInfo));
            } catch (Exception e) {
                batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
                logger.error("{} request processing failed for batch item {}/{}",
                             instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e);
            }
        }
        return Response.ok(batchResponse).build();
    } catch (Throwable e) {
        logger.error("Cannot execute batch Request", e);
        return Response.status(Status.INTERNAL_SERVER_ERROR).build();
    }
}


private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
        ApplicationResource applicationResource = createApplicationResource(instanceInfo);
        InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);

        String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
        String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
        String instanceStatus = toString(instanceInfo.getStatus());

        Builder singleResponseBuilder = new Builder();
        switch (instanceInfo.getAction()) {
            case Register:
                singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
                break;
            case Heartbeat:
                singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
                break;
            case Cancel:
                singleResponseBuilder = handleCancel(resource);
                break;
            case StatusUpdate:
                singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
                break;
            case DeleteStatusOverride:
                singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
                break;
        }
        return singleResponseBuilder.build();
    }

從這個入口進去,大家可以跟蹤一下感興趣的邏輯。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM