thingsboard聚集地
Thingsboard 話題討論區:https://forum.iotschool.com/topics/node8
歡迎大家加入thingsboard 二次開發討論群:121202538
ThingsBoard源碼分析4-啟動分析3
以下的分析環境基於默認配置
1. Actor 模型
首先搬用官網關於Actor模型的介紹:
-
The brief description of each actor’s functionality is listed below:
-
App Actor - responsible for management of tenant actors(負責管理租戶Actor). An instance of this actor is always present in memory.
-
Tenant Actor - responsible for management of tenant device & rule chain actors(負責管理租戶設備,規則鏈Actor). An instance of this actor is always present in memory.
-
Device Actor - maintain state of the device: active sessions, subscriptions, pending RPC commands(處理設備狀態,激活session,訂閱,RPC請求), etc. Caches current device attributes in memory for performance reasons. An actor is created when the first message from the device is processed. The actor is stopped when there is no messages from devices for a certain time.
-
Rule Chain Actor - process incoming messages and dispatches them to rule node actors(處理消息,並將消息分發給node Actor). An instance of this actor is always present in memory.
-
Rule Node Actor - process incoming messages, and report results back to rule chain actor(處理消息,並將處理的消息返回給規則鏈Actor). An instance of this actor is always present in memory.
2. Actor啟動分析
圖片看不清楚的,請點擊這里:
- 標准:https://cdn.iotschool.com/photo/2020/633f6162-71be-409e-925e-762d51e29b19.png?x-oss-process=image/resize,w_1920
- 高清:https://pic.downk.cc/item/5f50c6b1160a154a67353d2e.png
DefaultActorService
中@PostConstruct
標記的initActorSystem()
開啟了actor的啟動流程:
public void initActorSystem() {
log.info("Initializing actor system.");
actorContext.setActorService(this);
TbActorSystemSettings settings = new TbActorSystemSettings(actorThroughput, schedulerPoolSize, maxActorInitAttempts);
system = new DefaultTbActorSystem(settings);
//創建不同actor執行的線程池
system.createDispatcher(APP_DISPATCHER_NAME, initDispatcherExecutor(APP_DISPATCHER_NAME, appDispatcherSize));
system.createDispatcher(TENANT_DISPATCHER_NAME, initDispatcherExecutor(TENANT_DISPATCHER_NAME, tenantDispatcherSize));
system.createDispatcher(DEVICE_DISPATCHER_NAME, initDispatcherExecutor(DEVICE_DISPATCHER_NAME, deviceDispatcherSize));
system.createDispatcher(RULE_DISPATCHER_NAME, initDispatcherExecutor(RULE_DISPATCHER_NAME, ruleDispatcherSize));
actorContext.setActorSystem(system);
//創建AppActor詳細分析見①createActor分析
appActor = system.createRootActor(APP_DISPATCHER_NAME, new AppActor.ActorCreator(actorContext));
actorContext.setAppActor(appActor);
//創建statsActor,暫時不關注
TbActorRef statsActor = system.createRootActor(TENANT_DISPATCHER_NAME, new StatsActor.ActorCreator(actorContext, "StatsActor"));
actorContext.setStatsActor(statsActor);
log.info("Actor system initialized.");
}
①DefaultTbActorSystem
類中方法createActor
分析
private TbActorRef createActor(String dispatcherId, TbActorCreator creator, TbActorId parent) {
//獲取initActorSystem時對應的線程池
Dispatcher dispatcher = dispatchers.get(dispatcherId);
if (dispatcher == null) {
log.warn("Dispatcher with id [{}] is not registered!", dispatcherId);
throw new RuntimeException("Dispatcher with id [" + dispatcherId + "] is not registered!");
}
//creator有不同的實現如AppActor,TenantActor,RuleChainActor, RuleNodeActor等
// AppActor, 則使用固定的NULL_UUID,UUID.fromString("13814000-1dd2-11b2-8080-808080808080")作為Id
// TenantActor 使用creator提供的tenantId作為Id
// RuleChainActor 使用creator提供的ruleChain作為Id
// RuleNodeActor 使用creator提供的ruleNodeId作為Id
TbActorId actorId = creator.createActorId();
//緩存使用,將已經創建的放入緩存
TbActorMailbox actorMailbox = actors.get(actorId);
if (actorMailbox != null) {
log.debug("Actor with id [{}] is already registered!", actorId);
} else {
Lock actorCreationLock = actorCreationLocks.computeIfAbsent(actorId, id -> new ReentrantLock());
actorCreationLock.lock();
try {
actorMailbox = actors.get(actorId);
if (actorMailbox == null) {
log.debug("Creating actor with id [{}]!", actorId);
//正式創建Actor,也就是調用構造函數
TbActor actor = creator.createActor();
//只允許AppActor的parent是空
TbActorRef parentRef = null;
if (parent != null) {
parentRef = getActor(parent);
if (parentRef == null) {
throw new TbActorNotRegisteredException(parent, "Parent Actor with id [" + parent + "] is not registered!");
}
}
//使用actor創建TbActorMailbox對象
TbActorMailbox mailbox = new TbActorMailbox(this, settings, actorId, parentRef, actor, dispatcher);
//放入緩存
actors.put(actorId, mailbox);
//用各自的線程池去進行初始化操作,具體的初始化操作由每種actor的init方法實現詳細分析如下②③④
mailbox.initActor();
actorMailbox = mailbox;
if (parent != null) {
//緩存,父子關系緩存
parentChildMap.computeIfAbsent(parent, id -> ConcurrentHashMap.newKeySet()).add(actorId);
}
} else {
log.debug("Actor with id [{}] is already registered!", actorId);
}
} finally {
actorCreationLock.unlock();
actorCreationLocks.remove(actorId);
}
}
return actorMailbox;
}
-
②
TenantActor
的init
方法:如果此應用不是以某一個tennant啟動的話或者
isolatedTbRuleEngine
是false,調用父類RuleChainManagerActor
的initRuleChains()
方法查詢該租戶下的rule_chain表。開始創建RuleChainActor
, 創建RuleChainActor
的parent是當前TenantActor
,創建的時候,又會調用到DefaultTbActorSystem
的createActor
方法; -
③
RuleChainActor
實現了ComponentActor
,ComponentActor
的init
方法調用了createProcessor(TbActorCtx ctx)
(由子類實現,創建了RuleChainActorMessageProcessor
實例)和initProcessor(TbActorCtx ctx)
(調用processor.start
也即RuleChainActorMessageProcessor
的start
方法,該方法查詢查詢relation表,條件為RULE_CHAIN和ruleChainId,再根據to_id去rule_node表獲取RuleNode
;1. 根據ruleNodeList開始初創建RuleNodeActor
2.initRoutes()
查詢relation,rule_node,rule_chain表,構造nodeRoutes的數據,並設置firstRuleNode); -
④
RuleNodeActor
實現了ComponentActor
,ComponentActor
的init
方法調用了createProcessor(TbActorCtx ctx)
(由子類實現,創建了RuleNodeActorMessageProcessor
實例)和initProcessor(TbActorCtx ctx)
, (調用processor.start
也即RuleNodeActorMessageProcessor
的start
方法,使用反射創建ruleNode的實例,並按照數據庫configuration的配置,調用init
方法);
DefaultActorService
中@EventListener(ApplicationReadyEvent.class)
標記的onApplicationEvent
開始進一步初始化流程:
調用appActor.tellWithHighPriority(new AppInitMsg());
將AppInitMsg
推入AppActor
的消息隊列里,然后嘗試處理這一消息,最終會交給AppActor
的doProcess
方法進行處理,由於還未ruleChainsInitialized
從未進行復制,此時AppActor開始初始化TennatActor,查詢tenant
表對所有tenant創建TenantActor
;
總結
-
initActorSystem
創建AppActor
和StatsActor
兩個Actor,並未創建其他的Actor; -
在收到
ApplicationReadyEvent
的時候,由於之前沒有初始化,所以會初始化該應用的所有TenantActor
, 根據②③④點分析,循環切遞歸的創建了所有的RuleChainActor
,RuleNodeActor
; -
關於
TbActorMailbox
的理解:mailbox理解為一個信箱,里面有一些信件(即入隊列的消息),這些信件有一些是高優先級,有些是普通優先級的,每次取信件的時候,都先看有沒高優先級的,先處理高優先級的信件,再處理普通優先級的信件。那么信件處理人是誰呢?信件的處理人就是每一個Actor.實際上的信件處理方法都是每一個Actor的doProcess(TbActorMsg msg)
方法; -
我們討論的每一個
RuleNodeActor
就是在前端RULE CHAINS界面看到的每一個節點。