概述
如上圖,server1和server2之間會拉取對方的注冊表,相互的注冊,當client往集群中進行注冊的時候,如果是請求到server1上,server1會將這個請求同步到server2,下線心跳也是如此,集群之間的同步是通過3層隊列任務批處理的方式進行的。
集群的初始化
集群啟動
在EurekaBootStrap的初始化的過程中,第一步會先初始化eureka的環境,在初始化eureka的上下文環境。其中就有initEurekaServerContext下面的一段代碼
PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
applicationInfoManager
);
protected PeerEurekaNodes getPeerEurekaNodes(PeerAwareInstanceRegistry registry, EurekaServerConfig eurekaServerConfig, EurekaClientConfig eurekaClientConfig, ServerCodecs serverCodecs, ApplicationInfoManager applicationInfoManager) {
PeerEurekaNodes peerEurekaNodes = new PeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClientConfig,
serverCodecs,
applicationInfoManager
);
return peerEurekaNodes;
}
會得到一個PeerEurekaNodes,它的構造方法如下:
public class PeerEurekaNodes {
private static final Logger logger = LoggerFactory.getLogger(PeerEurekaNodes.class);
/**
* 應用實例注冊表
*/
protected final PeerAwareInstanceRegistry registry;
/**
* Eureka-Server 配置
*/
protected final EurekaServerConfig serverConfig;
/**
* Eureka-Client 配置
*/
protected final EurekaClientConfig clientConfig;
/**
* Eureka-Server 編解碼
*/
protected final ServerCodecs serverCodecs;
/**
* 應用實例信息管理器
*/
private final ApplicationInfoManager applicationInfoManager;
/**
* Eureka-Server 集群節點數組
*/
private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
/**
* Eureka-Server 服務地址數組
*/
private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
/**
* 定時任務服務
*/
private ScheduledExecutorService taskExecutor;
@Inject
public PeerEurekaNodes(
PeerAwareInstanceRegistry registry,
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
ApplicationInfoManager applicationInfoManager) {
this.registry = registry;
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
this.applicationInfoManager = applicationInfoManager;
}
}
其實是在處理eureka server集群信息的初始化,會執行PeerEurekaNodes.start()方法
//完成eureka-server上下文的構建以及初始化過程
serverContext = new DefaultEurekaServerContext(
eurekaServerConfig,
serverCodecs,
registry,
peerEurekaNodes,
applicationInfoManager
);
//初始化的代碼就在下面一行
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
peerEurekaNodes.start();
try {
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
@PostConstruct的執行順序就在DefaultEurekaServerContext的構造函數的后面開始執行,結合前面的流程,也就是說一開始先構造出peerEurekaNodes類,然后傳進DefaultEurekaServerContext的有參構造中,在進行初始化。
調用 PeerEurekaNodes#start()
方法,集群節點啟動。
- 初始化集群節點信息
- 初始化固定周期( 默認:10 分鍾,可配置 )更新集群節點信息的任務
public void start() {
//先創建一個定時調度
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
//第一次初始化的時候,自己先初始化一遍
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
//在創建一個線程,10分鍾去更新一下初始化的操作,用來移除添加新的server節點。
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
解析配置文件中的其他eureka server的url地址,基於url地址構造一個一個的PeerEurekaNode,一個PeerEurekaNode就代表了一個eureka server。啟動一個后台的線程,默認是每隔10分鍾,會運行一個任務,就是基於配置文件中的url來刷新eureka server列表。
更新集群信息
調用 #resolvePeerUrls()
方法,獲得 Eureka-Server 集群服務地址數組,不包含自己的
/**
* Resolve peer URLs.
*
* @return peer URLs with node's own URL filtered out
*/
protected List<String> resolvePeerUrls() {
InstanceInfo myInfo = applicationInfoManager.getInfo();
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
List<String> replicaUrls = EndpointUtils
.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
int idx = 0;
while (idx < replicaUrls.size()) {
//判斷是否是自己的url,是的話,進行移除
if (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
} else {
idx++;
}
}
return replicaUrls;
}
public boolean isThisMyUrl(String url) {
final String myUrlConfigured = serverConfig.getMyUrl();
if (myUrlConfigured != null) {
return myUrlConfigured.equals(url);
}
return isInstanceURL(url, applicationInfoManager.getInfo());
}
調用 #updatePeerEurekaNodes()
方法,更新集群節點信息,主要完成兩部分邏輯:
- 添加新增的集群節點
- 關閉刪除的集群節點
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
// 計算 刪除的集群節點地址
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
// 計算 新增的集群節點地址
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
// Remove peers no long available
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
//關閉刪除的集群節點
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// 添加新的節點
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
獲取注冊信息
初始化完成以后,就在Bootstrap的方法中,繼續往下走,來到int registryCount = registry.syncUp();
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
//默認可以重試5次拉取注冊表
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
// 如果第一次沒有在自己本地的eureka client中獲取注冊表
// 說明自己的本地eureka client還沒有從任何其他的eureka server上獲取注冊表
// 所以此時重試,等待30秒
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
//eureka server自己本身本來就是個eureka client,在初始化的時候,就會去找任意的一個eureka server
// 拉取注冊表到自己本地來,把這個注冊表放到自己身上來,作為自己這個eureka server的注冊表
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
在拉取失敗的時候,會等30s后,繼續拉取。
集群注冊信息同步
- Eureka-Server 接收到 Eureka-Client 的 Register、Heartbeat、Cancel、StatusUpdate、DeleteStatusOverride 操作,固定間隔( 默認值 :500 毫秒,可配 )向 Eureka-Server 集群內其他節點同步( 准實時,非實時 )。
ApplicationResource的addInstance()方法,負責注冊,現在自己本地完成一個注冊,接着會replicateToPeers()方法,這個方法就會將這次注冊請求,同步到其他所有的eureka server上去。
如果是某台eureka client來找eureka server進行注冊,isReplication是false,此時會給其他所有的你配置的eureka server都同步這個注冊請求,此時一定會基於jersey,調用其他所有的eureka server的restful接口,去執行這個服務實例的注冊的請求
eureka-core-jersey2的工程,ReplicationHttpClient,此時同步注冊請求給其他eureka server的時候,一定會將isReplication設置為true,其他eureka server接到這個同步的請求,僅僅在自己本地執行,不會再次向其他的eureka server去進行注冊
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
// Eureka-Server 復制
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// 集群為空 或者isReplication 為true
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
PeerAwareInstanceRegistryImpl#replicateInstanceActionsToPeers(...)
方法,代碼如下:
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
} finally {
CurrentRequestVersion.remove();
}
}
- Cancel :調用
PeerEurekaNode#cancel(...)
方法, - Heartbeat :調用
PeerEurekaNode#heartbeat(...)
方法 - Register :調用
PeerEurekaNode#register(...)
方法 - StatusUpdate :調用
PeerEurekaNode#statusUpdate(...)
方法 - DeleteStatusOverride :調用
PeerEurekaNode#deleteStatusOverride(...)
方法
隨便打開其中的一個方法查看:
public void cancel(final String appName, final String id) throws Exception {
long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
batchingDispatcher.process(
taskId("cancel", appName, id),
new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
@Override
public EurekaHttpResponse<Void> execute() {
return replicationClient.cancel(appName, id);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
logger.warn("{}: missing entry.", getTaskName());
}
}
},
expiryTime
);
}
//相同應用實例的相同同步操作使用相同任務編號
private static String taskId(String requestType, String appName, String id) {
return requestType + '#' + appName + '/' + id;
}
這里會把一個任務封裝成一個InstanceReplicationTask,交給batchingDispatcher,進行處理。
/* For testing */ PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
HttpReplicationClient replicationClient, EurekaServerConfig config,
int batchSize, long maxBatchingDelayMs,
long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
this.registry = registry;
this.targetHost = targetHost;
this.replicationClient = replicationClient;
this.serviceUrl = serviceUrl;
this.config = config;
this.maxProcessingDelayMs = config.getMaxTimeForReplication();
String batcherName = getBatcherName();
ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
batcherName,
config.getMaxElementsInPeerReplicationPool(),
batchSize,
config.getMaxThreadsForPeerReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
targetHost,
config.getMaxElementsInStatusReplicationPool(),
config.getMaxThreadsForStatusReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
}
public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id,
int maxBufferSize,
int workloadSize,
int workerCount,
long maxBatchingDelay,
long congestionRetryDelayMs,
long networkFailureRetryMs,
TaskProcessor<T> taskProcessor) {
final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor<>(
id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
);
final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
return new TaskDispatcher<ID, T>() {
@Override
public void process(ID id, T task, long expiryTime) {
acceptorExecutor.process(id, task, expiryTime);
}
@Override
public void shutdown() {
acceptorExecutor.shutdown();
taskExecutor.shutdown();
}
};
}
其中,createBatchingTaskDispatcher進行創建的時候,會把process進行重寫,最終是由acceptorExecutor進行處理。
void process(ID id, T task, long expiryTime) {
acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
acceptedTasks++;
}
他會把之前封裝的任務放到acceptorQueue中,在AcceptorExecutor的構造器中,會啟動一個acceptorThread回台進程。
AcceptorExecutor(String id,
int maxBufferSize,
int maxBatchingSize,
long maxBatchingDelay,
long congestionRetryDelayMs,
long networkFailureRetryMs) {
this.id = id;
this.maxBufferSize = maxBufferSize;
this.maxBatchingSize = maxBatchingSize;
this.maxBatchingDelay = maxBatchingDelay;
this.trafficShaper = new TrafficShaper(congestionRetryDelayMs, networkFailureRetryMs);
ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
this.acceptorThread.setDaemon(true);
this.acceptorThread.start();
final double[] percentiles = {50.0, 95.0, 99.0, 99.5};
final StatsConfig statsConfig = new StatsConfig.Builder()
.withSampleSize(1000)
.withPercentiles(percentiles)
.withPublishStdDev(true)
.build();
final MonitorConfig config = MonitorConfig.builder(METRIC_REPLICATION_PREFIX + "batchSize").build();
this.batchSizeMetric = new StatsTimer(config, statsConfig);
try {
Monitors.registerObject(id, this);
} catch (Throwable e) {
logger.warn("Cannot register servo monitor for this object", e);
}
}
啟動后去執行AcceptorRunner的run方法。
class AcceptorRunner implements Runnable {
@Override
public void run() {
long scheduleTime = 0;
while (!isShutdown.get()) {
try {
drainInputQueues();
int totalItems = processingOrder.size();
long now = System.currentTimeMillis();
if (scheduleTime < now) {
scheduleTime = now + trafficShaper.transmissionDelay();
}
if (scheduleTime <= now) {
assignBatchWork();
assignSingleItemWork();
}
// If no worker is requesting data or there is a delay injected by the traffic shaper,
// sleep for some time to avoid tight loop.
if (totalItems == processingOrder.size()) {
Thread.sleep(10);
}
} catch (InterruptedException ex) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery AcceptorThread error", e);
}
}
}
private void drainInputQueues() throws InterruptedException {
do {
drainReprocessQueue();
drainAcceptorQueue();
if (isShutdown.get()) {
break;
}
// If all queues are empty, block for a while on the acceptor queue
if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
if (taskHolder != null) {
appendTaskHolder(taskHolder);
}
}
} while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
}
private void drainAcceptorQueue() {
while (!acceptorQueue.isEmpty()) {
appendTaskHolder(acceptorQueue.poll());
}
}
runner這個后台線程,會把acceptorQueue的task任務移到processingOrder。接着就會把processingOrder的任務進行打包批量的放到batchWorkQueue中。
void assignBatchWork() {
if (hasEnoughTasksForNextBatch()) {
if (batchWorkRequests.tryAcquire(1)) {
long now = System.currentTimeMillis();
int len = Math.min(maxBatchingSize, processingOrder.size());
List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
while (holders.size() < len && !processingOrder.isEmpty()) {
ID id = processingOrder.poll();
TaskHolder<ID, T> holder = pendingTasks.remove(id);
if (holder.getExpiryTime() > now) {
holders.add(holder);
} else {
expiredTasks++;
}
}
if (holders.isEmpty()) {
batchWorkRequests.release();
} else {
batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
batchWorkQueue.add(holders);
}
}
}
}
private boolean hasEnoughTasksForNextBatch() {
if (processingOrder.isEmpty()) {
return false;
}
if (pendingTasks.size() >= maxBufferSize) {
return true;
}
TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
return delay >= maxBatchingDelay;
}
}
最后是由ReplicationTaskProcessor去執行Jersey2ReplicationClient#submitBatchUpdates
@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
ReplicationList list = createReplicationListOf(tasks);
try {
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
if (statusCode == 503) {
logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
return ProcessingResult.Congestion;
} else {
// Unexpected error returned from the server. This should ideally never happen.
logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
return ProcessingResult.PermanentError;
}
} else {
handleBatchResponse(tasks, response.getEntity().getResponseList());
}
} catch (Throwable e) {
if (maybeReadTimeOut(e)) {
logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
//read timeout exception is more Congestion then TransientError, return Congestion for longer delay
return ProcessingResult.Congestion;
} else if (isNetworkConnectException(e)) {
logNetworkErrorSample(null, e);
return ProcessingResult.TransientError;
} else {
logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
@Override
public EurekaHttpResponse<ReplicationListResponse> submitBatchUpdates(ReplicationList replicationList) {
Response response = null;
try {
response = jerseyClient.target(serviceUrl)
.path(PeerEurekaNode.BATCH_URL_PATH)
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.json(replicationList));
if (!isSuccess(response.getStatus())) {
return anEurekaHttpResponse(response.getStatus(), ReplicationListResponse.class).build();
}
ReplicationListResponse batchResponse = response.readEntity(ReplicationListResponse.class);
return anEurekaHttpResponse(response.getStatus(), batchResponse).type(MediaType.APPLICATION_JSON_TYPE).build();
} finally {
if (response != null) {
response.close();
}
}
}
去發送一個peerreplication/batch/
接口,映射 PeerReplicationResource#batchReplication(...)
方法,代碼如下:
@Path("batch")
@POST
public Response batchReplication(ReplicationList replicationList) {
try {
ReplicationListResponse batchResponse = new ReplicationListResponse();
for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {
try {
// 逐個同步操作任務處理,並將處理結果( ReplicationInstanceResponse ) 合並到 ReplicationListResponse 。
batchResponse.addResponse(dispatch(instanceInfo));
} catch (Exception e) {
batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));
logger.error("{} request processing failed for batch item {}/{}",
instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e);
}
}
return Response.ok(batchResponse).build();
} catch (Throwable e) {
logger.error("Cannot execute batch Request", e);
return Response.status(Status.INTERNAL_SERVER_ERROR).build();
}
}
private ReplicationInstanceResponse dispatch(ReplicationInstance instanceInfo) {
ApplicationResource applicationResource = createApplicationResource(instanceInfo);
InstanceResource resource = createInstanceResource(instanceInfo, applicationResource);
String lastDirtyTimestamp = toString(instanceInfo.getLastDirtyTimestamp());
String overriddenStatus = toString(instanceInfo.getOverriddenStatus());
String instanceStatus = toString(instanceInfo.getStatus());
Builder singleResponseBuilder = new Builder();
switch (instanceInfo.getAction()) {
case Register:
singleResponseBuilder = handleRegister(instanceInfo, applicationResource);
break;
case Heartbeat:
singleResponseBuilder = handleHeartbeat(serverConfig, resource, lastDirtyTimestamp, overriddenStatus, instanceStatus);
break;
case Cancel:
singleResponseBuilder = handleCancel(resource);
break;
case StatusUpdate:
singleResponseBuilder = handleStatusUpdate(instanceInfo, resource);
break;
case DeleteStatusOverride:
singleResponseBuilder = handleDeleteStatusOverride(instanceInfo, resource);
break;
}
return singleResponseBuilder.build();
}
1、集群同步的機制:閃光點,client可以找任何一個server發送請求,然后這個server會將請求同步到其他所有的server上去,但是其他的server僅僅會在自己本地執行,不會再次同步了
2、數據同步的異步批處理機制:閃光點,三個隊列,第一個隊列,就是純寫入;第二個隊列,是用來根據時間和大小,來拆分隊列;第三個隊列,用來放批處理任務 ==》 異步批處理機制