thingsboard聚集地
Thingsboard 話題討論區:https://forum.iotschool.com/topics/node8
歡迎大家加入thingsboard 二次開發討論群:121202538
ThingsBoard源碼分析5-如何接收MQTT連接
1. MQTT server
需要接收設備的MQTT連接,那么thingsboard中必然有MQTT服務器,MQTT服務器創建的類是MqttTransportService
;
基於netty的mqtt server,添加了MqttTransportServerInitializer
的處理類,並向ChannelPipeline
添加了netty的MqttDecoder
和MqttEncoder
讓我們可以忽略MQTT消息的編解碼工作,重要的是添加了MqttTransportHandler
;
2. MqttTransportHandler處理連接
此例中,我們首先需要創建租戶,租戶管理員,並添加設備,使用MQTT Box模擬硬件設備,拷貝ACCESS TOKEN做為MQTT Box的Username開始連接我們的thingsboard后台
如果圖片看不清楚,請點擊:
- 標准:https://cdn.iotschool.com/photo/2020/00e26598-e91a-4a08-b557-18b204bec6c9.png?x-oss-process=image/resize,w_1920
- 高清:https://p.pstatp.com/origin/137b60001339a846253dd
由於沒有使用ssl,收到連接請求以后,便會調用
private void processAuthTokenConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
String userName = msg.payload().userName();
log.info("[{}] Processing connect msg for client with user name: {}!", sessionId, userName);
if (StringUtils.isEmpty(userName)) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
ctx.close();
} else {
//取出userName,構造protobuf的類(方便傳輸與解析),交給transportService處理。此時會使用到源碼解析第三篇DefaultTransportService的解析的相關信息了解process的處理。參閱下方①的詳細解析。
transportService.process(ValidateDeviceTokenRequestMsg.newBuilder().setToken(userName).build(),
new TransportServiceCallback<ValidateDeviceCredentialsResponseMsg>() {
@Override
public void onSuccess(ValidateDeviceCredentialsResponseMsg msg) {
onValidateDeviceResponse(msg, ctx);
}
@Override
public void onError(Throwable e) {
log.trace("[{}] Failed to process credentials: {}", address, userName, e);
ctx.writeAndFlush(createMqttConnAckMsg(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE));
ctx.close();
}
});
}
}
-
DefaultTransportService
的process
方法構造了異步任務,成功調用onSuccess
的Consumer
,失敗調用onFailure
的Consumer
; -
將驗證用戶的任務交由
transportApiRequestTemplate.send
public ListenableFuture<Response> send(Request request) {
if (tickSize > maxPendingRequests) {
return Futures.immediateFailedFuture(new RuntimeException("Pending request map is full!"));
}
UUID requestId = UUID.randomUUID();
request.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
//由第三篇文章的分析得出,此topic時tb_transport.api.responses.localHostName
request.getHeaders().put(RESPONSE_TOPIC_HEADER, stringToBytes(responseTemplate.getTopic()));
request.getHeaders().put(REQUEST_TIME, longToBytes(System.currentTimeMillis()));
//參閱第一篇基礎知識的介紹,來自谷歌的庫,settableFuture,可設置結果的完成
SettableFuture<Response> future = SettableFuture.create();
ResponseMetaData<Response> responseMetaData = new ResponseMetaData<>(tickTs + maxRequestTimeout, future);
//將future放到pendingRequests中②
pendingRequests.putIfAbsent(requestId, responseMetaData);
log.trace("[{}] Sending request, key [{}], expTime [{}]", requestId, request.getKey(), responseMetaData.expTime);
//將消息發送給消息隊列topic是tb_transport.api.requests
requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
log.trace("[{}] Request sent: {}", requestId, metadata);
}
@Override
public void onFailure(Throwable t) {
pendingRequests.remove(requestId);
future.setException(t);
}
});
return future;
}
- 根據第三篇
TbCoreTransportApiService
的分析,我們發現DefaultTbQueueResponseTemplate
的成員變量requestTemplate
即consumer
剛好是訂閱的tb_transport.api.requests的消息:
......
requests.forEach(request -> {
long currentTime = System.currentTimeMillis();
long requestTime = bytesToLong(request.getHeaders().get(REQUEST_TIME));
if (requestTime + requestTimeout >= currentTime) {
byte[] requestIdHeader = request.getHeaders().get(REQUEST_ID_HEADER);
if (requestIdHeader == null) {
log.error("[{}] Missing requestId in header", request);
return;
}
//獲取response的topic,可以做到消息從哪來,處理好以后回哪里去,此時的topic是tb_transport.api.responses.localHostName
byte[] responseTopicHeader = request.getHeaders().get(RESPONSE_TOPIC_HEADER);
if (responseTopicHeader == null) {
log.error("[{}] Missing response topic in header", request);
return;
}
UUID requestId = bytesToUuid(requestIdHeader);
String responseTopic = bytesToString(responseTopicHeader);
try {
pendingRequestCount.getAndIncrement();
//調用handler進行處理消息
AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request),
response -> {
pendingRequestCount.decrementAndGet();
response.getHeaders().put(REQUEST_ID_HEADER, uuidToBytes(requestId));
//handler.hande處理的結果返回給發送方topic是tb_transport.api.responses.localHostName
responseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);
},
e -> {
pendingRequestCount.decrementAndGet();
if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
log.warn("[{}] Timeout to process the request: {}", requestId, request, e);
} else {
log.trace("[{}] Failed to process the request: {}", requestId, request, e);
}
},
requestTimeout,
timeoutExecutor,
callbackExecutor);
.......
- 具體驗證邏輯:
@Override
public ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProtoQueueMsg<TransportApiRequestMsg> tbProtoQueueMsg) {
TransportApiRequestMsg transportApiRequestMsg = tbProtoQueueMsg.getValue();
// protobuf構造的類中判定是否包含需要驗證的信息塊
if (transportApiRequestMsg.hasValidateTokenRequestMsg()) {
ValidateDeviceTokenRequestMsg msg = transportApiRequestMsg.getValidateTokenRequestMsg();
//調用validateCredentials,具體內容就是查詢deviceInfo,並將結果交由第二個Function進行進一步處理
return Futures.transform(validateCredentials(msg.getToken(), DeviceCredentialsType.ACCESS_TOKEN), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());
}
......
- 當通過設備的acess token找到了deviceInfo,便會通過消息中間件將DeviceInfo發出來,topic是tb_transport.api.responses.localHostName,在第三篇的分析中,
DefaultTransportService
的transportApiRequestTemplate
即訂閱此topic:
List<Response> responses = responseTemplate.poll(pollInterval);
if (responses.size() > 0) {
log.trace("Polling responses completed, consumer records count [{}]", responses.size());
} else {
continue;
}
responses.forEach(response -> {
byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER);
UUID requestId;
if (requestIdHeader == null) {
log.error("[{}] Missing requestId in header and body", response);
} else {
requestId = bytesToUuid(requestIdHeader);
log.trace("[{}] Response received: {}", requestId, response);
//參見上②,將驗證的future放入到pendingRequests中,現在通過設置的requestId取出來
ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId);
if (expectedResponse == null) {
log.trace("[{}] Invalid or stale request", requestId);
} else {
//設置settableFuture的結果
expectedResponse.future.set(response);
}
}
......
DefaultTransportService
的process
異步請求獲得了返回的結果,此時調用onSuccess
回調,即調用MqttTransportHandler
的onValidateDeviceResponse
;
private void onValidateDeviceResponse(ValidateDeviceCredentialsResponseMsg msg, ChannelHandlerContext ctx) {
if (!msg.hasDeviceInfo()) {
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_REFUSED_NOT_AUTHORIZED));
ctx.close();
} else {
deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
sessionInfo = SessionInfoProto.newBuilder()
.setNodeId(context.getNodeId())
.setSessionIdMSB(sessionId.getMostSignificantBits())
.setSessionIdLSB(sessionId.getLeastSignificantBits())
.setDeviceIdMSB(msg.getDeviceInfo().getDeviceIdMSB())
.setDeviceIdLSB(msg.getDeviceInfo().getDeviceIdLSB())
.setTenantIdMSB(msg.getDeviceInfo().getTenantIdMSB())
.setTenantIdLSB(msg.getDeviceInfo().getTenantIdLSB())
.setDeviceName(msg.getDeviceInfo().getDeviceName())
.setDeviceType(msg.getDeviceInfo().getDeviceType())
.build();
//創建SessionEvent.OPEN的消息,調用sendToDeviceActor方法,包含sessionInfo
transportService.process(sessionInfo, DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() {
.......
- sendToDeviceActor的實現:
protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback<Void> callback) {
//創建tpi,此時會選擇一個固定的partition Id,組成的topic是tb_core, fullTopicName是tb_core.(int) 如: tb_core.1
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, getTenantId(sessionInfo), getDeviceId(sessionInfo));
......
//使用tbCoreMsgProducer發送到消息隊列,設置了toDeviceActorMsg
tbCoreMsgProducer.send(tpi,
new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),
ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()), callback != null ?
new TransportTbQueueCallback(callback) : null);
}
- 此時第二篇基於
DefaultTbCoreConsumerService
可以知道DefaultTbCoreConsumerService
的消費者訂閱該主題的消息:
try {
ToCoreMsg toCoreMsg = msg.getValue();
if (toCoreMsg.hasToSubscriptionMgrMsg()) {
log.trace("[{}] Forwarding message to subscription manager service {}", id, toCoreMsg.getToSubscriptionMgrMsg());
forwardToSubMgrService(toCoreMsg.getToSubscriptionMgrMsg(), callback);
} else if (toCoreMsg.hasToDeviceActorMsg()) {
log.trace("[{}] Forwarding message to device actor {}", id, toCoreMsg.getToDeviceActorMsg());
//交由此方法進行處理
forwardToDeviceActor(toCoreMsg.getToDeviceActorMsg(), callback);
}
-
forwardToDeviceActor
對消息的處理private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg, TbCallback callback) { if (statsEnabled) { stats.log(toDeviceActorMsg); } //創建type為TRANSPORT_TO_DEVICE_ACTOR_MSG的消息,並交給AppActor處理 actorContext.tell(new TransportToDeviceActorMsgWrapper(toDeviceActorMsg, callback)); }
-
通過第四篇的總結3,我們可以直接去看
AppActor
的doProcess
方法對此類型消息的處理,跟蹤發現AppActor
將消息轉給了TenantActor
,TenantActor
創建了DeviceActor
,並將消息轉給了DeviceActor
; -
DeviceActor拿到此類型的消息,進行了如下的處理:
protected boolean doProcess(TbActorMsg msg) { switch (msg.getMsgType()) { case TRANSPORT_TO_DEVICE_ACTOR_MSG: //包裝成TransportToDeviceActorMsgWrapper交由processor處理,並繼續調用processSessionStateMsgs processor.process(ctx, (TransportToDeviceActorMsgWrapper) msg); break; case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
-
processSessionStateMsgs
的處理:private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) { UUID sessionId = getSessionId(sessionInfo); if (msg.getEvent() == SessionEvent.OPEN) { ..... sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfo.getNodeId()))); if (sessions.size() == 1) { // 將調用pushRuleEngineMessage(stateData, CONNECT_EVENT); reportSessionOpen(); } //將調用pushRuleEngineMessage(stateData, ACTIVITY_EVENT); systemContext.getDeviceStateService().onDeviceActivity(deviceId, System.currentTimeMillis()); dumpSessions(); } ....
-
由於
CONNECT_EVENT
和ACTIVITY_EVENT
僅僅類型不同,以下暫時只分析CONNECT_EVENT
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) { if (tenantId.isNullUid()) { if (entityId.getEntityType().equals(EntityType.TENANT)) { tenantId = new TenantId(entityId.getId()); } else { log.warn("[{}][{}] Received invalid message: {}", tenantId, entityId, tbMsg); return; } } //和第7點類似,創建的tpi的fullTopicName的例子 tb_rule_engine.main.1 TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); log.trace("PUSHING msg: {} to:{}", tbMsg, tpi); ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTbMsg(TbMsg.toByteString(tbMsg)).build(); producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback); toRuleEngineMsgs.incrementAndGet(); }
-
通過第二篇的分析
DefaultTbRuleEngineConsumerService
訂閱了此topic: tb_rule_engine.main.1的消息,收到消息以后,調用forwardToRuleEngineActor
方法,包裹成QUEUE_TO_RULE_ENGINE_MSG
類型的消息,交由AppActor進行分發處理; -
AppActor
交給TenantActor
處理,TenantActor
交給RootRuleChain
處理,RuleChainActor
交給firstRuleNode
處理,也就是某一個RuleNodeActor
; -
打開前端RULE CHAINS的界面,會發現,MESSAGE TYPE SWITCH是接收input的第一個節點,其實數據庫的配置中,rule_chain表中配置的first_rule_node_id就是
TbMsgTypeSwitchNode
; -
進入
TbMsgTypeSwitchNode
的onMsg
方法(實際上所有的ruleNode處理消息的方法都是onMsg
),發現根據messageType
(此時是CONNECT_EVENT
)定義了relationtype並調用ctx.tellNext(msg, relationType)
; -
此時
DefaultTbContext
創建一個RuleNodeToRuleChainTellNextMsg
,類型是RULE_TO_RULE_CHAIN_TELL_NEXT_MSG
,交給RuleChainActor
處理; -
接下來將會進入到
RuleChainActorMessageProcessor
的onTellNext
方法:private void onTellNext(TbMsg msg, RuleNodeId originatorNodeId, Set<String> relationTypes, String failureMessage) { try { checkActive(msg); //消息來源 EntityId entityId = msg.getOriginator(); //創建一個tpi,可能會使用 TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId); //查詢有關系的RuleNode,其實就是從relation表中查詢,該消息來源的id,relation_type和在TbMsgTypeSwitchNode定義的relationType一直的節點id,如上Connect Event就沒有找到相應的relation的RuleNodeId List<RuleNodeRelation> relations = nodeRoutes.get(originatorNodeId).stream() .filter(r -> contains(relationTypes, r.getType())) .collect(Collectors.toList()); int relationsCount = relations.size(); //Connect Event就沒有找到相應的relation的RuleNodeId,消息通過規則引擎,已經處理完成 if (relationsCount == 0) { log.trace("[{}][{}][{}] No outbound relations to process", tenantId, entityId, msg.getId()); if (relationTypes.contains(TbRelationTypes.FAILURE)) { RuleNodeCtx ruleNodeCtx = nodeActors.get(originatorNodeId); if (ruleNodeCtx != null) { msg.getCallback().onFailure(new RuleNodeException(failureMessage, ruleChainName, ruleNodeCtx.getSelf())); } else { log.debug("[{}] Failure during message processing by Rule Node [{}]. Enable and see debug events for more info", entityId, originatorNodeId.getId()); msg.getCallback().onFailure(new RuleEngineException("Failure during message processing by Rule Node [" + originatorNodeId.getId().toString() + "]")); } } else { msg.getCallback().onSuccess(); } //舉例:Post telemetry的type可以找到相應的ruleNode,實現類是:TbMsgTimeseriesNode,那么此消息將會交給TbMsgTimeseriesNode處理 } else if (relationsCount == 1) { for (RuleNodeRelation relation : relations) { log.trace("[{}][{}][{}] Pushing message to single target: [{}]", tenantId, entityId, msg.getId(), relation.getOut()); pushToTarget(tpi, msg, relation.getOut(), relation.getType()); } } else { MultipleTbQueueTbMsgCallbackWrapper callbackWrapper = new MultipleTbQueueTbMsgCallbackWrapper(relationsCount, msg.getCallback()); log.trace("[{}][{}][{}] Pushing message to multiple targets: [{}]", tenantId, entityId, msg.getId(), relations); for (RuleNodeRelation relation : relations) { EntityId target = relation.getOut(); putToQueue(tpi, msg, callbackWrapper, target); } } } catch (RuleNodeException rne) { msg.getCallback().onFailure(rne); } catch (Exception e) { msg.getCallback().onFailure(new RuleEngineException("onTellNext - " + e.getMessage())); } }
What's more:
如上面的舉例,比如是遙測數據Post telemetry,將會使用
TbMsgTimeseriesNode
的onMsg
做進一步的處理,比如存儲數據,再通過webSocket進行數據的更新如果有webSocket的session的話,或者其他通知消息,就不詳細展開了。
總結:
-
處理MQTT的連接其實就是走完了整個規則引擎的邏輯,其他類型的消息,比如遙測數據,屬性更新,RPC請求發送與接收,大體流程大同小異;
-
在處理消息流向的時候,我們一定要清楚其訂閱或者發布的主題是什么,這樣我們才不會丟失方向;
-
Actor的模型就是根據消息的類型,使用AppActor進行一步步的分發,最終交由合適的RuleNode進行處理;
-
Protobuf類型的消息容易序列化傳輸與解析,所以在thingsboard中大量使用,但是生成的類可讀性不是很高,可以選擇直接讀queue.proto文件,對類有感性的認知。
由於作者水平有限,只是梳理了大致的流程,文章難免出現紕漏,望諒解並指正。