Nacos 配置中心原理


在之前的项目中用到了Nacos作为微服务的注册中心与配置中心,配置中心可以动态更新配置,使得我们在改动配置后无需重启服务即可直接生效。我们也可以在nacos的后台直接修改配置文件,这极大地增强了系统的运维能力。

下面就来探究一下Nacos服务配置实时更新的底层原理。

官方demo

首先来参考一下官方的demo https://github.com/nacos-group/nacos-spring-boot-project/tree/master/nacos-spring-boot-samples/nacos-config-sample

 

 直接看ConfigApplication

public class ConfigApplication {

	public static final String content = "dept=Aliware\ngroup=Alibaba";

	public static final String DATA_ID = "test";

	public static void main(String[] args) {
		SpringApplication.run(ConfigApplication.class, args);
	}

	@Bean
	@Order(Ordered.LOWEST_PRECEDENCE)
	public CommandLineRunner firstCommandLineRunner() {
		return new FirstCommandLineRunner();
	}

	@Bean
	@Order(Ordered.LOWEST_PRECEDENCE - 1)
	public CommandLineRunner secondCommandLineRunner() {
		return new SecondCommandLineRunner();
	}

	@Bean
	public Foo foo() {
		return new Foo();
	}

	@Configuration
	@ConditionalOnProperty(prefix = "people", name = "enable", havingValue = "true")
	protected static class People {

		@Bean
		public Object object() {
			System.err.println("[liaochuntao] : " + this.getClass().getCanonicalName());
			return new Object();
		}

	}

	public static class FirstCommandLineRunner implements CommandLineRunner {

		@NacosInjected
		private ConfigService configService;

		@Override
		public void run(String... args) throws Exception {
			if (configService.publishConfig(DATA_ID, Constants.DEFAULT_GROUP, content)) {
				Thread.sleep(200);
				System.out.println("First runner success: " + configService
						.getConfig(DATA_ID, Constants.DEFAULT_GROUP, 5000));
			}
			else {
				System.out.println("First runner error: publish config error");
			}
		}
	}

	public static class SecondCommandLineRunner implements CommandLineRunner {

		@NacosValue("${dept:unknown}")
		private String dept;

		@NacosValue("${group:unknown}")
		private String group;

		@Autowired
		private Foo foo;

		@Override
		public void run(String... args) throws Exception {
			System.out.println("Second runner. dept: " + dept + ", group: " + group);
			System.out.println("Second runner. foo: " + foo);
		}
	}

}

  可以看到静态类FirstCommandLineRunner中注入了一个ConfigService,可以推断这个ConfigService是配置中心实现的核心。下面就来分析一下ConfigService和配置中心实现的流程。

 ConfigService

com/alibaba/nacos/api/config/ConfigFactory工厂类,提供了一个方法,用反射的方式创建实现了ConfigService接口的实例。这里并没有使用单例,也就是说每次调用都会重新创建一个 ConfigService的实例。

public class ConfigFactory {
    
    public static ConfigService createConfigService(Properties properties) throws NacosException {
        try {
            Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
        }
    }
    
    public static ConfigService createConfigService(String serverAddr) throws NacosException {
        Properties properties = new Properties();
        properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
        return createConfigService(properties);
    }
}

  

点进ConfigService接口,此接口提供了对配置信息以及它的监听器的操作功能,包括获取、发布和移除。给配置信息添加一个监听器,当配置信息被修改时,客户端将使用传入的监听器回调

public interface ConfigService {
    
    
    String getConfig(String dataId, String group, long timeoutMs) throws NacosException;    
    
    String getConfigAndSignListener(String dataId, String group, long timeoutMs, Listener listener)
            throws NacosException;
        
    void addListener(String dataId, String group, Listener listener) throws NacosException;
       
    boolean publishConfig(String dataId, String group, String content) throws NacosException;
        
    boolean publishConfig(String dataId, String group, String content, String type) throws NacosException;
        
    boolean removeConfig(String dataId, String group) throws NacosException;    
   
    void removeListener(String dataId, String group, Listener listener);
        
    String getServerStatus();
        
    void shutDown() throws NacosException;
}

 

再看监听器接口,第一个方法可以获得一个线程池来执行方法调用。第二个方法来监听配置信息

public interface Listener {
       
    Executor getExecutor();
      
    void receiveConfigInfo(final String configInfo);
}

  

 实例化ConfigService

查看上面反射创建的 com.alibaba.nacos.client.config.NacosConfigService类的构造方法

public class NacosConfigService implements ConfigService {
    
    private static final Logger LOGGER = LogUtils.logger(NacosConfigService.class);
    
    private static final long POST_TIMEOUT = 3000L;   
    
    private final HttpAgent agent;
    
    /**
     * long polling. 长轮询
     */
    private final ClientWorker worker;
    
    private String namespace;
    
    private final String encode;
    
    private final ConfigFilterChainManager configFilterChainManager = new ConfigFilterChainManager();
    
    public NacosConfigService(Properties properties) throws NacosException {
        ValidatorUtils.checkInitParam(properties);
        String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
        if (StringUtils.isBlank(encodeTmp)) {
            this.encode = Constants.ENCODE;
        } else {
            this.encode = encodeTmp.trim();
        }
        initNamespace(properties);
        
        this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
        this.agent.start();
        this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
    }

 //以下省略  

}

  实例化的时候主要初始化了两个对象,一个是HTTPAgent,另一个是长轮询的ClientWorker。

HttpAgent

其中 agent 是通过装饰者模式实现的。ServerHttpAgent 是实际工作的类,MetricsHttpAgent 在内部也是调用了 ServerHttpAgent 的方法,另外加上了一些统计操作,所以我们只需要关心 ServerHttpAgent 的功能就可以了。

 

ClientWorker

下面是ClientWorker的构造方法
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
            final Properties properties) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;
        
        // Initialize the timeout parameter
        
        init(properties);
        
        this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        
        this.executorService = Executors
                .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                        t.setDaemon(true);
                        return t;
                    }
                });
        
        this.executor.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    checkConfigInfo();
                } catch (Throwable e) {
                    LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);
    }

  

可以看到 ClientWorker 除了将 HttpAgent 维持在自己内部,还创建了两个线程池:

第一个线程池是一个普通的线程池,从 ThreadFactory 的名称可以看到这个线程池是做长轮询的。

第二个线程池是只拥有一个线程用来执行定时任务的 executor,executor 每隔 10ms 就会执行一次 checkConfigInfo() 方法。

scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

第一个command参数是任务实例,第二个initialDelay参数是初始换延迟时间,第三个delay参数是延迟间隔时间,第四个unit参数是时间单元。不管任务command执行的时间是多长,下一次任务的执行时间都是上一次任务执行完后在等待延迟间隔delay时间后执行下一次任务。

再来看一下checkConfigInfo方法
public void checkConfigInfo() {
        // 分配任务
        int listenerSize = cacheMap.size();
        // 向上取整
        int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
                // 任务列表是无序的,所以当它改变的时候可能会出现问题
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }

  

可以看到,checkConfigInfo 方法是取出了一批任务,然后提交给 executorService 线程池去执行,执行的任务就是 LongPollingRunnable,每个任务都有一个 taskId。

现在我们来看看 LongPollingRunnable 做了什么,主要分为两部分,第一部分是检查本地的配置信息,第二部分是获取服务端的配置信息然后更新到本地。

本地检查

首先取出与该 taskId 相关的 CacheData,然后对 CacheData 进行检查,包括本地配置检查和监听器的 md5 检查,本地检查主要是做一个故障容错,当服务端挂掉后,Nacos 客户端可以从本地的文件系统中获取相关的配置信息。
@Override
    public void run() {
            
        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // check failover config
            for (CacheData cacheData : cacheMap.values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        checkLocalConfig(cacheData);
                        if (cacheData.isUseLocalConfigInfo()) {
                            cacheData.checkListenerMd5();
                        }
                    } catch (Exception e) {
                        LOGGER.error("get local config info error", e);
                    }
                }
            }               
            // 省略部分代码                          
        } catch (Throwable e) {       
            //如果轮训任务异常,则该任务的下一次执行时间将延迟
            LOGGER.error("longPolling error : ", e);
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
        }
    }

 

服务端检查

通过 checkUpdateDataIds() 方法从服务端获取那些值发生了变化的 dataId 列表,

通过 getServerConfig 方法,根据 dataId 到服务端获取最新的配置信息,接着将最新的配置信息保存到 CacheData 中。

最后调用 CacheData 的 checkListenerMd5 方法,可以看到该方法在第一部分也被调用过。通过对比MD5值可以快速发现是否改变过配置。

@Override
    public void run() {

        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // 省略

            // 服务端检查
            List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);//从服务端获取那些值发生了变化的 dataId 列表
            if (!CollectionUtils.isEmpty(changedGroupKeys)) {
                LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
            }

            for (String groupKey : changedGroupKeys) {
                String[] key = GroupKey.parseKey(groupKey);
                String dataId = key[0];
                String group = key[1];
                String tenant = null;
                if (key.length == 3) {
                    tenant = key[2];
                }
                try {
                    String[] ct = getServerConfig(dataId, group, tenant, 3000L);//根据 dataId 到服务端获取最新的配置信息,接着将最新的配置信息保存到 CacheData 中。
                    CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                    cache.setContent(ct[0]);
                    if (null != ct[1]) {
                        cache.setType(ct[1]);
                    }
                    LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                        agent.getName(), dataId, group, tenant, cache.getMd5(),
                        ContentUtils.truncateContent(ct[0]), ct[1]);
                } catch (NacosException ioe) {
                    String message = String
                        .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                            agent.getName(), dataId, group, tenant);
                    LOGGER.error(message, ioe);
                }
            }
            for (CacheData cacheData : cacheDatas) {
                if (!cacheData.isInitializing() || inInitializingCacheList
                    .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                    cacheData.checkListenerMd5();//检查是否更改配置
                    cacheData.setInitializing(false);
                }
            }
            inInitializingCacheList.clear();

            executorService.execute(this);

        } catch (Throwable e) {
            LOGGER.error("longPolling error : ", e);
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
        }
    }

  

添加 Listener

使用ClientWorker 的 addTenantListeners 方法来为 ConfigService 添加一个 Listener 

public void addListeners(String dataId, String group, List<? extends Listener> listeners) {
        group = null2defaultGroup(group);
        CacheData cache = addCacheDataIfAbsent(dataId, group);
        for (Listener listener : listeners) {
            cache.addListener(listener);
        }
    }

  

该方法分为两个部分,首先根据 dataId,group 和当前的场景获取一个 CacheData 对象,然后将当前要添加的 listener 对象添加到 CacheData 中去。

也就是说 listener 最终是被这里的 CacheData 所持有了,那 listener 的回调方法 receiveConfigInfo 就应该是在 CacheData 中触发的。

 

可以发现 CacheData 是出现频率非常高的一个类,在 LongPollingRunnable 的任务中,几乎所有的方法都围绕着 CacheData 类,现在添加 Listener 的时候,实际上该 Listener 也被委托给了 CacheData。接下来看一下 CacheData 类的代码。

CacheData

首先看一下 CacheData 中的成员变量

    private final String name;
    private final ConfigFilterChainManager configFilterChainManager;
    public final String dataId;
    public final String group;
    public final String tenant;
    private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
    private volatile String md5;

    /**
     * 是否使用本地配置
     */
    private volatile boolean isUseLocalConfig = false;

    /**
     * 上一次修改时间
     */
    private volatile long localConfigLastModified;
    private volatile String content;
    private int taskId;
    private volatile boolean isInitializing = true;
    private String type;

  

可以看到除了 dataId,group,content,taskId 这些跟配置相关的属性,还有两个比较重要的属性:listeners、md5。

listeners 是该 CacheData 所关联的所有监听器,不过不是保存的原始的 Listener 对象,而是包装后的 ManagerListenerWrap 对象,该对象除了持有 Listener 对象,还持有了一个 lastCallMd5 属性。

另外一个属性 md5 就是根据当前对象的 content 计算出来的 md5 值,用于快速判断配置信息是否修改过。

触发回调

ConfigService 的 Listener 是在什么时候触发回调方法 receiveConfigInfo 的?

在 ClientWorker 中的定时任务中,启动了一个长轮询的任务:LongPollingRunnable,该任务多次执行了 cacheData.checkListenerMd5() 方法
void checkListenerMd5() {
        for (ManagerListenerWrap wrap : listeners) {
            if (!md5.equals(wrap.lastCallMd5)) {
                safeNotifyListener(dataId, group, content, type, md5, wrap);
            }
        }
    }

  

该方法会检查 CacheData 当前的 md5 与 CacheData 持有的所有 Listener 中保存的 md5 的值是否一致,如果不一致,就执行一个安全的监听器的通知方法:safeNotifyListener,通知Listener 的使用者,该 Listener 所关注的配置信息已经发生改变了。
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
            final String md5, final ManagerListenerWrap listenerWrap) {
        final Listener listener = listenerWrap.listener;

        Runnable job = new Runnable() {
            @Override
            public void run() {
                ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
                ClassLoader appClassLoader = listener.getClass().getClassLoader();
                try {
                    if (listener instanceof AbstractSharedListener) {
                        AbstractSharedListener adapter = (AbstractSharedListener) listener;
                        adapter.fillContext(dataId, group);
                        LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                    }
                    // 执行回调之前先将线程classloader设置为具体webapp的classloader,以免回调方法中调用spi接口是出现异常或错用(多应用部署才会有该问题)。
                    Thread.currentThread().setContextClassLoader(appClassLoader);

                    ConfigResponse cr = new ConfigResponse();
                    cr.setDataId(dataId);
                    cr.setGroup(group);
                    cr.setContent(content);
                    configFilterChainManager.doFilter(null, cr);
                    String contentTmp = cr.getContent();//获取最新的配置信息
                    listener.receiveConfigInfo(contentTmp);//回调方法,传入最新的信息

                    // 比较这次和上次的内容
                    if (listener instanceof AbstractConfigChangeListener) {
                        Map data = ConfigChangeHandler.getInstance()
                                .parseChangeData(listenerWrap.lastContent, content, type);
                        ConfigChangeEvent event = new ConfigChangeEvent(data);
                        ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
                        listenerWrap.lastContent = content;
                    }

                    listenerWrap.lastCallMd5 = md5;//更新md5值
                    LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
                            listener);
                } catch (NacosException ex) {
                    LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
                            name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
                } catch (Throwable t) {
                    LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
                            group, md5, listener, t.getCause());
                } finally {
                    Thread.currentThread().setContextClassLoader(myClassLoader);
                }
            }
        };

        final long startNotify = System.currentTimeMillis();
        try {
            if (null != listener.getExecutor()) {
                listener.getExecutor().execute(job);
            } else {
                job.run();
            }
        } catch (Throwable t) {
            LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
                    group, md5, listener, t.getCause());
        }
        final long finishNotify = System.currentTimeMillis();
        LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
                name, (finishNotify - startNotify), dataId, group, md5, listener);
    }

  

可以看到在 safeNotifyListener 方法中,注释部分,先获取最新的配置信息,调用 Listener 的回调方法,将最新的配置信息作为参数传入,这样 Listener 的使用者就能接收到变更后的配置信息了,最后更新 ListenerWrap 的 md5 值。
 
CacheData 的 md5 值在上面的 LongPollingRunnable 所执行的任务中,在获取服务端发生变更的配置信息时,将最新的 content 数据写入了 CacheData 中。
public void setContent(String content) {
        this.content = content;
        this.md5 = getMd5String(this.content);
 }

  

在长轮询的任务中,当服务端配置信息发生变更时,客户端将最新的数据获取下来之后,保存在了 CacheData 中,同时更新了该 CacheData 的 md5 值,所以当下次执行 checkListenerMd5 方法时,就会发现当前 listener 所持有的 md5 值已经和 CacheData 的 md5 值不一样了,也就意味着服务端的配置信息发生改变了,这时就需要将最新的数据通知给 Listener 的持有者。

以上就是配置中心的完整流程,可以发现,Nacos 并不是通过推的方式将服务端最新的配置信息发送给客户端的,而是客户端维护了一个长轮询的任务,定时去拉取发生变更的配置信息,然后将最新的数据推送给 Listener 的持有者。

总结

客户端是通过一个定时任务来检查自己监听的配置项的数据的,一旦服务端的数据发生变化时,客户端将会获取到最新的数据,并将最新的数据保存在一个 CacheData 对象中,然后会重新计算 CacheData 的 md5 属性的值,此时就会对该 CacheData 所绑定的 Listener 触发 receiveConfigInfo 回调。

Nacos 服务信息"推送"本质还是拉模式,具体流程为:

1、Nacos 客户端会循环请求服务端变更的数据,并且超时时间设置为 30s,当配置发生变化时,请求的响应会立即返回;

2、服务端数据变更后,找到具体的客户端请求中的 response,然后直接将结果写入 response 中;

3、Nacos 客户端就能够实时感知到服务端配置发生了变化。




参考文章:https://www.jianshu.com/p/38b5452c9fec
https://blog.csdn.net/qq40988670/article/details/105966202




免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM