ThingsBoard 二次開發之源碼分析 3-啟動分析 2


thingsboard聚集地

Thingsboard 話題討論區:https://forum.iotschool.com/topics/node8

歡迎大家加入thingsboard 二次開發討論群:121202538

thingsboard交流QQ群 121202538

ThingsBoard源碼分析3-啟動分析2

以下的分析環境基於內存消息隊列和默認配置

1. DefaultTransportService

DefaultTransportService啟動

分析初始化方法:

@PostConstruct
public void init() {
    //根據配置判斷是否創建限流
    if (rateLimitEnabled) {
        //Just checking the configuration parameters
        new TbRateLimits(perTenantLimitsConf);
        new TbRateLimits(perDevicesLimitsConf);
    }
    this.schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("transport-scheduler"));
    this.transportCallbackExecutor = Executors.newWorkStealingPool(20);
    this.schedulerExecutor.scheduleAtFixedRate(this::checkInactivityAndReportActivity, new Random().nextInt((int) sessionReportTimeout), sessionReportTimeout, TimeUnit.MILLISECONDS);
  	//transportApiRequestTemplate的創建見下分析①,transportApiRequestTemplate中包含了
    //一個生產者producerTemplate(requestTemplate)     topic:tb_transport.api.responses   ②
    //和一個消費者consumerTemplate (responseTemplate)   topic:tb_transport.api.responses.localHostName ③
    transportApiRequestTemplate = queueProvider.createTransportApiRequestTemplate();
    //此處的producerProvider bean的創建是按照配置文件的值創建的,TbQueueProducerProvider有三個實現類,使用ConditionalOnExpression注解,讀取service.type的值(默認monolith),所以該Bean的實現類是TbCoreQueueProducerProvider,此類的@PostConstruct標記的init()方法初始化的,該類TbCoreQueueProducerProvider初始化了一下變量:
   // 1.toTbCore                    topic:tb_core
   // 2.toTransport                 topic:tb_transport.notifications
   // 3.toRuleEngine                topic:tb_rule_engine
   // 4.toRuleEngineNotifications   topic:tb_rule_engine
   // 5.toTbCoreNotifications       topic:tb_core
    ruleEngineMsgProducer = producerProvider.getRuleEngineMsgProducer();
    tbCoreMsgProducer = producerProvider.getTbCoreMsgProducer();
    transportNotificationsConsumer = queueProvider.createTransportNotificationsConsumer();
    //fullTopic = topic:tb_transport.notifications.localHostName  
    TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceInfoProvider.getServiceId());
    transportNotificationsConsumer.subscribe(Collections.singleton(tpi));
    //見④分析
    transportApiRequestTemplate.init();
    mainConsumerExecutor.execute(() -> {
        while (!stopped) {
            try {
                List<TbProtoQueueMsg<ToTransportMsg>> records = transportNotificationsConsumer.poll(notificationsPollDuration);
                if (records.size() == 0) {
                    continue;
                }
                records.forEach(record -> {
                    try {
                        processToTransportMsg(record.getValue());
                    } catch (Throwable e) {
                        log.warn("Failed to process the notification.", e);
                    }
                });
                transportNotificationsConsumer.commit();
            } catch (Exception e) {
                if (!stopped) {
                    log.warn("Failed to obtain messages from queue.", e);
                    try {
                        Thread.sleep(notificationsPollDuration);
                    } catch (InterruptedException e2) {
                        log.trace("Failed to wait until the server has capacity to handle new requests", e2);
                    }
                }
            }
        }
    });
}

createTransportApiRequestTemplate In InMemoryTbTransportQueueFactory,因為我們沒有啟用相應的消息隊列中間件,我們分析InMemoryTbTransportQueueFactory:

public TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> createTransportApiRequestTemplate() {
    //根據配置文件值queue.transport_api.requests_topic獲取到的topic是tb_transport.api.requests創建了生產者
    InMemoryTbQueueProducer<TbProtoQueueMsg<TransportApiRequestMsg>> producerTemplate =
            new InMemoryTbQueueProducer<>(transportApiSettings.getRequestsTopic());
    //根據配置文件值queue.transport_api.responses_topic獲取到的topic是tb_transport.api.responses
    //加上serviceId(我們在第二篇分析中提到,本機的HostName作為serviceId,其topic就是tb_transport.api.responses.localHostName
    InMemoryTbQueueConsumer<TbProtoQueueMsg<TransportApiResponseMsg>> consumerTemplate =
            new InMemoryTbQueueConsumer<>(transportApiSettings.getResponsesTopic() + "." + serviceInfoProvider.getServiceId());
    //使用建造者模式返回了TbQueueRequestTemplate實例,其中包含了一個消費者和一個生產者
    DefaultTbQueueRequestTemplate.DefaultTbQueueRequestTemplateBuilder
            <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> templateBuilder = DefaultTbQueueRequestTemplate.builder();

    templateBuilder.queueAdmin(new TbQueueAdmin() {
        @Override
        public void createTopicIfNotExists(String topic) {}

        @Override
        public void destroy() {}
    });

    templateBuilder.requestTemplate(producerTemplate);
    templateBuilder.responseTemplate(consumerTemplate);
    templateBuilder.maxPendingRequests(transportApiSettings.getMaxPendingRequests());
    templateBuilder.maxRequestTimeout(transportApiSettings.getMaxRequestsTimeout());
    templateBuilder.pollInterval(transportApiSettings.getResponsePollInterval());
    return templateBuilder.build();
}

init() in DefaultTbQueueRequestTemplate:

public void init() {
    queueAdmin.createTopicIfNotExists(responseTemplate.getTopic());
    //按照是使用的中間件,實現不同的初始化方法,Inmemory該方法體為空
    this.requestTemplate.init();
    tickTs = System.currentTimeMillis();
    //見③,訂閱主題為 tb_transport.api.responses.localHostName
    responseTemplate.subscribe();
    executor.submit(() -> {
        long nextCleanupMs = 0L;
        while (!stopped) {
            try {
                //從消息隊列里面獲取消息
                List<Response> responses = responseTemplate.poll(pollInterval);
                ...........

2.TbCoreTransportApiService

TbCoreTransportApiService初始化

  • PostConstruct注解方法:

@PostConstruct
public void init() {
this.transportCallbackExecutor = Executors.newWorkStealingPool(maxCallbackThreads);
//topic是配置文件queue.transport_api.responses_topic的值默認為:tb_transport.api.responses ⑤
TbQueueProducer<TbProtoQueueMsg > producer = tbCoreQueueFactory.createTransportApiResponseProducer();
//topic是配置文件queue.transport_api.requests_topic的值,默認為:tb_transport.api.requests ⑥
TbQueueConsumer<TbProtoQueueMsg > consumer = tbCoreQueueFactory.createTransportApiRequestConsumer();

DefaultTbQueueResponseTemplate.DefaultTbQueueResponseTemplateBuilder
        <TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> builder = DefaultTbQueueResponseTemplate.builder();
builder.requestTemplate(consumer);
builder.responseTemplate(producer);
builder.maxPendingRequests(maxPendingRequests);
builder.requestTimeout(requestTimeout);
builder.pollInterval(responsePollDuration);
builder.executor(transportCallbackExecutor);
builder.handler(transportApiService);
transportApiTemplate = builder.build();

- `@EventListener(ApplicationReadyEvent.class)`注解方法,調用了`transportApiTemplate.init(transportApiService);``transportApiTemplate`即上一步創建的`DefaultTbQueueResponseTemplate`對象`init()`方法為:

```java
@Override
public void init(TbQueueHandler<Request, Response> handler) {
   //按照是使用的中間件,實現不同的初始化方法,Inmemory該方法體為空
    this.responseTemplate.init();
    //見⑥,訂閱主題為tb_transport.api.requests
    requestTemplate.subscribe();
    loopExecutor.submit(() -> {
        while (!stopped) {
            try {
                while (pendingRequestCount.get() >= maxPendingRequests) {
                    try {
                        Thread.sleep(pollInterval);
                    } catch (InterruptedException e) {
                        log.trace("Failed to wait until the server has capacity to handle new requests", e);
                    }
                }
                List<Request> requests = requestTemplate.poll(pollInterval);
                ...........

總結

DefaultTransportServiceTbCoreTransportApiService方法的啟動並不是很復雜,我們需要將主要的關注點放在兩個Bean初始化消費者和生產者的topic上面,thingsboard將使用中間件將消息解耦,如果按照傳統的調試方法很容易找不到消息的流向,此時我們將topic作為關鍵的切入點,方便后面整個數據流的分析。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM