- 配置信息的發布
- 配置信息發布請求URL: POST: /v1/cs/configs
- nacos在STANDALONE模式或集群模式沒有指定用mysql情況下使用derby數據庫,在集群模式且指定mysql情況下使用mysql
- 持續化到數據庫后立馬同步集群中的其他節點
/**
* 增加或更新非聚合數據。
*
* @throws NacosException
*/
@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
String tenant,
@RequestParam("content") String content,
@RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema)
throws NacosException {
//省略參數驗證代碼
.....................
//persistService.insertOrUpdate插入或更新數據庫
//EventDispatcher.fireEvent 通知集群中其他節點
final Timestamp time = TimeUtils.getCurrentTime();
String betaIps = request.getHeader("betaIps");
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
} else { // beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
}
return true;
}
- EventDispatcher.fireEvent 觸發 AsyncNotifyService.onEvent事件
- AsyncNotifyService.onEvent事件中遍歷serverList通知集群各個節點
- 如果某個節點不健康或同步失敗延遲一段時間后重新通知:schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
@Service
public class AsyncNotifyService extends AbstractEventListener {
//onEvent事件遍歷serverList通知集群各個節點
@Override
public void onEvent(Event event) {
// 並發產生 ConfigDataChangeEvent
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
List<?> ipList = serverListService.getServerList();
// 其實這里任何類型隊列都可以
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
for (int i = 0; i < ipList.size(); i++) {
queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta));
}
EXECUTOR.execute(new AsyncTask(httpclient, queue));
}
}
// 同步任務邏輯
class AsyncTask implements Runnable {
@Override
public void run() {
executeAsyncInvoke();
}
private void executeAsyncInvoke() {
while (!queue.isEmpty()) {
NotifySingleTask task = queue.poll();
String targetIp = task.getTargetIP();
if (serverListService.getServerList().contains(
targetIp)) {
// 啟動健康檢查且有不監控的ip則直接把放到通知隊列,否則通知
if (serverListService.isHealthCheck() && ServerListService.getServerListUnhealth().contains(targetIp)) {
// target ip 不健康,則放入通知列表中
// get delay time and set fail count to the task
// schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
asyncTaskExecute(task);
} else {
// /v1/cs/communication/dataChange?dataId={2}&group={3}
HttpGet request = new HttpGet(task.url);
request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP);
httpclient.execute(request, new AsyncNotifyCallBack(httpclient, task));
}
}
}
}
}
class AsyncNotifyCallBack implements FutureCallback<HttpResponse> {
// 同步其他節點失敗
@Override
public void failed(Exception ex) {
long delayed = System.currentTimeMillis() - task.getLastModified();
//get delay time and set fail count to the task
//延遲一段時間后重新通知
//schedule(asyncTask, delay, TimeUnit.MILLISECONDS);
asyncTaskExecute(task);
MetricsMonitor.getConfigNotifyException().increment();
}
}
}
- 配置信息的獲取
- 讀取配置信息的時候加讀鎖
- lockResult>0 加鎖成功,獲取配置信息后返回HttpServletResponse.SC_OK 或 HttpServletResponse.SC_NOT_FOUND
- lockResult==0對應的配置信息不存在返回 HttpServletResponse.SC_NOT_FOUND
- lockResult<0 已有線程加了寫鎖返回 HttpServletResponse.SC_CONFLICT
- 單機且不用mysql情況下從數據庫中獲取,其他配置從文件中獲取配置信息(本地文件相當於mysql的緩存)
@Service
public class ConfigServletInner {
/**
* 同步配置獲取接口
*/
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,
String tenant, String tag, String clientIp) throws IOException, ServletException {
int lockResult = tryConfigReadLock(groupKey);
if (lockResult > 0) {
//代碼挺多總體邏輯歸納如下
//單機且不用mysql情況下從數據庫中獲取,其他配置從文件中獲取配置信息
//數據庫情況下判斷從config_info,還是config_info_beta,還是config_info_tag表獲取數據
//文件情況下判斷從還是BASE_DIR還是BETA_DIR,還是TAG_DIR目錄獲取配置文件
//本地文件獲取比mysql中獲取更快,本地文件相當於mysql的緩存
}
}
}
為了查詢效率,mysql中的數據緩存到每個集群節點的文件中,緩存時機有3中情況
- 當nacos啟動的時候如果當前時間距離最后一次心跳時間超過6個小時,則全量緩存mysql數據
- 啟動定時任務每6個小時重新全量緩存mysql數據
- @GetMapping("/dataChange")時處理單個數據
//@PostConstruct init() 調用 dumpConfigInfo
private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {
int timeStep = 6;
Boolean isAllDump = true;
// initial dump all
FileInputStream fis = null;
Timestamp heartheatLastStamp = null;
try {
if (isQuickStart()) {
File heartbeatFile = DiskUtil.heartBeatFile();
if (heartbeatFile.exists()) {
fis = new FileInputStream(heartbeatFile);
String heartheatTempLast = IoUtils.toString(fis, Constants.ENCODE);
heartheatLastStamp = Timestamp.valueOf(heartheatTempLast);
//當前時間距離最后一次心跳時間超過6個小時
if (TimeUtils.getCurrentTime().getTime() - heartheatLastStamp.getTime() < timeStep * 60 * 60 * 1000) {
isAllDump = false;
}
}
}
if (isAllDump) {
LogUtil.defaultLog.info("start clear all config-info.");
DiskUtil.clearAll();
//
dumpAllProcessor.process(DumpAllTask.TASK_ID, new DumpAllTask());
}
} catch (IOException e) {
LogUtil.fatalLog.error("dump config fail" + e.getMessage());
throw e;
}
}
@PostConstruct
public void init() {
//啟動定時任務定時緩存數據
TimerTaskService.scheduleWithFixedDelay(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
}
/**
* 通知配置信息改變
*/
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request,
@RequestParam("dataId") String dataId, @RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY)
String tenant,
@RequestParam(value = "tag", required = false) String tag) {
dataId = dataId.trim();
group = group.trim();
String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
String isBetaStr = request.getHeader("isBeta");
//其他節點通知變更后緩存數據
if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}
return true;
}
- 配置信息的輪詢