Apollo配置中心源碼分析


Apollo 配置中心源碼分析

​ Apollo是攜程開源的一款分布式配置管理中心,能夠集中化管理應用不同環境、不同集群的配置,配置修改后能夠實時推送到應用端,並且具備規范的權限、流程治理等特性,適用於微服務配置管理場景。

Apollo配置發布和通知的過程

  1. 用戶在配置中心對配置進行修改並發布

  2. 配置中心通知Apollo客戶端有配置更新

  3. Apollo客戶端從配置中心拉取最新的配置、更新本地配置並通知到應用

image-20220301221014007

從Apollo模塊看配置發布流程

image-20220305161457345

Apollo四個核心模塊及其主要功能

  1. ConfigService

    • 提供配置獲取接口
    • 提供配置推送接口
    • 服務於Apollo客戶端
  2. AdminService

    • 提供配置管理接口
    • 提供配置修改發布接口
    • 服務於管理界面Portal
  3. Client

    • 為應用獲取配置,支持實時更新
    • 通過MetaServer獲取ConfigService的服務列表
    • 使用客戶端軟負載SLB方式調用ConfigService
  4. Portal

    • 配置管理界面
    • 通過MetaServer獲取AdminService的服務列表
    • 使用客戶端軟負載SLB方式調用AdminService

先對整體流程進行一個梳理:

* 用戶修改和發布配置是通過portal調用AdminService,把配置變更保存在數據庫中。

* 客戶端通過長輪詢訪問ConfigService實時監聽配置變更。默認超時時間是90秒。如果在超時前有配置變更,就會立即返回給客戶端。客戶端獲取變化的配置,根據進行實時更新。如果超時也沒有數據變更,就放304.客戶端重新發起新的請求。

* 配置服務ConfigService有一個定時任務,每秒去掃描數據庫,查看是否有新變更的數據。如果有數據變更就通知客戶端。

下面打算對Apollo在頁面修改配置后如何通知到客戶端過程的源碼進行分析。

說明:

  • Apollo版本為1.9.1.
  • 測試用的應用appid=apollo-demo,namespace=default,env=DEV,cluster=default

主要分為一下幾個部分

  1. 頁面發布配置(新增,修改和刪除)
  2. configService獲取到新發布的配置信息
  3. configService通知客戶端最新的配置變更
  4. 客戶端的同步更新Spring容器中注入的@Value的值
  5. Apollo 如何實現讓自己的配置優先級最高
一、 Apollo修改配置與發布配置
1.1頁面修改配置

修改name 舊值:張三 新值:張三1

URL: http://localhost:8070/apps/apollo-demo/envs/DEV/clusters/default/namespaces/application/item

參數:

{"id":1,"namespaceId":1,"key":"name","value":"張三1","lineNum":1,"dataChangeCreatedBy":"apollo","dataChangeLastModifiedBy":"apollo","dataChangeCreatedByDisplayName":"apollo","dataChangeLastModifiedByDisplayName":"apollo","dataChangeCreatedTime":"2022-02-26T12:26:12.000+0800","dataChangeLastModifiedTime":"2022-02-26T12:26:12.000+0800","tableViewOperType":"update","comment":"修改姓名"}

根據上面的分析在頁面修改配置是portal調用AdminService保存到數據庫。所以我們到Apollo的portal模塊去查找請求。Apollo使用的是restful的請求方式,它的請求格式都是/參數名1/{參數值1}/參數名2/{參數值2}/……。所以我們就去portal查詢"/apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces/{namespaceName}/item")

@PutMapping("/apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces/{namespaceName}/item")
  public void updateItem(@PathVariable String appId, 
                         @PathVariable String env,
                         @PathVariable String clusterName, 
                         @PathVariable String namespaceName,
                         @RequestBody ItemDTO item) {
    checkModel(isValidItem(item));
    String username = userInfoHolder.getUser().getUserId();
    item.setDataChangeLastModifiedBy(username);
    configService.updateItem(appId, Env.valueOf(env), clusterName, namespaceName, item);
  }

單個更新配置時portal通過configService.updateItem()保存數據中

public void updateItem(String appId, Env env, 
                       String clusterName, 
                       String namespace, 
                       long itemId, 
                       ItemDTO item) {
restTemplate.put(env, "apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/items/{itemId}",
item, appId, clusterName, namespace, itemId);
    }

這里就是portal通過restTemplate調用AdminService保存配置到數據庫。

AdminService 中代碼如下

 @PutMapping("/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/items/{itemId}")
  public ItemDTO update(@PathVariable("appId") String appId,
                        @PathVariable("clusterName") String clusterName,
                        @PathVariable("namespaceName") String namespaceName,
                        @PathVariable("itemId") long itemId,
                        @RequestBody ItemDTO itemDTO) {
    Item managedEntity = itemService.findOne(itemId);
    if (managedEntity == null) {
      throw new NotFoundException("item not found for itemId " + itemId);
    }
    Namespace namespace = namespaceService.findOne(appId, clusterName, namespaceName);
    // In case someone constructs an attack scenario
    if (namespace == null || namespace.getId() != managedEntity.getNamespaceId()) {
      throw new BadRequestException("Invalid request, item and namespace do not match!");
    }
    Item entity = BeanUtils.transform(Item.class, itemDTO);
    ConfigChangeContentBuilder builder = new ConfigChangeContentBuilder();
    Item beforeUpdateItem = BeanUtils.transform(Item.class, managedEntity);
    //protect. only value,comment,lastModifiedBy can be modified
    managedEntity.setValue(entity.getValue());
    managedEntity.setComment(entity.getComment());
    managedEntity.setDataChangeLastModifiedBy(entity.getDataChangeLastModifiedBy());
    // 保存配置到 Item表中
    entity = itemService.update(managedEntity);
    builder.updateItem(beforeUpdateItem, entity);
    itemDTO = BeanUtils.transform(ItemDTO.class, entity);
    if (builder.hasContent()) {
      Commit commit = new Commit();
      commit.setAppId(appId);
      commit.setClusterName(clusterName);
      commit.setNamespaceName(namespaceName);
      commit.setChangeSets(builder.build());
      commit.setDataChangeCreatedBy(itemDTO.getDataChangeLastModifiedBy());
      commit.setDataChangeLastModifiedBy(itemDTO.getDataChangeLastModifiedBy());
      // 保存發布信息到 commit 表中
      commitService.save(commit);
    }
    return itemDTO;
  }

我們看下數據item表中的配置信息。里面記錄namespaceid,key,value,comment(配置的備注信息),可以根據上面信息查詢到配置信息。

image-20220305170636763

commit表中的信息。

image-20220305170732235

每次修改配置都會新插入一條記錄。其中changSets記錄了這次變更的類型和內容。

image-20220305171225028

每個changeSets中會按照createItemsupdateItemsdeleteItems分別記錄了新增,修改和刪除的配置項。每個分類里面又會記錄具體的新增,修改和刪除的具體配置信息。

1.2 查詢配置列表

url:http://localhost:8070/apps/apollo-demo/envs/DEV/clusters/default/namespaces

image-20220305171521709

列表分別顯示了有兩條配置修改了,但是沒有發布。在上面標記了未發布的標簽。這個是怎么判斷的呢?

我們一起看下源碼吧。根據上面的地址,我們去portal中查詢 /apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces

@GetMapping("/apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces")
public List<NamespaceBO> findNamespaces(@PathVariable String appId, 
                                        @PathVariable String env,
                                        @PathVariable String clusterName) {

 // 根據應用名,環境和集群查詢配置列表,根據namespece返回配置列表。
    List<NamespaceBO> namespaceBOs = namespaceService.findNamespaceBOs(
      appId, Env.valueOf(env), clusterName);
    for (NamespaceBO namespaceBO : namespaceBOs) {
      if (permissionValidator.shouldHideConfigToCurrentUser(
        appId, env, namespaceBO.getBaseInfo().getNamespaceName())) {
        namespaceBO.hideItems();
      }
    }
    return namespaceBOs;
  }

NamespaceBO中的內容。里面包含基本信息,以及namespace內的配置列表。item中的isModified表示配置是否修改,但是沒有發布。如果修改了,里面還會包含修改前后的值。

image-20220305173014035

namespaceService.findNamespaceBOs()是查詢該集群下所有namespaces和配置信息。現在看下namespaceService.findNamespaceBOs()的具體實現。

public List<NamespaceBO> findNamespaceBOs(String appId, Env env, String clusterName) {
// 根據查詢應用,環境和集群查詢當前的namespaces列表,
// 查詢的表 namespace jpa語句 namespaceRepository.findByAppIdAndClusterNameAndNamespaceName(appId, clusterName,namespaceName);
    List<NamespaceDTO> namespaces = namespaceAPI.findNamespaceByCluster(appId, env, clusterName);
    if (namespaces == null || namespaces.size() == 0) {
      throw new BadRequestException("namespaces not exist");
    }
    List<NamespaceBO> namespaceBOs = new LinkedList<>();
    for (NamespaceDTO namespace : namespaces) {
      NamespaceBO namespaceBO;
      try {
        //根據環境查詢得到NamespaceBO
        namespaceBO = transformNamespace2BO(env, namespace);
        namespaceBOs.add(namespaceBO);
      } catch (Exception e) {
        logger.error("parse namespace error. app id:{}, env:{}, clusterName:{}, namespace:{}",
                appId, env, clusterName, namespace.getNamespaceName(), e);
        throw e;
      }
    }
    return namespaceBOs;
  }

transformNamespace2BO作用就是查詢出namespace中哪些是修改的,哪些是刪除的。看下面代碼前的前置內容

  • apollo 對數據庫的操作都是使用JPA,查詢時@Where(clause = "isDeleted = 0") 默認排除了已刪除的

  • -對涉及到的幾張表的說明

​ release:每次發布生效的配置記錄。里面的Configurations 是對當前生效的配置列表的JSON串。已刪除的配置不會保存在里面。

​ item:保存配置的表。adminService中新增,修改和刪除配置都是更新這張表。里面是配置的最新值,但是配置的狀態可能是已發布的,也可能是已修改但未發布的。

​ commit:保存每次配置修改的記錄,里面記錄每次修改配置提交時的新增,修改和刪除的配置列表。

{"createItems":[],"updateItems":[{"oldItem":{"namespaceId":1,"key":"age","value":"21","comment":"年齡修改","lineNum":2,"id":2,"isDeleted":false,"dataChangeCreatedBy":"apollo","dataChangeCreatedTime":"2022-02-26 12:26:23","dataChangeLastModifiedBy":"apollo","dataChangeLastModifiedTime":"2022-03-05 09:56:27"},"newItem":{"namespaceId":1,"key":"age","value":"22","comment":"年齡修改2","lineNum":2,"id":2,"isDeleted":false,"dataChangeCreatedBy":"apollo","dataChangeCreatedTime":"2022-02-26 12:26:23","dataChangeLastModifiedBy":"apollo","dataChangeLastModifiedTime":"2022-03-05 21:35:48"}}],"deleteItems":[]}

  • 如何判斷配置是否發布

如果在item表中存在值跟最新發布生效的配置值不一樣,則可能是新增或者修改的值但是為發布

  • 如何判斷配置已刪除

​ 查詢最后一次發布記錄,獲取最后一次發布配置的時間。然后查詢commit表中在最后一次發布配置后,所有的commit記錄。然后從里面取出所有的刪除配置列表。就得到的刪除但沒有發布的配置列表

private NamespaceBO transformNamespace2BO(Env env, NamespaceDTO namespace) {
    NamespaceBO namespaceBO = new NamespaceBO();
    namespaceBO.setBaseInfo(namespace);
    String appId = namespace.getAppId();
    String clusterName = namespace.getClusterName();
    String namespaceName = namespace.getNamespaceName();
    fillAppNamespaceProperties(namespaceBO);
    List<ItemBO> itemBOs = new LinkedList<>();
    namespaceBO.setItems(itemBOs);
    //latest Release
    ReleaseDTO latestRelease;
    Map<String, String> releaseItems = new HashMap<>();
    Map<String, ItemDTO> deletedItemDTOs = new HashMap<>();
    // 查詢最后一次發布記錄,里面保存了最新發布的,已經生效的的所有配置信息,不包括只刪除的配置,json串保存。而items中的是最新的值,但可能是已發布的頁可能是未發布的配置。
    // 查詢的表 Release  jpa語句  releaseRepository.findFirstByAppIdAndClusterNameAndNamespaceNameAndIsAbandonedFalseOrderByIdDesc(appId,clusterName,namespaceName);
    latestRelease = releaseService.loadLatestRelease(appId, env, clusterName, namespaceName);
    if (latestRelease != null) {
      releaseItems = GSON.fromJson(latestRelease.getConfigurations(), GsonType.CONFIG);
    }
    //not Release config items 開始處理未發布的配置
    // 查詢namespace下未刪除的配置列表。列表中的內容可能有未發布的配置
    // 查詢的表 Item List<Item> items = itemRepository.findByNamespaceIdOrderByLineNumAsc(namespaceId);
    List<ItemDTO> items = itemService.findItems(appId, env, clusterName, namespaceName);
    additionalUserInfoEnrichService
        .enrichAdditionalUserInfo(items, BaseDtoUserInfoEnrichedAdapter::new);
    int modifiedItemCnt = 0;
    for (ItemDTO itemDTO : items) {
      // 判斷內容是否更改,並設置修改前和修改后的值。通過對比最后一次發布記錄中的值與當前最新的值是否一致,如果不一致說明是修改后沒有發布。
      ItemBO itemBO = transformItem2BO(itemDTO, releaseItems);
      if (itemBO.isModified()) {
        modifiedItemCnt++;
      }
      itemBOs.add(itemBO);
    }
    //deleted items 開始處理已刪除的配置
    // 調用adminService 獲取最后一次發布后的已刪除的配置列表
    itemService.findDeletedItems(appId, env, clusterName, namespaceName).forEach(item -> {
      deletedItemDTOs.put(item.getKey(),item);
    });
    List<ItemBO> deletedItems = parseDeletedItems(items, releaseItems, deletedItemDTOs);
    itemBOs.addAll(deletedItems);
    modifiedItemCnt += deletedItems.size();
    namespaceBO.setItemModifiedCnt(modifiedItemCnt);
    return namespaceBO;
  }
1.3 發布配置

url:http://localhost:8070/apps/apollo-demo/envs/DEV/clusters/default/namespaces/application/releases

參數:{"releaseTitle":"20220305225621-release","releaseComment":"發布刪除的111","isEmergencyPublish":false}

  public ReleaseDTO createRelease(@PathVariable String appId,
                                  @PathVariable String env, @PathVariable String clusterName,
                                  @PathVariable String namespaceName, @RequestBody NamespaceReleaseModel model) {
    model.setAppId(appId);
    model.setEnv(env);
    model.setClusterName(clusterName);
    model.setNamespaceName(namespaceName);
    if (model.isEmergencyPublish() && !portalConfig.isEmergencyPublishAllowed(Env.valueOf(env))) {
      throw new BadRequestException(String.format("Env: %s is not supported emergency publish now", env));
    }
// 插入release記錄
    ReleaseDTO createdRelease = releaseService.publish(model);
    ConfigPublishEvent event = ConfigPublishEvent.instance();
    event.withAppId(appId)
            .withCluster(clusterName)
            .withNamespace(namespaceName)
            .withReleaseId(createdRelease.getId())
            .setNormalPublishEvent(true)
            .setEnv(Env.valueOf(env));
// 發出發布event
    publisher.publishEvent(event);
    return createdRelease;
  }

releaseService.publish(model) 調用adminService 中的插入release記錄。adminService代碼如下:

  @PostMapping("/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/releases")
  public ReleaseDTO publish(@PathVariable("appId") String appId,
                            @PathVariable("clusterName") String clusterName,
                            @PathVariable("namespaceName") String namespaceName,
                            @RequestParam("name") String releaseName,
                            @RequestParam(name = "comment", required = false) String releaseComment,
                            @RequestParam("operator") String operator,
                            @RequestParam(name = "isEmergencyPublish", defaultValue = "false") boolean isEmergencyPublish) {
    Namespace namespace = namespaceService.findOne(appId, clusterName, namespaceName);
    if (namespace == null) {
      throw new NotFoundException(String.format("Could not find namespace for %s %s %s", appId,
              clusterName, namespaceName));
    }
    // 保存發布記錄
    Release release = releaseService.publish(namespace, releaseName, releaseComment, operator, isEmergencyPublish);
    //send release message 發送發布消息
    Namespace parentNamespace = namespaceService.findParentNamespace(namespace);
    String messageCluster;
    if (parentNamespace != null) {
      messageCluster = parentNamespace.getClusterName();
    } else {
      messageCluster = clusterName;
    }
    // 實際發布信息
    messageSender.sendMessage(ReleaseMessageKeyGenerator.generate(appId, messageCluster, namespaceName),
            Topics.APOLLO_RELEASE_TOPIC);
    return BeanUtils.transform(ReleaseDTO.class, release);
  }

保存發布記錄 releaseService.publish(namespace, releaseName, releaseComment, operator, isEmergencyPublish);

*查詢namespace下所有未刪除的配置列表

*先查詢最新的發布記錄,獲取上次最新發布記錄的id。

*組裝release信息,插入到數據庫中

發送消息通知ConfigService有新配置發布

Admin Service在配置發布后會往ReleaseMessage表插入一條消息記錄,消息內容就是配置發布的AppId+Cluster+Namespace。這里發布后需要通知ConfigService有新的配置發布。configService獲取新發布的配置信息,推送給client。

二、配置發布后的實時推送設計

image-20220306154605993

上圖簡要描述了配置發布的大致過程:

  1. 用戶在Portal操作配置發布
  2. Portal調用Admin Service的接口操作發布
  3. Admin Service發布配置后,發送ReleaseMessage給各個Config Service
  4. Config Service收到ReleaseMessage后,通知對應的客戶端

####### 2.1 發送ReleaseMessage的實現方式

Admin Service在配置發布后,需要通知所有的Config Service有配置發布,從而Config Service可以通知對應的客戶端來拉取最新的配置。

從概念上來看,這是一個典型的消息使用場景,Admin Service作為producer發出消息,各個Config Service作為consumer消費消息。通過一個消息組件(Message Queue)就能很好的實現Admin Service和Config Service的解耦。

在實現上,考慮到Apollo的實際使用場景,以及為了盡可能減少外部依賴,我們沒有采用外部的消息中間件,而是通過數據庫實現了一個簡單的消息隊列。

實現方式如下:

  1. Admin Service在配置發布后會往ReleaseMessage表插入一條消息記錄,消息內容就是配置發布的AppId+Cluster+Namespace,參見DatabaseMessageSender

  2. Config Service有一個線程會每秒掃描一次ReleaseMessage表,看看是否有新的消息記錄,參見ReleaseMessageScanner

  3. Config Service如果發現有新的消息記錄,那么就會通知到所有的消息監聽器(ReleaseMessageListener),如NotificationControllerV2,消息監聽器的注冊過程參見ConfigServiceAutoConfiguration

  4. NotificationControllerV2得到配置發布的AppId+Cluster+Namespace后,會通知對應的客戶端

示意圖如下:

image-20220306155452839

現在看下Apollo源碼中的具體實現過程

2.1 configService 定時掃描 releaseMessage

在Java配置類 ConfigServiceAutoConfiguration 中 配置了注冊ReleaseMessageScanner到spring容器中。

  @Bean
  public ReleaseMessageScanner releaseMessageScanner() {
      // ReleaseMessageScanner 構造方法中初始化一個定時執行的線程池
      ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
    	// releaseMessageScanner注冊監聽器列表,獲取到信息發布的消息,會調用監聽器列表
      //0. handle release message cache
      releaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);
      //1. handle gray release rule
      releaseMessageScanner.addMessageListener(grayReleaseRulesHolder);
      //2. handle server cache
      releaseMessageScanner.addMessageListener(configService);
      releaseMessageScanner.addMessageListener(configFileController);
      //3. notify clients 通知客戶端
      releaseMessageScanner.addMessageListener(notificationControllerV2);
      releaseMessageScanner.addMessageListener(notificationController);
      return releaseMessageScanner;
    }

看下ReleaseMessageScanner構造方法

public ReleaseMessageScanner() {
    listeners = Lists.newCopyOnWriteArrayList();
  	// 初始化一個定時每秒執行的線程池 executorService
    executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory
        .create("ReleaseMessageScanner", true));
    missingReleaseMessages = Maps.newHashMap();
  }

同時,我們注意到ReleaseMessageScanner實現了InitializingBean接口,通過afterPropertiesSet方法對spring進行擴展。

public class ReleaseMessageScanner implements InitializingBean {
    @Override
  public void afterPropertiesSet() throws Exception {
    // 獲取配置掃描的時間間隔
    databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();
    // 初始化屬性maxIdScanned,從ReleaseMessage表中獲取最大ID
    maxIdScanned = loadLargestMessageId();
    // 定時線程池執行定時掃描ReleaseMessage的任務
    executorService.scheduleWithFixedDelay(() -> {
      try {
        // 掃描ReleaseMessage最新發布,如果有新的發布配置就通知監聽器
        scanMessages();
      } catch (Throwable ex) {
        logger.error("Scan and send message failed", ex);
      } 
    }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);
  }
}

查看scanMessages()的內容

 private void scanMessages() {
    boolean hasMoreMessages = true;
   // 一致掃描,直到沒有新的發布消息
    while (hasMoreMessages && !Thread.currentThread().isInterrupted()) {
      hasMoreMessages = scanAndSendMessages();
    }
  }

  private boolean scanAndSendMessages() {
    //current batch is 500,每次查詢比maxIdScanned大的前500條發布記錄信息
    List<ReleaseMessage> releaseMessages =
        releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
    if (CollectionUtils.isEmpty(releaseMessages)) {
      return false;
    }
    // 通知消息監聽器對發布的消息進行處理,這里主要是NotificationControllerV2通知客戶端新發布的配置
    fireMessageScanned(releaseMessages);
    // 獲取當前掃描到的最大ID
    long newMaxIdScanned = releaseMessages.get(messageScanned - 1).getId();
    // 更新當前的掃描到的最大ID
    maxIdScanned = newMaxIdScanned;
    // 是否繼續循掃描
    return messageScanned == 500;
  }

 private void fireMessageScanned(Iterable<ReleaseMessage> messages) {
    for (ReleaseMessage message : messages) {
      for (ReleaseMessageListener listener : listeners) {
        try {
          listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
        } catch (Throwable ex) {
          logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
        }
      }
    }
  }
2.2 NotificationControllerV2通知客戶端配置更新

實現方式如下:

  1. 客戶端會發起一個Http請求到Config Service的notifications/v2接口,也就是NotificationControllerV2,參見RemoteConfigLongPollService

  2. NotificationControllerV2不會立即返回結果,而是通過Spring DeferredResult把請求掛起

  3. 如果在60秒內沒有該客戶端關心的配置發布,那么會返回Http狀態碼304給客戶端

  4. 如果有該客戶端關心的配置發布,NotificationControllerV2會調用DeferredResult的setResult方法,傳入有配置變化的namespace信息,同時該請求會立即返回。客戶端從返回的結果中獲取到配置變化的namespace后,會立即請求Config Service獲取該namespace的最新配置。

public class NotificationControllerV2 implements ReleaseMessageListener {
  @Override
  public void handleMessage(ReleaseMessage message, String channel) {
    String content = message.getMessage();
    // 只處理發布消息的消息
    if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
      return;
    }
		// 獲取發布信息的namespace。比如content內容為 apollo-demo+default+application,namespace為application
    String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);
    if (Strings.isNullOrEmpty(changedNamespace)) {
      logger.error("message format invalid - {}", content);
      return;
    }
    // deferredResults 就是客戶端請求到ConfigService請求掛起的集合。相當於客戶端的集合
    if (!deferredResults.containsKey(content)) {
      return;
    }
    //create a new list to avoid ConcurrentModificationException
    List<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));
    // 創建配置變更的通知對象
    ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());
    configNotification.addMessage(content, message.getId());
    //do async notification if too many clients 
    //通知所有請求過來的客戶端有新的配置發布。如果客戶端太多就執行異步操作
    if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
      largeNotificationBatchExecutorService.submit(() -> {
        for (int i = 0; i < results.size(); i++) {
          if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
            try {
TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
            } catch (InterruptedException e) {
              //ignore
            }
          }
          logger.debug("Async notify {}", results.get(i));
          results.get(i).setResult(configNotification);
        }
      });
      return;
    }
    logger.debug("Notify {} clients for key {}", results.size(), content);
    for (DeferredResultWrapper result : results) {
      // result.setResult()后,客戶端就獲得了新發布的namespace信息,掛起的請求就立即返回了。
      result.setResult(configNotification);
    }
    logger.debug("Notification completed");
  }
}

通過上面的操作,在用戶發布新的配置后,adminService就新增一個releaseMessage到數據庫中。configService定時掃描獲取新增的發布消息,調用NotificationControllerV2后通知所有的客戶端哪個namespace有新的配置發布。這是客戶端就獲取到了有發布配置的namespace,可以請求configService拉取這個namespace的最新配置到本地。

三、客戶端實時通知和定時拉取配置

image-20220306174842290

上圖簡要描述了Apollo客戶端的實現原理:

  1. 客戶端和服務端保持了一個長連接,從而能第一時間獲得配置更新的推送。(通過Http Long Polling實現)
  2. 客戶端還會定時從Apollo配置中心服務端拉取應用的最新配置。
    • 這是一個fallback機制,為了防止推送機制失效導致配置不更新
    • 客戶端定時拉取會上報本地版本,所以一般情況下,對於定時拉取的操作,服務端都會返回304 - Not Modified
    • 定時頻率默認為每5分鍾拉取一次,客戶端也可以通過在運行時指定System Property: apollo.refreshInterval來覆蓋,單位為分鍾。
  3. 客戶端從Apollo配置中心服務端獲取到應用的最新配置后,會保存在內存中
  4. 客戶端會把從服務端獲取到的配置在本地文件系統緩存一份
    • 在遇到服務不可用,或網絡不通的時候,依然能從本地恢復配置
  5. 應用程序可以從Apollo客戶端獲取最新的配置、訂閱配置更新通知
3.1 客戶端初始化過程

Apollo是在PropertySourcesProcessor中實現了在spring初始化時獲取配置和設置自動更新配置的以及實現將Apollo配置的優先級設置為最高的。BeanFactoryPostProcessor 是spring對bean實例化前的擴展接口,可以對beanDefine進行修改。

public class PropertySourcesProcessor implements BeanFactoryPostProcessor, EnvironmentAware, PriorityOrdered {
  
  @Override
  public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    this.configUtil = ApolloInjector.getInstance(ConfigUtil.class);
    // 初始化和拉取配置,並這是Apollo的配置是最先生效的。同時設置實時和定時從configService獲取配置
    initializePropertySources();
    // 運行時自動更新配置
    initializeAutoUpdatePropertiesFeature(beanFactory);
  }
}

我們先看initializePropertySources方法。該方法就是獲取應用中設置的所有namespace從Apollo獲取配置信息,然后設置Apollo的配置的優先級。

Apollo是如何設置自己配置優先於別的配置文件呢?

Spring從3.1版本開始增加了ConfigurableEnvironmentPropertySource

  • ConfigurableEnvironment
    • Spring的ApplicationContext會包含一個Environment(實現ConfigurableEnvironment接口)
    • ConfigurableEnvironment自身包含了很多個PropertySource
  • PropertySource
    • 屬性源
    • 可以理解為很多個Key - Value的屬性配置

在運行時的結構形如:

image-20220306182714638

需要注意的是,PropertySource之間是有優先級順序的,如果有一個Key在多個property source中都存在,那么在前面的property source優先。

所以對上圖的例子:

  • env.getProperty(“key1”) -> value1
  • env.getProperty(“key2”) -> value2
  • env.getProperty(“key3”) -> value4

在理解了上述原理后,Apollo和Spring/Spring Boot集成的手段就呼之欲出了:在應用啟動階段,Apollo從遠端獲取配置,然后組裝成PropertySource並插入到第一個即可,如下圖所示:

image-20220306182851641

 private void initializePropertySources() {
    if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_PROPERTY_SOURCE_NAME)) {
      //already initialized
      return;
    }
    CompositePropertySource composite;
    if (configUtil.isPropertyNamesCacheEnabled()) {
      composite = new CachedCompositePropertySource(PropertySourcesConstants.APOLLO_PROPERTY_SOURCE_NAME);
    } else {
      composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_PROPERTY_SOURCE_NAME);
    }
    //sort by order asc
    ImmutableSortedSet<Integer> orders = ImmutableSortedSet.copyOf(NAMESPACE_NAMES.keySet());
    Iterator<Integer> iterator = orders.iterator();
    while (iterator.hasNext()) {
      int order = iterator.next();
      // 按照配置的Apollo的各個nameSpace的生效順序,獲取namespace
      for (String namespace : NAMESPACE_NAMES.get(order)) {
        // 重點 從Apollo的configservice獲取nameSpace的配置的信息
        Config config = ConfigService.getConfig(namespace);
        composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
      }
    }
    // clean up
    NAMESPACE_NAMES.clear();
    // add after the bootstrap property source or to the first
    // 設置Apollo配置的優先級最高,在environment.getPropertySources()位置為0
    if (environment.getPropertySources()
        .contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) {
      // ensure ApolloBootstrapPropertySources is still the first
      ensureBootstrapPropertyPrecedence(environment);
      environment.getPropertySources()
          .addAfter(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME, composite);
    } else {
      environment.getPropertySources().addFirst(composite);
    }
  }

我們看下應用啟動是Apollo初始化獲取配置的過程 ConfigService.getConfig(namespace);

  public static Config getConfig(String namespace) {
    // 入口 ,重點看 getConfig
    return s_instance.getManager().getConfig(namespace);
  }

看下com.ctrip.framework.apollo.internals.DefaultConfigManager#getConfig()方法

  @Override
  public Config getConfig(String namespace) {
    // 從內存中獲取配置
    Config config = m_configs.get(namespace);
    // 雙重檢驗
    if (config == null) {
      synchronized (this) {
        config = m_configs.get(namespace);
        if (config == null) {
          ConfigFactory factory = m_factoryManager.getFactory(namespace);
          // 創建配置
          config = factory.create(namespace);
          // 設置nameSpace的配置
          m_configs.put(namespace, config);
        }
      }
    }
    return config;
  }
 com.ctrip.framework.apollo.spi.DefaultConfigFactory#create
   
 public Config create(String namespace) {
    // 獲取namespace的格式,一般是properties
    ConfigFileFormat format = determineFileFormat(namespace);
    // YML和yaml文件
    if (ConfigFileFormat.isPropertiesCompatible(format)) {
      return this.createRepositoryConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
    }
    //重點 createLocalConfigRepository(namespace) 開啟定時任務定時從遠端拉取進行同步和長輪詢
    return this.createRepositoryConfig(namespace, createLocalConfigRepository(namespace));
  }

protected Config createRepositoryConfig(String namespace, 
                                        ConfigRepository configRepository) {
    return new DefaultConfig(namespace, configRepository);
  }

看下Apollo創建配置過程 com.ctrip.framework.apollo.spi.DefaultConfigFactory#create

public Config create(String namespace) {
    // 獲取namespace的格式,一般是properties
    ConfigFileFormat format = determineFileFormat(namespace);
    // YML和yaml文件
    if (ConfigFileFormat.isPropertiesCompatible(format)) {
      return this.createRepositoryConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
    }
    // 重點 createLocalConfigRepository(namespace) 開啟定時任務定時從遠端拉取進行同步和和長輪詢
    return this.createRepositoryConfig(namespace, createLocalConfigRepository(namespace));
  }



 LocalFileConfigRepository createLocalConfigRepository(String namespace) {
  // createRemoteConfigRepository 客戶端連接遠端,本地配置與遠端配置進行同步,啟動定時拉起配置和發起對ConfigService請求獲取實時的配置變更
  return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
  }

我們看下RemoteConfigRepository的構造方法

public RemoteConfigRepository(String namespace) {
  m_namespace = namespace;
  m_configCache = new AtomicReference<>();
  m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
  m_httpClient = ApolloInjector.getInstance(HttpClient.class);
  m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
  remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
  m_longPollServiceDto = new AtomicReference<>();
  m_remoteMessages = new AtomicReference<>();
  m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
  m_configNeedForceRefresh = new AtomicBoolean(true);
  m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
      m_configUtil.getOnErrorRetryInterval() * 8);
  // 從遠端拉取和本地配置進行同步,超時時間為5秒
  this.trySync();
  // 定時周期拉取最新(主動取configService拉取配置到本地,每5秒拉取一次,通過調用trySync(),)
  this.schedulePeriodicRefresh();
  // 長輪詢實時刷新(遠端推送)
  this.scheduleLongPollingRefresh();
}
3.2 拉取遠端配置同步到本地
 protected boolean trySync() {
    try {
      sync();
      return true;
    } catch (Throwable ex) {
  logger.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil.getDetailMessage(ex));
    }
    return false;
  }

  @Override
  protected synchronized void sync() {
    Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
    try {
      // 獲取本地內存當前的配置
      ApolloConfig previous = m_configCache.get();
      // 獲取遠端的當前配置
      ApolloConfig current = loadApolloConfig();
      //reference equals means HTTP 304
      if (previous != current) {
        logger.debug("Remote Config refreshed!");
        // 遠端配置覆蓋本地配置
        m_configCache.set(current);
        // 遠端配置有更新時,通知本地配置變更。
        this.fireRepositoryChange(m_namespace, this.getConfig());
      }
    } catch (Throwable ex) {
      throw ex;
    }
  }

   for (RepositoryChangeListener listener : m_listeners) {
      try {
        // 調用listener更新配置
        listener.onRepositoryChange(namespace, newProperties);
      } catch (Throwable ex) {
   logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
      }
    }
  }

遠端配置同步后,如果有更新就會更新本地緩存的配置文件。實現就在 com.ctrip.framework.apollo.internals.LocalFileConfigRepository#onRepositoryChange

  @Override
  public void onRepositoryChange(String namespace, Properties newProperties) {
    if (newProperties.equals(m_fileProperties)) {
      return;
    }
    Properties newFileProperties = propertiesFactory.getPropertiesInstance();
    newFileProperties.putAll(newProperties);
    // 更新本地緩存
    updateFileProperties(newFileProperties, m_upstream.getSourceType());
    this.fireRepositoryChange(namespace, newProperties);
  }


  private synchronized void updateFileProperties(Properties newProperties, ConfigSourceType sourceType) {
    this.m_sourceType = sourceType;
    // 判斷本地配置列表跟遠端配置列表是否一致。Hashtable比較里面key的值是否一致
    if (newProperties.equals(m_fileProperties)) {
      return;
    }
    // 否則遠端的配置覆蓋本地配置
    this.m_fileProperties = newProperties;
    // 寫入到本地的配置文件中
    persistLocalCacheFile(m_baseDir, m_namespace);
  }

現在我們一起看下client如何拉取從configService拉取配置的。主要流程如下

** 獲取遠端configService地址列表,負載均衡選擇一個去拉取配置

** 拼接url並請求configService。url結構如下:http://192.168.100.2:8080/configs/apollo-demo/default/application?ip=192.168.100.2。com.ctrip.framework.apollo.configservice.controller.ConfigController#queryConfig

代碼如下

private ApolloConfig loadApolloConfig() {
    // 本地拉取遠端配置限流每5秒一次
    if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
      //wait at most 5 seconds
      try {
        TimeUnit.SECONDS.sleep(5);
      } catch (InterruptedException e) {
      }
    }
    String appId = m_configUtil.getAppId();
    String cluster = m_configUtil.getCluster();
    String dataCenter = m_configUtil.getDataCenter();
    String secret = m_configUtil.getAccessKeySecret();
    Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
    int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
    long onErrorSleepTime = 0; // 0 means no sleep
    Throwable exception = null;
    // 獲取遠端的配置服務configServie地址列表
    List<ServiceDTO> configServices = getConfigServices();
    String url = null;
    retryLoopLabel:
    for (int i = 0; i < maxRetries; i++) {
      List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
      // 對configServie地址列表進行亂序,相當於隨機選取一個服務進行調用。負載均衡
      Collections.shuffle(randomConfigServices);
      //Access the server which notifies the client first
      // 優先訪問通知過客戶端的服務,放在列表的第一位
      if (m_longPollServiceDto.get() != null) {
        randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
      }
      for (ServiceDTO configService : randomConfigServices) {
        if (onErrorSleepTime > 0) {
          logger.warn(
              "Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
              onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);
          try {
            m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
          } catch (InterruptedException e) {
            //ignore
          }
        }
        // 組裝url http://192.168.100.2:8080/configs/apollo-demo/default/application?ip=192.168.100.2 請求的是configService的 ConfigController 的/{appId}/{clusterName}/{namespace:.+}
        url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,dataCenter, m_remoteMessages.get(), m_configCache.get());
        logger.debug("Loading config from {}", url);
        HttpRequest request = new HttpRequest(url);
        if (!StringUtils.isBlank(secret)) {
          Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
          request.setHeaders(headers);
        }
        try {
            // 從遠端獲取配置信息,
          HttpResponse<ApolloConfig> response = m_httpClient.doGet(request, ApolloConfig.class);
          m_configNeedForceRefresh.set(false);
          m_loadConfigFailSchedulePolicy.success();
          // 如果請求返回304,說明配置沒變化,返回本地緩存的配置對象
          if (response.getStatusCode() == 304) {
            logger.debug("Config server responds with 304 HTTP status code.");
            return m_configCache.get();
          }
          // 否則返回實際返回的配置信息
          ApolloConfig result = response.getBody();
          logger.debug("Loaded config for {}: {}", m_namespace, result);
          return result;
        } catch (ApolloConfigStatusCodeException ex) {
          ApolloConfigStatusCodeException statusCodeException = ex;
          //config not found
          if (ex.getStatusCode() == 404) {
            String message = String.format(
                "Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +
                    "please check whether the configs are released in Apollo!",
                appId, cluster, m_namespace);
            statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),
                message);
          }
          exception = statusCodeException;
          if(ex.getStatusCode() == 404) {
            break retryLoopLabel;
          }
        } catch (Throwable ex) {
          exception = ex;
        } 
        // if force refresh, do normal sleep, if normal config load, do exponential sleep
        onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
            m_loadConfigFailSchedulePolicy.fail();
      }

    }
    String message = String.format(
        "Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s",
        appId, cluster, m_namespace, url);
    throw new ApolloConfigException(message, exception);
  }

接下來看下configService中如何獲取配置的。主要邏輯就是configservice獲取release表中最新的發布記錄,得到最新發布記錄的releaseKey,與客戶端帶來的clientSideReleaseKey比較是否一致。如果一致,說明沒有配置變化,返回304.否則就把最新發布的配置返回。

 @GetMapping(value = "/{appId}/{clusterName}/{namespace:.+}")
  public ApolloConfig queryConfig(@PathVariable String appId,
                                  @PathVariable String clusterName,
                                  @PathVariable String namespace,
                                  @RequestParam(value = "dataCenter", required = false) String dataCenter,
                                  @RequestParam(value = "releaseKey", defaultValue = "-1") String clientSideReleaseKey,
                                  @RequestParam(value = "ip", required = false) String clientIp,
                                  @RequestParam(value = "messages", required = false) String messagesAsString,
                                  HttpServletRequest request, HttpServletResponse response) throws IOException {
    String originalNamespace = namespace;
    //strip out .properties suffix
    namespace = namespaceUtil.filterNamespaceName(namespace);
    //fix the character case issue, such as FX.apollo <-> fx.apollo
    namespace = namespaceUtil.normalizeNamespace(appId, namespace);

    if (Strings.isNullOrEmpty(clientIp)) {
      clientIp = tryToGetClientIp(request);
    }

    ApolloNotificationMessages clientMessages = transformMessages(messagesAsString);

    List<Release> releases = Lists.newLinkedList();

    String appClusterNameLoaded = clusterName;
    if (!ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
      // 查找Release表中最新的發布記錄
      Release currentAppRelease = configService.loadConfig(appId, clientIp, appId, clusterName, namespace,
          dataCenter, clientMessages);

      if (currentAppRelease != null) {
        releases.add(currentAppRelease);
        //we have cluster search process, so the cluster name might be overridden
        appClusterNameLoaded = currentAppRelease.getClusterName();
      }
    }

    //if namespace does not belong to this appId, should check if there is a public configuration
    if (!namespaceBelongsToAppId(appId, namespace)) {
      Release publicRelease = this.findPublicConfig(appId, clientIp, clusterName, namespace,
          dataCenter, clientMessages);
      if (Objects.nonNull(publicRelease)) {
        releases.add(publicRelease);
      }
    }

    if (releases.isEmpty()) {
      response.sendError(HttpServletResponse.SC_NOT_FOUND,
          String.format(
              "Could not load configurations with appId: %s, clusterName: %s, namespace: %s",
              appId, clusterName, originalNamespace));
      Tracer.logEvent("Apollo.Config.NotFound",
          assembleKey(appId, clusterName, originalNamespace, dataCenter));
      return null;
    }

    auditReleases(appId, clusterName, dataCenter, clientIp, releases);

    String mergedReleaseKey = releases.stream().map(Release::getReleaseKey)
            .collect(Collectors.joining(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR));
// 比較客戶端請求帶來的clientSideReleaseKey與服務端是否一致,如果一致說明沒有新的配置發布,返回304
    if (mergedReleaseKey.equals(clientSideReleaseKey)) {
      // Client side configuration is the same with server side, return 304
      response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
      Tracer.logEvent("Apollo.Config.NotModified",
          assembleKey(appId, appClusterNameLoaded, originalNamespace, dataCenter));
      return null;
    }

    ApolloConfig apolloConfig = new ApolloConfig(appId, appClusterNameLoaded, originalNamespace,
        mergedReleaseKey);
    apolloConfig.setConfigurations(mergeReleaseConfigurations(releases));

    Tracer.logEvent("Apollo.Config.Found", assembleKey(appId, appClusterNameLoaded,
        originalNamespace, dataCenter));
    return apolloConfig;
  }
3.3 客戶端定時拉取同步遠端配置

講完了客戶端同步遠端配置,我們重新回到RemoteConfigRepository的構造方法。

 public RemoteConfigRepository(String namespace) {
    m_namespace = namespace;
    m_configCache = new AtomicReference<>();
    m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
    m_httpClient = ApolloInjector.getInstance(HttpClient.class);
    m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
    remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
    m_longPollServiceDto = new AtomicReference<>();
    m_remoteMessages = new AtomicReference<>();
    m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
    m_configNeedForceRefresh = new AtomicBoolean(true);
    m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
        m_configUtil.getOnErrorRetryInterval() * 8);
    // 從遠端拉取和本地配置進行同步,超時時間為5秒
    this.trySync();
    // 定時周期拉取最新(主動取configService拉取配置到本地,每5秒拉取一次,通過調用trySync(),)
    this.schedulePeriodicRefresh();
    // 長輪詢實時刷新(遠端推送)
    this.scheduleLongPollingRefresh();
  }

接下來查看定時周期拉取配置,同步本地配置,避免長輪詢失敗導致本地與遠端配置不一致。是一種兜底的操作。

代碼很簡單就是啟動一個定時線程池,定時調用同步配置的trySync()方法。每5分鍾定時拉取一次。

  private void schedulePeriodicRefresh() {
    logger.debug("Schedule periodic refresh with interval: {} {}",
        m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
    m_executorService.scheduleAtFixedRate(
        new Runnable() {
          @Override
          public void run() {
            logger.debug("refresh config for namespace: {}", m_namespace);
            trySync();
          }
        }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
        m_configUtil.getRefreshIntervalTimeUnit());
  }

3.3 客戶端長輪詢實時同步配置
  private void scheduleLongPollingRefresh() {
    // submit 方法中進行長輪詢
    remoteConfigLongPollService.submit(m_namespace, this);
  }

  public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
    boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
    m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
    if (!m_longPollStarted.get()) {
      startLongPolling();
    }
    return added;
  }


private void startLongPolling() {
    if (!m_longPollStarted.compareAndSet(false, true)) {
      //already started
      return;
    }
    try {
      final String appId = m_configUtil.getAppId();
      final String cluster = m_configUtil.getCluster();
      final String dataCenter = m_configUtil.getDataCenter();
      final String secret = m_configUtil.getAccessKeySecret();
      final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
      m_longPollingService.submit(new Runnable() {
        @Override
        public void run() {
          // 初始化延遲2秒
          if (longPollingInitialDelayInMills > 0) {
            try {
              logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
              TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
            } catch (InterruptedException e) {
              //ignore
            }
          }
          // 定時任務長輪詢查詢是否存在新的變更
          doLongPollingRefresh(appId, cluster, dataCenter, secret);
        }
      });
    } catch (Throwable ex) {
      m_longPollStarted.set(false);
      ApolloConfigException exception =
          new ApolloConfigException("Schedule long polling refresh failed", ex);
      logger.warn(ExceptionUtil.getDetailMessage(exception));
    }
  }
private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
    final Random random = new Random();
    ServiceDTO lastServiceDto = null;
    while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
      if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
        //wait at most 5 seconds
        try {
          TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
        }
      }
      Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
      String url = null;
      try {
        // 如果是第一次長輪詢
        if (lastServiceDto == null) {
          // 獲取遠端的配置服務列表
          List<ServiceDTO> configServices = getConfigServices();
          // 隨機取一個configServcie進行長輪詢
          lastServiceDto = configServices.get(random.nextInt(configServices.size()));
        }
        // m_notifications 存儲每個namespace對應的releaseMessageId
        url = assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter, m_notifications);
        // 長輪詢url=http://192.168.100.2:8080/notifications/v2?cluster=default&appId=apollo-demo&ip=192.168.100.2&notifications=[{"namespaceName":"application","notificationId":-1}]
        logger.debug("Long polling from {}", url);
        HttpRequest request = new HttpRequest(url);
         //90 seconds, should be longer than server side's long polling timeout, which is now 60 seconds
        // 客戶端請求超時時間為90 秒,應該比服務器端的長輪詢超時時間長,服務端現在是 60 秒
        request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
        if (!StringUtils.isBlank(secret)) {
          Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
          request.setHeaders(headers);
        }
        final HttpResponse<List<ApolloConfigNotification>> response =
            m_httpClient.doGet(request, m_responseType);
        logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
//        客戶端從Apollo配置中心服務端獲取到應用的最新配置后,會保存在內存中
//        客戶端會把從服務端獲取到的配置在本地文件系統緩存一份
        if (response.getStatusCode() == 200 && response.getBody() != null) {
//      response.getBody()    ApolloConfigNotification{namespaceName='application', notificationId=971}
          // 根據namespace 和 notificationId 進行更新,已經更新的記錄
          updateNotifications(response.getBody());
          updateRemoteNotifications(response.getBody());
          // 重點 通知本地拉取配置
          notify(lastServiceDto, response.getBody());
        }
       //try to load balance 如果配置沒有變化,就設置lastServiceDto為空,下次隨機獲取一個服務地址列表
        if (response.getStatusCode() == 304 && random.nextBoolean()) {
          lastServiceDto = null;
        }
        m_longPollFailSchedulePolicyInSecond.success();
      } catch (Throwable ex) {
        lastServiceDto = null;
        long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
        logger.warn(
            "Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
            sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
        try {
          TimeUnit.SECONDS.sleep(sleepTimeInSecond);
        } catch (InterruptedException ie) {
          //ignore
        }
      } 
    }
  }

長輪詢的狀態碼如果是200,會根據返回的結果然本地重新拉取配置。返回結果就是namespace和releaseMessageId。

我們看下configService長輪詢返回結果的代碼。主要就是把請求轉換為deferredResultWrapper 存儲起來,對應的key是releaseMessage中的message(比如 apollo-demo+default+application),value是所有請求到該configService的請求轉換的deferredResultWrapper的集合,如果configService掃描到有新的releaseMessage,就會查找根據message查詢出所有的client,設置deferredResultWrapper的結果,讓請求提前結束。否則請求會一直等待直到超時返回。

@GetMapping
  public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
      @RequestParam(value = "appId") String appId,
      @RequestParam(value = "cluster") String cluster,
      @RequestParam(value = "notifications") String notificationsAsString,
      @RequestParam(value = "dataCenter", required = false) String dataCenter,
      @RequestParam(value = "ip", required = false) String clientIp) {
    List<ApolloConfigNotification> notifications = null;

    try {
      notifications =
          gson.fromJson(notificationsAsString, notificationsTypeReference);
    } catch (Throwable ex) {
      Tracer.logError(ex);
    }

    if (CollectionUtils.isEmpty(notifications)) {
      throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
    }
    
    Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);

    if (CollectionUtils.isEmpty(filteredNotifications)) {
      throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
    }
    //這里是構建DeferredResult對象,一會存儲起來。NotificationControllerV2不會立即返回結果,而是通過Spring DeferredResult 把請求掛起。
    //如果在60秒內沒有該客戶端關心的配置發布,那么會返回Http狀態碼304給客戶端
    DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());
    Set<String> namespaces = Sets.newHashSetWithExpectedSize(filteredNotifications.size());
    Map<String, Long> clientSideNotifications = Maps.newHashMapWithExpectedSize(filteredNotifications.size());
    
    for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {
      String normalizedNamespace = notificationEntry.getKey();
      ApolloConfigNotification notification = notificationEntry.getValue();
      namespaces.add(normalizedNamespace);
      clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());
      if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {
        deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);
      }
    }

    Multimap<String, String> watchedKeysMap =
        watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);

    Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());

    /**
     * 1、set deferredResult before the check, for avoid more waiting
     * If the check before setting deferredResult,it may receive a notification the next time
     * when method handleMessage is executed between check and set deferredResult.
     * 在check之前設置deferredResult,避免更多的等待。如果在設置deferredResult之前進行check,
     * 。下次在check和set deferredResult之間執行handleMessage方法時可能會收到通知。
     */
    deferredResultWrapper
          .onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));

    deferredResultWrapper.onCompletion(() -> {
      //unregister all keys
      for (String key : watchedKeys) {
        deferredResults.remove(key, deferredResultWrapper);
      }
      logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
    });

    //register all keys 存儲請求對應的deferredResultWrapper映射關系
    for (String key : watchedKeys) {
      this.deferredResults.put(key, deferredResultWrapper);
    }

    logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
    logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
        watchedKeys, appId, cluster, namespaces, dataCenter);

    /**
     * 2、check new release
     * 查詢最新的releaseMessage中的最新消息記錄,從緩存中獲取的
     */
    List<ReleaseMessage> latestReleaseMessages =
        releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);

    /**
     * Manually close the entity manager.
     * Since for async request, Spring won't do so until the request is finished,
     * which is unacceptable since we are doing long polling - means the db connection would be hold
     * for a very long time
     * 手動關閉實體管理器。由於對於異步請求,Spring 在請求完成之前不會這樣做,這是不可接受的,因為我們正在進行長時間輪詢 - 意味着數據庫連接將保持很長時間
     */
    entityManagerUtil.closeEntityManager();
/**
 * 如果有該客戶端關心的配置發布,NotificationControllerV2會調用DeferredResult的setResult方法,傳入有配置變化的namespace信息,同時該請求會立即返回。
 * 客戶端從返回的結果中獲取到配置變化的namespace后,會立即請求Config Service獲取該namespace的最新配置。
 */
// 查詢當前是否有未同步的已發布的releaseMessage,如果有就直接返回
    List<ApolloConfigNotification> newNotifications =
        getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,
            latestReleaseMessages);

    if (!CollectionUtils.isEmpty(newNotifications)) {
      deferredResultWrapper.setResult(newNotifications);
    }
    // 等待直到超時才返回304或者listener監聽到發布配置調用 handleMessage() deferredResultWrapper.setResult()提前返回。
    return deferredResultWrapper.getResult();
  }

如果有配置變更,長輪詢返回namespace和releaseMessageId。客戶端會對該namespace進行拉取配置。每個namespace會對應一個單門同步遠端配置的對象。

private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {
  if (notifications == null || notifications.isEmpty()) {
    return;
  }
  for (ApolloConfigNotification notification : notifications) {
    String namespaceName = notification.getNamespaceName();
    //create a new list to avoid ConcurrentModificationException
    List<RemoteConfigRepository> toBeNotified =
        Lists.newArrayList(m_longPollNamespaces.get(namespaceName));
    ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName);
    ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone();
    //since .properties are filtered out by default, so we need to check if there is any listener for it
    toBeNotified.addAll(m_longPollNamespaces
        .get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));
    for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {
      try {
        // 從遠端拉取配置同步到本地
        remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);
      } catch (Throwable ex) {
        Tracer.logError(ex);
      }
    }
  }
}

可以看到還是通過trySync()進行同步本地配置

  public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
    m_longPollServiceDto.set(longPollNotifiedServiceDto);
    m_remoteMessages.set(remoteMessages);
    m_executorService.submit(new Runnable() {
      @Override
      public void run() {
        m_configNeedForceRefresh.set(true);
        trySync();
      }
    });
  }
3.4 Apollo配置在spring中placeholder在運行時自動更新

https://github.com/apolloconfig/apollo/pull/972

回到Apollo客戶端啟動的代碼

public class PropertySourcesProcessor implements BeanFactoryPostProcessor, EnvironmentAware, PriorityOrdered {
  
  @Override
  public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    this.configUtil = ApolloInjector.getInstance(ConfigUtil.class);
    // 初始化和拉取配置,並設置Apollo的配置是最先生效的。同時設置實時和定時從configService獲取配置
    initializePropertySources();
    // 運行時自動更新配置
    initializeAutoUpdatePropertiesFeature(beanFactory);
  }
}


上面已經講完了客戶端初始化和同步配置的功能。下面繼續講運行時占位符自動更新的功能。

如果要實現Spring占位符值在Apollo配置變更后自動更新,需要滿足下面的前提:

1,能監聽Apollo的配置變化(Apollo可以為配置添加監聽器實現)

2,在Spring容器初始化時把xml和Bean中的占位符與對應bean的映射關系保存起來。在配置變化時根據Apollo變更的key,查找出哪些bean中的屬性需要自動更新,並進行更新操作。(Apollo中新增了ApolloProcessor,SpringValueProcessor,ApolloAnnotationProcessor 進行占位符的注冊)

spring占位符自動更新的邏輯就是在spring容器實初始化時新增了ApolloProcessor擴展了BeanPostProcessor,並新增子類子類 SpringValueProcessor(處理Spring中作用在field和method上的@Value和xml文件中的占位符)與ApolloAnnotationProcessor(Apollo自定義注解ApolloJsonValue)中的占位符組裝成SpringValue對象,並注冊在springValueRegistry中。key是占位符名稱,value是SpringValue列表(因為一個占位符可能用在多個Bean對象中)。然后為所有的Apollo的Namespace對象添加監聽器,一旦配置變化。就到springValueRegistry根據變更的key產找SpringValue,如果存在就更新對應Bean的屬性值。

  private void initializeAutoUpdatePropertiesFeature(ConfigurableListableBeanFactory beanFactory) {
    if (!configUtil.isAutoUpdateInjectedSpringPropertiesEnabled() ||
        !AUTO_UPDATE_INITIALIZED_BEAN_FACTORIES.add(beanFactory)) {
      return;
    }

    // 創建一個Apollo配置變更的監聽器,用於監聽配置變化,查看AutoUpdateConfigChangeListener 的onChange方法
    AutoUpdateConfigChangeListener autoUpdateConfigChangeListener = new AutoUpdateConfigChangeListener(
        environment, beanFactory);
    // 對所有的配置文件添加自動更新的監聽器,
    List<ConfigPropertySource> configPropertySources = configPropertySourceFactory.getAllConfigPropertySources();
    for (ConfigPropertySource configPropertySource : configPropertySources) {
      configPropertySource.addChangeListener(autoUpdateConfigChangeListener);
    }
  }

重點關注 AutoUpdateConfigChangeListener中的onChange方法。

  @Override
  public void onChange(ConfigChangeEvent changeEvent) {
    Set<String> keys = changeEvent.changedKeys();
    if (CollectionUtils.isEmpty(keys)) {
      return;
    }
    for (String key : keys) {
      // 1. check whether the changed key is relevant。查找變更的key關聯的bean列表信息。每注入一個bean中就多一個SpringValue對象
      Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key);
      if (targetValues == null || targetValues.isEmpty()) {
        continue;
      }

      // 2. update the value
      for (SpringValue val : targetValues) {
        updateSpringValue(val);
      }
    }
  }

private void updateSpringValue(SpringValue springValue) {
    try {
      Object value = resolvePropertyValue(springValue);
      springValue.update(value);

      logger.info("Auto update apollo changed value successfully, new value: {}, {}", value,
          springValue);
    } catch (Throwable ex) {
      logger.error("Auto update apollo changed value failed, {}", springValue.toString(), ex);
    }
  }

public void update(Object newVal) throws IllegalAccessException, InvocationTargetException {
    if (isField()) {
      injectField(newVal);
    } else {
      injectMethod(newVal);
    }
  }

  private void injectField(Object newVal) throws IllegalAccessException {
    Object bean = beanRef.get();
    if (bean == null) {
      return;
    }
    boolean accessible = field.isAccessible();
    field.setAccessible(true);
    field.set(bean, newVal);
    field.setAccessible(accessible);
  }

  private void injectMethod(Object newVal)
      throws InvocationTargetException, IllegalAccessException {
    Object bean = beanRef.get();
    if (bean == null) {
      return;
    }
    methodParameter.getMethod().invoke(bean, newVal);
  }

那么Apollo怎樣將占位符的和bean信息都保存到springValueRegistry中的呢?

其實是調用了com.ctrip.framework.apollo.spring.annotation.ApolloProcessor#postProcessBeforeInitialization

public abstract class ApolloProcessor implements BeanPostProcessor, PriorityOrdered {

  @Override
  public Object postProcessBeforeInitialization(Object bean, String beanName)
      throws BeansException {
    Class clazz = bean.getClass();
    for (Field field : findAllField(clazz)) {
      processField(bean, beanName, field);
    }
    for (Method method : findAllMethod(clazz)) {
      processMethod(bean, beanName, method);
    }
    return bean;
  }

  /**
   * subclass should implement this method to process field
   */
  protected abstract void processField(Object bean, String beanName, Field field);

  /**
   * subclass should implement this method to process method
   */
  protected abstract void processMethod(Object bean, String beanName, Method method);

com.ctrip.framework.apollo.spring.annotation.SpringValueProcessor#processField
@Override
  protected void processField(Object bean, String beanName, Field field) {
    // register @Value on field
    Value value = field.getAnnotation(Value.class);
    if (value == null) {
      return;
    }
    Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value());

    if (keys.isEmpty()) {
      return;
    }

    for (String key : keys) {
      SpringValue springValue = new SpringValue(key, value.value(), bean, beanName, field, false);
      springValueRegistry.register(beanFactory, key, springValue);
      logger.debug("Monitoring {}", springValue);
    }
  }

處理@ApolloJsonValue

 @Override
  protected void processField(Object bean, String beanName, Field field) {
    this.processApolloConfig(bean, field);
    this.processApolloJsonValue(bean, beanName, field);
  }

 private void processApolloJsonValue(Object bean, String beanName, Field field) {
    ApolloJsonValue apolloJsonValue = AnnotationUtils.getAnnotation(field, ApolloJsonValue.class);
    if (apolloJsonValue == null) {
      return;
    }
    String placeholder = apolloJsonValue.value();
    Object propertyValue = placeholderHelper
        .resolvePropertyValue(this.configurableBeanFactory, beanName, placeholder);

    // propertyValue will never be null, as @ApolloJsonValue will not allow that
    if (!(propertyValue instanceof String)) {
      return;
    }

    boolean accessible = field.isAccessible();
    field.setAccessible(true);
    ReflectionUtils
        .setField(field, bean, parseJsonValue((String) propertyValue, field.getGenericType()));
    field.setAccessible(accessible);

    if (configUtil.isAutoUpdateInjectedSpringPropertiesEnabled()) {
      Set<String> keys = placeholderHelper.extractPlaceholderKeys(placeholder);
      for (String key : keys) {
        SpringValue springValue = new SpringValue(key, placeholder, bean, beanName, field, true);
        springValueRegistry.register(this.configurableBeanFactory, key, springValue);
        logger.debug("Monitoring {}", springValue);
      }
    }
  }
參考文檔


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM