前言
前情回顧
上一講看了Eureka 注冊中心的自我保護機制,以及里面提到的bug問題。
哈哈 轉眼間都2020年了,這個系列的文章從12.17 一直寫到現在,也是不容易哈,每天持續不斷學習,輸出博客,這一段時間確實收獲很多。
今天在公司給組內成員分享了Eureka源碼剖析,反響效果還可以,也算是感覺收獲了點東西。后面還會繼續feign、ribbon、hystrix的源碼學習,依然文章連載的形式輸出。
本講目錄
本講主要是EurekaServer集群模式的數據同步講解,主要目錄如下。
目錄如下:
- eureka server集群機制
- 注冊、下線、續約的注冊表同步機制
- 注冊表同步三層隊列機制詳解
技術亮點:
- 3層隊列機制實現注冊表的批量同步需求
說明
原創不易,如若轉載 請標明來源!
博客地址:一枝花算不算浪漫
微信公眾號:壹枝花算不算浪漫
源碼分析
eureka server集群機制
Eureka Server會在注冊、下線、續約的時候進行數據同步,將信息同步到其他Eureka Server節點。
可以想象到的是,這里肯定不會是實時同步的,往后繼續看注冊表的同步機制吧。
注冊、下線、續約的注冊表同步機制
我們以Eureka Client注冊為例,看看Eureka Server是如何同步給其他節點的。
PeerAwareInstanceRegistryImpl.java
:
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
- 注冊完成后,調用
replicateToPeers()
,注意這里面有一個參數isReplication
,如果是true,代表是其他Eureka Server節點同步的,false則是EurekaClient注冊來的。 replicateToPeers()
中一段邏輯,如果isReplication
為true則直接跳出,這里意思是client注冊來的服務實例需要向其他節點擴散,如果不是則不需要去同步peerEurekaNodes.getPeerEurekaNodes()
拿到所有的Eureka Server節點,循環遍歷去同步數據,調用replicateInstanceActionsToPeers()
replicateInstanceActionsToPeers()
方法中根據注冊、下線、續約等去處理不同邏輯
接下來就是真正執行同步邏輯的地方,這里主要用了三層隊列對同步請求進行了batch操作,將請求打成一批批 然后向各個EurekaServer進行http請求。
注冊表同步三層隊列機制詳解
到了這里就是真正進入了同步的邏輯,這里還是以上面注冊邏輯為主線,接着上述代碼繼續往下跟:
PeerEurekaNode.java
:
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
這里會執行batchingDispatcher.process()
方法,我們繼續點進去,然后會進入 TaskDispatchers.createBatchingTaskDispatcher()
方法,查看其中的匿名內部類中的process()
方法:
void process(ID id, T task, long expiryTime) {
// 將請求都放入到acceptorQueue中
acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
acceptedTasks++;
}
將需要同步的Task數據放入到acceptorQueue
隊列中。
接着回到createBatchingTaskDispatcher()
方法中,看下AcceptorExecutor
,它的構造函數中會啟動一個后台線程:
ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
this.acceptorThread = new Thread(threadGroup, new AcceptorRunner(), "TaskAcceptor-" + id);
我們繼續跟AcceptorRunner.java
:
class AcceptorRunner implements Runnable {
@Override
public void run() {
long scheduleTime = 0;
while (!isShutdown.get()) {
try {
// 處理acceptorQueue隊列中的數據
drainInputQueues();
int totalItems = processingOrder.size();
long now = System.currentTimeMillis();
if (scheduleTime < now) {
scheduleTime = now + trafficShaper.transmissionDelay();
}
if (scheduleTime <= now) {
// 將processingOrder拆分成一個個batch,然后進行操作
assignBatchWork();
assignSingleItemWork();
}
// If no worker is requesting data or there is a delay injected by the traffic shaper,
// sleep for some time to avoid tight loop.
if (totalItems == processingOrder.size()) {
Thread.sleep(10);
}
} catch (InterruptedException ex) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery AcceptorThread error", e);
}
}
}
private void drainInputQueues() throws InterruptedException {
do {
drainAcceptorQueue();
if (!isShutdown.get()) {
// If all queues are empty, block for a while on the acceptor queue
if (reprocessQueue.isEmpty() && acceptorQueue.isEmpty() && pendingTasks.isEmpty()) {
TaskHolder<ID, T> taskHolder = acceptorQueue.poll(10, TimeUnit.MILLISECONDS);
if (taskHolder != null) {
appendTaskHolder(taskHolder);
}
}
}
} while (!reprocessQueue.isEmpty() || !acceptorQueue.isEmpty() || pendingTasks.isEmpty());
}
private void drainAcceptorQueue() {
while (!acceptorQueue.isEmpty()) {
// 將acceptor隊列中的數據放入到processingOrder隊列中去,方便后續拆分成batch
appendTaskHolder(acceptorQueue.poll());
}
}
private void appendTaskHolder(TaskHolder<ID, T> taskHolder) {
if (isFull()) {
pendingTasks.remove(processingOrder.poll());
queueOverflows++;
}
TaskHolder<ID, T> previousTask = pendingTasks.put(taskHolder.getId(), taskHolder);
if (previousTask == null) {
processingOrder.add(taskHolder.getId());
} else {
overriddenTasks++;
}
}
}
認真跟這里面的代碼,可以看到這里是將上面的acceptorQueue
放入到processingOrder
, 其中processingOrder
也是一個隊列。
在AcceptorRunner.java
的run()
方法中,還會調用assignBatchWork()
方法,這里面就是將processingOrder
打成一個個batch,接着看代碼:
void assignBatchWork() {
if (hasEnoughTasksForNextBatch()) {
if (batchWorkRequests.tryAcquire(1)) {
long now = System.currentTimeMillis();
int len = Math.min(maxBatchingSize, processingOrder.size());
List<TaskHolder<ID, T>> holders = new ArrayList<>(len);
while (holders.size() < len && !processingOrder.isEmpty()) {
ID id = processingOrder.poll();
TaskHolder<ID, T> holder = pendingTasks.remove(id);
if (holder.getExpiryTime() > now) {
holders.add(holder);
} else {
expiredTasks++;
}
}
if (holders.isEmpty()) {
batchWorkRequests.release();
} else {
batchSizeMetric.record(holders.size(), TimeUnit.MILLISECONDS);
// 將批量數據放入到batchWorkQueue中
batchWorkQueue.add(holders);
}
}
}
}
private boolean hasEnoughTasksForNextBatch() {
if (processingOrder.isEmpty()) {
return false;
}
// 默認maxBufferSize為250
if (pendingTasks.size() >= maxBufferSize) {
return true;
}
TaskHolder<ID, T> nextHolder = pendingTasks.get(processingOrder.peek());
// 默認maxBatchingDelay為500ms
long delay = System.currentTimeMillis() - nextHolder.getSubmitTimestamp();
return delay >= maxBatchingDelay;
}
這里加入batch的規則是:maxBufferSize
默認為250
maxBatchingDelay
默認為500ms,打成一個個batch后就開始發送給server端。至於怎么發送 我們接着看 PeerEurekaNode.java
, 我們在最開始調用register()
方法就是調用PeerEurekaNode.register()
, 我們來看看它的構造方法:
PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
HttpReplicationClient replicationClient, EurekaServerConfig config,
int batchSize, long maxBatchingDelayMs,
long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
this.registry = registry;
this.targetHost = targetHost;
this.replicationClient = replicationClient;
this.serviceUrl = serviceUrl;
this.config = config;
this.maxProcessingDelayMs = config.getMaxTimeForReplication();
String batcherName = getBatcherName();
ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
batcherName,
config.getMaxElementsInPeerReplicationPool(),
batchSize,
config.getMaxThreadsForPeerReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
}
這里會實例化一個ReplicationTaskProcessor.java
, 我們跟進去,發下它是實現TaskProcessor
的,所以一定會執行此類中的process()
方法,執行方法如下:
public ProcessingResult process(List<ReplicationTask> tasks) {
ReplicationList list = createReplicationListOf(tasks);
try {
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
if (statusCode == 503) {
logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
return ProcessingResult.Congestion;
} else {
// Unexpected error returned from the server. This should ideally never happen.
logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
return ProcessingResult.PermanentError;
}
} else {
handleBatchResponse(tasks, response.getEntity().getResponseList());
}
} catch (Throwable e) {
if (isNetworkConnectException(e)) {
logNetworkErrorSample(null, e);
return ProcessingResult.TransientError;
} else {
logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
這里面是將List<ReplicationTask> tasks
通過submitBatchUpdate()
發送給server端。
server端在PeerReplicationResource.batchReplication()
去處理,實際上就是循環調用ApplicationResource.addInstance()
方法,又回到了最開始注冊的方法。
到此 EurekaServer同步的邏輯就結束了,這里主要是三層隊列的數據結構很繞,通過一個batchList去批量同步數據的。
注意這里還有一個很重要的點,就是Client注冊時調用addInstance()方法,這里到了server端PeerAwareInstanceRegistryImpl
會執行同步其他EurekaServer邏輯。
而EurekaServer同步注冊接口仍然會調用addInstance()方法,這里難不成就死循環調用了?當然不是,addInstance()中也有個參數:isReplication
, 在最后調用server端方法的時候如下:registry.register(info, "true".equals(isReplication));
我們知道,EurekaClient在注冊的時候isReplication
傳遞為空,所以這里為false,而Server端同步的時候調用:
PeerReplicationResource
:
private static Builder handleRegister(ReplicationInstance instanceInfo, ApplicationResource applicationResource) {
applicationResource.addInstance(instanceInfo.getInstanceInfo(), REPLICATION);
return new Builder().setStatusCode(Status.OK.getStatusCode());
}
這里的REPLICATION
為true
另外在AbstractJersey2EurekaHttpClient
中發送register請求的時候,有個addExtraHeaders()
方法,如下圖:
如果是使用的Jersey2ReplicationClient
發送的,那么header中的x-netflix-discovery-replication
配置則為true,在后面執行注冊的addInstance()
方法中會接收這個參數的:
總結
仍然一圖流,文中解析的內容都包含在這張圖中了:
申明
本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫