在之前的項目中用到了Nacos作為微服務的注冊中心與配置中心,配置中心可以動態更新配置,使得我們在改動配置后無需重啟服務即可直接生效。我們也可以在nacos的后台直接修改配置文件,這極大地增強了系統的運維能力。
下面就來探究一下Nacos服務配置實時更新的底層原理。
官方demo
首先來參考一下官方的demo https://github.com/nacos-group/nacos-spring-boot-project/tree/master/nacos-spring-boot-samples/nacos-config-sample

直接看ConfigApplication
public class ConfigApplication {
public static final String content = "dept=Aliware\ngroup=Alibaba";
public static final String DATA_ID = "test";
public static void main(String[] args) {
SpringApplication.run(ConfigApplication.class, args);
}
@Bean
@Order(Ordered.LOWEST_PRECEDENCE)
public CommandLineRunner firstCommandLineRunner() {
return new FirstCommandLineRunner();
}
@Bean
@Order(Ordered.LOWEST_PRECEDENCE - 1)
public CommandLineRunner secondCommandLineRunner() {
return new SecondCommandLineRunner();
}
@Bean
public Foo foo() {
return new Foo();
}
@Configuration
@ConditionalOnProperty(prefix = "people", name = "enable", havingValue = "true")
protected static class People {
@Bean
public Object object() {
System.err.println("[liaochuntao] : " + this.getClass().getCanonicalName());
return new Object();
}
}
public static class FirstCommandLineRunner implements CommandLineRunner {
@NacosInjected
private ConfigService configService;
@Override
public void run(String... args) throws Exception {
if (configService.publishConfig(DATA_ID, Constants.DEFAULT_GROUP, content)) {
Thread.sleep(200);
System.out.println("First runner success: " + configService
.getConfig(DATA_ID, Constants.DEFAULT_GROUP, 5000));
}
else {
System.out.println("First runner error: publish config error");
}
}
}
public static class SecondCommandLineRunner implements CommandLineRunner {
@NacosValue("${dept:unknown}")
private String dept;
@NacosValue("${group:unknown}")
private String group;
@Autowired
private Foo foo;
@Override
public void run(String... args) throws Exception {
System.out.println("Second runner. dept: " + dept + ", group: " + group);
System.out.println("Second runner. foo: " + foo);
}
}
}
可以看到靜態類FirstCommandLineRunner中注入了一個ConfigService,可以推斷這個ConfigService是配置中心實現的核心。下面就來分析一下ConfigService和配置中心實現的流程。
ConfigService
com/alibaba/nacos/api/config/ConfigFactory工廠類,提供了一個方法,用反射的方式創建實現了ConfigService接口的實例。這里並沒有使用單例,也就是說每次調用都會重新創建一個 ConfigService的實例。
public class ConfigFactory {
public static ConfigService createConfigService(Properties properties) throws NacosException {
try {
Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
}
}
public static ConfigService createConfigService(String serverAddr) throws NacosException {
Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
return createConfigService(properties);
}
}
點進ConfigService接口,此接口提供了對配置信息以及它的監聽器的操作功能,包括獲取、發布和移除。給配置信息添加一個監聽器,當配置信息被修改時,客戶端將使用傳入的監聽器回調
public interface ConfigService {
String getConfig(String dataId, String group, long timeoutMs) throws NacosException;
String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener)
throws NacosException;
void addListener(String dataId, String group, Listener listener) throws NacosException;
boolean publishConfig(String dataId, String group, String content) throws NacosException;
boolean publishConfig(String dataId, String group, String content, String type) throws NacosException;
boolean removeConfig(String dataId, String group) throws NacosException;
void removeListener(String dataId, String group, Listener listener);
String getServerStatus();
void shutDown() throws NacosException;
}
再看監聽器接口,第一個方法可以獲得一個線程池來執行方法調用。第二個方法來監聽配置信息
public interface Listener {
Executor getExecutor();
void receiveConfigInfo(final String configInfo);
}
實例化ConfigService
查看上面反射創建的 com.alibaba.nacos.client.config.NacosConfigService類的構造方法
public class NacosConfigService implements ConfigService {
private static final Logger LOGGER = LogUtils.logger(NacosConfigService.class);
private static final long POST_TIMEOUT = 3000L;
private final HttpAgent agent;
/**
* long polling. 長輪詢
*/
private final ClientWorker worker;
private String namespace;
private final String encode;
private final ConfigFilterChainManager configFilterChainManager = new ConfigFilterChainManager();
public NacosConfigService(Properties properties) throws NacosException {
ValidatorUtils.checkInitParam(properties);
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
this.encode = Constants.ENCODE;
} else {
this.encode = encodeTmp.trim();
}
initNamespace(properties);
this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
this.agent.start();
this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
}
//以下省略
}
實例化的時候主要初始化了兩個對象,一個是HTTPAgent,另一個是長輪詢的ClientWorker。
HttpAgent
其中 agent 是通過裝飾者模式實現的。ServerHttpAgent 是實際工作的類,MetricsHttpAgent 在內部也是調用了 ServerHttpAgent 的方法,另外加上了一些統計操作,所以我們只需要關心 ServerHttpAgent 的功能就可以了。
ClientWorker
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
final Properties properties) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
// Initialize the timeout parameter
init(properties);
this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
this.executorService = Executors
.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
t.setDaemon(true);
return t;
}
});
this.executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
checkConfigInfo();
} catch (Throwable e) {
LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);
}
可以看到 ClientWorker 除了將 HttpAgent 維持在自己內部,還創建了兩個線程池:
第一個線程池是一個普通的線程池,從 ThreadFactory 的名稱可以看到這個線程池是做長輪詢的。
第二個線程池是只擁有一個線程用來執行定時任務的 executor,executor 每隔 10ms 就會執行一次 checkConfigInfo() 方法。
scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
第一個command參數是任務實例,第二個initialDelay參數是初始換延遲時間,第三個delay參數是延遲間隔時間,第四個unit參數是時間單元。不管任務command執行的時間是多長,下一次任務的執行時間都是上一次任務執行完后在等待延遲間隔delay時間后執行下一次任務。
public void checkConfigInfo() {
// 分配任務
int listenerSize = cacheMap.size();
// 向上取整
int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
// 任務列表是無序的,所以當它改變的時候可能會出現問題
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
可以看到,checkConfigInfo 方法是取出了一批任務,然后提交給 executorService 線程池去執行,執行的任務就是 LongPollingRunnable,每個任務都有一個 taskId。
現在我們來看看 LongPollingRunnable 做了什么,主要分為兩部分,第一部分是檢查本地的配置信息,第二部分是獲取服務端的配置信息然后更新到本地。
本地檢查
@Override
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// check failover config
for (CacheData cacheData : cacheMap.values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
// 省略部分代碼
} catch (Throwable e) {
//如果輪訓任務異常,則該任務的下一次執行時間將延遲
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
服務端檢查
通過 checkUpdateDataIds() 方法從服務端獲取那些值發生了變化的 dataId 列表,
通過 getServerConfig 方法,根據 dataId 到服務端獲取最新的配置信息,接着將最新的配置信息保存到 CacheData 中。
最后調用 CacheData 的 checkListenerMd5 方法,可以看到該方法在第一部分也被調用過。通過對比MD5值可以快速發現是否改變過配置。
@Override
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
// 省略
// 服務端檢查
List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);//從服務端獲取那些值發生了變化的 dataId 列表
if (!CollectionUtils.isEmpty(changedGroupKeys)) {
LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
}
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
String[] ct = getServerConfig(dataId, group, tenant, 3000L);//根據 dataId 到服務端獲取最新的配置信息,接着將最新的配置信息保存到 CacheData 中。
CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(ct[0]);
if (null != ct[1]) {
cache.setType(ct[1]);
}
LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
agent.getName(), dataId, group, tenant, cache.getMd5(),
ContentUtils.truncateContent(ct[0]), ct[1]);
} catch (NacosException ioe) {
String message = String
.format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
agent.getName(), dataId, group, tenant);
LOGGER.error(message, ioe);
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
cacheData.checkListenerMd5();//檢查是否更改配置
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
executorService.execute(this);
} catch (Throwable e) {
LOGGER.error("longPolling error : ", e);
executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
}
}
添加 Listener
使用ClientWorker 的 addTenantListeners 方法來為 ConfigService 添加一個 Listener
public void addListeners(String dataId, String group, List<? extends Listener> listeners) {
group = null2defaultGroup(group);
CacheData cache = addCacheDataIfAbsent(dataId, group);
for (Listener listener : listeners) {
cache.addListener(listener);
}
}
該方法分為兩個部分,首先根據 dataId,group 和當前的場景獲取一個 CacheData 對象,然后將當前要添加的 listener 對象添加到 CacheData 中去。
也就是說 listener 最終是被這里的 CacheData 所持有了,那 listener 的回調方法 receiveConfigInfo 就應該是在 CacheData 中觸發的。
可以發現 CacheData 是出現頻率非常高的一個類,在 LongPollingRunnable 的任務中,幾乎所有的方法都圍繞着 CacheData 類,現在添加 Listener 的時候,實際上該 Listener 也被委托給了 CacheData。接下來看一下 CacheData 類的代碼。
CacheData
首先看一下 CacheData 中的成員變量
private final String name;
private final ConfigFilterChainManager configFilterChainManager;
public final String dataId;
public final String group;
public final String tenant;
private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
private volatile String md5;
/**
* 是否使用本地配置
*/
private volatile boolean isUseLocalConfig = false;
/**
* 上一次修改時間
*/
private volatile long localConfigLastModified;
private volatile String content;
private int taskId;
private volatile boolean isInitializing = true;
private String type;
可以看到除了 dataId,group,content,taskId 這些跟配置相關的屬性,還有兩個比較重要的屬性:listeners、md5。
listeners 是該 CacheData 所關聯的所有監聽器,不過不是保存的原始的 Listener 對象,而是包裝后的 ManagerListenerWrap 對象,該對象除了持有 Listener 對象,還持有了一個 lastCallMd5 屬性。
另外一個屬性 md5 就是根據當前對象的 content 計算出來的 md5 值,用於快速判斷配置信息是否修改過。
觸發回調
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
safeNotifyListener(dataId, group, content, type, md5, wrap);
}
}
}
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
final String md5, final ManagerListenerWrap listenerWrap) {
final Listener listener = listenerWrap.listener;
Runnable job = new Runnable() {
@Override
public void run() {
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
if (listener instanceof AbstractSharedListener) {
AbstractSharedListener adapter = (AbstractSharedListener) listener;
adapter.fillContext(dataId, group);
LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
}
// 執行回調之前先將線程classloader設置為具體webapp的classloader,以免回調方法中調用spi接口是出現異常或錯用(多應用部署才會有該問題)。
Thread.currentThread().setContextClassLoader(appClassLoader);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent();//獲取最新的配置信息
listener.receiveConfigInfo(contentTmp);//回調方法,傳入最新的信息
// 比較這次和上次的內容
if (listener instanceof AbstractConfigChangeListener) {
Map data = ConfigChangeHandler.getInstance()
.parseChangeData(listenerWrap.lastContent, content, type);
ConfigChangeEvent event = new ConfigChangeEvent(data);
((AbstractConfigChangeListener) listener).receiveConfigChange(event);
listenerWrap.lastContent = content;
}
listenerWrap.lastCallMd5 = md5;//更新md5值
LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
listener);
} catch (NacosException ex) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
group, md5, listener, t.getCause());
} finally {
Thread.currentThread().setContextClassLoader(myClassLoader);
}
}
};
final long startNotify = System.currentTimeMillis();
try {
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
job.run();
}
} catch (Throwable t) {
LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
group, md5, listener, t.getCause());
}
final long finishNotify = System.currentTimeMillis();
LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
name, (finishNotify - startNotify), dataId, group, md5, listener);
}
public void setContent(String content) {
this.content = content;
this.md5 = getMd5String(this.content);
}
總結
Nacos 服務信息"推送"本質還是拉模式,具體流程為:
1、Nacos 客戶端會循環請求服務端變更的數據,並且超時時間設置為 30s,當配置發生變化時,請求的響應會立即返回;
2、服務端數據變更后,找到具體的客戶端請求中的 response,然后直接將結果寫入 response 中;
3、Nacos 客戶端就能夠實時感知到服務端配置發生了變化。
參考文章:https://www.jianshu.com/p/38b5452c9fec
https://blog.csdn.net/qq40988670/article/details/105966202
