更多MyCat源碼分析,請戳MyCat源碼分析系列
MyCat配置信息
除了一些默認的配置參數,大多數的MyCat配置信息是通過讀取若干.xml/.properties文件獲取的,主要包括:
1)server.xml:系統和用戶相關配置
2)schema.xml:虛擬庫、表、數據節點配置等
3)rule.xml:分片規則設置
4)cacheservice.properties:緩存相關設置
5)dnindex.properties:datahost主從切換配置文件
6)sequence_conf.properties:本地全局序列號配置文件
而在代碼層面,與配置相關的類主要包括3個:
1)MycatConfig:最為重要的配置類
2)ReloadConfig:用於通過管理端口執行mysql> reload @@config或config_all命令,重新載入配置文件
3)RollbackConfig:用於通過管理端口執行mysql> rollback @@config命令,將配置信息回滾至reload之前的狀態
接下來重點介紹MycatConfig,它最關鍵的屬性如下:
private volatile SystemConfig system; private volatile MycatCluster cluster; private volatile MycatCluster _cluster; private volatile QuarantineConfig quarantine; private volatile QuarantineConfig _quarantine; private volatile Map<String, UserConfig> users; private volatile Map<String, UserConfig> _users; private volatile Map<String, SchemaConfig> schemas; private volatile Map<String, SchemaConfig> _schemas; private volatile Map<String, PhysicalDBNode> dataNodes; private volatile Map<String, PhysicalDBNode> _dataNodes; private volatile Map<String, PhysicalDBPool> dataHosts; private volatile Map<String, PhysicalDBPool> _dataHosts;
- SystemConfig:包含了諸多系統相關的配置參數(如端口、編碼、線程池大小、BufferPool大小、隔離級別等)
- MycatCluster:MyCat集群配置信息
- QuarantineConfig:用戶權限隔離(黑白名單)
- UserConfig:用戶配置,包含用戶名/密碼和允許訪問的虛擬庫
- SchemaConfig:虛擬庫配置,包括所有下屬的表配置(
TableConfig
)以及這些表涉及的datanode - PhysicalDBNode:datanode相關,對應一個數據庫實例中的數據庫
- PhysicalDBPool:datahost相關,里面包含了DataHostConfig配置,包括所有writeHosts和readHosts以及讀寫分離類型等
注意到除了SystemConfig,其余屬性都還有一個前綴加了_的同名屬性,這些屬性其實是作為備份的,用於reload/rollback配置文件時的切換。reload和rollback相關的方法如下:
public void reload(Map<String, UserConfig> users, Map<String, SchemaConfig> schemas, Map<String, PhysicalDBNode> dataNodes, Map<String, PhysicalDBPool> dataHosts, MycatCluster cluster, QuarantineConfig quarantine,boolean reloadAll) { apply(users, schemas, dataNodes, dataHosts, cluster, quarantine,reloadAll); this.reloadTime = TimeUtil.currentTimeMillis(); this.status = reloadAll?RELOAD_ALL:RELOAD; } public void rollback(Map<String, UserConfig> users, Map<String, SchemaConfig> schemas, Map<String, PhysicalDBNode> dataNodes, Map<String, PhysicalDBPool> dataHosts, MycatCluster cluster, QuarantineConfig quarantine) { apply(users, schemas, dataNodes, dataHosts, cluster, quarantine,status==RELOAD_ALL); this.rollbackTime = TimeUtil.currentTimeMillis(); this.status = ROLLBACK; }
在reload的時候注意到有reload和reload_all,區別就在於前者不會重新加載與datahost/datanode相關的更改,而后者會。
啟動流程
MyCat的啟動類為MycatStartup,而主體為MycatServer,其中主要分為兩個步驟:
1)初始化:此過程在MycatServer構造函數時執行,包括配置文件的讀取、CacheService和RouteService的創建等
public MycatServer() { this.config = new MycatConfig(); this.timer = new Timer(NAME + "Timer", true); this.sqlRecorder = new SQLRecorder(config.getSystem() .getSqlRecordCount()); this.isOnline = new AtomicBoolean(true); cacheService = new CacheService(); routerService = new RouteService(cacheService); // load datanode active index from properties dnIndexProperties = loadDnIndexProps(); try { sqlInterceptor = (SQLInterceptor) Class.forName( config.getSystem().getSqlInterceptor()).newInstance(); } catch (Exception e) { throw new RuntimeException(e); } catletClassLoader = new DynaClassLoader(SystemConfig.getHomePath() + File.separator + "catlet", config.getSystem() .getCatletClassCheckSeconds()); this.startupTime = TimeUtil.currentTimeMillis(); }
2)運行:此過程由startup()方法觸發,包括處理器對象創建、bufferpool創建、處理線程池創建、AIOConnector/NIOConnector創建與啟動、兩個AIOAcceptor/NIOAcceptor創建與啟動、后端數據庫的初始連接建立、定時器線程池/定時任務創建與啟動
public void startup() throws IOException { SystemConfig system = config.getSystem(); int processorCount = system.getProcessors(); // server startup LOGGER.info("==============================================="); LOGGER.info(NAME + " is ready to startup ..."); String inf = "Startup processors ...,total processors:" + system.getProcessors() + ",aio thread pool size:" + system.getProcessorExecutor() + " \r\n each process allocated socket buffer pool " + " bytes ,buffer chunk size:" + system.getProcessorBufferChunk() + " buffer pool's capacity(buferPool/bufferChunk) is:" + system.getProcessorBufferPool() / system.getProcessorBufferChunk(); LOGGER.info(inf); LOGGER.info("sysconfig params:" + system.toString()); // startup manager ManagerConnectionFactory mf = new ManagerConnectionFactory(); ServerConnectionFactory sf = new ServerConnectionFactory(); SocketAcceptor manager = null; SocketAcceptor server = null; aio = (system.getUsingAIO() == 1); // startup processors int threadPoolSize = system.getProcessorExecutor(); processors = new NIOProcessor[processorCount]; long processBuferPool = system.getProcessorBufferPool(); int processBufferChunk = system.getProcessorBufferChunk(); int socketBufferLocalPercent = system.getProcessorBufferLocalPercent(); bufferPool = new BufferPool(processBuferPool, processBufferChunk, socketBufferLocalPercent / processorCount); businessExecutor = ExecutorUtil.create("BusinessExecutor", threadPoolSize); timerExecutor = ExecutorUtil.create("Timer", system.getTimerExecutor()); listeningExecutorService = MoreExecutors.listeningDecorator(businessExecutor); for (int i = 0; i < processors.length; i++) { processors[i] = new NIOProcessor("Processor" + i, bufferPool, businessExecutor); } if (aio) { LOGGER.info("using aio network handler "); asyncChannelGroups = new AsynchronousChannelGroup[processorCount]; // startup connector connector = new AIOConnector(); for (int i = 0; i < processors.length; i++) { asyncChannelGroups[i] = AsynchronousChannelGroup .withFixedThreadPool(processorCount, new ThreadFactory() { private int inx = 1; @Override public Thread newThread(Runnable r) { Thread th = new Thread(r); th.setName(BufferPool.LOCAL_BUF_THREAD_PREX + "AIO" + (inx++)); LOGGER.info("created new AIO thread " + th.getName()); return th; } }); } manager = new AIOAcceptor(NAME + "Manager", system.getBindIp(), system.getManagerPort(), mf, this.asyncChannelGroups[0]); // startup server server = new AIOAcceptor(NAME + "Server", system.getBindIp(), system.getServerPort(), sf, this.asyncChannelGroups[0]); } else { LOGGER.info("using nio network handler "); NIOReactorPool reactorPool = new NIOReactorPool( BufferPool.LOCAL_BUF_THREAD_PREX + "NIOREACTOR", processors.length); connector = new NIOConnector(BufferPool.LOCAL_BUF_THREAD_PREX + "NIOConnector", reactorPool); ((NIOConnector) connector).start(); manager = new NIOAcceptor(BufferPool.LOCAL_BUF_THREAD_PREX + NAME + "Manager", system.getBindIp(), system.getManagerPort(), mf, reactorPool); server = new NIOAcceptor(BufferPool.LOCAL_BUF_THREAD_PREX + NAME + "Server", system.getBindIp(), system.getServerPort(), sf, reactorPool); } // manager start manager.start(); LOGGER.info(manager.getName() + " is started and listening on " + manager.getPort()); server.start(); // server started LOGGER.info(server.getName() + " is started and listening on " + server.getPort()); LOGGER.info("==============================================="); // init datahost Map<String, PhysicalDBPool> dataHosts = config.getDataHosts(); LOGGER.info("Initialize dataHost ..."); for (PhysicalDBPool node : dataHosts.values()) { String index = dnIndexProperties.getProperty(node.getHostName(), "0"); if (!"0".equals(index)) { LOGGER.info("init datahost: " + node.getHostName() + " to use datasource index:" + index); } node.init(Integer.valueOf(index)); node.startHeartbeat(); } long dataNodeIldeCheckPeriod = system.getDataNodeIdleCheckPeriod(); timer.schedule(updateTime(), 0L, TIME_UPDATE_PERIOD); timer.schedule(processorCheck(), 0L, system.getProcessorCheckPeriod()); timer.schedule(dataNodeConHeartBeatCheck(dataNodeIldeCheckPeriod), 0L, dataNodeIldeCheckPeriod); timer.schedule(dataNodeHeartbeat(), 0L, system.getDataNodeHeartbeatPeriod()); timer.schedule(catletClassClear(), 30000); }
從以上代碼中不難看出,connector用於作為客戶端與后端MySQL建立連接,而server和manager則作為服務端接受來自前端應用的連接請求,其中server負責常規業務流程(默認端口8066),而manager負責監控與管理(默認端口9066)。
而datahost的初始化行為由node.init(Integer.valueOf(index));觸發,其目的是每個datahost中writehost會連續創建若干初始連接供使用(該數量由schema.xml中datahost標簽的minCon屬性確定),當后續連接不足時會創建新的連接(最大不超過maxCon)。
由此,MyCat程序啟動完成,等待接受來自應用的連接請求和后續的命令處理。
為尊重原創成果,如需轉載煩請注明本文出處:http://www.cnblogs.com/fernandolee24/p/5193983.html,特此感謝