ThingsBoard 二次開發之源碼分析 4-啟動分析 3


thingsboard聚集地

Thingsboard 話題討論區:https://forum.iotschool.com/topics/node8

歡迎大家加入thingsboard 二次開發討論群:121202538

thingsboard交流QQ群 121202538

ThingsBoard源碼分析4-啟動分析3

以下的分析環境基於默認配置

1. Actor 模型

首先搬用官網關於Actor模型的介紹:

actor架構圖

  • The brief description of each actor’s functionality is listed below:

  • App Actor - responsible for management of tenant actors(負責管理租戶Actor). An instance of this actor is always present in memory.

  • Tenant Actor - responsible for management of tenant device & rule chain actors(負責管理租戶設備,規則鏈Actor). An instance of this actor is always present in memory.

  • Device Actor - maintain state of the device: active sessions, subscriptions, pending RPC commands(處理設備狀態,激活session,訂閱,RPC請求), etc. Caches current device attributes in memory for performance reasons. An actor is created when the first message from the device is processed. The actor is stopped when there is no messages from devices for a certain time.

  • Rule Chain Actor - process incoming messages and dispatches them to rule node actors(處理消息,並將消息分發給node Actor). An instance of this actor is always present in memory.

  • Rule Node Actor - process incoming messages, and report results back to rule chain actor(處理消息,並將處理的消息返回給規則鏈Actor). An instance of this actor is always present in memory.

2. Actor啟動分析

DefaultActorService簡析

圖片看不清楚的,請點擊這里:

DefaultActorService@PostConstruct標記的initActorSystem()開啟了actor的啟動流程:

public void initActorSystem() {
    log.info("Initializing actor system.");
    actorContext.setActorService(this);
    TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
    system = new DefaultTbActorSystem(settings);
    //創建不同actor執行的線程池
    system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
    system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
    system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
    system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));

    actorContext.setActorSystem(system);
    //創建AppActor詳細分析見①createActor分析
    appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));
    actorContext.setAppActor(appActor);
    //創建statsActor,暫時不關注
    TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));
    actorContext.setStatsActor(statsActor);

    log.info("Actor system initialized.");
}

DefaultTbActorSystem類中方法createActor分析

private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) {
    //獲取initActorSystem時對應的線程池
    Dispatcher dispatcher = dispatchers.get(dispatcherId);
    if (dispatcher == null) {
        log.warn("Dispatcher with id [{}] is not registered!", dispatcherId);
        throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!");
    }
    //creator有不同的實現如AppActor,TenantActor,RuleChainActor, RuleNodeActor等
    // AppActor, 則使用固定的NULL_UUID,UUID.fromString("13814000-1dd2-11b2-8080-808080808080")作為Id
    // TenantActor 使用creator提供的tenantId作為Id
    // RuleChainActor 使用creator提供的ruleChain作為Id
    // RuleNodeActor  使用creator提供的ruleNodeId作為Id
    TbActorId actorId = creator.createActorId();
    //緩存使用,將已經創建的放入緩存
    TbActorMailbox actorMailbox = actors.get(actorId);
    if (actorMailbox != null) {
        log.debug("Actor with id [{}] is already registered!", actorId);
    } else {
        Lock actorCreationLock = actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock());
        actorCreationLock.lock();
        try {
            actorMailbox = actors.get(actorId);
            if (actorMailbox == null) {
                log.debug("Creating actor with id [{}]!", actorId);
                //正式創建Actor,也就是調用構造函數
                TbActor actor = creator.createActor();
                //只允許AppActor的parent是空
                TbActorRef parentRef = null;
                if (parent != null) {
                    parentRef = getActor(parent);
                    if (parentRef == null) {
                        throw new TbActorNotRegisteredException(parent, "Parent Actor with id [" + parent + "] is not registered!");
                    }
                }
                //使用actor創建TbActorMailbox對象
                TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher);
                //放入緩存
                actors.put(actorId, mailbox);
                //用各自的線程池去進行初始化操作,具體的初始化操作由每種actor的init方法實現詳細分析如下②③④
                mailbox.initActor();
                actorMailbox = mailbox;
                if (parent != null) {
                    //緩存,父子關系緩存
                    parentChildMap.computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet()).add(actorId);
                }
            } else {
                log.debug("Actor with id [{}] is already registered!", actorId);
            }
        } finally {
            actorCreationLock.unlock();
            actorCreationLocks.remove(actorId);
        }
    }
    return actorMailbox;
}
  • TenantActorinit方法:

    如果此應用不是以某一個tennant啟動的話或者isolatedTbRuleEngine是false,調用父類RuleChainManagerActorinitRuleChains()方法查詢該租戶下的rule_chain表。開始創建RuleChainActor, 創建RuleChainActor的parent是當前TenantActor,創建的時候,又會調用到DefaultTbActorSystemcreateActor方法;

  • RuleChainActor實現了ComponentActorComponentActorinit方法調用了createProcessor(TbActorCtx ctx)(由子類實現,創建了RuleChainActorMessageProcessor實例)和initProcessor(TbActorCtx ctx)(調用processor.start也即RuleChainActorMessageProcessorstart方法,該方法查詢查詢relation表,條件為RULE_CHAIN和ruleChainId,再根據to_id去rule_node表獲取RuleNode;1. 根據ruleNodeList開始初創建RuleNodeActor2.initRoutes()查詢relation,rule_noderule_chain表,構造nodeRoutes的數據,並設置firstRuleNode);

  • RuleNodeActor實現了ComponentActorComponentActorinit方法調用了createProcessor(TbActorCtx ctx)(由子類實現,創建了RuleNodeActorMessageProcessor實例)和initProcessor(TbActorCtx ctx), (調用processor.start也即RuleNodeActorMessageProcessorstart方法,使用反射創建ruleNode的實例,並按照數據庫configuration的配置,調用init方法);

DefaultActorService@EventListener(ApplicationReadyEvent.class)標記的onApplicationEvent開始進一步初始化流程:

調用appActor.tellWithHighPriority(new AppInitMsg());AppInitMsg推入AppActor的消息隊列里,然后嘗試處理這一消息,最終會交給AppActordoProcess方法進行處理,由於還未ruleChainsInitialized從未進行復制,此時AppActor開始初始化TennatActor,查詢tenant表對所有tenant創建TenantActor;

總結

  1. initActorSystem創建AppActorStatsActor兩個Actor,並未創建其他的Actor;

  2. 在收到ApplicationReadyEvent的時候,由於之前沒有初始化,所以會初始化該應用的所有TenantActor, 根據②③④點分析,循環切遞歸的創建了所有的RuleChainActor, RuleNodeActor;

  3. 關於TbActorMailbox的理解:mailbox理解為一個信箱,里面有一些信件(即入隊列的消息),這些信件有一些是高優先級,有些是普通優先級的,每次取信件的時候,都先看有沒高優先級的,先處理高優先級的信件,再處理普通優先級的信件。那么信件處理人是誰呢?信件的處理人就是每一個Actor.實際上的信件處理方法都是每一個Actor的doProcess(TbActorMsg msg)方法

  4. 我們討論的每一個RuleNodeActor就是在前端RULE CHAINS界面看到的每一個節點。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM