溫馨提示:
本文內容基於個人學習Nacos 2.0.1版本代碼總結而來,因個人理解差異,不保證完全正確。如有理解錯誤之處歡迎各位拍磚指正,相互學習;轉載請注明出處。
什么是健康檢查?
本人理解的健康檢查是Nacos對服務端的各種連接狀態的一種管理。比如服務端和數據庫的連接是否正常,一個HTTP的連接是否正常,一個TCP的連接是否正常,注冊的服務是否正常可用。要對什么樣的連接做健康檢查,檢查什么內容完全可以自己擴展。
健康檢查整體設計

- 負責執行健康檢查的任務是
NacosHealthCheckTask, 目前有兩種實現ClientBeatCheckTaskV2和HealthCheckTaskV2;前者處理心跳相關的狀態,后者處理各種連接的狀態。
- ClientBeatCheckTaskV2 在執行健康檢查過程中會使用InstanceBeatCheckTaskInterceptorChain中的攔截器列表對將要進行的任務進行攔截處理。這個將要進行的任務是
InstanceBeatCheckTask,它內部維護了一個Checker列表,用於添加額外的檢查器。
- HealthCheckTaskV2 在執行健康檢查過程中會使用HealthCheckProcessorV2Delegate對任務進行處理。
在nacos-naming模塊下的com.alibaba.nacos.naming.healthcheck包中定義了健康檢查相關的內容,通過觀察這個包下面的類可以發現幾個關鍵字Processor、Task、Interceptor、Checker。它們將扮演不同角色共同組成一個完整的健康檢查功能。本節將單獨分析攔截器的具體實現,並不涉及攔截器的整體串聯流程。請查看《攔截器機制》。
攔截器系列(Interceptor)
Naming模塊下的攔截器主要用於攔截將要執行的任務,對任務進行一些驗證處理。下圖展示了攔截器接口的繼承關系:

Interceptable
所有需要被攔截器處理的任務都需要實現此接口,它定義了被攔截之后和未被攔截時的執行流程。
/**
* Interceptable Interface.
*
* @author xiweng.yy
*/
public interface Interceptable {
/**
* 若沒有攔截器攔截此對象,此方法會被調用
* If no {@link NacosNamingInterceptor} intercept this object, this method will be called to execute.
*/
void passIntercept();
/**
* 若此對象被攔截器攔截,此方法會被調用
* If one {@link NacosNamingInterceptor} intercept this object, this method will be called.
*/
void afterIntercept();
}
InstanceBeatCheckTask
用於檢查心跳的執行狀態。
/**
* Instance beat check task.
* 實例心跳檢查任務
* @author xiweng.yy
*/
public class InstanceBeatCheckTask implements Interceptable {
/**
* 檢查器集合
*/
private static final List<InstanceBeatChecker> CHECKERS = new LinkedList<>();
private final IpPortBasedClient client;
private final Service service;
private final HealthCheckInstancePublishInfo instancePublishInfo;
static {
// 初始化時獲取檢查器
CHECKERS.add(new UnhealthyInstanceChecker());
CHECKERS.add(new ExpiredInstanceChecker());
CHECKERS.addAll(NacosServiceLoader.load(InstanceBeatChecker.class));
}
public InstanceBeatCheckTask(IpPortBasedClient client, Service service, HealthCheckInstancePublishInfo instancePublishInfo) {
this.client = client;
this.service = service;
this.instancePublishInfo = instancePublishInfo;
}
@Override
public void passIntercept() {
// 當沒有被攔截的時候執行檢查
for (InstanceBeatChecker each : CHECKERS) {
each.doCheck(client, service, instancePublishInfo);
}
}
@Override
public void afterIntercept() {
}
public IpPortBasedClient getClient() {
return client;
}
public Service getService() {
return service;
}
public HealthCheckInstancePublishInfo getInstancePublishInfo() {
return instancePublishInfo;
}
}
NacosNamingInterceptor
NacosNamingInterceptor接口限定了它的實現必須是處理Interceptable類型。它最主要的功能就是定義了攔截機制。
/**
* Nacos naming interceptor.
*
* @author xiweng.yy
*/
public interface NacosNamingInterceptor<T extends Interceptable> {
/**
* Judge whether the input type is intercepted by this Interceptor.
* 判斷輸入的參數是否是當前攔截器可處理的類型
* <p>This method only should judge the object type whether need be do intercept. Not the intercept logic.
*
* @param type type
* @return true if the input type is intercepted by this Interceptor, otherwise false
*/
boolean isInterceptType(Class<?> type);
/**
* Do intercept operation.
* 攔截后的操作
* <p>This method is the actual intercept operation.
*
* @param object need intercepted object
* @return true if object is intercepted, otherwise false
*/
boolean intercept(T object);
/**
* The order of interceptor. The lower the number, the earlier the execution.
* 攔截器的優先級,數字越低,優先級越高
* @return the order number of interceptor
*/
int order();
}
AbstractHealthCheckInterceptor
這個抽象類用於限定它的子類只可以攔截NacosHealthCheckTask類型的任務。
/**
* Abstract health check interceptor.
*
* @author xiweng.yy
*/
public abstract class AbstractHealthCheckInterceptor implements NacosNamingInterceptor<NacosHealthCheckTask> {
@Override
public boolean isInterceptType(Class<?> type) {
return NacosHealthCheckTask.class.isAssignableFrom(type);
}
}

HealthCheckResponsibleInterceptor
用於攔截NacosHealthCheckTask類型的任務,攔截之后判斷當前處理的任務是否應該由當前節點處理。它的優先級被設為最高級-1。
/**
* Health check responsible interceptor.
* 判斷是否需要攔截處理
* @author xiweng.yy
*/
public class HealthCheckResponsibleInterceptor extends AbstractHealthCheckInterceptor {
@Override
public boolean intercept(NacosHealthCheckTask object) {
return !ApplicationUtils.getBean(DistroMapper.class).responsible(object.getTaskId());
}
@Override
public int order() {
return Integer.MIN_VALUE + 1;
}
}
HealthCheckEnableInterceptor
用於攔截NacosHealthCheckTask類型的任務,攔截之后判斷當前節點是否開啟了健康檢查。它的優先級最高。
/**
* Health check enable interceptor.
* 檢查是否開啟了健康檢查
* @author xiweng.yy
*/
public class HealthCheckEnableInterceptor extends AbstractHealthCheckInterceptor {
@Override
public boolean intercept(NacosHealthCheckTask object) {
try {
return !ApplicationUtils.getBean(SwitchDomain.class).isHealthCheckEnabled() || !ApplicationUtils
.getBean(UpgradeJudgement.class).isUseGrpcFeatures();
} catch (Exception e) {
return true;
}
}
@Override
public int order() {
return Integer.MIN_VALUE;
}
}
通過兩個實現可以看出
AbstractHealthCheckInterceptor主要用於檢查被攔截的NacosHealthCheckTask任務是否應當執行后續的攔截邏輯。很顯然優先級最高的攔截器HealthCheckEnableInterceptor直接決定了任務是否需要繼續執行下去。
AbstractBeatCheckInterceptor
這個抽象類用於限定它的子類只可以攔截InstanceBeatCheckTask類型的任務。
/**
* Abstract Beat check Interceptor.
* 抽象心跳檢查攔截器
* @author xiweng.yy
*/
public abstract class AbstractBeatCheckInterceptor implements NacosNamingInterceptor<InstanceBeatCheckTask> {
@Override
public boolean isInterceptType(Class<?> type) {
// 指定它攔截InstanceBeatCheckTask
return InstanceBeatCheckTask.class.isAssignableFrom(type);
}
}

InstanceEnableBeatCheckInterceptor
用於攔截InstanceBeatCheckTask類型的任務,攔截之后用於判斷當前的Instance心跳檢查任務是否開啟。
/**
* Instance enable beat check interceptor.
* 用於檢查Instance是否開啟了心跳檢查的攔截器
* @author xiweng.yy
*/
public class InstanceEnableBeatCheckInterceptor extends AbstractBeatCheckInterceptor {
@Override
public boolean intercept(InstanceBeatCheckTask object) {
NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);
HealthCheckInstancePublishInfo instance = object.getInstancePublishInfo();
Optional<InstanceMetadata> metadata = metadataManager.getInstanceMetadata(object.getService(), instance.getMetadataId());
if (metadata.isPresent() && metadata.get().getExtendData().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {
return ConvertUtils.toBoolean(metadata.get().getExtendData().get(UtilsAndCommons.ENABLE_CLIENT_BEAT).toString());
}
if (instance.getExtendDatum().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {
return ConvertUtils.toBoolean(instance.getExtendDatum().get(UtilsAndCommons.ENABLE_CLIENT_BEAT).toString());
}
return false;
}
@Override
public int order() {
return Integer.MIN_VALUE + 1;
}
}
ServiceEnableBeatCheckInterceptor
用於攔截InstanceBeatCheckTask類型的任務,攔截之后用於判斷當前的Service心跳檢查任務是否開啟。
/**
* Service enable beat check interceptor.
* 用於檢查Service是否開啟了心跳檢查的攔截器
* @author xiweng.yy
*/
public class ServiceEnableBeatCheckInterceptor extends AbstractBeatCheckInterceptor {
@Override
public boolean intercept(InstanceBeatCheckTask object) {
NamingMetadataManager metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);
Optional<ServiceMetadata> metadata = metadataManager.getServiceMetadata(object.getService());
if (metadata.isPresent() && metadata.get().getExtendData().containsKey(UtilsAndCommons.ENABLE_CLIENT_BEAT)) {
return Boolean.parseBoolean(metadata.get().getExtendData().get(UtilsAndCommons.ENABLE_CLIENT_BEAT));
}
return false;
}
@Override
public int order() {
return Integer.MIN_VALUE;
}
}
InstanceBeatCheckResponsibleInterceptor
用於攔截InstanceBeatCheckTask類型的任務,攔截之后用於判斷當前的Instance心跳檢查任務是否由當前節點來處理,若不是則不進行后續的攔截操作。
/**
* Instance responsibility check interceptor.
* 是否本機負責的檢查攔截器
* @author gengtuo.ygt
* on 2021/3/24
*/
public class InstanceBeatCheckResponsibleInterceptor extends AbstractBeatCheckInterceptor {
@Override
public boolean intercept(InstanceBeatCheckTask object) {
return !ApplicationUtils.getBean(DistroMapper.class).responsible(object.getClient().getResponsibleId());
}
@Override
public int order() {
return Integer.MIN_VALUE + 2;
}
}
需要注意的是,在
AbstractBeatCheckInterceptor的幾個子類中,默認都是返回false的,請觀察他們獲取數據的位置,都是從元數據里面獲取。默認情況下,注冊一個服務是不需要攜帶這么多數據的。那么此處就產生一個疑問,不檢查他們的心跳是否開啟,如何執行心跳?檢查還是否有用?
任務系列(Task)
在nacos-naming模塊的com.alibaba.nacos.naming.healthcheck包下,Task一共4種類別,分別是NacosTask、HealthCheckTask、NacosHealthCheckTask、BeatCheckTask。在Nacos中一切操作皆為Task,這也是實現高性能的一種有效方式。
NacosTask
NacosTask作為Nacos內部Task的統一接口。基本上系統級別的任務都是通過它的相關子類實現。此接口的子類分為了兩個類型AbstractExecuteTask、AbstractDelayTask。分別代表立即執行的任務和延遲執行的任務, 對任務體系作了更細的划分。它定義了此任務是否需要被執行。
/**
* Nacos task.
*
* @author xiweng.yy
*/
public interface NacosTask {
/**
* Judge Whether this nacos task should do.
*
* @return true means the nacos task should be done, otherwise false
*/
boolean shouldProcess();
}

AbstractExecuteTask
需要立即執行的任務。
/**
* Abstract task which should be executed immediately.
*
* @author xiweng.yy
*/
public abstract class AbstractExecuteTask implements NacosTask, Runnable {
@Override
public boolean shouldProcess() {
return true;
}
}
ClientBeatCheckTaskV2
/**
* Client beat check task of service for version 2.x.
* @author nkorange
*/
public class ClientBeatCheckTaskV2 extends AbstractExecuteTask implements BeatCheckTask, NacosHealthCheckTask {
private final IpPortBasedClient client;
private final String taskId;
/**
* 攔截器鏈
*/
private final InstanceBeatCheckTaskInterceptorChain interceptorChain;
public ClientBeatCheckTaskV2(IpPortBasedClient client) {
this.client = client;
this.taskId = client.getResponsibleId();
this.interceptorChain = InstanceBeatCheckTaskInterceptorChain.getInstance();
}
public GlobalConfig getGlobalConfig() {
return ApplicationUtils.getBean(GlobalConfig.class);
}
@Override
public String taskKey() {
return KeyBuilder.buildServiceMetaKey(client.getClientId(), String.valueOf(client.isEphemeral()));
}
@Override
public String getTaskId() {
return taskId;
}
@Override
public void doHealthCheck() {
try {
// 獲取所有的Service
Collection<Service> services = client.getAllPublishedService();
for (Service each : services) {
// 獲取Service對應的InstancePublishInfo
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(each);
// 創建一個InstanceBeatCheckTask,並交由攔截器鏈處理
interceptorChain.doInterceptor(new InstanceBeatCheckTask(client, each, instance));
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
@Override
public void run() {
doHealthCheck();
}
@Override
public void passIntercept() {
doHealthCheck();
}
@Override
public void afterIntercept() {
}
}
ClientBeatUpdateTask
v2版本的,用於更新某個Client下所有的實例。
/**
* Client beat update task.
*
* @author xiweng.yy
*/
public class ClientBeatUpdateTask extends AbstractExecuteTask {
private final IpPortBasedClient client;
public ClientBeatUpdateTask(IpPortBasedClient client) {
this.client = client;
}
@Override
public void run() {
// 獲取當前時間,更新Client和Client下的Instance的最新活躍時間
long currentTime = System.currentTimeMillis();
for (InstancePublishInfo each : client.getAllInstancePublishInfo()) {
((HealthCheckInstancePublishInfo) each).setLastHeartBeatTime(currentTime);
}
client.setLastUpdatedTime();
}
}
HealthCheckTaskV2
v2版本的健康檢查任務,繼承了AbstractExecuteTask說明會立即執行,實現了NacosHealthCheckTask說明可被攔截器攔截處理,前面章節已經分析過NacosHealthCheckTask的相關攔截器只是用於檢查是否開啟了健康檢查以及是否是當前節點處理的判斷。
根據實際的執行邏輯來看,健康檢查任務將會循環執行。看類中的注釋目前還是采用和v1相同的處理邏輯,待后續版本更新之后看看會有什么區別。
/**
* Health check task for v2.x.
* v2版本的健康檢查
* <p>Current health check logic is same as v1.x. TODO refactor health check for v2.x.
*
* @author nacos
*/
public class HealthCheckTaskV2 extends AbstractExecuteTask implements NacosHealthCheckTask {
/**
* 一個客戶端對象(此客戶端代表提供服務用於被應用訪問的客戶端)
* 從這里可以看出,啟動一個健康檢查任務是以客戶端為維度的
*/
private final IpPortBasedClient client;
private final String taskId;
private final SwitchDomain switchDomain;
private final NamingMetadataManager metadataManager;
private long checkRtNormalized = -1;
/**
* 檢查最佳響應時間
*/
private long checkRtBest = -1;
/**
* 檢查最差響應時間
*/
private long checkRtWorst = -1;
/**
* 檢查上次響應時間
*/
private long checkRtLast = -1;
/**
* 檢查上上次響應時間
*/
private long checkRtLastLast = -1;
/**
* 開始時間
*/
private long startTime;
/**
* 任務是否取消
*/
private volatile boolean cancelled = false;
public HealthCheckTaskV2(IpPortBasedClient client) {
this.client = client;
this.taskId = client.getResponsibleId();
this.switchDomain = ApplicationUtils.getBean(SwitchDomain.class);
this.metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);
// 初始化響應時間檢查
initCheckRT();
}
/**
* 初始化響應時間值
*/
private void initCheckRT() {
// first check time delay
// 2000 + (在5000以內的隨機數)
checkRtNormalized = 2000 + RandomUtils.nextInt(0, RandomUtils.nextInt(0, switchDomain.getTcpHealthParams().getMax()));
// 最佳響應時間
checkRtBest = Long.MAX_VALUE;
// 最差響應時間為0
checkRtWorst = 0L;
}
public IpPortBasedClient getClient() {
return client;
}
@Override
public String getTaskId() {
return taskId;
}
/**
* 開始執行健康檢查任務
*/
@Override
public void doHealthCheck() {
try {
// 獲取當前傳入的Client所發布的所有Service
for (Service each : client.getAllPublishedService()) {
// 只有當Service開啟了健康檢查才執行
if (switchDomain.isHealthCheckEnabled(each.getGroupedServiceName())) {
// 獲取Service對應的InstancePublishInfo
InstancePublishInfo instancePublishInfo = client.getInstancePublishInfo(each);
// 獲取集群元數據
ClusterMetadata metadata = getClusterMetadata(each, instancePublishInfo);
// 使用Processor代理對象對任務進行處理
ApplicationUtils.getBean(HealthCheckProcessorV2Delegate.class).process(this, each, metadata);
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[HEALTH-CHECK] schedule health check task: {}", client.getClientId());
}
}
}
} catch (Throwable e) {
Loggers.SRV_LOG.error("[HEALTH-CHECK] error while process health check for {}", client.getClientId(), e);
} finally {
// 若任務執行狀態為已取消,則再次啟動
if (!cancelled) {
HealthCheckReactor.scheduleCheck(this);
// worst == 0 means never checked
if (this.getCheckRtWorst() > 0) {
// TLog doesn't support float so we must convert it into long
long checkRtLastLast = getCheckRtLastLast();
this.setCheckRtLastLast(this.getCheckRtLast());
if (checkRtLastLast > 0) {
long diff = ((this.getCheckRtLast() - this.getCheckRtLastLast()) * 10000) / checkRtLastLast;
if (Loggers.CHECK_RT.isDebugEnabled()) {
Loggers.CHECK_RT.debug("{}->normalized: {}, worst: {}, best: {}, last: {}, diff: {}",
client.getClientId(), this.getCheckRtNormalized(), this.getCheckRtWorst(),
this.getCheckRtBest(), this.getCheckRtLast(), diff);
}
}
}
}
}
}
@Override
public void passIntercept() {
// 攔截通過之后執行健康檢查
doHealthCheck();
}
@Override
public void afterIntercept() {
// 攔截器執行完畢之后,若當前任務終止了,則再次進行檢查,由此可見其是循環執行的,循環是依賴攔截器的調用邏輯來實現。
if (!cancelled) {
HealthCheckReactor.scheduleCheck(this);
}
}
@Override
public void run() {
// 調用健康檢查
doHealthCheck();
}
/**
* 獲取集群元數據
* @param service 服務信息
* @param instancePublishInfo 服務對應的ip等信息
* @return
*/
private ClusterMetadata getClusterMetadata(Service service, InstancePublishInfo instancePublishInfo) {
Optional<ServiceMetadata> serviceMetadata = metadataManager.getServiceMetadata(service);
if (!serviceMetadata.isPresent()) {
return new ClusterMetadata();
}
String cluster = instancePublishInfo.getCluster();
ClusterMetadata result = serviceMetadata.get().getClusters().get(cluster);
return null == result ? new ClusterMetadata() : result;
}
//... 省略部分getter/setter
}
ClientBeatUpdateTask
用於更新Client的最新活躍時間。
/**
* Client beat update task.
* 客戶端心跳更新任務
* @author xiweng.yy
*/
public class ClientBeatUpdateTask extends AbstractExecuteTask {
/**
* 客戶端對象
*/
private final IpPortBasedClient client;
public ClientBeatUpdateTask(IpPortBasedClient client) {
this.client = client;
}
@Override
public void run() {
// 獲取當前時間,更新Client和Client下的Instance的最新活躍時間
long currentTime = System.currentTimeMillis();
for (InstancePublishInfo each : client.getAllInstancePublishInfo()) {
((HealthCheckInstancePublishInfo) each).setLastHeartBeatTime(currentTime);
}
client.setLastUpdatedTime();
}
}
AbstractDelayTask
可以延遲執行的任務。
/**
* Abstract task which can delay and merge.
*
* @author huali
* @author xiweng.yy
*/
public abstract class AbstractDelayTask implements NacosTask {
/**
* Task time interval between twice processing, unit is millisecond.
* 任務執行間隔時長(單位:毫秒)
*/
private long taskInterval;
/**
* The time which was processed at last time, unit is millisecond.
* 上一次執行的事件(單位:毫秒)
*/
private long lastProcessTime;
/**
* merge task.
* 合並任務,關於合並任務,請查看它的子類實現
* @param task task
*/
public abstract void merge(AbstractDelayTask task);
public void setTaskInterval(long interval) {
this.taskInterval = interval;
}
public long getTaskInterval() {
return this.taskInterval;
}
public void setLastProcessTime(long lastProcessTime) {
this.lastProcessTime = lastProcessTime;
}
public long getLastProcessTime() {
return this.lastProcessTime;
}
@Override
public boolean shouldProcess() {
return (System.currentTimeMillis() - this.lastProcessTime >= this.taskInterval);
}
}
提示:
關於延遲任務,在健康檢查章節就不作介紹了,健康檢查都是立即執行的任務。
NacosHealthCheckTask
用於健康檢查的Task,定義了健康檢查的基本方法。繼承Interceptable說明其可以被攔截器處理。繼承Runnable說明其是一個線程,可被線程執行器調度。
/**
* Nacos health check task.
*
* @author xiweng.yy
*/
public interface NacosHealthCheckTask extends Interceptable, Runnable {
/**
* Get task id.
*
* @return task id.
*/
String getTaskId();
/**
* Do health check.
*/
void doHealthCheck();
}
ClientBeatCheckTaskV2
請參考NacosTask章節《ClientBeatCheckTaskV2》
HealthCheckTaskV2
請參考NacosTask章節《HealthCheckTaskV2》
在功能性上
ClientBeatCheckTaskV2是用於檢查心跳請求的執行狀態。HealthCheckTaskV2則是檢查其他連接的狀態。在內部的處理邏輯上也有明顯的不同,前者使用了攔截器來處理,后者使用了處理器來處理。
檢查器系列(Checker)
個人理解Checker的存在是對攔截器的一種補充,當任務沒有被攔截,但有需要進行一些檢查的時候,可以使用Checker來執行檢查。這一點在InstanceBeatCheckTask類(點擊跳轉查看)中可以體現,它將checker的調用放在了passIntercept()方法中。
InstanceBeatChecker
檢查器負責對傳入的實例進行檢查。
/**
* Instance heart beat checker.
*
* @author xiweng.yy
*/
public interface InstanceBeatChecker {
/**
* Do check for input instance.
*
* @param client client
* @param service service of instance
* @param instance instance publish info
*/
void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance);
}
ExpiredInstanceChecker
已過期的實例檢查器,用於檢查實例是否過期,若過期則從已發布列表內部移除該服務。
/**
* Instance beat checker for expired instance.
* Instance檢查器,用於檢查是否過期
* <p>Delete the instance if has expired.
*
* @author xiweng.yy
*/
public class ExpiredInstanceChecker implements InstanceBeatChecker {
/**
* 執行檢查工作
* @param client client
* @param service service of instance
* @param instance instance publish info
*/
@Override
public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
// 實例是否可過期
boolean expireInstance = ApplicationUtils.getBean(GlobalConfig.class).isExpireInstance();
// 若支持過期,並已過期
if (expireInstance && isExpireInstance(service, instance)) {
// 從所在的Client內部已發布服務列表中移除
deleteIp(client, service, instance);
}
}
/**
* 判斷是否超時
* @param service
* @param instance
* @return
*/
private boolean isExpireInstance(Service service, HealthCheckInstancePublishInfo instance) {
long deleteTimeout = getTimeout(service, instance);
return System.currentTimeMillis() - instance.getLastHeartBeatTime() > deleteTimeout;
}
/**
* 獲取超時時間
* @param service
* @param instance
* @return
*/
private long getTimeout(Service service, InstancePublishInfo instance) {
Optional<Object> timeout = getTimeoutFromMetadata(service, instance);
if (!timeout.isPresent()) {
timeout = Optional.ofNullable(instance.getExtendDatum().get(PreservedMetadataKeys.IP_DELETE_TIMEOUT));
}
return timeout.map(ConvertUtils::toLong).orElse(Constants.DEFAULT_IP_DELETE_TIMEOUT);
}
/**
* 從元數據中獲取超時時間
* @param service
* @param instance
* @return
*/
private Optional<Object> getTimeoutFromMetadata(Service service, InstancePublishInfo instance) {
Optional<InstanceMetadata> instanceMetadata = ApplicationUtils.getBean(NamingMetadataManager.class)
.getInstanceMetadata(service, instance.getMetadataId());
return instanceMetadata.map(metadata -> metadata.getExtendData().get(PreservedMetadataKeys.IP_DELETE_TIMEOUT));
}
/**
* 移除服務,並發布事件
* @param client
* @param service
* @param instance
*/
private void deleteIp(Client client, Service service, InstancePublishInfo instance) {
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.toString(), JacksonUtils.toJson(instance));
client.removeServiceInstance(service);
NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(service, client.getClientId()));
}
}
UnhealthyInstanceChecker
用於檢查實例是否健康,若不健康則更新狀態並發布事件。
/**
* Instance beat checker for unhealthy instances.
* 用於檢查不健康實例的檢查員
* <p>Mark these instances healthy status {@code false} if beat time out.
*
* @author xiweng.yy
*/
public class UnhealthyInstanceChecker implements InstanceBeatChecker {
/**
* 執行檢查工作
* @param client client
* @param service service of instance
* @param instance instance publish info
*/
@Override
public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
// 若實例傳遞進來時是健康的,但經過計算超時的時候是不健康的,則需要更改狀態
if (instance.isHealthy() && isUnhealthy(service, instance)) {
changeHealthyStatus(client, service, instance);
}
}
/**
* 根據實例的上一次更新時間判斷是否超時
* @param service
* @param instance
* @return
*/
private boolean isUnhealthy(Service service, HealthCheckInstancePublishInfo instance) {
long beatTimeout = getTimeout(service, instance);
return System.currentTimeMillis() - instance.getLastHeartBeatTime() > beatTimeout;
}
/**
* 獲取超時時長
* @param service
* @param instance
* @return
*/
private long getTimeout(Service service, InstancePublishInfo instance) {
Optional<Object> timeout = getTimeoutFromMetadata(service, instance);
if (!timeout.isPresent()) {
timeout = Optional.ofNullable(instance.getExtendDatum().get(PreservedMetadataKeys.HEART_BEAT_TIMEOUT));
}
return timeout.map(ConvertUtils::toLong).orElse(Constants.DEFAULT_HEART_BEAT_TIMEOUT);
}
/**
* 從元數據中獲取超時時長
* @param service
* @param instance
* @return
*/
private Optional<Object> getTimeoutFromMetadata(Service service, InstancePublishInfo instance) {
Optional<InstanceMetadata> instanceMetadata = ApplicationUtils.getBean(NamingMetadataManager.class)
.getInstanceMetadata(service, instance.getMetadataId());
return instanceMetadata.map(metadata -> metadata.getExtendData().get(PreservedMetadataKeys.HEART_BEAT_TIMEOUT));
}
/**
* 更新健康狀態
* @param client
* @param service
* @param instance
*/
private void changeHealthyStatus(Client client, Service service, HealthCheckInstancePublishInfo instance) {
// 設置實例為不健康
instance.setHealthy(false);
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client last beat: {}", instance.getIp(),
instance.getPort(), instance.getCluster(), service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getLastHeartBeatTime());
// 發布服務變更和Client變更事件
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
}
}
注意:
Unhealthy(不健康)表示的是心跳超時,但還不至於要立馬移除。心跳默認超時時長為15秒。
- Unhealthy 超時時間可以從元數據中獲取,配置名稱: PreservedMetadataKeys.HEART_BEAT_TIMEOUT
- Unhealthy 超時時間也可以使用系統默認值,配置名稱: Constants.DEFAULT_HEART_BEAT_TIMEOUT
Expired(過期)表示的是服務已經達到超時的最大限制,達到這個時長之后將不再嘗試心跳,而是將其移除。服務的過期時間默認為
30秒。
- Expired過期時間可以從元數據中獲取,配置名稱: PreservedMetadataKeys.IP_DELETE_TIMEOUT
- Expired過期時間也可以使用系統默認值,配置名稱: Constants.DEFAULT_IP_DELETE_TIMEOUT
任務處理器系列(Processor)
用於處理HealthCheckTaskV2 。
BeatProcessor
用於處理接收到的實例心跳。
/**
* Thread to update ephemeral instance triggered by client beat.
*
* @author xiweng.yy
*/
public interface BeatProcessor extends Runnable {
}
ClientBeatProcessorV2
v2版本的心跳處理器。
/**
* Thread to update ephemeral instance triggered by client beat for v2.x.
* 用於更新由v2 client 心跳觸發的ephemeral實例的線程
* @author nkorange
*/
public class ClientBeatProcessorV2 implements BeatProcessor {
private final String namespace;
private final RsInfo rsInfo;
/**
* Client對象,表示此線程是一個Client一個處理線程
*/
private final IpPortBasedClient client;
public ClientBeatProcessorV2(String namespace, RsInfo rsInfo, IpPortBasedClient ipPortBasedClient) {
this.namespace = namespace;
this.rsInfo = rsInfo;
this.client = ipPortBasedClient;
}
@Override
public void run() {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
// 通過心跳信息組裝實例
String ip = rsInfo.getIp();
int port = rsInfo.getPort();
String serviceName = NamingUtils.getServiceName(rsInfo.getServiceName());
String groupName = NamingUtils.getGroupName(rsInfo.getServiceName());
Service service = Service.newService(namespace, groupName, serviceName, rsInfo.isEphemeral());
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(service);
// 若當前心跳傳遞過來的實例是當前線程代表的Client的實例才處理
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
// 接收到心跳請求之后,設置當前時間為它的最新活躍時間
instance.setLastHeartBeatTime(System.currentTimeMillis());
// 若不是健康狀態,需要將其更新為健康狀態,因為此實例是當前線程所代表的Client負責的,超時的原因可能是網絡延遲,總之
// 當前Client若接收到了心跳就應當設置它為健康狀態。
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
rsInfo.getServiceName(), ip, port, rsInfo.getCluster(), UtilsAndCommons.LOCALHOST_SITE);
// 發布服務狀態變更事件
NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
}
}
}
}
HealthCheckProcessorV2
v2版本的健康檢查處理器。限制了它只能用於處理HealthCheckTaskV2類型的任務。
/**
* Health check processor for v2.x.
*
* @author nkorange
*/
public interface HealthCheckProcessorV2 {
/**
* Run check task for service.
*
* @param task health check task v2
* @param service service of current process
* @param metadata cluster metadata of current process
*/
void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata);
/**
* Get check task type, refer to enum HealthCheckType.
*
* @return check type
*/
String getType();
}
注意:
此處分析的是v2版本的處理器,它位於com.alibaba.nacos.naming.healthcheck.v2.processor包下,請勿和v1版本的混淆。
HealthCheckProcessorV2Delegate
使用代理模式管理不同類型的處理器。
/**
* Delegate of health check v2.x.
* v2健康檢查處理器代理
* @author nacos
*/
@Component("healthCheckDelegateV2")
public class HealthCheckProcessorV2Delegate implements HealthCheckProcessorV2 {
/**
* 不同的處理器集合
*/
private final Map<String, HealthCheckProcessorV2> healthCheckProcessorMap = new HashMap<>();
public HealthCheckProcessorV2Delegate(HealthCheckExtendProvider provider) {
// 初始化SPI擴展的加載,用於獲取用戶自定義的processor和checker
provider.init();
}
@Autowired
public void addProcessor(Collection<HealthCheckProcessorV2> processors) {
// 添加processor到容器,以處理類別為key
healthCheckProcessorMap.putAll(processors.stream().filter(processor -> processor.getType() != null)
.collect(Collectors.toMap(HealthCheckProcessorV2::getType, processor -> processor)));
}
@Override
public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
// 從元數據中獲取處理方式的類別
String type = metadata.getHealthyCheckType();
// 獲取指定的處理器
HealthCheckProcessorV2 processor = healthCheckProcessorMap.get(type);
// 若未獲取到,指定一個默認的處理器,默認不作處理
if (processor == null) {
processor = healthCheckProcessorMap.get(NoneHealthCheckProcessor.TYPE);
}
// 調用處理器進行處理
processor.process(task, service, metadata);
}
@Override
public String getType() {
return null;
}
}
HttpHealthCheckProcessor
HTTP方式的健康檢查處理器。目前邏輯和v1版本相同,后續會重構。
/**
* TCP health check processor for v2.x.
* Http方式的心跳檢查處理器, 原文的注釋是不是copy的?
* <p>Current health check logic is same as v1.x. TODO refactor health check for v2.x.
*
* @author xiweng.yy
*/
@Component
public class HttpHealthCheckProcessor implements HealthCheckProcessorV2 {
/**
* 當前處理的類型
*/
public static final String TYPE = HealthCheckType.HTTP.name();
/**
* 請求模板,用於處理http請求
*/
private static final NacosAsyncRestTemplate ASYNC_REST_TEMPLATE = HttpClientManager.getProcessorNacosAsyncRestTemplate();
/**
* 健康檢查公用方法集合
*/
private final HealthCheckCommonV2 healthCheckCommon;
private final SwitchDomain switchDomain;
public HttpHealthCheckProcessor(HealthCheckCommonV2 healthCheckCommon, SwitchDomain switchDomain) {
this.healthCheckCommon = healthCheckCommon;
this.switchDomain = switchDomain;
}
@Override
public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
// 獲取指定Service對應的InstancePublishInfo
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient().getInstancePublishInfo(service);
if (null == instance) {
return;
}
try {
// TODO handle marked(white list) logic like v1.x.
if (!instance.tryStartCheck()) {
SRV_LOG.warn("http check started before last one finished, service: {} : {} : {}:{}",
service.getGroupedServiceName(), instance.getCluster(), instance.getIp(), instance.getPort());
// 更新instance的開始檢查狀態
healthCheckCommon
.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getHttpHealthParams());
return;
}
// 獲取檢查器
Http healthChecker = (Http) metadata.getHealthChecker();
// 獲取實例所在的網絡位置
int ckPort = metadata.isUseInstancePortForCheck() ? instance.getPort() : metadata.getHealthyCheckPort();
URL host = new URL("http://" + instance.getIp() + ":" + ckPort);
URL target = new URL(host, healthChecker.getPath());
Map<String, String> customHeaders = healthChecker.getCustomHeaders();
Header header = Header.newInstance();
header.addAll(customHeaders);
// 發送http請求
ASYNC_REST_TEMPLATE.get(target.toString(), header, Query.EMPTY, String.class,
new HttpHealthCheckCallback(instance, task, service));
MetricsMonitor.getHttpHealthCheckMonitor().incrementAndGet();
} catch (Throwable e) {
instance.setCheckRt(switchDomain.getHttpHealthParams().getMax());
healthCheckCommon.checkFail(task, service, "http:error:" + e.getMessage());
healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
switchDomain.getHttpHealthParams());
}
}
@Override
public String getType() {
return TYPE;
}
/**
* 健康檢查回調
*/
private class HttpHealthCheckCallback implements Callback<String> {
private final HealthCheckTaskV2 task;
private final Service service;
private final HealthCheckInstancePublishInfo instance;
private long startTime = System.currentTimeMillis();
public HttpHealthCheckCallback(HealthCheckInstancePublishInfo instance, HealthCheckTaskV2 task,
Service service) {
this.instance = instance;
this.task = task;
this.service = service;
}
@Override
public void onReceive(RestResult<String> result) {
// 設置本次響應時間
instance.setCheckRt(System.currentTimeMillis() - startTime);
int httpCode = result.getCode();
// 判斷請求結果
if (HttpURLConnection.HTTP_OK == httpCode) {
healthCheckCommon.checkOk(task, service, "http:" + httpCode);
healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,
switchDomain.getHttpHealthParams());
} else if (HttpURLConnection.HTTP_UNAVAILABLE == httpCode
|| HttpURLConnection.HTTP_MOVED_TEMP == httpCode) {
// server is busy, need verification later
healthCheckCommon.checkFail(task, service, "http:" + httpCode);
healthCheckCommon
.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getHttpHealthParams());
} else {
//probably means the state files has been removed by administrator
healthCheckCommon.checkFailNow(task, service, "http:" + httpCode);
healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
switchDomain.getHttpHealthParams());
}
}
@Override
public void onError(Throwable throwable) {
Throwable cause = throwable;
instance.setCheckRt(System.currentTimeMillis() - startTime);
int maxStackDepth = 50;
for (int deepth = 0; deepth < maxStackDepth && cause != null; deepth++) {
if (HttpUtils.isTimeoutException(cause)) {
healthCheckCommon.checkFail(task, service, "http:" + cause.getMessage());
healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task,
switchDomain.getHttpHealthParams());
return;
}
cause = cause.getCause();
}
// connection error, probably not reachable
if (throwable instanceof ConnectException) {
healthCheckCommon.checkFailNow(task, service, "http:unable2connect:" + throwable.getMessage());
} else {
healthCheckCommon.checkFail(task, service, "http:error:" + throwable.getMessage());
}
healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
switchDomain.getHttpHealthParams());
}
@Override
public void onCancel() {
}
}
}
TcpHealthCheckProcessor
TCP請求方式的健康檢查處理器,內部使用NIO實現網絡通信。
首先通過一張整體的概覽圖來看看這個Processor是怎么工作的。

- 藍色部分代表的是TcpHealthCheckProcessor類相關的內容
- 橘色部分代表的是內部的子類對象被執行時的狀態
- 虛線代表異步執行
在這個處理器中NIO的特性和健康檢查的特性高度契合,下面的對比展示的是他們分別作為不同角色時的工作內容。
從任務處理角度來看:
TcpHealthCheckProcessor:負責啟動任務,並創建多個TaskProcessor。
TaskProcessor:負責處理心跳任務,並創建一個TimeOutTask。
TimeOutTask:負責處理心跳檢測超時。
PostProcessor:負責檢查心跳是否成功。
從NIO網絡連接角度來看:
TcpHealthCheckProcessor:相當於NIO中的Selector,確實它內部帶有一個Selector。
TaskProcessor:相當於NIO中的Channel, 每個TaskProcessor都具有一個獨立的Channel。它只負責創建和連接,並不負責檢查連接的結果。
TimeOutTask:相當於NIO中的Channel, 它被TaskProcessor創建,並和TaskProcessor持有相同的Channel,負責檢查Channel連接是否超時。
PostProcessor:相當於NIO中的Channel, 它主動獲取已經准備好的Channel,獲取的Channel就是TaskProcessor創建的Channel,負責檢查連接是否成功。
角色扮演圖:

首先TcpHealthCheckProcessor作為一個任務處理器,它的入口是process()方法;作為一個線程,它的入口是run()方法。當它作為一個處理器的時候,內部維護了一個隊列,實現生產/消費模型。process()方法由外部調用,用於產生數據。run()方法由構造方法調用,用於消費數據。
構造方法初始化的時候初始化了一個Selector, 並啟動了線程。當process()方法被調用時,它將一個Beat對象放入taskQueue隊列(生產者)。當線程啟動執行run()方法時,從taskQueue隊列取出Beat,生成一個TaskProcessor,積攢到一定數量的時候批量調用(消費者)。當每一個TaskProcessor開始執行的時候各自創建一個屬於自己的Channel,並將其注冊到Selector中,同時創建一個TimeOutTask,延遲執行。
因為TaskProcessor是異步調用的,因此在執行批量調用之后就開始查找剛才的TaskProcessor所注冊的Channel,為每一個Channel創建一個PostProcessor,並異步執行。當PostProcessor執行時檢查它代表的Channel的連接狀態,根據連接狀態來處理心跳結果,心跳檢測以連接目標服務器成功作為標志。此時TimeOutTask在delay時間到了之后開始執行,再次檢查它代表的Channel的狀態,根據連接狀態來處理超時情況。
@Component
public class TcpHealthCheckProcessor implements HealthCheckProcessorV2, Runnable {
/**
* 當前的Processor可處理的Task類型為TCP
*/
public static final String TYPE = HealthCheckType.TCP.name();
/**
* 連接超時時長
*/
public static final int CONNECT_TIMEOUT_MS = 500;
/**
* NIO線程數量
* this value has been carefully tuned, do not modify unless you're confident.
*/
private static final int NIO_THREAD_COUNT = EnvUtil.getAvailableProcessors(0.5);
/**
* because some hosts doesn't support keep-alive connections, disabled temporarily.
*/
private static final long TCP_KEEP_ALIVE_MILLIS = 0;
/**
* v2版本健康檢查通用方法集合
*/
private final HealthCheckCommonV2 healthCheckCommon;
private final SwitchDomain switchDomain;
private final Map<String, BeatKey> keyMap = new ConcurrentHashMap<>();
/**
* Tcp心跳任務阻塞隊列,用於實現生產者消費者模式。
*/
private final BlockingQueue<Beat> taskQueue = new LinkedBlockingQueue<>();
/**
* NIO多路復用器,用於管理多個線程的網絡連接,此處就是檢查多個心跳的連接
*/
private final Selector selector;
public TcpHealthCheckProcessor(HealthCheckCommonV2 healthCheckCommon, SwitchDomain switchDomain) {
this.healthCheckCommon = healthCheckCommon;
this.switchDomain = switchDomain;
try {
// 創建Selector
selector = Selector.open();
// 使用線程執行器執行當前類,也就是將當前類作為消費者啟動,run方法內部的循環將會持續進行,不斷消費數據
GlobalExecutor.submitTcpCheck(this);
} catch (Exception e) {
throw new IllegalStateException("Error while initializing SuperSense(TM).");
}
}
/**
* 作為Processor的時候,它提供process方法來對task進行處理,處理的結果就是將其放入消費隊列
* 作為一個線程運行的時候,它作為消費者,不斷從隊列中獲取任務來執行
* @param task health check task v2
* @param service service of current process
* @param metadata cluster metadata of current process
*/
@Override
public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
// 獲取Instance的檢查信息
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient()
.getInstancePublishInfo(service);
if (null == instance) {
return;
}
// TODO handle marked(white list) logic like v1.x.
if (!instance.tryStartCheck()) {
SRV_LOG.warn("tcp check started before last one finished, service: {} : {} : {}:{}",
service.getGroupedServiceName(), instance.getCluster(), instance.getIp(), instance.getPort());
healthCheckCommon
.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getTcpHealthParams());
return;
}
// 處理任務時,將其放入隊列內部,此處相當於生產者,每調用一次process都會將其放入隊列
taskQueue.add(new Beat(task, service, metadata, instance));
MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
}
@Override
public String getType() {
return TYPE;
}
/**
* 處理TCP健康檢查任務
* @throws Exception
*/
private void processTask() throws Exception {
// 任務處理器集合,為每一個Beat創建一個TaskProcessor
Collection<Callable<Void>> tasks = new LinkedList<>();
/**
* 瘋狂從隊列獲取心跳信息
* 循環條件:
* 1. 隊列內有數據
* 2. 已獲取的task小於CPU核數的0.5倍 * 64(例如8核CPU的話就是 8 * 0.5 * 64 = 256)
*/
do {
// 從隊列獲取元素,超時時間為250毫秒
Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);
// 若數據為空,繼續執行下次循環
if (beat == null) {
return;
}
// 添加任務到集合中,后續一次性處理
tasks.add(new TaskProcessor(beat));
} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);
// 一次性調用所有task
for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {
f.get();
}
}
@Override
public void run() {
while (true) {
try {
processTask();
// 使用非阻塞方法獲取已准備好進行I/O的channel數量集
int readyCount = selector.selectNow();
if (readyCount <= 0) {
continue;
}
// 處理 SelectionKey
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
GlobalExecutor.executeTcpSuperSense(new PostProcessor(key));
}
} catch (Throwable e) {
SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);
}
}
}
}
TaskProcessor(Inner Class)
每一個TaskProcessor都會攜帶一個Beat, 當被批量調用的時候執行call() 方法。它負責為每一個Beat創建一個Channel,用於連接實例所在服務器,同時也會創建一個TimeOutTask來延遲執行,用於檢查連接是否超時,連接的超時就代表心跳的超時。
/**
* 任務處理器
*/
private class TaskProcessor implements Callable<Void> {
/**
* 最大等待時間500毫秒
*/
private static final int MAX_WAIT_TIME_MILLISECONDS = 500;
/**
* 心跳對象
*/
Beat beat;
public TaskProcessor(Beat beat) {
this.beat = beat;
}
@Override
public Void call() {
// 當前任務已等待的時長
long waited = System.currentTimeMillis() - beat.getStartTime();
// 當前任務等待時長超過500毫秒,打印警告信息
if (waited > MAX_WAIT_TIME_MILLISECONDS) {
Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");
}
SocketChannel channel = null;
try {
HealthCheckInstancePublishInfo instance = beat.getInstance();
BeatKey beatKey = keyMap.get(beat.toString());
if (beatKey != null && beatKey.key.isValid()) {
if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {
instance.finishCheck();
return null;
}
beatKey.key.cancel();
beatKey.key.channel().close();
}
channel = SocketChannel.open();
channel.configureBlocking(false);
// only by setting this can we make the socket close event asynchronous
channel.socket().setSoLinger(false, -1);
channel.socket().setReuseAddress(true);
channel.socket().setKeepAlive(true);
channel.socket().setTcpNoDelay(true);
ClusterMetadata cluster = beat.getMetadata();
int port = cluster.isUseInstancePortForCheck() ? instance.getPort() : cluster.getHealthyCheckPort();
channel.connect(new InetSocketAddress(instance.getIp(), port));
// 注冊Channel到Register
SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
key.attach(beat);
keyMap.put(beat.toString(), new BeatKey(key));
// 設置心跳開始時間
beat.setStartTime(System.currentTimeMillis());
// 啟動超感任務,這里將SelectionKey傳入了TimeOutTask,意味着后者將會知道當前心跳任務的連接狀態
GlobalExecutor
.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
// 設置為檢查失敗
beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),
"tcp:error:" + e.getMessage());
if (channel != null) {
try {
// 關閉連接
channel.close();
} catch (Exception ignore) {
}
}
}
return null;
}
}
PostProcessor(Inner Class)
在TcpHealthCheckProcessor將一個已准備好連接的SelectionKey傳遞過來之后,獲取對應的Beat,並根據這個連接狀態來處理心跳的狀態。
public class PostProcessor implements Runnable {
SelectionKey key;
public PostProcessor(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
Beat beat = (Beat) key.attachment();
SocketChannel channel = (SocketChannel) key.channel();
try {
// 如果心跳檢測已經超時,關閉對應的channel
if (!beat.isHealthy()) {
//invalid beat means this server is no longer responsible for the current service
key.cancel();
key.channel().close();
beat.finishCheck();
return;
}
// 是否支持套接字連接操作
if (key.isValid() && key.isConnectable()) {
//connected
// 判斷是否連接成功
channel.finishConnect();
// 更新心跳信息
beat.finishCheck(true, false, System.currentTimeMillis() - beat.getTask().getStartTime(),
"tcp:ok+");
}
// 判斷key的channel是否支持read操作
if (key.isValid() && key.isReadable()) {
//disconnected
// 從channel讀取數據到buffer
ByteBuffer buffer = ByteBuffer.allocate(128);
if (channel.read(buffer) == -1) {
key.cancel();
key.channel().close();
} else {
// not terminate request, ignore
// 若讀取到channel內的數據,忽略此請求保持連接
SRV_LOG.warn(
"Tcp check ok, but the connected server responses some msg. Connection won't be closed.");
}
}
} catch (ConnectException e) {
// unable to connect, possibly port not opened
beat.finishCheck(false, true, switchDomain.getTcpHealthParams().getMax(),
"tcp:unable2connect:" + e.getMessage());
} catch (Exception e) {
beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),
"tcp:error:" + e.getMessage());
try {
// 發生異常關閉連接
key.cancel();
key.channel().close();
} catch (Exception ignore) {
}
}
}
}
Beat(Inner Class)
/**
* 心跳對象
*
* 請注意構造方法傳入的HealthCheckTaskV2 task
* 后續一系列的處理將會調用原有的這個task來進行一些操作
*/
private class Beat {
private final HealthCheckTaskV2 task;
private final Service service;
private final ClusterMetadata metadata;
private final HealthCheckInstancePublishInfo instance;
long startTime = System.currentTimeMillis();
public Beat(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata,
HealthCheckInstancePublishInfo instance) {
this.task = task;
this.service = service;
this.metadata = metadata;
this.instance = instance;
}
public void setStartTime(long time) {
startTime = time;
}
public long getStartTime() {
return startTime;
}
public HealthCheckTaskV2 getTask() {
return task;
}
public Service getService() {
return service;
}
public ClusterMetadata getMetadata() {
return metadata;
}
public HealthCheckInstancePublishInfo getInstance() {
return instance;
}
public boolean isHealthy() {
return System.currentTimeMillis() - startTime < TimeUnit.SECONDS.toMillis(30L);
}
/**
* finish check only, no ip state will be changed.
*/
public void finishCheck() {
instance.finishCheck();
}
public void finishCheck(boolean success, boolean now, long rt, String msg) {
if (success) {
healthCheckCommon.checkOk(task, service, msg);
} else {
if (now) {
healthCheckCommon.checkFailNow(task, service, msg);
} else {
healthCheckCommon.checkFail(task, service, msg);
}
keyMap.remove(toString());
}
healthCheckCommon.reEvaluateCheckRT(rt, task, switchDomain.getTcpHealthParams());
}
@Override
public String toString() {
return service.getGroupedServiceName() + ":" + instance.getCluster() + ":" + instance.getIp() + ":"
+ instance.getPort();
}
@Override
public int hashCode() {
return toString().hashCode();
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof Beat)) {
return false;
}
return this.toString().equals(obj.toString());
}
}
BeatKey(Inner Class)
用於記錄連接的創建時間。
private static class BeatKey {
public SelectionKey key;
public long birthTime;
public BeatKey(SelectionKey key) {
this.key = key;
this.birthTime = System.currentTimeMillis();
}
}
TimeOutTask(Inner Class)
用於連接超時處理,由TaskProcessor創建,所操作的Channel由TaskProcessor傳遞,他們是一對一的關系。
/**
* 超時任務,此任務創建時,任務不一定超時
* 是在此任務內部來判斷是否超時,以及作相應的處理
*/
private static class TimeOutTask implements Runnable {
SelectionKey key;
public TimeOutTask(SelectionKey key) {
this.key = key;
}
@Override
public void run() {
if (key != null && key.isValid()) {
// 獲取本次心跳的channel對象
SocketChannel channel = (SocketChannel) key.channel();
// 獲取注冊時傳入的Beat
Beat beat = (Beat) key.attachment();
// 判斷是否連接成功,因為當前判斷條件在TimeOutTask對象內,如果連接成功就不是timeout,不需要執行后續操作
if (channel.isConnected()) {
return;
}
try {
// 完成本次連接
channel.finishConnect();
} catch (Exception ignore) {
}
try {
// 設置check狀態為false,關閉本channel的選擇,讓selector不再處理
beat.finishCheck(false, false, beat.getTask().getCheckRtNormalized() * 2, "tcp:timeout");
key.cancel();
key.channel().close();
} catch (Exception ignore) {
}
}
}
}
MysqlHealthCheckProcessor
/**
* TCP health check processor for v2.x.
* Mysql集群健康檢查
* <p>Current health check logic is same as v1.x. TODO refactor health check for v2.x.
*
* @author xiweng.yy
*/
@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class MysqlHealthCheckProcessor implements HealthCheckProcessorV2 {
/**
* 當前處理的類型
*/
public static final String TYPE = HealthCheckType.MYSQL.name();
/**
* 健康檢查公用方法集合
*/
private final HealthCheckCommonV2 healthCheckCommon;
private final SwitchDomain switchDomain;
/**
* 連接超時時長
*/
public static final int CONNECT_TIMEOUT_MS = 500;
/**
* 檢查時發送一條SQL語句用於判斷是否連接成功
*/
private static final String CHECK_MYSQL_MASTER_SQL = "show global variables where variable_name='read_only'";
/**
* Mysql 從機只讀狀態
*/
private static final String MYSQL_SLAVE_READONLY = "ON";
/**
* 數據庫連接池
*/
private static final ConcurrentMap<String, Connection> CONNECTION_POOL = new ConcurrentHashMap<String, Connection>();
public MysqlHealthCheckProcessor(HealthCheckCommonV2 healthCheckCommon, SwitchDomain switchDomain) {
this.healthCheckCommon = healthCheckCommon;
this.switchDomain = switchDomain;
}
@Override
public String getType() {
return TYPE;
}
@Override
public void process(HealthCheckTaskV2 task, Service service, ClusterMetadata metadata) {
// 獲取服務對應的實例
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) task.getClient()
.getInstancePublishInfo(service);
if (null == instance) {
return;
}
SRV_LOG.debug("mysql check, ip:" + instance);
try {
// TODO handle marked(white list) logic like v1.x.
if (!instance.tryStartCheck()) {
SRV_LOG.warn("mysql check started before last one finished, service: {} : {} : {}:{}",
service.getGroupedServiceName(), instance.getCluster(), instance.getIp(), instance.getPort());
healthCheckCommon
.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getMysqlHealthParams());
return;
}
// 創建MySQL檢查任務並執行
GlobalExecutor.executeMysqlCheckTask(new MysqlCheckTask(task, service, instance, metadata));
MetricsMonitor.getMysqlHealthCheckMonitor().incrementAndGet();
} catch (Exception e) {
instance.setCheckRt(switchDomain.getMysqlHealthParams().getMax());
healthCheckCommon.checkFail(task, service, "mysql:error:" + e.getMessage());
healthCheckCommon.reEvaluateCheckRT(switchDomain.getMysqlHealthParams().getMax(), task,
switchDomain.getMysqlHealthParams());
}
}
/**
* MySQL檢查任務
*/
private class MysqlCheckTask implements Runnable {
private final HealthCheckTaskV2 task;
private final Service service;
private final HealthCheckInstancePublishInfo instance;
private final ClusterMetadata metadata;
private long startTime = System.currentTimeMillis();
public MysqlCheckTask(HealthCheckTaskV2 task, Service service, HealthCheckInstancePublishInfo instance,
ClusterMetadata metadata) {
this.task = task;
this.service = service;
this.instance = instance;
this.metadata = metadata;
}
@Override
public void run() {
Statement statement = null;
ResultSet resultSet = null;
try {
String clusterName = instance.getCluster();
// 組裝連接緩存key
String key =
service.getGroupedServiceName() + ":" + clusterName + ":" + instance.getIp() + ":" + instance
.getPort();
// 從連接池獲取mysql連接
Connection connection = CONNECTION_POOL.get(key);
// 獲取健康檢查器
Mysql config = (Mysql) metadata.getHealthChecker();
// 創建連接並緩存
if (connection == null || connection.isClosed()) {
String url = "jdbc:mysql://" + instance.getIp() + ":" + instance.getPort() + "?connectTimeout="
+ CONNECT_TIMEOUT_MS + "&socketTimeout=" + CONNECT_TIMEOUT_MS + "&loginTimeout=" + 1;
connection = DriverManager.getConnection(url, config.getUser(), config.getPwd());
CONNECTION_POOL.put(key, connection);
}
statement = connection.createStatement();
statement.setQueryTimeout(1);
resultSet = statement.executeQuery(config.getCmd());
int resultColumnIndex = 2;
// 判斷執行語句是否是主節點查詢語句
if (CHECK_MYSQL_MASTER_SQL.equals(config.getCmd())) {
resultSet.next();
/**
* 從查詢結果判斷是主機還是從機
* CHECK_MYSQL_MASTER_SQL 語句執行結果為:[read_only : ON/OFF]
* MYSQL_SLAVE_READONLY 默認為 ON,若返回的是ON說明請求的是主機(沒人會把主機Master設置為只讀狀態吧)
*/
if (MYSQL_SLAVE_READONLY.equals(resultSet.getString(resultColumnIndex))) {
throw new IllegalStateException("current node is slave!");
}
}
// 處理檢查結果
healthCheckCommon.checkOk(task, service, "mysql:+ok");
healthCheckCommon.reEvaluateCheckRT(System.currentTimeMillis() - startTime, task,
switchDomain.getMysqlHealthParams());
} catch (SQLException e) {
// fail immediately
healthCheckCommon.checkFailNow(task, service, "mysql:" + e.getMessage());
healthCheckCommon.reEvaluateCheckRT(switchDomain.getHttpHealthParams().getMax(), task,
switchDomain.getMysqlHealthParams());
} catch (Throwable t) {
// 不太明白此處的用意是什么
Throwable cause = t;
int maxStackDepth = 50;
for (int deepth = 0; deepth < maxStackDepth && cause != null; deepth++) {
if (cause instanceof SocketTimeoutException || cause instanceof ConnectTimeoutException
|| cause instanceof TimeoutException || cause.getCause() instanceof TimeoutException) {
healthCheckCommon.checkFail(task, service, "mysql:timeout:" + cause.getMessage());
healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task,
switchDomain.getMysqlHealthParams());
return;
}
cause = cause.getCause();
}
// connection error, probably not reachable
healthCheckCommon.checkFail(task, service, "mysql:error:" + t.getMessage());
healthCheckCommon.reEvaluateCheckRT(switchDomain.getMysqlHealthParams().getMax(), task,
switchDomain.getMysqlHealthParams());
} finally {
instance.setCheckRt(System.currentTimeMillis() - startTime);
if (statement != null) {
try {
statement.close();
} catch (SQLException e) {
Loggers.SRV_LOG.error("[MYSQL-CHECK] failed to close statement:" + statement, e);
}
}
if (resultSet != null) {
try {
resultSet.close();
} catch (SQLException e) {
Loggers.SRV_LOG.error("[MYSQL-CHECK] failed to close resultSet:" + resultSet, e);
}
}
}
}
}
}
NoneHealthCheckProcessor
兜底處理器,默認對任務不作任何處理。
/**
* Health checker that does nothing.
*
* @author nkorange
* @since 1.0.0
*/
@Component
public class NoneHealthCheckProcessor implements HealthCheckProcessor {
public static final String TYPE = "NONE";
@Override
public void process(HealthCheckTask task) {
}
@Override
public String getType() {
return TYPE;
}
}
注冊服務時啟動檢查任務
在注冊服務或者同步服務時,會創建Client對象,在創建Client對象時會創建對應的客戶端心跳檢查任務,重點在檢查上面。它用於檢查心跳執行的結果,因為心跳會更新服務的狀態,而這里就是檢查服務的狀態來反向檢測心跳的執行情況。對於服務來說,客戶端應該算是一個最頂級的單元,它會管理
所有服務和實例,因此在它創建的時候開啟一個心跳檢查任務再合適不過了。
public IpPortBasedClient(String clientId, boolean ephemeral) {
this.ephemeral = ephemeral;
this.clientId = clientId;
this.responsibleId = getResponsibleTagFromId();
if (ephemeral) {
// 創建健康檢查任務
beatCheckTask = new ClientBeatCheckTaskV2(this);
// 交由執行器調度
HealthCheckReactor.scheduleCheck(beatCheckTask);
} else {
healthCheckTaskV2 = new HealthCheckTaskV2(this);
HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
}
}
/**
* Schedule client beat check task with a delay.
* 執行一個延遲的客戶端心跳檢查
* @param task client beat check task
*/
public static void scheduleCheck(BeatCheckTask task) {
Runnable wrapperTask = task instanceof NacosHealthCheckTask ? new HealthCheckTaskInterceptWrapper((NacosHealthCheckTask) task) : task;
futureMap.computeIfAbsent(task.taskKey(), k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}
這里我們以默認的ephemeral狀態的Client為例,在構造方法中它新建了ClientBeatCheckTaskV2,並使用HealthCheckReactor來進行調度。並將這個任務緩存到了futureMap, 首次執行的時候延遲5秒,每隔5秒檢查一次。
提示:
scheduleCheck(BeatCheckTask task)方法中聲明的wrapperTask並未使用,這個wrapper內部也管理這一個攔截器鏈。后續可以添加一些攔截器來作一些驗證工作,這里也屬於一層擴展吧。
在檢查方法內會獲取當前Client下所注冊的所有服務,對每一個服務都進行檢查。若Client下沒有已經發布的服務的話,每隔5秒再執行一次豈不浪費資源?此時啟動任務時的緩存就發揮作用了,可通過futureMap來獲取名下沒有服務的Client檢查任務,將其取消釋放線程資源。至於誰來取消,后續分析。
public void doHealthCheck() {
try {
// 獲取當前客戶端下所有的服務
Collection<Service> services = client.getAllPublishedService();
for (Service each : services) {
// 獲取服務下的Instance,轉換為HealthCheckInstancePublishInfo
HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(each);
// 為每一個Instance生成一個心跳檢查任務InstanceBeatCheckTask,並使用攔截器來處理
interceptorChain.doInterceptor(new InstanceBeatCheckTask(client, each, instance));
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
為獲取到的每一個實例創建一個InstanceBeatCheckTask,一個ClientBeatCheckTaskV2會創建多個InstanceBeatCheckTask。在當前的任務中一共會加載3個攔截器,按照優先級排列他們分別是:
- ServiceEnableBeatCheckInterceptor
- InstanceEnableBeatCheckInterceptor
- InstanceBeatCheckResponsibleInterceptor
Instance心跳檢查任務InstanceBeatCheckTask使用Checker來檢查當前實例的狀態。這里默認會有不健康實例的檢查器和過期的實例檢查器。它只負責啟動檢查,具體的檢查內容,以及根據檢查結果所做的操作,都由檢查器完成。
// InstanceBeatCheckTask.java
// 當沒有被攔截的時候執行檢查
public void passIntercept() {
// 遍歷所有Checker
for (InstanceBeatChecker each : CHECKERS) {
// 交由Checker執行
each.doCheck(client, service, instancePublishInfo);
}
}
InstanceBeatCheckTask內部Checker的調用被放在了passIntercept()方法內部,表示無論攔截器怎么處理,最終都會執行。詳細處理流程請參考: UnhealthyInstanceChecker、ExpiredInstanceChecker。
開啟自動清理任務
在Nacos2.X版本中有一個用於自動清理任務的類EmptyServiceAutoCleanerV2,它在Nacos服務端啟動的時候開始執行首次延遲30秒,每隔60秒執行一次清理。用於清理空的服務。這里主要用於清理集中管理的所有Service和所有Client(關於ServiceManager、ClientServiceIndexesManager、ServiceStorage請參考《Service的存儲管理》)。
/**
* Empty service auto cleaner for v2.x.
* 空服務自動清理器
* @author xiweng.yy
*/
@Component
public class EmptyServiceAutoCleanerV2 extends AbstractNamingCleaner {
private static final String EMPTY_SERVICE = "emptyService";
// Client和Service索引管理
private final ClientServiceIndexesManager clientServiceIndexesManager;
// Service倉庫
private final ServiceStorage serviceStorage;
public EmptyServiceAutoCleanerV2(ClientServiceIndexesManager clientServiceIndexesManager, ServiceStorage serviceStorage) {
this.clientServiceIndexesManager = clientServiceIndexesManager;
this.serviceStorage = serviceStorage;
// 延遲30秒執行,每60秒清空一次空服務
GlobalExecutor.scheduleExpiredClientCleaner(this, TimeUnit.SECONDS.toMillis(30), GlobalConfig.getEmptyServiceCleanInterval(), TimeUnit.MILLISECONDS);
}
@Override
public String getType() {
return EMPTY_SERVICE;
}
@Override
public void doClean() {
// 獲取ServiceManager
ServiceManager serviceManager = ServiceManager.getInstance();
// 並行處理開啟閾值,當服務數量超過100的時候就使用多線程處理
// Parallel flow opening threshold
int parallelSize = 100;
// 處理多個Namespace下的Service
for (String each : serviceManager.getAllNamespaces()) {
Set<Service> services = serviceManager.getSingletons(each);
// 根據當前Namespace下的Service數量決定是否采用多線程處理
Stream<Service> stream = services.size() > parallelSize ? services.parallelStream() : services.stream();
// 對每個Service執行cleanEmptyService
stream.forEach(this::cleanEmptyService);
}
}
private void cleanEmptyService(Service service) {
// 獲取當前Service下所有的clientId
Collection<String> registeredService = clientServiceIndexesManager.getAllClientsRegisteredService(service);
// 若當前服務下的客戶端為空,或者當前服務距離最后一次更新時間超過60秒
if (registeredService.isEmpty() && isTimeExpired(service)) {
Loggers.SRV_LOG.warn("namespace : {}, [{}] services are automatically cleaned", service.getNamespace(), service.getGroupedServiceName());
// 移除Service和Client關聯信息
clientServiceIndexesManager.removePublisherIndexesByEmptyService(service);
// 移除指定Namespace下的Service服務
ServiceManager.getInstance().removeSingleton(service);
// 移除Service的詳細信息
serviceStorage.removeData(service);
// 發布Service過期事件
NotifyCenter.publishEvent(new MetadataEvent.ServiceMetadataEvent(service, true));
}
}
private boolean isTimeExpired(Service service) {
long currentTimeMillis = System.currentTimeMillis();
return currentTimeMillis - service.getLastUpdatedTime() >= GlobalConfig.getEmptyServiceExpiredTime();
}
}
除了Service的自動清除還有ServiceMetaData的過期清除任務ExpiredMetadataCleaner,感興趣的可以自行閱讀相關源碼。
