server 保留 2 份配置文件,一份在 mysql,一份在本地磁盤,同時在內存中緩存配置文件的 md5 值。當客戶端獲取配置時,server 直接返回本地磁盤文件,使用的是 sendFile api
FileInputStream fis = null;
fis.getChannel().transferTo(0L, fis.getChannel().size(), Channels.newChannel(response.getOutputStream()));
用戶發布配置
ConfigController#publishConfig
// 更新數據庫 persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false); // 發布事件 EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
整個事件流
public class EventDispatcher { /** * add event listener */ static public void addEventListener(AbstractEventListener listener) { for (Class<? extends Event> type : listener.interest()) { getEntry(type).listeners.addIfAbsent(listener); } } /** * fire event, notify listeners. */ static public void fireEvent(Event event) { if (null == event) { throw new IllegalArgumentException(); } for (AbstractEventListener listener : getEntry(event.getClass()).listeners) { try { listener.onEvent(event); } catch (Exception e) { log.error(e.toString(), e); } } } /** * For only test purpose */ static public void clear() { LISTENER_HUB.clear(); } /** * get event listener for eventType. Add Entry if not exist. * 獲取事件監聽器,如果沒有則新建 Entry */ static Entry getEntry(Class<? extends Event> eventType) { for (; ; ) { for (Entry entry : LISTENER_HUB) { if (entry.eventType == eventType) { return entry; } } Entry tmp = new Entry(eventType); /** * false means already exists */ if (LISTENER_HUB.addIfAbsent(tmp)) { return tmp; } } } // 把事件和監聽器關聯起來 static private class Entry { final Class<? extends Event> eventType; final CopyOnWriteArrayList<AbstractEventListener> listeners; Entry(Class<? extends Event> type) { eventType = type; listeners = new CopyOnWriteArrayList<AbstractEventListener>(); } @Override public boolean equals(Object obj) { if (null == obj || obj.getClass() != getClass()) { return false; } if (this == obj) { return true; } return eventType == ((Entry)obj).eventType; } @Override public int hashCode() { return super.hashCode(); } } static private final Logger log = LoggerFactory.getLogger(EventDispatcher.class); static final CopyOnWriteArrayList<Entry> LISTENER_HUB = new CopyOnWriteArrayList<Entry>(); public interface Event { } static public abstract class AbstractEventListener { public AbstractEventListener() { // 執行 AsyncNotifyService 構造函數,把事件和監聽器關聯起來 EventDispatcher.addEventListener(this); } /** * 感興趣的事件列表 * * @return event list */ abstract public List<Class<? extends Event>> interest(); /** * 處理事件 * * @param event event */ abstract public void onEvent(Event event); } }
AsyncNotifyService
@Service public class AsyncNotifyService extends AbstractEventListener { @Override public List<Class<? extends Event>> interest() { List<Class<? extends Event>> types = new ArrayList<Class<? extends Event>>(); // 觸發配置變更同步通知 types.add(ConfigDataChangeEvent.class); return types; } @Override public void onEvent(Event event) { // 並發產生 ConfigDataChangeEvent if (event instanceof ConfigDataChangeEvent) { ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; long dumpTs = evt.lastModifiedTs; String dataId = evt.dataId; String group = evt.group; String tenant = evt.tenant; String tag = evt.tag; List<?> ipList = serverListService.getServerList(); // 其實這里任何類型隊列都可以 Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>(); for (int i = 0; i < ipList.size(); i++) { queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String) ipList.get(i), evt.isBeta)); } EXECUTOR.execute(new AsyncTask(httpclient, queue)); } } }
由以上可見,ConfigDataChangeEvent 事件由 AsyncNotifyService.onEvent 負責
向集群中所有節點(包括自己)發送請求
/v1/cs/communication/dataChange?dataId=oo.yml&group=xx&tenant=dev
節點處理配置變更請求
CommunicationController#notifyConfigInfo
用數據庫中的數據更新磁盤上的文件緩存
dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
// DumpService#dump public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp, boolean isBeta) { String groupKey = GroupKey2.getKey(dataId, group, tenant); dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta)); }
@Service public class DumpService { @Autowired private Environment env; @Autowired PersistService persistService; @PostConstruct public void init() { LogUtil.defaultLog.warn("DumpService start"); DumpProcessor processor = new DumpProcessor(this); dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager"); // 設置默認處理器 dumpTaskMgr.setDefaultTaskProcessor(processor); } }
TaskManager
public TaskManager(String name) { this.name = name; if (null != name && name.length() > 0) { this.processingThread = new Thread(new ProcessRunnable(), name); } else { this.processingThread = new Thread(new ProcessRunnable()); } this.processingThread.setDaemon(true); this.closed.set(false); this.processingThread.start(); } class ProcessRunnable implements Runnable { @Override public void run() { while (!TaskManager.this.closed.get()) { try { Thread.sleep(100); TaskManager.this.process(); } catch (Throwable e) { } } } } public void addTask(String type, AbstractTask task) { this.lock.lock(); try { AbstractTask oldTask = tasks.put(type, task); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); if (null != oldTask) { task.merge(oldTask); } } finally { this.lock.unlock(); } } protected void process() { for (Map.Entry<String, AbstractTask> entry : this.tasks.entrySet()) { AbstractTask task = null; this.lock.lock(); try { // 獲取任務 task = entry.getValue(); if (null != task) { if (!task.shouldProcess()) { // 任務當前不需要被執行,直接跳過 continue; } // 先將任務從任務Map中刪除 this.tasks.remove(entry.getKey()); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); } } finally { this.lock.unlock(); } if (null != task) { // 獲取任務處理器 TaskProcessor processor = this.taskProcessors.get(entry.getKey()); if (null == processor) { // DumpTask 使用的是默認處理器,即 DumpProcessor processor = this.getDefaultTaskProcessor(); } if (null != processor) { boolean result = false; try { // 處理任務 result = processor.process(entry.getKey(), task); } catch (Throwable t) { log.error("task_fail", "處理task失敗", t); } if (!result) { // 任務處理失敗,設置最后處理時間 task.setLastProcessTime(System.currentTimeMillis()); // 將任務重新加入到任務Map中 this.addTask(entry.getKey(), task); } } } } if (tasks.isEmpty()) { this.lock.lock(); try { this.notEmpty.signalAll(); } finally { this.lock.unlock(); } } }
因此執行的是 DumpProcessor#process,讀取數據庫中的配置,更新本地磁盤文件,同時生成 LocalDataChangeEvent 事件。
處理 LocalDataChangeEvent 事件
// com.alibaba.nacos.config.server.service.LongPollingService#onEvent public void onEvent(Event event) { if (isFixedPolling()) { // ignore } else { if (event instanceof LocalDataChangeEvent) { LocalDataChangeEvent evt = (LocalDataChangeEvent)event; scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps)); } } }
取消線程池中的定時任務,發送響應(變化的配置文件 id)給客戶端
com.alibaba.nacos.config.server.service.LongPollingService.DataChangeTask#run
客觀地講,nacos 的代碼細節不優雅,還在發展中。