nacos的配置中心--服務端


一:配置服務端存儲模型

1.1:概述

Nacos Config提供了配置管理的功能,它允許用戶在nacos上配置key-value對,並在客戶端訂閱需要的配置。當配置發生變更時,訂閱的客戶端會獲得通知,隨后拉取最新的key-value對。

Config Server為了最大程度保證可用性采用了一種三層的存儲架構設計,mysql - 本地文件 - 內存緩存:

1.2:數據庫

Config Server所有的key-value配置信息都最終存儲在mysql數據庫中,當中包含四張核心表(table):

config_info - 存儲配置信息,包含id/data_id/group_id/tenant_id/content/md5/gmt_created/gmt_modified/app_name等列。當中data_id + group_id + tenant_id三者唯一確定一條key-value配置。

config_tags_relation - 存儲配置上附加的tag,包含id/tag_name/tag_type/data_id/group_id/tenant_id/nid等列。

config_info_beta - 存儲beta環境的特殊配置值,除了config_info表中的列之外新增了beta_ips列。

config_info_tag - 存儲某個tag對應的特殊配置值,除了config_info表中的列之外新增了tag_id列。

1.3:本地磁盤

mysql數據庫中存儲的是最終的配置信息,config server在啟動后會周期性(360min)的從mysql中將所有配置信息dump到本地文件系統中。配置會被存儲到一個特殊的目錄/{user.home}/nacos/data/config-data/{groupId}/{dataId}下,每條配置存在一個獨立的文件中。

config server中關於配置的讀取都是走本地文件系統中的dump,這種設計一方面提升了系統的可用性(防止mysql奔潰導致config不可用),另一方面極大降低了mysql數據庫的負載,使得config server的水平擴張變得非常容易。

1.4:緩存

當config server啟動時會一次性把mysql中存儲的所有配置dump到本地文件系統中,並設置一個定時器周期性(默認6h)做全量dump。config server也有一種quick start模式,允許重用文件系統中保留的配置數據,做增量dump。

配置信息的寫首先進入到mysql數據庫中。mysql插入成功之后server會生成一個ConfigDataChangeEvent事件,在AsyncNotifyService中將捕獲這個事件,對當前每一個config server發起/dataChange調用。

/dataChange調用在CommunicationController中被處理,通過ConfigService將變動的數據dump到本地文件中並更新內存緩存。

配置的讀取及訂閱都是從內存Cache + 本地文件中完成。

二、數據存儲

2.1:流程圖

整個流程分為兩大部分:

  • 入庫。插入mysql數據庫,發起ConfigDataChangeEvent,調用所有server上的/dataChange接口。

  • dump。server響應/dataChange請求,異步dump數據庫配置信息到本地。

2.2:ConfigController.publishConfig

處理config獲取/訂閱/變更相關的http請求。

2.2.1:入口

其中進行一系列的邏輯判斷,但是可以看出主要做了兩件事:持久化和事件發布。

public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
      @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
      @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
      @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
      @RequestParam(value = "appName", required = false) String appName,
      @RequestParam(value = "src_user", required = false) String srcUser,
      @RequestParam(value = "config_tags", required = false) String configTags,
      @RequestParam(value = "desc", required = false) String desc,
      @RequestParam(value = "use", required = false) String use,
      @RequestParam(value = "effect", required = false) String effect,
      @RequestParam(value = "type", required = false) String type,
      @RequestParam(value = "schema", required = false) String schema) throws NacosException {
      .......
          //進行持久化保存
      persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
              //配置更新事件
              ConfigChangePublisher
                      .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
      .......
      }

其中持久化主要交互到數據庫,進行配置數據的插入和歷史表的插入。

后面一步,封裝成為ConfigDataChangeEvent進行事件發布,只需要找到監聽位置進行跟蹤后續邏輯。事件的發布和監聽,是nacos自己寫的邏輯,將任務發布即將任務加入到某個任務隊列,另外有一個線程在阻塞監聽隊列的數據,一旦發現隊列中有數據,會根據任務類型找到任務的訂閱者,由訂閱者處理新發布的事件。

2.2.2 ConfigDataChangeEvent監聽

監聽事件的處理內容:

@Override
public void onEvent(Event event) {
  // Generate ConfigDataChangeEvent concurrently
  if (event instanceof ConfigDataChangeEvent) {
      ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
      long dumpTs = evt.lastModifiedTs;
      String dataId = evt.dataId;
      String group = evt.group;
      String tenant = evt.tenant;
      String tag = evt.tag;
      Collection<Member> ipList = memberManager.allMembers();
       
      // In fact, any type of queue here can be
      //初始化一個隊列
      Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
      for (Member member : ipList) {
          //遍歷nacos集群的成員,並在隊列中添加通知任務
          queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                  evt.isBeta));
      }
      //開辟新的任務,執行隊列中的任務
      ConfigExecutor.executeAsyncNotify(new AsyncTask(httpclient, queue));
  }
}
2.2.3 不同節點同步
private void executeAsyncInvoke() {
  while (!queue.isEmpty()) {
  //彈出隊列的數據
      NotifySingleTask task = queue.poll();
      String targetIp = task.getTargetIP();
      //判斷是否是集群成員
      if (memberManager.hasMember(targetIp)) {
          // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
          boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);
          //判斷節點是否為非監控節點,如果不健康則延時進行調用,並根據嘗試次數增加延時時長
          if (unHealthNeedDelay) {
              // target ip is unhealthy, then put it in the notification list
              ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                      task.getLastModified(), InetUtils.getSelfIp(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
                      0, task.target);
              // get delay time and set fail count to the task
              asyncTaskExecute(task);
          } else {
          //對於健康節點,直接執行通知邏輯
              HttpGet request = new HttpGet(task.url);
              request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED,
                      String.valueOf(task.getLastModified()));
              request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIp());
              if (task.isBeta) {
                  request.setHeader("isBeta", "true");
              }
              //執行請求,並帶上回調函數
              httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task));
          }
      }
  }
}

對於非監控節點延長延時時間,主要是根據次數增減加時長

private static int getDelayTime(NotifySingleTask task) {
  int failCount = task.getFailCount();
  int delay = MIN_RETRY_INTERVAL + failCount * failCount * INCREASE_STEPS;
  if (failCount <= MAX_COUNT) {
      task.setFailCount(failCount + 1);
  }
  return delay;
}

對於回調函數的內容,無非就是對http請求成功或者失敗的判定,以及根據結果進行后續的邏輯處理:

無論是請求成功結果狀態碼不正確還是調用失敗,異或是取消,都會添加任務到隊列中進行重試。

 @Override
  public void completed(HttpResponse response) {
      long delayed = System.currentTimeMillis() - task.getLastModified();
      if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
          ......
      } else {
          ......
          asyncTaskExecute(task);
          .......
      }
      HttpClientUtils.closeQuietly(response);
  }
  @Override
  public void failed(Exception ex) {
      ......
      asyncTaskExecute(task);
      ......
  }
  @Override
  public void cancelled() {
      ......
      asyncTaskExecute(task);
      ......
  }
  private NotifySingleTask task;
  private CloseableHttpAsyncClient httpClient;
}
2.3:CommunicationController.notifyConfigInfo

該接口,對應與2.2中的通知邏輯,該部分主要進行兩件事情:

(1):將配置數據更新到緩存和磁盤文件之中;

(2):如果有客戶端長連接監聽配置信息的變化,此時會找到對應key的長連接的請求,並進行響應。

2.3.2 接收到請求
if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
  dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
  dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}

dump的方法,根據dataId, group, tenant(命名空間編號),組裝成的key。

public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {
  String groupKey = GroupKey2.getKey(dataId, group, tenant);
  //添加任務
  dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));
}

//添加任務到tasks中,添加任務時候使用lock,大概是防止在任務添加的時候,執行該類的processor方法。

public void addTask(String type, AbstractTask task) {
  this.lock.lock();
  try {
      AbstractTask oldTask = tasks.put(type, task);
      MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
      if (null != oldTask) {
          task.merge(oldTask);
      }
  } finally {
      this.lock.unlock();
  }
}

該類中有一個processor的方法,該方法是當前類循環執行,每次執行間隙會休眠:

public void run() {
  while (!TaskManager.this.closed.get()) {
      try {
          Thread.sleep(100);
          TaskManager.this.process();
      } catch (Throwable e) {
          LogUtil.DUMP_LOG.error("execute dump process has error : {}", e);
      }
  }
}

在processor方法兩階段事情:

(1):判斷當前任務是否還需要繼續執行,不需要就將任務對隊列中去除;

(2):找到TaskProcessor,並調用result = processor.process(entry.getKey(), task);方法

protected void process() {
  for (Map.Entry<String, AbstractTask> entry : this.tasks.entrySet()) {
      AbstractTask task = null;
      this.lock.lock();
      try {
          // Getting task.
          //判斷當前任務是否還需要繼續執行,不需要就刪除
          task = entry.getValue();
          if (null != task) {
              if (!task.shouldProcess()) {
                  // If current task needn't to process, then it will skip.
                  continue;
              }
              // Remove task from task maps.
              this.tasks.remove(entry.getKey());
              MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
          }
      } finally {
          this.lock.unlock();
      }
       
      if (null != task) {
          // Getting task processor.找到對應的TaskProcessor,此時會使用默認this.processor = new DumpProcessor(this);
          TaskProcessor processor = this.taskProcessors.get(entry.getKey());
          if (null == processor) {
              // If has no related typpe processor, then it will use default processor.
              processor = this.getDefaultTaskProcessor();
          }
          if (null != processor) {
              boolean result = false;
              try {
                  // Execute the task.
                  result = processor.process(entry.getKey(), task);
              } catch (Throwable t) {
                  LOGGER.error("task_fail", "處理task失敗", t);
              }
              if (!result) {
                  // If task is executed failed, the set lastProcessTime.
                  task.setLastProcessTime(System.currentTimeMillis());
                   
                  // Add task into task map again.
                  this.addTask(entry.getKey(), task);
              }
          }
      }
  }
   
  if (tasks.isEmpty()) {
      this.lock.lock();
      try {
          this.notEmpty.signalAll();
      } finally {
          this.lock.unlock();
      }
  }
}
2.3.3 DumpProcessor.process

其中一大坨代碼,只是進行兩個操作:包裝數據和調用DumpConfigHandler.configDump(build.build())方法:

......
//包裝數據
ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
      .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
......
//查詢數據,查出最新數據
  ConfigInfo4Beta cf = persistService.findConfigInfo(dataId, group, tenant);
  //如果查出是null的處理
  build.remove(Objects.isNull(cf));
  build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
  build.content(Objects.isNull(cf) ? null : cf.getContent());
  //進行進步調用
  return DumpConfigHandler.configDump(build.build());
}
2.3.4 DumpConfigHandler.configDump

其中的核心代碼:

//對比md5值之后,保存到緩存和本地文件
result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);

直接進入ConfigCacheService.dump:

開始部分:

嘗試獲取一次寫鎖,如果獲取失敗,就會直接返回,不再進行數據寫入,寫鎖是代碼層面實現,不深入。

final int lockResult = tryWriteLock(groupKey);
assert (lockResult != 0);

if (lockResult < 0) {
  DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
  return false;
}

接下來進入主題:

//獲取當前內容的md5值
final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
  DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
                  + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
          lastModifiedTs);
} else if (!PropertyUtil.isDirectRead()) {
//保存到本地磁盤
  DiskUtil.saveToDisk(dataId, group, tenant, content);
}
//更新MD5值
updateMd5(groupKey, md5, lastModifiedTs);

其中保存本地磁盤不再細化,即找到目標文件,將數據寫入。后面updateMd5值:

public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
  CacheItem cache = makeSure(groupKey);
  if (cache.md5 == null || !cache.md5.equals(md5)) {
      cache.md5 = md5;
      cache.lastModifiedTs = lastModifiedTs;
      NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
  }
}

其中makerSure就是獲取緩存數據,上面已經敘述緩存中只保存對應的md5值和更新時間,讀寫鎖等信息,不包括具體的配置數據。

此時可以看到發布LocalDataChangeEvent事件,監聽該事件部分在下一個章節進行描述。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM