ThingsBoard 二次開發之源碼分析 2-啟動分析 1


thingsboard聚集地

Thingsboard 話題討論區:https://forum.iotschool.com/topics/node8

歡迎大家加入thingsboard 二次開發討論群:121202538

thingsboard交流QQ群 121202538

ThingsBoard源碼分析2-啟動分析1

以下的分析環境基於內存消息隊列和無注冊中心配置以及按照默認配置

1.Clustering mode

官網對集群模式有一部分介紹這可以幫助我們理解代碼為什么會這么做:

ThingsBoard adopts consistent hashing to ensure scalability and availability. Message from Device A that is received on a particular node may be forwarded to the other node based on the hash of the device ID. Although this introduces certain networking overhead, it allows to process all messages from a particular device using corresponding device actor on a determined server, which introduces the following advantages:

improve cache hit rate. Device attributes and other device related data are fetched by device actor on a specific server.
avoid race conditions. All messages for a particular device are processed on a determined server.
allows targeting server-side api calls based on the device id.

2. TbServiceInfoProvider

DefaultTbServiceInfoProvider init()方法由@PostConstruct標記,spring啟動的時候會自動調用該方法:

public void init() {
        if (StringUtils.isEmpty(serviceId)) {
            try {
              	//獲取本機的HostName作為serviceId
                serviceId = InetAddress.getLocalHost().getHostName();
            } catch (UnknownHostException e) {
                serviceId = org.apache.commons.lang3.RandomStringUtils.randomAlphabetic(10);
            }
        }
        log.info("Current Service ID: {}", serviceId);
 			  //serviceType是配置文件下service.type的值,默認為monolith
  			//serviceTypes將會是一個List包含TB_CORE, TB_RULE_ENGINE, TB_TRANSPORT, JS_EXECUTOR ①
        if (serviceType.equalsIgnoreCase("monolith")) { 
            serviceTypes = Collections.unmodifiableList(Arrays.asList(ServiceType.values()));
        } else {
            serviceTypes = Collections.singletonList(ServiceType.of(serviceType));
        }
        ServiceInfo.Builder builder = ServiceInfo.newBuilder()
                .setServiceId(serviceId)
                .addAllServiceTypes(serviceTypes.stream().map(ServiceType::name).collect(Collectors.toList()));
        UUID tenantId;
  		 	//tenantIdStr是配置文件中service.tenant_id的值,默認情況下為空,isolatedTenant也就為空了
        if (!StringUtils.isEmpty(tenantIdStr)) {
            tenantId = UUID.fromString(tenantIdStr);
            isolatedTenant = new TenantId(tenantId);
        } else {
            tenantId = TenantId.NULL_UUID;
        }
  			//返回此 uuid 的 128 位值中的最高有效 64 位和最低64位
        builder.setTenantIdMSB(tenantId.getMostSignificantBits());
        builder.setTenantIdLSB(tenantId.getLeastSignificantBits());
				//ruleEngineSettings是一個TbQueueRuleEngineSettings的一個實例,讀取queue.rule-engine下的值
  			//ruleEngineSettings包含topic是tb_rule_engine,queue隊列有三個分別是②:
  			// 1. name: Main topic: tb_rule_engine.main partition: 10
  			// 2. name: HighPriority topic: tb_rule_engine.hp partition: 10
 			  // 3. name: SequentialByOriginator topic: tb_rule_engine.sq partition: 10
        if (serviceTypes.contains(ServiceType.TB_RULE_ENGINE) && ruleEngineSettings != null) {
            for (TbRuleEngineQueueConfiguration queue : ruleEngineSettings.getQueues()) {
                TransportProtos.QueueInfo queueInfo = TransportProtos.QueueInfo.newBuilder()
                        .setName(queue.getName())
                        .setTopic(queue.getTopic())
                        .setPartitions(queue.getPartitions()).build();
                builder.addRuleEngineQueues(queueInfo);
            }
        }
        serviceInfo = builder.build();
    }

3. PartitionService

PartitionService的默認實現是HashPartitionService

 @PostConstruct
    public void init() {
        //根據queue.partitions.hash_function_name的配置選擇以后做partition的hash方法,默認值是murmur3_128
        this.hashFunction = forName(hashFunctionName);
        //ConcurrentMap<ServiceQueue, Integer> partitionSizes
        //ServiceQueue 類成員ServiceType和字符串類型的queue name,構造函數如果不提供queue name或者queue name是null的話,ServiceQueue對象的的queue name是"Main"
      //corePartitions 是queue.core.partitions默認值10
        partitionSizes.put(new ServiceQueue(ServiceType.TB_CORE), corePartitions);
      //coreTopic對應的配置文件鍵是queue.core.topic,默認值tb_core
        partitionTopics.put(new ServiceQueue(ServiceType.TB_CORE), coreTopic);
      //根據DefaultTbServiceInfoProvider②的分析可以得出partitionTopics,partitionSizes的具體內容
        tbQueueRuleEngineSettings.getQueues().forEach(queueConfiguration -> {
            partitionTopics.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), queueConfiguration.getTopic());
            partitionSizes.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), 			queueConfiguration.getPartitions());
        });
    }

4. DiscoveryService

因為沒有使用Zookeeper做注冊中心,DiscoveryService的實現由DummyDiscoveryService實現,在收到Spring發送的ApplicationReadyEvent事件后,調用partitionService.recalculatePartitions方法:

public void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
    //日志記錄
    logServiceInfo(currentService);
    //dummy Discovery將不包含otherService,Zookeeper注冊中心的實現將會有otherService
    otherServices.forEach(this::logServiceInfo);
    Map<ServiceQueueKey, List<ServiceInfo>> queueServicesMap = new HashMap<>();
    //展開ServiceInfo的serviceTypes和RuleEngineQueue,並添加到queueServicesMap
    addNode(queueServicesMap, currentService);
    for (ServiceInfo other : otherServices) {
        addNode(queueServicesMap, other);
    }
    queueServicesMap.values().forEach(list -> list.sort((a, b) -> a.getServiceId().compareTo(b.getServiceId())));

    ConcurrentMap<ServiceQueueKey, List<Integer>> oldPartitions = myPartitions;
    TenantId myIsolatedOrSystemTenantId = getSystemOrIsolatedTenantId(currentService);
    myPartitions = new ConcurrentHashMap<>();
   //創建了ServiceQueueKey和partitionIndex的list組合
    partitionSizes.forEach((serviceQueue, size) -> {
        ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(serviceQueue, myIsolatedOrSystemTenantId);
        for (int i = 0; i < size; i++) {
            ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(myServiceQueueKey), i);
            if (currentService.equals(serviceInfo)) {
                ServiceQueueKey serviceQueueKey = new ServiceQueueKey(serviceQueue, getSystemOrIsolatedTenantId(serviceInfo));
                myPartitions.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(i);
            }
        }
    });

    oldPartitions.forEach((serviceQueueKey, partitions) -> {
        if (!myPartitions.containsKey(serviceQueueKey)) {
            log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", serviceQueueKey);
            applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceQueueKey, Collections.emptySet()));
        }
    });
   //發送PartitionChangeEvent,創建的TopicPartitionInfo的topic是partitionTopics的topic,fullTopicName是topic+Index, DummyDiscovery每次tpiList包含了所有的partitionIndex, 此例0-9.例:tpiList其中的fullTopicName是tb_core.9,tb_rule_engine.main.3
    myPartitions.forEach((serviceQueueKey, partitions) -> {
        if (!partitions.equals(oldPartitions.get(serviceQueueKey))) {
            log.info("[{}] NEW PARTITIONS: {}", serviceQueueKey, partitions);
            Set<TopicPartitionInfo> tpiList = partitions.stream()
                    .map(partition -> buildTopicPartitionInfo(serviceQueueKey, partition))
                    .collect(Collectors.toSet());
            applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceQueueKey, tpiList));
        }
    });
    tpiCache.clear();

    if (currentOtherServices == null) {
        currentOtherServices = new ArrayList<>(otherServices);
    } else {
        Set<ServiceQueueKey> changes = new HashSet<>();
        Map<ServiceQueueKey, List<ServiceInfo>> currentMap = getServiceKeyListMap(currentOtherServices);
        Map<ServiceQueueKey, List<ServiceInfo>> newMap = getServiceKeyListMap(otherServices);
        currentOtherServices = otherServices;
        currentMap.forEach((key, list) -> {
            if (!list.equals(newMap.get(key))) {
                changes.add(key);
            }
        });
        currentMap.keySet().forEach(newMap::remove);
        changes.addAll(newMap.keySet());
        if (!changes.isEmpty()) {
            applicationEventPublisher.publishEvent(new ClusterTopologyChangeEvent(this, changes));
        }
    }
}

5. DefaultTbCoreConsumerService

DefaultTbCoreConsumerService流程簡析

  • DefaultTbCoreConsumerService的父類是AbstractConsumerService, 而父類接收到ApplicationReadyEvent時,會調用子類的launchMainConsumers()應用設計模式模板方法模式,啟動了消費者線;其作用是:消費者線程按照固定的延時,無限循環去消息隊列提取消息,由於此時並未subscribed,也即未訂閱,將暫時不從消息隊列里取消息;
  • 前面討論到DiscoveryService發布了PartitionChangeEventDefaultTbCoreConsumerService實現了ApplicationListener<PartitionChangeEvent>接口,在接收到PartitionChangeEvent時,會做出相應的反應,調用onApplicationEvent,訂閱了的fullTopicName是tb_core.0-9 每次從消息隊列進行poll的時候,都會檢查這些fullTopicName里面是否有消息准備就緒;

6. DefaultTbRuleEngineConsumerService

DefaultTbRuleEngineConsumerService初始化

  • @PostConstruct注解初始化了ruleEngine的消費者,按照TbServiceInfoProviderbean的初始化②,可以知道,初始化了三個consumer, 放到了consumers的map變量里,key是queue的name;

  • DefaultTbRuleEngineConsumerService的父類也是AbstractConsumerService, 跟DefaultTbCoreConsumerService一樣,父類接收到ApplicationReadyEvent時,調用該類的launchMainConsumers()方法,啟動了三個消費者線程,由於還未訂閱,所以暫時不從消息隊列里面獲取消息;

  • DefaultTbCoreConsumerService一樣,收到PartitionChangeEvent時,啟動訂閱主題tb_rule_engine.main.0-9,tb_rule_engine.sq.0-9,tb_rule_engine.hp.0-9

    public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
        //此時serviceType是ServiceType.TB_RULE_ENGINE,過濾了其他的type
        if (partitionChangeEvent.getServiceType().equals(getServiceType())) {
            ServiceQueue serviceQueue = partitionChangeEvent.getServiceQueueKey().getServiceQueue();
            log.info("[{}] Subscribing to partitions: {}", serviceQueue.getQueue(), partitionChangeEvent.getPartitions());
            //根絕queueName做區分,讓三個consumer分別訂閱了三個主題的列表tb_rule_engine.main.0-9,tb_rule_engine.sq.0-9,tb_rule_engine.hp.0-9
            consumers.get(serviceQueue.getQueue()).subscribe(partitionChangeEvent.getPartitions());
        }
    }
    

總結

通過上面類的初始化一系列過程,我們有了大概的印象:

  1. 創建了ServiceInfo對象,並根據配置文件我們得知了tb_core,和tb_rule_engine的一些配置信息;

  2. DiscoveryService根據注冊中心,和其他的服務提供者,會計算相應的partition index,並發布PartitionChangeEvent;

  3. AbstractConsumerService的兩個實現類在接收到PartitionChangeEvent之前,都啟動了一個或多個線程,在接收到此消息的時候, 都會使自己的consumer訂閱相應的主題等待消息的到來。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM