Nacos 2.0源碼分析-攔截器機制


溫馨提示:
本文內容基於個人學習Nacos 2.0.1版本代碼總結而來,因個人理解差異,不保證完全正確。如有理解錯誤之處歡迎各位拍磚指正,相互學習;轉載請注明出處。

Nacos服務端在處理健康檢查和心跳檢查任務的時候它是使用攔截器鏈來執行的。攔截器鏈內部有多個攔截器,通過獲取不同的攔截器鏈實例,在實例內部指定具體的攔截器類型來組成一組攔截器。這里使用了攔截器模式和模板模式來組織代碼。攔截器模式體現在整體攔截機制的實現;模板模式主要體現在對攔截器鏈的抽象實現上。

攔截器模式有三個要素

  • 攔截器
  • 調度者
  • 業務邏輯

攔截器

定義一個攔截器的基本功能,同時限定了傳入的攔截對象類型必須為Interceptable。這里只定義了基本的功能和基本的限定攔截對象。這里將其描述為基本的功能,那就意味着它的實現將會有更高級的功能。

/**
 * Nacos naming interceptor.
 * 攔截器對象
 * @author xiweng.yy
 */
public interface NacosNamingInterceptor<T extends Interceptable> {
    
    /**
     * Judge whether the input type is intercepted by this Interceptor.
     * 此攔截器的實例將會判斷傳入的對象是否是他需要處理的類型,此方法可以實現不同攔截器處理不同對象的隔離操作
     * <p>This method only should judge the object type whether need be do intercept. Not the intercept logic.
     * @param type type
     * @return true if the input type is intercepted by this Interceptor, otherwise false
     */
    boolean isInterceptType(Class<?> type);
    
    /**
     * Do intercept operation.
     * 執行攔截操作
     * <p>This method is the actual intercept operation.
     * @param object need intercepted object
     * @return true if object is intercepted, otherwise false
     */
    boolean intercept(T object);
    
    /**
     * The order of interceptor. The lower the number, the earlier the execution.
     * 攔截器排序,數字越低,優先級越高
     * @return the order number of interceptor
     */
    int order();
}

被攔截的對象

Interceptable 定義了對攔截操作相關的執行方法,passIntercept()在未被攔截的時候需要執行,afterIntercept()在被攔截之后需要執行。被攔截對象的業務邏輯需要由攔截器負責調度。

/**
 * Interceptable Interface.
 *
 * @author xiweng.yy
 */
public interface Interceptable {
    
    /**
     * If no {@link NacosNamingInterceptor} intercept this object, this method will be called to execute.
     */
    void passIntercept();
    
    /**
     * If one {@link NacosNamingInterceptor} intercept this object, this method will be called.
     */
    void afterIntercept();
}

調度者

調度者主要是用來管理攔截器的組織方式,觸發攔截器的攔截操作。下圖展示了Naming模塊的攔截器鏈的繼承關系。

整體的構成由NacosNamingInterceptorChain定義基本框架,AbstractNamingInterceptorChain實現通用邏輯,HealthCheckInterceptorChainInstanceBeatCheckTaskInterceptorChain則分別服務於健康檢查和心跳檢查。

NacosNamingInterceptorChain

定義了攔截器鏈對象應該具有的基本行為:添加攔截器、執行攔截器。

/**
 * Nacos naming interceptor chain.
 * Nacos Naming模塊的攔截器鏈接口,攔截器鏈用於存儲並管理多個攔截器
 * @author xiweng.yy
 */
public interface NacosNamingInterceptorChain<T extends Interceptable> {
	
    /**
     * Add interceptor.
     * 添加指定類型的攔截器對象
     * @param interceptor interceptor
     */
    void addInterceptor(NacosNamingInterceptor<T> interceptor);
    
    /**
     * Do intercept by added interceptors.
     * 執行攔截的業務操作
     * @param object be interceptor object
     */
    void doInterceptor(T object);
}

AbstractNamingInterceptorChain

AbstractNamingInterceptorChain實現了NacosNamingInterceptorChain所定義的對NacosNamingInterceptor的操作。在構造方法中提供了具體的攔截器實現類的加載,它這里使用了SPI方式加載。默認可以加載的攔截器必須是NacosNamingInterceptor的實例。在攔截器的執行方法doInterceptor()中會按優先級調用每一個攔截器,首先判斷被攔截的對象是否是此攔截器處理,接着調用攔截器的intercept()方法,成功后調用被攔截對象的afterIntercept()方法。若未攔截成功則調用被攔截對象的passIntercept()方法。因此在攔截器中的intercept()方法中可以定義攔截器對被攔截對象的處理邏輯,而被攔截對象則可以在afterIntercept()和passIntercept()方法中定義自身的處理邏輯。從而實現在攔截器中被處理和自身處理任務依賴於攔截器來觸發。

/**
 * Abstract Naming Interceptor Chain.
 * 抽象的命名服務攔截器鏈,用於定義攔截器鏈的工作流程
 * @author xiweng.yy
 */
public abstract class AbstractNamingInterceptorChain<T extends Interceptable> implements NacosNamingInterceptorChain<T> {
    
    // 存儲多個攔截器
    private final List<NacosNamingInterceptor<T>> interceptors;
    
	// 限制使用范圍為當前包或者其子類
    protected AbstractNamingInterceptorChain(Class<? extends NacosNamingInterceptor<T>> clazz) {
        this.interceptors = new LinkedList<>();
		// 使用SPI模式加載指定的攔截器類型
		// 而且NacosNamingInterceptor內部有判斷它需要攔截對象的類型,因此非常靈活
        interceptors.addAll(NacosServiceLoader.load(clazz));
		// 對攔截器的順序進行排序
        interceptors.sort(Comparator.comparingInt(NacosNamingInterceptor::order));
    }
    
    /**
     * Get all interceptors.
     *
     * @return interceptors list
     */
    protected List<NacosNamingInterceptor<T>> getInterceptors() {
        return interceptors;
    }
    
    @Override
    public void addInterceptor(NacosNamingInterceptor<T> interceptor) {
		// 若手動添加,則需要再次進行排序
        interceptors.add(interceptor);
        interceptors.sort(Comparator.comparingInt(NacosNamingInterceptor::order));
    }
    
    @Override
    public void doInterceptor(T object) {
		// 因為內部的攔截器已經排序過了,所以直接遍歷
		for (NacosNamingInterceptor<T> each : interceptors) {
			// 若當前攔截的對象不是當前攔截器所要處理的類型則調過
            if (!each.isInterceptType(object.getClass())) {
                continue;
            }
			// 執行攔截操作成功之后,繼續執行攔截后操作
            if (each.intercept(object)) {
                object.afterIntercept();
                return;
            }
        }
		// 未攔截的操作
        object.passIntercept();
    }
}

doInterceptor() 方法中使用當前攔截器鏈內部的所有攔截器對被攔截對象進行處理,並且組織了被攔截對象被攔截之后的方法調用流程。即:攔截之后執行被攔截對象的afterIntercept()方法,未攔截時執行passIntercept()方法。

HealthCheckInterceptorChain

健康檢查攔截器鏈負責加載AbstractHealthCheckInterceptor類型的攔截器。

/**
 * Health check interceptor chain.
 * @author xiweng.yy
 */
public class HealthCheckInterceptorChain extends AbstractNamingInterceptorChain<NacosHealthCheckTask> {

    private static final HealthCheckInterceptorChain INSTANCE = new HealthCheckInterceptorChain();

    private HealthCheckInterceptorChain() {
        super(AbstractHealthCheckInterceptor.class);
    }

    public static HealthCheckInterceptorChain getInstance() {
        return INSTANCE;
    }
}

InstanceBeatCheckTaskInterceptorChain

實例心跳檢查器鏈負責加載AbstractBeatCheckInterceptor類型的攔截器。

/**
 * Instance beat check interceptor chain.
 *
 * @author xiweng.yy
 */
public class InstanceBeatCheckTaskInterceptorChain extends AbstractNamingInterceptorChain<InstanceBeatCheckTask> {
    
    private static final InstanceBeatCheckTaskInterceptorChain INSTANCE = new InstanceBeatCheckTaskInterceptorChain();
    
    private InstanceBeatCheckTaskInterceptorChain() {
        super(AbstractBeatCheckInterceptor.class);
    }
    
    public static InstanceBeatCheckTaskInterceptorChain getInstance() {
        return INSTANCE;
    }
}

小結
通過模板模式來實現攔截器機制。

  • AbstractNamingInterceptorChain 抽象出連接器鏈對攔截器加載的通用方法,定義了攔截器對被攔截對象的通用處理流程。
  • AbstractHealthCheckInterceptor 定義了健康檢查攔截器被攔截的對象類型
  • AbstractBeatCheckInterceptor 定義了心跳檢查攔截器被攔截的對象類型

通過對攔截器鏈的組織方式梳理可以看到有明顯的兩條路線,一個是健康檢查,一個是心跳檢查。分析后續具體的攔截器,以及他們所要處理的任務就很清晰了。

業務邏輯

業務邏輯是被攔截器攔截之后需要進行的操作。

健康檢查類的被攔截對象

健康檢查的抽象攔截器AbstractHealthCheckInterceptor定義了它的子類將要處理的任務類型為NacosHealthCheckTask

HealthCheckTaskV2

/**
 * Health check task for v2.x.
 * v2版本的健康檢查
 * <p>Current health check logic is same as v1.x. TODO refactor health check for v2.x.
 *
 * @author nacos
 */
public class HealthCheckTaskV2 extends AbstractExecuteTask implements NacosHealthCheckTask {

    /**
     * 一個客戶端對象(此客戶端代表提供服務用於被應用訪問的客戶端)
     * 從這里可以看出,啟動一個健康檢查任務是以客戶端為維度的
     */
    private final IpPortBasedClient client;

    private final String taskId;

    private final SwitchDomain switchDomain;

    private final NamingMetadataManager metadataManager;

    private long checkRtNormalized = -1;
    /**
     * 檢查最佳響應時間
     */
    private long checkRtBest = -1;

    /**
     * 檢查最差響應時間
     */
    private long checkRtWorst = -1;

    /**
     * 檢查上次響應時間
     */
    private long checkRtLast = -1;

    /**
     * 檢查上上次響應時間
     */
    private long checkRtLastLast = -1;

    /**
     * 開始時間
     */
    private long startTime;

    /**
     * 任務是否取消
     */
    private volatile boolean cancelled = false;

    public HealthCheckTaskV2(IpPortBasedClient client) {
        this.client = client;
        this.taskId = client.getResponsibleId();
        this.switchDomain = ApplicationUtils.getBean(SwitchDomain.class);
        this.metadataManager = ApplicationUtils.getBean(NamingMetadataManager.class);
        // 初始化響應時間檢查
        initCheckRT();
    }

    /**
     * 初始化響應時間值
     */
    private void initCheckRT() {
        // first check time delay
        // 2000 + (在5000以內的隨機數)
        checkRtNormalized =
                2000 + RandomUtils.nextInt(0, RandomUtils.nextInt(0, switchDomain.getTcpHealthParams().getMax()));
        // 最佳響應時間
        checkRtBest = Long.MAX_VALUE;
        // 最差響應時間為0
        checkRtWorst = 0L;
    }

    public IpPortBasedClient getClient() {
        return client;
    }

    @Override
    public String getTaskId() {
        return taskId;
    }

    /**
     * 開始執行健康檢查任務
     */
    @Override
    public void doHealthCheck() {
        try {
            // 獲取當前傳入的Client所發布的所有Service
            for (Service each : client.getAllPublishedService()) {
                // 只有當Service開啟了健康檢查才執行
                if (switchDomain.isHealthCheckEnabled(each.getGroupedServiceName())) {
                    // 獲取Service對應的InstancePublishInfo
                    InstancePublishInfo instancePublishInfo = client.getInstancePublishInfo(each);
                    // 獲取集群元數據
                    ClusterMetadata metadata = getClusterMetadata(each, instancePublishInfo);
                    // 使用Processor代理對象對任務進行處理
                    ApplicationUtils.getBean(HealthCheckProcessorV2Delegate.class).process(this, each, metadata);
                    if (Loggers.EVT_LOG.isDebugEnabled()) {
                        Loggers.EVT_LOG.debug("[HEALTH-CHECK] schedule health check task: {}", client.getClientId());
                    }
                }
            }
        } catch (Throwable e) {
            Loggers.SRV_LOG.error("[HEALTH-CHECK] error while process health check for {}", client.getClientId(), e);
        } finally {
            // 若任務執行狀態為已取消,則再次啟動
            if (!cancelled) {
                HealthCheckReactor.scheduleCheck(this);
                // worst == 0 means never checked
                if (this.getCheckRtWorst() > 0) {
                    // TLog doesn't support float so we must convert it into long
                    long checkRtLastLast = getCheckRtLastLast();
                    this.setCheckRtLastLast(this.getCheckRtLast());
                    if (checkRtLastLast > 0) {
                        long diff = ((this.getCheckRtLast() - this.getCheckRtLastLast()) * 10000) / checkRtLastLast;
                        if (Loggers.CHECK_RT.isDebugEnabled()) {
                            Loggers.CHECK_RT.debug("{}->normalized: {}, worst: {}, best: {}, last: {}, diff: {}",
                                    client.getClientId(), this.getCheckRtNormalized(), this.getCheckRtWorst(),
                                    this.getCheckRtBest(), this.getCheckRtLast(), diff);
                        }
                    }
                }
            }
        }
    }

    @Override
    public void passIntercept() {
        doHealthCheck();
    }

    @Override
    public void afterIntercept() {
        // 若任務執行狀態為已取消,則再次啟動
        if (!cancelled) {
            HealthCheckReactor.scheduleCheck(this);
        }
    }

    @Override
    public void run() {
        // 調用健康檢查
        doHealthCheck();
    }

    /**
     * 獲取集群元數據
     * @param service               服務信息
     * @param instancePublishInfo   服務對應的ip等信息
     * @return
     */
    private ClusterMetadata getClusterMetadata(Service service, InstancePublishInfo instancePublishInfo) {
        Optional<ServiceMetadata> serviceMetadata = metadataManager.getServiceMetadata(service);
        if (!serviceMetadata.isPresent()) {
            return new ClusterMetadata();
        }
        String cluster = instancePublishInfo.getCluster();
        ClusterMetadata result = serviceMetadata.get().getClusters().get(cluster);
        return null == result ? new ClusterMetadata() : result;
    }

    public long getCheckRtNormalized() {
        return checkRtNormalized;
    }

    public long getCheckRtBest() {
        return checkRtBest;
    }

    public long getCheckRtWorst() {
        return checkRtWorst;
    }

    public void setCheckRtWorst(long checkRtWorst) {
        this.checkRtWorst = checkRtWorst;
    }

    public void setCheckRtBest(long checkRtBest) {
        this.checkRtBest = checkRtBest;
    }

    public void setCheckRtNormalized(long checkRtNormalized) {
        this.checkRtNormalized = checkRtNormalized;
    }

    public boolean isCancelled() {
        return cancelled;
    }

    public void setCancelled(boolean cancelled) {
        this.cancelled = cancelled;
    }

    public long getStartTime() {
        return startTime;
    }

    public void setStartTime(long startTime) {
        this.startTime = startTime;
    }

    public long getCheckRtLast() {
        return checkRtLast;
    }

    public void setCheckRtLast(long checkRtLast) {
        this.checkRtLast = checkRtLast;
    }

    public long getCheckRtLastLast() {
        return checkRtLastLast;
    }

    public void setCheckRtLastLast(long checkRtLastLast) {
        this.checkRtLastLast = checkRtLastLast;
    }
}

心跳檢查類的被攔截對象

ClientBeatCheckTaskV2

雖然它繼承了NacosHealthCheckTask,但內部只使用了InstanceBeatCheckTaskInterceptorChain,沒有使用HealthCheckInterceptorChain, 按理說應該划分到"心跳檢查類的被攔截對象" 這個類別的。不知道為何這樣設計,已提issues。

/**
 * Client beat check task of service for version 2.x.
 * @author nkorange
 */
public class ClientBeatCheckTaskV2 extends AbstractExecuteTask implements BeatCheckTask, NacosHealthCheckTask {

    private final IpPortBasedClient client;
 
    private final String taskId;
	
    /**
     * 使用攔截器鏈
     */
    private final InstanceBeatCheckTaskInterceptorChain interceptorChain;

    public ClientBeatCheckTaskV2(IpPortBasedClient client) {
        this.client = client;
        this.taskId = client.getResponsibleId();
        this.interceptorChain = InstanceBeatCheckTaskInterceptorChain.getInstance();
    }

    public GlobalConfig getGlobalConfig() {
        return ApplicationUtils.getBean(GlobalConfig.class);
    }

    @Override
    public String taskKey() {
        return KeyBuilder.buildServiceMetaKey(client.getClientId(), String.valueOf(client.isEphemeral()));
    }

    @Override
    public String getTaskId() {
        return taskId;
    }

    @Override
    public void doHealthCheck() {

        try {
            // 獲取所有的Service
            Collection<Service> services = client.getAllPublishedService();
            for (Service each : services) {
                logger.info("開始對Service進行攔截操作,{}", each.getName());
                // 獲取Service對應的InstancePublishInfo
                HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client.getInstancePublishInfo(each);
                // 創建一個InstanceBeatCheckTask,並交由攔截器鏈處理
                interceptorChain.doInterceptor(new InstanceBeatCheckTask(client, each, instance));
            }
            
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
    }

    @Override
    public void run() {
        doHealthCheck();
    }

    @Override
    public void passIntercept() {
        doHealthCheck();
    }

    @Override
    public void afterIntercept() {
    }
}

InstanceBeatCheckTask

/**
 * Instance beat check task.
 * Instance心跳檢查任務,此處它作為一個可被攔截器攔截的對象使用。
 * @author xiweng.yy
 */
public class InstanceBeatCheckTask implements Interceptable {
    
    // 心跳檢查者列表
    private static final List<InstanceBeatChecker> CHECKERS = new LinkedList<>();
    
    // 客戶端對象(因為實例就代表的是客戶端)
    private final IpPortBasedClient client;
    
    // 服務對象
    private final Service service;
    
    // 健康檢查信息
    private final HealthCheckInstancePublishInfo instancePublishInfo;
    
    static {
		// 添加不健康實例檢查器
        CHECKERS.add(new UnhealthyInstanceChecker());
		// 添加過期實例檢查器
        CHECKERS.add(new ExpiredInstanceChecker());
		// 添加用戶自定義的心跳檢查器
        CHECKERS.addAll(NacosServiceLoader.load(InstanceBeatChecker.class));
    }
    
    public InstanceBeatCheckTask(IpPortBasedClient client, Service service, HealthCheckInstancePublishInfo instancePublishInfo) {
        this.client = client;
        this.service = service;
        this.instancePublishInfo = instancePublishInfo;
    }
    
    @Override
    public void passIntercept() {
		// 未被攔截的時候執行自身邏輯
        for (InstanceBeatChecker each : CHECKERS) {
            each.doCheck(client, service, instancePublishInfo);
        }
    }
    
    @Override
    public void afterIntercept() {
    }
    
    public IpPortBasedClient getClient() {
        return client;
    }
    
    public Service getService() {
        return service;
    }
    
    public HealthCheckInstancePublishInfo getInstancePublishInfo() {
        return instancePublishInfo;
    }
}

總結

  • 攔截器鏈確定了要加載的攔截器類型
  • 攔截器確定了要攔截的對象類型
  • 被攔截的對象又建立了自己的檢查策略


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM