Nacos2.X源碼閱讀總結


前言

Nacos是一個Alibaba出品的高性能微服務時代產出的組件,集注冊和配置中心為一體。那么Nacos為什么這么高性能呢?總結以下幾點;

1:基於阿里自研的distro協議進行Nacos把不同節點的數據同步

2:大量使用線程池和異步的方式提高API的響應速度

3:2.X使用grpc長連接的方式取代了1.X需要一直發送心跳包導出服務器CPU占用較高的問題

同時2.X也對1.X做了重大的升級,無論是從架構層面還是代碼層面都做了重大的升級,有條件升級為2.X的同學建議客戶端可服務端一起升級,這樣才能更大限度的發揮出2.X架構的優勢。1.X和2.X的對比 如下:

1.X 2.X
連接方式 Http短連接 GRpc、Http短連接(兼容1.X)
推送方式 UDP GRpc
健康檢測方式 Http短連接定時心跳包 Grpc長連接(輕量級心跳包)

關於Nacos1.X和2.X的性能對比請參考:Nacos 2.0 升級前后性能對比壓測-阿里雲開發者社區 (aliyun.com)

這里借用一下阿里雲社區的Nacos架構圖:

5.png

下面我們就基於Nacos2.0.4的代碼層面分析一下為什么Nacos源碼,看之前最好有以下基礎,設計模式(模板,委托,代理,單例,工廠,策略)、異步編程方式,grpc。

啟動

首先我們先看一下Nacos的結構圖:Nacos通過Namespace(命名空間)進行環境的隔離,然后我們可以把根據服務之間的關聯性來把不同的服務划分到不同的組(Group)之間,每一個組之間可以有多個服務(Service),同時為了容災,我們可以把一個服務划分為不同的集群(Cluster)部署在不同的地區或機房,每一個具體的集群下就是我們一個個實例(Instance)了,也就是我們開發的微服務項目。

由於Nacos 中很多都是用異步方式來處理的,所以我們很多的時候不能直接采用流程的方式來閱讀代碼,閱讀的時候會來回的跳轉,異步事件的方式編程相對來說復雜了很多,首先我們先看一下Nacos的啟動過程,后續我貼代碼的時候只貼關鍵代碼,其他就略去了,后續不在重復。

下面看一下處理請求的事件和監聽的邏輯

該類com.alibaba.nacos.core.remote.RequestHandlerRegistry監聽了ContextRefreshedEvent事件,那么SpringBoot啟動之后就是自動執行我們需要處理的邏輯。

/**
 * RequestHandlerRegistry.
 * 當Spring初始化完成之后,加載com.alibaba.nacos.core.remote.RequestHandler,注冊為事件監聽器
 *
 * @author liuzunfei
 * @version $Id: RequestHandlerRegistry.java, v 0.1 2020年07月13日 8:24 PM liuzunfei Exp $
 */
@Service
public class RequestHandlerRegistry implements ApplicationListener<ContextRefreshedEvent> {

    /**
     * 請求處理器集合
     */
    Map<String, RequestHandler> registryHandlers = new HashMap<String, RequestHandler>();

    @Autowired
    private TpsMonitorManager tpsMonitorManager;

    /**
     * Get Request Handler By request Type.
     *
     * @param requestType see definitions  of sub constants classes of RequestTypeConstants
     * @return request handler.
     */
    public RequestHandler getByRequestType(String requestType) {
        return registryHandlers.get(requestType);
    }

    /**
     * 此監聽器的主要作用就是加載com.alibaba.nacos.core.remote.RequestHandler的子類到registryHandlers,
     * 后續做請求處理使用,可以看做是策略模式的一個體現
     *
     * @param event event
     */
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class);
        Collection<RequestHandler> values = beansOfType.values();
        for (RequestHandler requestHandler : values) {

            Class<?> clazz = requestHandler.getClass();
            boolean skip = false;
            while (!clazz.getSuperclass().equals(RequestHandler.class)) {
                if (clazz.getSuperclass().equals(Object.class)) {
                    skip = true;
                    break;
                }
                clazz = clazz.getSuperclass();
            }
            if (skip) {
                continue;
            }

            try {
                Method method = clazz.getMethod("handle", Request.class, RequestMeta.class);
                //需要TPS監控的類加入到tpsMonitorManager集合中
                if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) {
                    TpsControl tpsControl = method.getAnnotation(TpsControl.class);
                    String pointName = tpsControl.pointName();
                    TpsMonitorPoint tpsMonitorPoint = new TpsMonitorPoint(pointName);
                    tpsMonitorManager.registerTpsControlPoint(tpsMonitorPoint);
                }
            } catch (Exception e) {
                //ignore.
            }
            Class tClass = (Class) ((ParameterizedType) clazz.getGenericSuperclass()).getActualTypeArguments()[0];
            //添加處理器到集合中
            registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler);
        }
    }
}

我們可以看到com.alibaba.nacos.core.remote.RequestHandler的實現類有好多,我們大致可以從名稱就可以看出每一類的作用

com.alibaba.nacos.core.remote.RequestHandler子類名稱 作用
com.alibaba.nacos.config.server.remote.ConfigChangeBatchListenRequestHandler 節點之間配置互相同步的處理器
com.alibaba.nacos.config.server.remote.ConfigChangeBatchListenRequestHandler 配置改變監聽處理器
com.alibaba.nacos.config.server.remote.ConfigPublishRequestHandler 配置發布監聽處理器
com.alibaba.nacos.config.server.remote.ConfigQueryRequestHandler 配置查詢請求處理器
com.alibaba.nacos.config.server.remote.ConfigRemoveRequestHandler 配置移除請求處理器
com.alibaba.nacos.naming.remote.rpc.handler.DistroDataRequestHandler distro一致性服務處理器(節點點同步數據)
com.alibaba.nacos.core.remote.HealthCheckRequestHandler 健康檢查處理器
com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler 實例注冊,移除處理器
com.alibaba.nacos.core.remote.core.ServerLoaderInfoRequestHandler 服務信息加載處理器
com.alibaba.nacos.naming.remote.rpc.handler.ServiceListRequestHandler 服務列表請求處理器
com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler 服務查詢處理器

Service的注冊流程

io.grpc.stub.StreamObserver#onNext中啟動了一個Acceptor,用來監聽客戶端的GRpc連接,當有客戶端連接的時候,就會通過connectionManager.register(connectionId, connection)注冊實例,然后會通過客戶端注冊連接器發布連接事件clientConnectionEventListenerRegistry.notifyClientConnected(connection);然后就會由監聽事件實現具體的建立連接的邏輯,建立完成之后才會進行注冊邏輯的執行。

com.alibaba.nacos.core.remote.ClientConnectionEventListenerRegistry:客戶端連接Naocs事件注冊器
    目前已知的注冊器都繼承了com.alibaba.nacos.core.remote.ClientConnectionEventListener
    //代碼目前為空,可能是為以后擴展使用
    com.alibaba.nacos.config.server.remote.ConfigConnectionEventListener
    //用來管理客戶端的連接,可以進行連接,斷開連接,驗證連接是否有效等操作,其內部有一個線程池定時清除無效的連接
    com.alibaba.nacos.naming.core.v2.client.manager.impl.ConnectionBasedClientManager
    //grpc回調初始化以及清理監聽器
    com.alibaba.nacos.core.remote.core.RpcAckCallbackInitorOrCleaner

下面我們就使用com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler服務注冊請求分析一下該類的處理流程。

//定義請求的處理流程
public abstract class RequestHandler<T extends Request, S extends Response> {
    @Autowired
    private RequestFilters requestFilters;

    public Response handleRequest(T request, RequestMeta meta) throws NacosException {
        for (AbstractRequestFilter filter : requestFilters.filters) {
            try {
                Response filterResult = filter.filter(request, meta, this.getClass());
                if (filterResult != null && !filterResult.isSuccess()) {
                    return filterResult;
                }
            } catch (Throwable throwable) {
                Loggers.REMOTE.error("filter error", throwable);
            }
            
        }
        return handle(request, meta);
    }
    
    public abstract S handle(T request, RequestMeta meta) throws NacosException;
}
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {

    private final EphemeralClientOperationServiceImpl clientOperationService;

    public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
        this.clientOperationService = clientOperationService;
    }

    @Override
    @Secured(action = ActionTypes.WRITE, parser = NamingResourceParser.class)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        switch (request.getType()) {
            case NamingRemoteConstants.REGISTER_INSTANCE:
                return registerInstance(service, request, meta);
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM,
                        String.format("Unsupported request type %s", request.getType()));
        }
    }

    /**
     * 委托給com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler#clientOperationService來進行實例注冊
     *
     * @param service service
     * @param request request
     * @param meta    meta
     * @return com.alibaba.nacos.api.naming.remote.response.InstanceResponse
     */
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }

    private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
        clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
        return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
    }

}

可以看到InstanceRequestHandler繼承了RequestHandler,父類在handleRequest定義好了請求的處理流程,最后具體的處理邏輯交給子類去實現,這就是一個典型模板設計模式的實現,可以看到子類根據request.getType()又把具體的處理成分為了注冊實例和取消注冊實例,然后又委托給了com.alibaba.nacos.naming.core.v2.service.impl.EphemeralClientOperationServiceImpl去處理具體的注冊實例和取消注冊實例的邏輯。

我們都知道Nacos的實例分為了Ephemeral和Persistent兩種實例,而默認的都是Ephemeral,這里直接注冊為EphemeralClientOperationServiceImpl的Bean而不是采用ClientOperationServiceProxy代理的方式,因為是Persistent的實例是的處理邏輯不在這里。

走了這么多步驟,終於到了注冊實例的真正流程了

    /**
     * 注冊實例
     *
     * @param service  service
     * @param instance instance
     * @param clientId connectionId
     */
    @Override
    public void registerInstance(Service service, Instance instance, String clientId) {
        //獲取服務,如果如果已存在的話,替換掉舊的Service(namespace,group,name)
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        if (!singleton.isEphemeral()) {
            throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                    String.format("Current service %s is persistent service, can't register ephemeral instance.",
                            singleton.getGroupedServiceName()));
        }
        //獲取client
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }                   
        //創建一個實例
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        //把Service和instanceInfo緩存到連接的客戶端里面,然后發布客戶端變更事件
        client.addServiceInstance(singleton, instanceInfo);
        //更新最后更新時間
        client.setLastUpdatedTime();
        //發布注冊服務事件
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        //發布元數據更新事件(matadataId=>ip:port:clusterName)
        NotifyCenter
                .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }

可以看到注冊流程里面分別有獲取並替換舊服務,如果不存在的話就創建一個新的,然后根據ClientId獲取對應的Client,然后根據創建一個InstanceInfo,添加Service和InstanceInfo到ClientManager里面,最后發布了兩個事件。然后呢?這就完了?數據存到哪里了?注冊哪里去了?剛開始,我也是帶着這一系列的疑問,不知道數據存到哪里去了,后面通過根據控制台界面請求的接口/nacos/v1/ns/catalog/services發現該接口的數據都是從一個叫做ServiceStorage的里面讀過來的,然后通過答案找問題的思路找到了在發布事件之后的一系列操作之后存在執行引擎中進行了數據存儲操作。

Nacos數據存儲

ServiceStorage.java

/**
 * Service storage.
 *
 * @author xiweng.yy
 */
@Component
public class ServiceStorage {

    /**
     * 客戶單連接注冊服務索引關注
     */
    private final ClientServiceIndexesManager serviceIndexesManager;

    private final ClientManager clientManager;

    private final SwitchDomain switchDomain;

    private final NamingMetadataManager metadataManager;

    /**
     * 服務信息
     */
    private final ConcurrentMap<Service, ServiceInfo> serviceDataIndexes;

    /**
     * 集群索引管理    key:value=>Service:Set(ClusterName)
     */
    private final ConcurrentMap<Service, Set<String>> serviceClusterIndex;

    public ServiceStorage(ClientServiceIndexesManager serviceIndexesManager, ClientManagerDelegate clientManager,
                          SwitchDomain switchDomain, NamingMetadataManager metadataManager) {
        this.serviceIndexesManager = serviceIndexesManager;
        this.clientManager = clientManager;
        this.switchDomain = switchDomain;
        this.metadataManager = metadataManager;
        this.serviceDataIndexes = new ConcurrentHashMap<>();
        this.serviceClusterIndex = new ConcurrentHashMap<>();
    }

    /**
     * 獲取當前服務下的集群信息
     *
     * @param service service
     * @return java.util.Set
     */
    public Set<String> getClusters(Service service) {
        return serviceClusterIndex.getOrDefault(service, new HashSet<>());
    }

    /**
     * 獲取服務的數據信息
     *
     * @param service service
     * @return com.alibaba.nacos.api.naming.pojo.ServiceInfo
     */
    public ServiceInfo getData(Service service) {
        return serviceDataIndexes.containsKey(service) ? serviceDataIndexes.get(service) : getPushData(service);
    }

    /**
     * 若com.alibaba.nacos.naming.core.v2.ServiceManager不存在,則直接返回,已存在的話更新當前Service下的Cluster和Instance信息
     *
     * @param service service
     * @return com.alibaba.nacos.api.naming.pojo.ServiceInfo
     */
    public ServiceInfo getPushData(Service service) {
        ServiceInfo result = emptyServiceInfo(service);
        //ServiceManager不包含直接返回,否則更新Service
        if (!ServiceManager.getInstance().containSingleton(service)) {
            return result;
        }
        //更新Service下的集群新信息
        result.setHosts(getAllInstancesFromIndex(service));
        //更新服務下的實例信息
        serviceDataIndexes.put(service, result);
        return result;
    }

    public void removeData(Service service) {
        serviceDataIndexes.remove(service);
        serviceClusterIndex.remove(service);
    }

    private ServiceInfo emptyServiceInfo(Service service) {
        ServiceInfo result = new ServiceInfo();
        result.setName(service.getName());
        result.setGroupName(service.getGroup());
        result.setLastRefTime(System.currentTimeMillis());
        result.setCacheMillis(switchDomain.getDefaultPushCacheMillis());
        return result;
    }

    /**
     * 獲取當前Service下的所有的Instance信息,並更新當前Service下的集群信息
     *
     * @param service service
     * @return java.util.List
     */
    private List<Instance> getAllInstancesFromIndex(Service service) {
        Set<Instance> result = new HashSet<>();
        Set<String> clusters = new HashSet<>();
        for (String each : serviceIndexesManager.getAllClientsRegisteredService(service)) {
            Optional<InstancePublishInfo> instancePublishInfo = getInstanceInfo(each, service);
            if (instancePublishInfo.isPresent()) {
                //獲取實例並更新實例的元數據信息
                Instance instance = parseInstance(service, instancePublishInfo.get());
                result.add(instance);
                clusters.add(instance.getClusterName());
            }
        }
        // cache clusters of this service
        serviceClusterIndex.put(service, clusters);
        return new LinkedList<>(result);
    }

    private Optional<InstancePublishInfo> getInstanceInfo(String clientId, Service service) {
        Client client = clientManager.getClient(clientId);
        if (null == client) {
            return Optional.empty();
        }
        return Optional.ofNullable(client.getInstancePublishInfo(service));
    }

    private Instance parseInstance(Service service, InstancePublishInfo instanceInfo) {
        Instance result = InstanceUtil.parseToApiInstance(service, instanceInfo);
        Optional<InstanceMetadata> metadata = metadataManager
                .getInstanceMetadata(service, instanceInfo.getMetadataId());
        metadata.ifPresent(instanceMetadata -> InstanceUtil.updateInstanceMetadata(result, instanceMetadata));
        return result;
    }
}

可以看到,其中最重要的是getData方法和getPushData方法,而getData方法又是調用的getPushData方法,getPushData在更新服務下的Service方法的時候調用getAllInstancesFromIndex獲取並更新當前Service下的所有的集群信息,這樣Service下的所有信息都緩存到ServiceStorage里面了。

Nacos注冊相關事件解析

由於Nacos的事件分為了常規事件和慢事件,權限定類名分別為com.alibaba.nacos.common.notify.Event和com.alibaba.nacos.common.notify.SlowEvent,訂閱者和發布者也分為了多事件發布者(訂閱者)和單事件發布者(訂閱者),通知中心為com.alibaba.nacos.common.notify.NotifyCenter,這里不在詳細闡述.這里只是簡單的介紹一下根實例注冊有關的事件類型以及什么時候會觸發和誰監聽了這個事件,詳情見下表。

事件全稱 事件作用 觸發時機 監聽者
com.alibaba.nacos.naming.core. v2.event.client. ClientOperationEvent. ClientRegisterServiceEvent 客戶端注冊實例事件 1:客戶端主動發起請求注冊實例的時候 2:一致性協議主動通知更新客戶端狀態 com.alibaba.nacos.naming.core.v2.index. ClientServiceIndexesManager #handleClientOperation
com.alibaba.nacos.naming.core.v2. event.service.ServiceEvent. ServiceChangedEvent 實例變更事件 1:客戶端注冊實例的時候 2客戶端移除已注冊的實例的時候3:客戶端更新實例元數據的時候 4:客戶端心跳處理(只有當實例處於不健康狀態下才發布此事件)5:不健康實例檢測 1:com.alibaba.nacos.naming.core.v2. upgrade.doublewrite.delay. DoubleWriteEventListene r#onEvent;2:com.alibaba.nacos.naming.push. v2.NamingSubscriberServiceV2Impl#onEvent

Nacos執行引擎

前面的事件的出發之后,經過一系列的邏輯之后最終會走到執行引擎這里,執行引擎來執行任務,這里的執行引擎(雙寫和延遲推送任務)涉及到兩個分別是com.alibaba.nacos.naming.push.v2.task.PushDelayTaskExecuteEngine、com.alibaba.nacos.naming.core.v2.upgrade.doublewrite.delay.DoubleWriteDelayTaskEngine

先看一下執行引擎的類圖,可以發現這兩個執行引擎都是繼承了com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine他們的父類是一模一樣的,只是又重新定義了自己的執行邏輯。

雙寫執行引擎

@Component
public class DoubleWriteDelayTaskEngine extends NacosDelayTaskExecuteEngine {
    
    public DoubleWriteDelayTaskEngine() {
        //執行引擎名稱和日志打印器
        super(DoubleWriteDelayTaskEngine.class.getSimpleName(), Loggers.SRV_LOG);
        //添加v1版本的任務處理器
        addProcessor("v1", new ServiceChangeV1Task.ServiceChangeV1TaskProcessor());
        //添加v2版本的任務處理器
        addProcessor("v2", new ServiceChangeV2Task.ServiceChangeV2TaskProcessor());
    }
    
    @Override
    public NacosTaskProcessor getProcessor(Object key) {
        String actualKey = key.toString().split(":")[0];
        return super.getProcessor(actualKey);
    }
}

根據構造函數可以看出雙寫執行引擎分別添加了v1和v2兩個任務處理器,目的就是保證版本的平滑升級,當我們的集群已經升級且處於穩定狀態的時候就可以關閉雙寫了,這點在Nacos的升級文檔中也有提及(Nacos 2.0 升級文檔)。

public class PushDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine {
    /**
     * 客戶端管理
     */
    private final ClientManager clientManager;
    /**
     * 客戶端服務管理器
     */
    private final ClientServiceIndexesManager indexesManager;
    /**
     * 數據存儲
     */
    private final ServiceStorage serviceStorage;
    /**
     * 元數據管理
     */
    private final NamingMetadataManager metadataManager;
    /**
     * 執行器
     */
    private final PushExecutor pushExecutor;

    private final SwitchDomain switchDomain;

    public PushDelayTaskExecuteEngine(ClientManager clientManager, ClientServiceIndexesManager indexesManager,
                                      ServiceStorage serviceStorage, NamingMetadataManager metadataManager,
                                      PushExecutor pushExecutor, SwitchDomain switchDomain) {
        super(PushDelayTaskExecuteEngine.class.getSimpleName(), Loggers.PUSH);
        this.clientManager = clientManager;
        this.indexesManager = indexesManager;
        this.serviceStorage = serviceStorage;
        this.metadataManager = metadataManager;
        this.pushExecutor = pushExecutor;
        this.switchDomain = switchDomain;
        //自定義默認的任務處理器
        setDefaultTaskProcessor(new PushDelayTaskProcessor(this));
    }

    public ClientManager getClientManager() {
        return clientManager;
    }

    public ClientServiceIndexesManager getIndexesManager() {
        return indexesManager;
    }

    public ServiceStorage getServiceStorage() {
        return serviceStorage;
    }

    public NamingMetadataManager getMetadataManager() {
        return metadataManager;
    }

    public PushExecutor getPushExecutor() {
        return pushExecutor;
    }

    @Override
    protected void processTasks() {
        if (!switchDomain.isPushEnabled()) {
            return;
        }
        super.processTasks();
    }

    /**
     * 自定義默認的處理器
     */
    private static class PushDelayTaskProcessor implements NacosTaskProcessor {

        private final PushDelayTaskExecuteEngine executeEngine;

        public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {
            this.executeEngine = executeEngine;
        }

        @Override
        public boolean process(NacosTask task) {
            PushDelayTask pushDelayTask = (PushDelayTask) task;
            Service service = pushDelayTask.getService();
            //任務分派
            NamingExecuteTaskDispatcher.getInstance()
                    .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
            return true;
        }
    }
}

Nacos任務dispatcher

public class NamingExecuteTaskDispatcher {

    private static final NamingExecuteTaskDispatcher INSTANCE = new NamingExecuteTaskDispatcher();

    private final NacosExecuteTaskExecuteEngine executeEngine;

    private NamingExecuteTaskDispatcher() {
        //Nacos任務執行引擎
        executeEngine = new NacosExecuteTaskExecuteEngine(EnvUtil.FUNCTION_MODE_NAMING, Loggers.SRV_LOG);
    }

    public static NamingExecuteTaskDispatcher getInstance() {
        return INSTANCE;
    }

    /**
     * 執行引擎中添加任務
     *
     * @param dispatchTag 根據dispatchTag決定把任務分配給誰執行
     * @param task        任務
     */
    public void dispatchAndExecuteTask(Object dispatchTag, AbstractExecuteTask task) {
        executeEngine.addTask(dispatchTag, task);
    }

    public String workersStatus() {
        return executeEngine.workersStatus();
    }
}

可以看到這里又把任務添加到了Nacos的任務隊列中,統一交給了Nacos任務執行引擎來執行任務。

public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {
    
    private final TaskExecuteWorker[] executeWorkers;
    
    public NacosExecuteTaskExecuteEngine(String name, Logger logger) {
        this(name, logger, ThreadUtils.getSuitableThreadCount(1));
    }
    
    public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {
        super(logger);
        executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];
        for (int mod = 0; mod < dispatchWorkerCount; ++mod) {
            executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());
        }
    }
    
    @Override
    public int size() {
        int result = 0;
        for (TaskExecuteWorker each : executeWorkers) {
            result += each.pendingTaskCount();
        }
        return result;
    }
    
    @Override
    public boolean isEmpty() {
        return 0 == size();
    }
    
    @Override
    public void addTask(Object tag, AbstractExecuteTask task) {
        NacosTaskProcessor processor = getProcessor(tag);
        if (null != processor) {
            processor.process(task);
            return;
        }
        TaskExecuteWorker worker = getWorker(tag);
        worker.process(task);
    }
    
    private TaskExecuteWorker getWorker(Object tag) {
        int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();
        return executeWorkers[idx];
    }
    
    private int workersCount() {
        return executeWorkers.length;
    }
    
    @Override
    public AbstractExecuteTask removeTask(Object key) {
        throw new UnsupportedOperationException("ExecuteTaskEngine do not support remove task");
    }
    
    @Override
    public Collection<Object> getAllTaskKeys() {
        throw new UnsupportedOperationException("ExecuteTaskEngine do not support get all task keys");
    }
    
    @Override
    public void shutdown() throws NacosException {
        for (TaskExecuteWorker each : executeWorkers) {
            each.shutdown();
        }
    }
    
    /**
     * Get workers status.
     *
     * @return workers status string
     */
    public String workersStatus() {
        StringBuilder sb = new StringBuilder();
        for (TaskExecuteWorker worker : executeWorkers) {
            sb.append(worker.status()).append('\n');
        }
        return sb.toString();
    }
}

NacosExecuteTaskExecuteEngine.java

public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {

    private final TaskExecuteWorker[] executeWorkers;

    public NacosExecuteTaskExecuteEngine(String name, Logger logger) {
        //根據計算機的CPU合數計算線程數,大於等於CPU核數*threadMultiple的最小的pow(2,n)的正整數
        this(name, logger, ThreadUtils.getSuitableThreadCount(1));
    }

    public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {
        super(logger);
        executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];
        for (int mod = 0; mod < dispatchWorkerCount; ++mod) {
            executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());
        }
    }

    @Override
    public int size() {
        int result = 0;
        for (TaskExecuteWorker each : executeWorkers) {
            result += each.pendingTaskCount();
        }
        return result;
    }

    @Override
    public boolean isEmpty() {
        return 0 == size();
    }

    @Override
    public void addTask(Object tag, AbstractExecuteTask task) {
        NacosTaskProcessor processor = getProcessor(tag);
        //如果有自定義的處理器的話,則使用自定義處理器執行任務
        if (null != processor) {
            processor.process(task);
            return;
        }
        //獲取執行任務的worker
        TaskExecuteWorker worker = getWorker(tag);
        //執行任務
        worker.process(task);
    }

    /**
     * 根據tag判斷哪一個worker來執行任務
     *
     * @param tag tag
     * @return worker
     */
    private TaskExecuteWorker getWorker(Object tag) {
        //保證得到的idx為0~workersCount的數
        int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();
        return executeWorkers[idx];
    }

    private int workersCount() {
        return executeWorkers.length;
    }

    @Override
    public AbstractExecuteTask removeTask(Object key) {
        throw new UnsupportedOperationException("ExecuteTaskEngine do not support remove task");
    }

    @Override
    public Collection<Object> getAllTaskKeys() {
        throw new UnsupportedOperationException("ExecuteTaskEngine do not support get all task keys");
    }

    @Override
    public void shutdown() throws NacosException {
        for (TaskExecuteWorker each : executeWorkers) {
            each.shutdown();
        }
    }

    /**
     * Get workers status.
     *
     * @return workers status string
     */
    public String workersStatus() {
        StringBuilder sb = new StringBuilder();
        for (TaskExecuteWorker worker : executeWorkers) {
            sb.append(worker.status()).append('\n');
        }
        return sb.toString();
    }
}

可以看到該類的主要作用就是根據CPU的核數計算線程數,然后獲取對應的worker執行任務,在addTask方法中的獲取分派worker,最后執行任務worker.process(task);執行任務的時候我們自然而然可以想到里面是一個線程,然后死循環從阻塞隊列中獲取任務執行任務。

TaskExecuteWorker.java

public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {

    /**
     * Max task queue size 32768.
     */
    private static final int QUEUE_CAPACITY = 1 << 15;

    private final Logger log;

    private final String name;
    /**
     * 阻塞隊列
     */
    private final BlockingQueue<Runnable> queue;

    private final AtomicBoolean closed;

    public TaskExecuteWorker(final String name, final int mod, final int total) {
        this(name, mod, total, null);
    }

    public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {
        this.name = name + "_" + mod + "%" + total;
        this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);
        this.closed = new AtomicBoolean(false);
        this.log = null == logger ? LoggerFactory.getLogger(TaskExecuteWorker.class) : logger;
        //開啟線程
        new InnerWorker(name).start();
    }

    public String getName() {
        return name;
    }

    /**
     * 任務放入隊列
     *
     * @param task task
     * @return 是否放入阻塞隊列成功
     */
    @Override
    public boolean process(NacosTask task) {
        if (task instanceof AbstractExecuteTask) {
            putTask((Runnable) task);
        }
        return true;
    }

    private void putTask(Runnable task) {
        try {
            queue.put(task);
        } catch (InterruptedException ire) {
            log.error(ire.toString(), ire);
        }
    }

    public int pendingTaskCount() {
        return queue.size();
    }

    /**
     * Worker status.
     */
    public String status() {
        return name + ", pending tasks: " + pendingTaskCount();
    }

    @Override
    public void shutdown() throws NacosException {
        queue.clear();
        closed.compareAndSet(false, true);
    }

    /**
     * 任務執行器,一直循環從阻塞隊列獲取任務然后執行
     */
    private class InnerWorker extends Thread {

        InnerWorker(String name) {
            setDaemon(false);
            setName(name);
        }

        @Override
        public void run() {
            while (!closed.get()) {
                try {
                    Runnable task = queue.take();
                    long begin = System.currentTimeMillis();
                    task.run();
                    long duration = System.currentTimeMillis() - begin;
                    if (duration > 1000L) {
                        log.warn("task {} takes {}ms", task, duration);
                    }
                } catch (Throwable e) {
                    log.error("[TASK-FAILED] " + e.toString(), e);
                }
            }
        }
    }
}

總結

本篇文章描述了Nacos1.X和2.X的區別以及2.X相對於1.X的優勢,並且從源碼的方面解析了為什么Nacos2.X的優勢。通過本篇章可以學習到Nacos的設計,並且對我們自己碼代碼的時候也是很有幫助的,可以看到里面用到了很多的設計模式(模板,觀察者,委托,代理,單例,工廠,策略),其中也通過事件(觀察者模式)來解耦,使用異步編程方式來盡可能的提高程序的相應,相信仔細閱讀源碼之后對我們以后代碼的設計和多線程編程都會有很大的提升。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM