我們從原生SDK代碼中入手,可以發現最核心的兩行代碼:
ConfigService configService=NacosFactory.createConfigService(properties);
String content=configService.getConfig(dataId,groupId,3000);
首先我們先來看 NacosFactory.createConfigService :
public static ConfigService createConfigService(Properties properties) throws NacosException { try { Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService"); Constructor constructor = driverImplClass.getConstructor(Properties.class);
//調用反射創建一個NacosConfigService實例 ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties); return vendorImpl; } catch (Throwable e) { throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e); } }
這一步的代碼很簡單,及通過類的全類名通過反射創建一個 NacosConfigService 實例,我們跟進該類的構造方法:
public NacosConfigService(Properties properties) throws NacosException { String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { encode = Constants.ENCODE; } else { encode = encodeTmp.trim(); }//初始化命名空間 initNamespace(properties); agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); agent.start(); worker = new ClientWorker(agent, configFilterChainManager, properties); }
這一步主要初始化了 agent 與 worker 兩個實例。這里又看到熟悉的包裝器模式,將ServerHttpAgent 包裝成MetricsHttpAgent,這里我們需要知道,其中MetricsHttpAgent是對ServerHttpAgent功能的拓展,核心功能還是由ServerHttpAgent去實現,接下去我們來看一下 worker 的初始化,從名字上看能知道 最后真的工作的是他:
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // Initialize the timeout parameter // 初始化一些參數
init(properties); //創建了一個定時任務的線程池
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); //創建了一個保持長連接的線程池
executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); //創建了一個延遲任務線程池來每隔10ms來檢查配置信息的線程池
executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }
這一步創建了兩個線程池,第一個線程池負責與配置中心進行數據的交互,並且啟動后延遲1ms,之后每隔10ms對配置信息進行定時檢查,第二個線程池則是負責保持一個長連接。我們再服務啟動之后便會執行 checkConfigInfo(),跟進去看看:
public void checkConfigInfo() { // 分任務(解決大數據量的傳輸問題)
int listenerSize = cacheMap.get().size(); // 向上取整為批數,分批次進行檢查 // ParamUtil.getPerTaskConfigSize() =3000
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); // currentLongingTaskCount =0
if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // 要判斷任務是否在執行 這塊需要好好想想。 任務列表現在是無序的。變化過程可能有問題
executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
這里主要是先去除緩存中 Map<String, CacheData> 的數量,為避免處理過量的數據,這里對緩存數據進行了分組,最后創建 LongPollingRunnable 去執行,可以知道 這里會進入 LongPollingRunnable 的 Run 方法:
public void run() { List<CacheData> cacheDatas = new ArrayList<CacheData>(); List<String> inInitializingCacheList = new ArrayList<String>(); try { // check failover config
for (CacheData cacheData : cacheMap.get().values()) { if (cacheData.getTaskId() == taskId) { cacheDatas.add(cacheData); try { //檢查本地配置
checkLocalConfig(cacheData); if (cacheData.isUseLocalConfigInfo()) {
//檢查緩存的MD5 cacheData.checkListenerMd5(); } } catch (Exception e) { LOGGER.error("get local config info error", e); } } } //檢查服務端配置
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String content = getServerConfig(dataId, group, tenant, 3000L); //將配置設置進緩存
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(content); LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)); } catch (NacosException ioe) { String message = String.format( "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); } catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished
LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); } }
總的來說,該方法主要流程是先檢查本地緩存,再檢查服務端的配置,由改變最后再回寫到本地及加載到緩存。
private void checkLocalConfig(CacheData cacheData) { final String dataId = cacheData.dataId; final String group = cacheData.group; final String tenant = cacheData.tenant;
//本地文件緩存 File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant); // 沒有 -> 有 //不使用本地配置,但是持久化文件存在,需要讀取文件加載至內存
if (!cacheData.isUseLocalConfigInfo() && path.exists()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); String md5 = MD5.getInstance().getMD5String(content); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn("[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); return; } // 有 -> 沒有。不通知業務監聽器,從server拿到配置后通知。 //使用本地配置,但是持久化文件不存在
if (cacheData.isUseLocalConfigInfo() && !path.exists()) { cacheData.setUseLocalConfigInfo(false); LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); return; } // 有變更 //使用本地配置,持久化文件存在,緩存跟文件最后修改時間不一致
if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path.lastModified()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); String md5 = MD5.getInstance().getMD5String(content); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn("[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); } }
本地檢查主要是通過是否使用本地配置,繼而尋找持久化緩存文件,再通過判斷文件的最后修改事件與本地緩存的版本是否一致來判斷是否由變更。本地檢查完畢,如果使用本地配置會進入下列代碼:
if (cacheData.isUseLocalConfigInfo()) { //檢查緩存的MD5
cacheData.checkListenerMd5(); }
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
//MD5由變更,說明數據變更
if (!md5.equals(wrap.lastCallMd5)) {
//執行回調
safeNotifyListener(dataId, group, content, md5, wrap);
}
}
}
本地檢查完畢會進行遠程服務器檢查:
//檢查服務端配置
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
這里會去獲取一個發生變化的GroupKeys 集合:
/** * 從Server獲取值變化了的DataID列表。返回的對象里只有dataId和group是有效的。 保證不返回NULL。 */ List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws IOException { StringBuilder sb = new StringBuilder(); for (CacheData cacheData : cacheDatas) { if (!cacheData.isUseLocalConfigInfo()) { sb.append(cacheData.dataId).append(WORD_SEPARATOR); sb.append(cacheData.group).append(WORD_SEPARATOR); if (StringUtils.isBlank(cacheData.tenant)) { sb.append(cacheData.getMd5()).append(LINE_SEPARATOR); } else { sb.append(cacheData.getMd5()).append(WORD_SEPARATOR); sb.append(cacheData.getTenant()).append(LINE_SEPARATOR); } if (cacheData.isInitializing()) { // cacheData 首次出現在cacheMap中&首次check更新
inInitializingCacheList .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant)); } } } boolean isInitializingCacheList = !inInitializingCacheList.isEmpty(); return checkUpdateConfigStr(sb.toString(), isInitializingCacheList); }
這里將可能發生變化的配置信息封裝成一個 StringBuilder ,繼而調用 checkUpdateConfigStr:
/** * 從Server獲取值變化了的DataID列表。返回的對象里只有dataId和group是有效的。 保證不返回NULL。 */ List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException { List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); List<String> headers = new ArrayList<String>(2); headers.add("Long-Pulling-Timeout"); headers.add("" + timeout); // told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) { headers.add("Long-Pulling-Timeout-No-Hangup"); headers.add("true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try {//發起一個Post請求 HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); if (HttpURLConnection.HTTP_OK == result.code) { setHealthServer(true); return parseUpdateDataIdResponse(result.content); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code); } } catch (IOException e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }
就這樣從Server獲取值變化了的DataID列表。返回的對象里只有dataId和group是有效的。 保證不返回NULL。獲取到這個列表以后就便利這個列表,去服務器端獲取對應變更后的配置:
for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { String content = getServerConfig(dataId, group, tenant, 3000L); //將配置設置進緩存
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(content); LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(content)); } catch (NacosException ioe) { String message = String.format( "[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } }
這里會發起請求從服務器端獲取配置:getServerConfig:
public String getServerConfig(String dataId, String group, String tenant, long readTimeout) throws NacosException { if (StringUtils.isBlank(group)) { group = Constants.DEFAULT_GROUP; } HttpResult result = null; try { List<String> params = null; if (StringUtils.isBlank(tenant)) { params = Arrays.asList("dataId", dataId, "group", group); } else { params = Arrays.asList("dataId", dataId, "group", group, "tenant", tenant); } result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout); } catch (IOException e) { String message = String.format( "[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, e); throw new NacosException(NacosException.SERVER_ERROR, e); } switch (result.code) { case HttpURLConnection.HTTP_OK: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.content); return result.content; case HttpURLConnection.HTTP_NOT_FOUND: LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null); return null; case HttpURLConnection.HTTP_CONFLICT: { LOGGER.error( "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
+ "tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(NacosException.CONFLICT, "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } case HttpURLConnection.HTTP_FORBIDDEN: { LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); throw new NacosException(result.code, result.content); } default: { LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(), dataId, group, tenant, result.code); throw new NacosException(result.code, "http error, code=" + result.code + ",dataId=" + dataId + ",group=" + group + ",tenant=" + tenant); } } }
通過初始化時候的 agent.httpGet 去發起一個Get請求,就這樣變更本例的配置,當從遠程服務器獲取玩配置以后還有一個循環:
for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { cacheData.checkListenerMd5(); cacheData.setInitializing(false); } }
這個循環主要是對有變化的配置進行監聽回調。整個流程就差不都完成了,最后來一張流程圖:
長輪訓的時間間隔:
我們知道客戶端會有一個長輪訓的任務去檢查服務器端的配置是否發生了變化,如果發生了變更,那么客戶端會拿到變更的 groupKey 再根據 groupKey 去獲取配置項的最新值更新到本地的緩存以及文件中,那么這種每次都靠客戶端去請求,那請求的時間間隔設置多少合適呢?
如果間隔時間設置的太長的話有可能無法及時獲取服務端的變更,如果間隔時間設置的太短的話,那么頻繁的請求對於服務端來說無疑也是一種負擔,所以最好的方式是客戶端每隔一段長度適中的時間去服務端請求,而在這期間如果配置發生變更,服務端能夠主動將變更后的結果推送給客戶端,這樣既能保證客戶端能夠實時感知到配置的變化,也降低了服務端的壓力。 我們來看看nacos設置的間隔時間是多久。
長輪訓的概念:
客戶端發起一個請求到服務端,服務端收到客戶端的請求后,並不會立刻響應給客戶端,而是先把這個請求hold住,然后服務端會在hold住的這段時間檢查數據是否有更新,如果有,則響應給客戶端,如果一直沒有數據變更,則達到一定的時間(長輪訓時間間隔)才返回。
長輪訓典型的場景有: 掃碼登錄、掃碼支付。
客戶端長輪訓:
在ClientWorker這個類里面,找到 checkUpdateConfigStr 這個方法,這里面就是去服務器端查詢發生變化的groupKey。
/** * 從Server獲取值變化了的DataID列表。返回的對象里只有dataId和group是有效的。 保證不返回NULL。 */ List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws IOException { List<String> params = Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString); List<String> headers = new ArrayList<String>(2); headers.add("Long-Pulling-Timeout"); headers.add("" + timeout); // told server do not hang me up if new initializing cacheData added in
if (isInitializingCacheList) { headers.add("Long-Pulling-Timeout-No-Hangup"); headers.add("true"); } if (StringUtils.isBlank(probeUpdateString)) { return Collections.emptyList(); } try {//客戶端發送的請求地址是: /v1/cs/configs/listener HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout); if (HttpURLConnection.HTTP_OK == result.code) { setHealthServer(true); return parseUpdateDataIdResponse(result.content); } else { setHealthServer(false); LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(), result.code); } } catch (IOException e) { setHealthServer(false); LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e); throw e; } return Collections.emptyList(); }
這個方法最終會發起http請求,注意這里面有一個 timeout 的屬性,
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), timeout);
timeout是在init這個方法中賦值的,默認情況下是30秒,可以通過configLongPollTimeout進行修改
private void init(Properties properties) { // 默認長輪詢的事件就是30S
timeout = Math.max(NumberUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT), //public static final int CONFIG_LONG_POLL_TIMEOUT = 30000; //public static final int MIN_CONFIG_LONG_POLL_TIMEOUT = 10000;
Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT); taskPenaltyTime = NumberUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME); enableRemoteSyncConfig = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG)); }
所以從這里得出的一個基本結論是:客戶端發起一個輪詢請求,超時時間是30s。 那么客戶端為什么要等待30s才超時呢?不是越快越好嗎? 我們可以在nacos的日志目錄下 $NACOS_HOME/nacos/logs/config-client-request.log 文件.
可以看到一個現象,在配置沒有發生變化的情況下,客戶端會等29.5s以上,才請求到服務器端的結果。然后客戶端拿到服務器端的結果之后,在做后續的操作。當服務器端頻繁的修改,那么服務器端頻繁客戶端進行推送.
服務端的處理:
服務端是如何處理客戶端的請求的?那么同樣,我們需要思考幾個問題:
- 客戶端的長輪訓響應時間受到哪些因素的影響
- 客戶端的超時時間為什么要設置30s
- 客戶端發送的請求地址是: /v1/cs/configs/listener 找到服務端對應的方法
nacos是使用spring mvc提供的rest api,其中有個類是 ConfigController ,我們在其中找到了Post 請求的 listener 路徑的接口方法:
/** * 比較MD5 */ @RequestMapping(value = "/listener", method = RequestMethod.POST) public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true); String probeModify = request.getParameter("Listening-Configs"); if (StringUtils.isBlank(probeModify)) { throw new IllegalArgumentException("invalid probeModify"); } probeModify = URLDecoder.decode(probeModify, Constants.ENCODE); Map<String, String> clientMd5Map; try { clientMd5Map = MD5Util.getClientMd5Map(probeModify); } catch (Throwable e) { throw new IllegalArgumentException("invalid probeModify"); } // do long-polling
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length()); }
先是獲取了客戶端的MD5集合,這里面會調用inner.doPollingConfig進行處理,這個方法中,兼容了長輪訓和短輪詢的邏輯,我們只需要關注長輪訓的部分:
/** * 輪詢接口 */
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException, ServletException { // 長輪詢
if (LongPollingService.isSupportLongPolling(request)) { longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize); return HttpServletResponse.SC_OK + ""; }
......//省略代碼
}
這里我們進入長輪詢的代碼塊:
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) { //超時時間 String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER); String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER); String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER); String tag = req.getHeader("Vipserver-Tag"); int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500); /** * 提前500ms返回響應,為避免客戶端超時 @qiaoyi.dingqy 2013.10.22改動 add delay time for LoadBalance */
long timeout = Math.max(10000, Long.parseLong(str) - delayTime); if (isFixedPolling()) { timeout = Math.max(10000, getFixedPollingInterval()); // do nothing but set fix polling timeout
} else { long start = System.currentTimeMillis(); List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map); if (changedGroups.size() > 0) { generateResponse(req, rsp, changedGroups); LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup", RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize, changedGroups.size()); return; } } String ip = RequestUtil.getRemoteIp(req); // 一定要由HTTP線程調用,否則離開后容器會立即發送響應
final AsyncContext asyncContext = req.startAsync(); // AsyncContext.setTimeout()的超時時間不准,所以只能自己控制
asyncContext.setTimeout(0L); scheduler.execute( new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag)); }
這個方法是把客戶端的長輪訓請求添加到任務中去。獲得客戶端傳遞過來的超時時間,並且進行本地計算,提前500ms返回響應,這就能解釋為什么客戶端響應超時時間是29.5+了。當然如果 isFixedPolling=true 的情況下,不會提前返回響應根據客戶端請求過來的md5和服務器端對應的group下對應內容的md5進行比較,如果不一致,則通過 generateResponse 將結果返回如果配置文件沒有發生變化,則通過 scheduler.execute 啟動了一個定時任務,將客戶端的長輪詢請求封裝成一個叫 ClientLongPolling 的任務,交給 scheduler 去執行,那么接下去一定會進入ClientLongPolling 的Run 方法:
public void run() { asyncTimeoutFuture = scheduler.schedule(new Runnable() { @Override public void run() { try { getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis()); /** * 刪除訂閱關系 */ allSubs.remove(ClientLongPolling.this); if (isFixedPolling()) { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "fix", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); List<String> changedGroups = MD5Util.compareMd5( (HttpServletRequest)asyncContext.getRequest(), (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
//有變化立即執行返回 if (changedGroups.size() > 0) { sendResponse(changedGroups); } else { sendResponse(null); } } else { LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout", RequestUtil.getRemoteIp((HttpServletRequest)asyncContext.getRequest()), "polling", clientMd5Map.size(), probeRequestSize); sendResponse(null); } } catch (Throwable t) { LogUtil.defaultLog.error("long polling error:" + t.getMessage(), t.getCause()); } } //延遲29.5秒后執行 }, timeoutTime, TimeUnit.MILLISECONDS); allSubs.add(this); }
在run方法中,通過scheduler.schedule實現了一個定時任務,它的delay時間正好是前面計算的29.5s。在這個任務中,會通過MD5Util.compareMd5來進行計算那另外一個,當數據發生變化以后,肯定不能等到29.5s之后才通知呀,那怎么辦呢?我們發現有一個allSubs 的東西,它似乎和發布訂閱有關系。那是不是有可能當前的clientLongPolling訂閱了數據變化的事件呢?allSubs是一個隊列,隊列里面放了ClientLongPolling這個對象。這個隊列似乎和配置變更有某種關聯關系:
/** * 長輪詢訂閱關系 */ final Queue<ClientLongPolling> allSubs;
注釋里寫明了他是和長輪詢訂閱相關的,接着我們先來看一下他所歸屬的類的類圖:
發現LongPollingService集成了AbstractEventListener,事件監聽.
AbstractEventListener:
static public abstract class AbstractEventListener { public AbstractEventListener() { /** * automatic register */ EventDispatcher.addEventListener(this); } /** * 感興趣的事件列表 * * @return event list */
abstract public List<Class<? extends Event>> interest(); /** * 處理事件 * * @param event event */
abstract public void onEvent(Event event); }
這里面有一個抽象的onEvent方法,明顯是用來處理事件的方法,而抽象方法必須由子類實現,所以意味着LongPollingService里面肯定實現了onEvent方法:
public void onEvent(Event event) { if (isFixedPolling()) { // ignore
} else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent)event; scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } } }
所以到了這里,肯定是修改了配置之后會有一個觸發點去出發該事件,當匹配上事件類型,那么就會去執行這個回調,這個事件的實現方法中判斷事件類型是否為LocalDataChangeEvent,通過scheduler.execute執行DataChangeTask這個任務:
public void run() { try { ConfigService.getContentBetaMd5(groupKey); for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) { ClientLongPolling clientSub = iter.next(); if (clientSub.clientMd5Map.containsKey(groupKey)) { // 如果beta發布且不在beta列表直接跳過
if (isBeta && !betaIps.contains(clientSub.ip)) { continue; } // 如果tag發布且不在tag列表直接跳過
if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) { continue; } getRetainIps().put(clientSub.ip, System.currentTimeMillis()); iter.remove(); // 刪除訂閱關系
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance", RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()), "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey); clientSub.sendResponse(Arrays.asList(groupKey)); } } } catch (Throwable t) { LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause()); } }
這個是數據變化的任務,最讓人興奮的應該是,它里面有一個循環迭代器,從allSubs里面獲得ClientLongPolling最后通過clientSub.sendResponse把數據返回到客戶端。所以,這也就能夠理解為何數據變化能夠實時觸發更新了。
那么接下來還有一個疑問是,數據變化之后是如何觸發事件的呢? 所以我們定位到數據變化的請求類中,在ConfigController這個類中,找到POST請求的方法找到配置變更的位置:
/** * 增加或更新非聚合數據。 * * @throws NacosException */ @RequestMapping(method = RequestMethod.POST) @ResponseBody public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response, @RequestParam("dataId") String dataId, @RequestParam("group") String group, @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, @RequestParam("content") String content, @RequestParam(value = "tag", required = false) String tag, @RequestParam(value = "appName", required = false) String appName, @RequestParam(value = "src_user", required = false) String srcUser, @RequestParam(value = "config_tags", required = false) String configTags, @RequestParam(value = "desc", required = false) String desc, @RequestParam(value = "use", required = false) String use, @RequestParam(value = "effect", required = false) String effect, @RequestParam(value = "type", required = false) String type, @RequestParam(value = "schema", required = false) String schema) throws NacosException { final String srcIp = RequestUtil.getRemoteIp(request); String requestIpApp = RequestUtil.getAppName(request); ParamUtils.checkParam(dataId, group, "datumId", content); ParamUtils.checkParam(tag); Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10); ......//省略代碼 final Timestamp time = TimeUtils.getCurrentTime(); String betaIps = request.getHeader("betaIps"); ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content); if (StringUtils.isBlank(betaIps)) { if (StringUtils.isBlank(tag)) { persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); } else { persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); } } else { // beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false); EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime())); } ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content); return true; }
發現數據持久化之后,會通過EventDispatcher進行事件發布EventDispatcher.fireEvent 但是這個事件似乎不是我們所關心的時間,原因是這里發布的事件是ConfigDataChangeEvent , 而LongPollingService感興趣的事件是 LocalDataChangeEvent。
在Nacos中有一個DumpService,它會定時把變更后的數據dump到磁盤上,DumpService在spring啟動之后,會調用init方法啟動幾個dump任務。然后在任務執行結束之后,會觸發一個LocalDataChangeEvent 的事件:
@PostConstruct public void init() { LogUtil.defaultLog.warn("DumpService start"); DumpProcessor processor = new DumpProcessor(this); DumpAllProcessor dumpAllProcessor = new DumpAllProcessor(this); DumpAllBetaProcessor dumpAllBetaProcessor = new DumpAllBetaProcessor(this); DumpAllTagProcessor dumpAllTagProcessor = new DumpAllTagProcessor(this);
......//省略代碼
}
其中在 DumpProcessor的 process方法中會調用 ConfigService 的相關API對數據進行操作,其中調用 remove 后會傳播這么一個事件:
/** * 刪除配置文件,刪除緩存。 */
static public boolean remove(String dataId, String group, String tenant) { final String groupKey = GroupKey2.getKey(dataId, group, tenant); final int lockResult = tryWriteLock(groupKey); /** * 數據不存在 */
if (0 == lockResult) { dumpLog.info("[remove-ok] {} not exist.", groupKey); return true; } /** * 加鎖失敗 */
if (lockResult < 0) { dumpLog.warn("[remove-error] write lock failed. {}", groupKey); return false; } try { if (!STANDALONE_MODE || PropertyUtil.isStandaloneUseMysql()) { DiskUtil.removeConfigInfo(dataId, group, tenant); } CACHE.remove(groupKey); EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey)); return true; } finally { releaseWriteLock(groupKey); } }
簡單總結一下剛剛分析的整個過程。
- 客戶端發起長輪訓請求,
- 服務端收到請求以后,先比較服務端緩存中的數據是否相同,如果不通,則直接返回
- 如果相同,則通過schedule延遲29.5s之后再執行比較
- 為了保證當服務端在29.5s之內發生數據變化能夠及時通知給客戶端,服務端采用事件訂閱的方式來監聽服務端本地數據變化的事件,一旦收到事件,則觸發DataChangeTask的通知,並且遍歷allStubs隊列中的ClientLongPolling,把結果寫回到客戶端,就完成了一次數據的推送
- 如果 DataChangeTask 任務完成了數據的 “推送” 之后,ClientLongPolling 中的調度任務又開始執行了怎么辦呢?很簡單,只要在進行 “推送” 操作之前,先將原來等待執行的調度任務取消掉就可以了,這樣就防止了推送操作寫完響應數據之后,調度任務又去寫響應數據,這時肯定會報錯的。所以,在ClientLongPolling方法中,最開始的一個步驟就是刪除訂閱事件
所以總的來說,Nacos采用推+拉的形式,來解決最開始關於長輪訓時間間隔的問題。當然,30s這個時間是可以設置的,而之所以定30s,應該是一個經驗值。
集群選舉:
Nacos支持集群模式,很顯然。而一旦涉及到集群,就涉及到主從,那么nacos是一種什么樣的機制來實現的集群呢?
nacos的集群類似於zookeeper, 它分為leader角色和follower角色, 那么從這個角色的名字可以看出來,這個集群存在選舉的機制。 因為如果自己不具備選舉功能,角色的命名可能就是master/slave了,
在nacos的解壓目錄nacos/的conf目錄下,有配置文件cluster.conf,請每行配置成ip:port。(請配置3個或3個以上節點)。首先我們進入conf目錄下,默認只有一個cluster.conf.example文件,我們需要自行復制一份,修改名稱為cluster.conf。然后使用vi編輯器 打開cluster.config,按a/i/o 鍵可進入插入模式,輸入以下內容
#ip:port 10.51.10.128:8848
10.51.10.129:8848
10.51.10.130:8848
然后重新啟動就好了。
選舉算法:
Nacos集群采用 raft 算法來實現,它是相對zookeeper的選舉算法較為簡單的一種。選舉算法的核心在 RaftCore 中,包括數據的處理和數據同步.
raft算法動畫演示地址:http://thesecretlivesofdata.com/raft/ 。可以很直觀的看到整個算法選舉的過程。
在Raft中,節點有三種角色:
- Leader:負責接收客戶端的請求
- Candidate:用於選舉Leader的一種角色
- Follower:負責響應來自Leader或者Candidate的請求
選舉分為兩個節點:
- 服務啟動的時候
- leader掛了的時候
所有節點啟動的時候,都是follower狀態。 如果在一段時間內如果沒有收到leader的心跳(可能是沒有leader,也可能是leader掛了),那么follower會變成Candidate。然后發起選舉,選舉之前,會增加term,這個term和zookeeper中的epoch的道理是一樣的。
follower會投自己一票,並且給其他節點發送票據vote,等到其他節點回復在這個過程中,可能出現幾種情況
- 收到過半的票數通過,則成為leader
- 被告知其他節點已經成為leader,則自己切換為follower
- 一段時間內沒有收到過半的投票,則重新發起選舉
約束條件在任一term中,單個節點最多只能投一票
選舉的幾種情況:
- 第一種情況,贏得選舉之后,leader會給所有節點發送消息,避免其他節點觸發新的選舉
- 第二種情況,比如有三個節點A B C。A B同時發起選舉,而A的選舉消息先到達C,C給A投了一票,當B的消息到達C時,已經不能滿足上面提到的第一個約束,即C不會給B投票,而A和B顯然都不會給對方投票。A勝出之后,會給B,C發心跳消息,節點B發現節點A的term不低於自己的term,知道有已經有Leader了,於是轉換成follower
- 第三種情況, 沒有任何節點獲得majority投票,可能是平票的情況。加入總共有四個節點(A/B/C/D),Node C、Node D同時成為了candidate,但Node A投了NodeD一票,NodeB投了Node C一票,這就出現了平票 split vote的情況。這個時候大家都在等啊等,直到超時后重新發起選舉。如果出現平票的情況,那么就延長了系統不可用的時間,因此raft引入了randomized election timeouts來盡量避免平票情況,
在動畫演示中可以看到選舉超時后,即每個小球外圍都變化先消失的座位候選人,接着發出請求讓其他人投票選舉自己,同時修改Term:
與Zookeeper一樣,對於事務操作,請求會轉發給leader,非事務操作上,可以任意一個節點來處理.