mycat啟動的時候啟動了三個模塊
1:NIOConnector(負責鏈接mysql數據庫,連接池以數據庫為准不以鏈接字符串為准),
1:NIOAcceptor,ManagerConnectionFactory(管理模塊,默認端口9066)
2:NIOAcceptor,ServerConnectionFactory(mysql服務模塊,默認端口8066)
這里介紹下管理模塊的啟動流程
順序圖

NIO和AIO
mycat分別實現了NIO和AIO,由於linux當前沒有真正實現AIO這里主要介紹NIO的流程。
NIO的Reactor與AIO的Proactor兩種模式的場景區別:
下面是Reactor的做法:
1. 等待事件響應 (Reactor job)
2. 分發 “Ready-to-Read” 事件給用戶句柄 ( Reactor job)
3. 讀數據 (user handler job)
4. 處理數據( user handler job)
下面再來看看真正意義的異步模式Proactor是如何做的:
1. 等待事件響應 (Proactor job)
2. 讀數據 (Proactor job)
3. 分發 “Read-Completed” 事件給用戶句柄 (Proactor job)
4. 處理數據(user handler job)
mycat的NIO實現
Selector(選擇器)是Java NIO中能夠檢測一到多個NIO通道,並能夠知曉通道是否為諸如讀寫事件做好准備的組件。這樣,一個單獨的線程可以管理多個channel,從而管理多個網絡連接。
Selector可以監聽四種不同類型的事件:
- Connect
- Accept
- Read
- Write
這四種事件用SelectionKey的四個常量來表示:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
前面已經說了,NIO采用的Reactor模式:例如汽車是乘客訪問的主體(Reactor),乘客上車后,到售票員(acceptor)處登記,之后乘客便可以休息睡覺去了,當到達乘客所要到達的目的地后,售票員將其喚醒即可。
核心順序

mycat管理端的啟動流程
1:new ManagerConnectionFactory extends FrontendConnectionFactory
2:new NIOReactorPool,new NIOReactor,new RW中new ConcurrentLinkedQueue<AbstractConnection>()而AbstractConnection中new NIOSocketWR
3:new NIOAcceptor中向反應堆中注冊了OP_ACCEPT,該類繼承了Thread然后start啟動
accept
channel = serverChannel.accept();
channel.configureBlocking(false);
FrontendConnection c = factory.make(channel);
c.setAccepted(true);
c.setId(ID_GENERATOR.getId());
NIOProcessor processor = (NIOProcessor) MycatServer.getInstance()
.nextProcessor();
c.setProcessor(processor);
LOGGER.info("accept");
NIOReactor reactor = reactorPool.getNextReactor();
reactor.postRegister(c);
factory.make(channel):最終構造了ManagerQueryHandler(管理命令解析器)和FrontendAuthenticator(mycat權限解析器)
reactor.postRegister(c):把當前鏈接添加到reactor的registerQueue中並喚醒reactor的selector
read
在NIOReactor的registerQueue為空的時候run循環空運轉,當上一步把accept的鏈接放到隊列的時候則
for (;;) {
++reactCount;
try {
selector.select(500L);
register(selector);
keys = selector.selectedKeys();
for (SelectionKey key : keys) {
AbstractConnection con = null;
try {
Object att = key.attachment();
if (att != null) {
con = (AbstractConnection) att;
if (key.isValid() && key.isReadable()) {
try {
con.asynRead();
} catch (IOException e) {
con.close("program err:" + e.toString());
continue;
} catch (Exception e) {
LOGGER.debug("caught err:", e);
con.close("program err:" + e.toString());
continue;
}
}
if (key.isValid() && key.isWritable()) {
con.doNextWriteCheck();
}
} else {
key.cancel();
}
} catch (CancelledKeyException e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(con + " socket key canceled");
}
} catch (Exception e) {
LOGGER.warn(con + " " + e);
}
}
} catch (Exception e) {
LOGGER.warn(name, e);
} finally {
if (keys != null) {
keys.clear();
}
}
register(selector);也即
((NIOSocketWR) c.getSocketWR()).register(selector); 注冊OP_READ事件
c.register();即FrontendConnection的register發送握手數據包
con.asynRead();即NIOSocketWR的asynRead即
public void asynRead() throws IOException {
LOGGER.info("asynRead");
ByteBuffer theBuffer = con.readBuffer;
if (theBuffer == null) {
theBuffer = con.processor.getBufferPool().allocate();
con.readBuffer = theBuffer;
}
int got = channel.read(theBuffer);
con.onReadData(got);
}
con.onReadData(got);即AbstractConnection的onReadData這里拆包得到完成的數據包后調用
handler.handle(data);也即FrontendAuthenticator的handle在這里check user;check password;check schema如果失敗則將失敗信息寫入緩沖區,如果成功
則把AbstractConnection的默認hander從FrontendAuthenticator換成FrontendCommandHandler等待接下來的處理(比如show命令等,
以上的處理是發生在輸入mysql -utest -ptest -h10.97.177.83 -P9066時)
認證完成后下一次的handler.handle(data)則使用FrontendCommandHandler的handle來處理也即
public void handle(byte[] data)
{
if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())
{
MySQLMessage mm = new MySQLMessage(data);
int packetLength = mm.readUB3();
if(packetLength+4==data.length)
{
source.loadDataInfileData(data);
}
return;
}
switch (data[4])
{
case MySQLPacket.COM_INIT_DB:
commands.doInitDB();
source.initDB(data);
break;
case MySQLPacket.COM_QUERY:
commands.doQuery();
source.query(data);
break;
case MySQLPacket.COM_PING:
commands.doPing();
source.ping();
break;
case MySQLPacket.COM_QUIT:
commands.doQuit();
source.close("quit cmd");
break;
case MySQLPacket.COM_PROCESS_KILL:
commands.doKill();
source.kill(data);
break;
case MySQLPacket.COM_STMT_PREPARE:
commands.doStmtPrepare();
source.stmtPrepare(data);
break;
case MySQLPacket.COM_STMT_EXECUTE:
commands.doStmtExecute();
source.stmtExecute(data);
break;
case MySQLPacket.COM_STMT_CLOSE:
commands.doStmtClose();
source.stmtClose(data);
break;
case MySQLPacket.COM_HEARTBEAT:
commands.doHeartbeat();
source.heartbeat(data);
break;
default:
commands.doOther();
source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,
"Unknown command");
}
}
source.query(data);即queryHandler.query(sql);這里的queryHandler是ManagerQueryHandler即
public void query(String sql) {
ManagerConnection c = this.source;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(new StringBuilder().append(c).append(sql).toString());
}
int rs = ManagerParse.parse(sql);
switch (rs & 0xff) {
case ManagerParse.SELECT:
SelectHandler.handle(sql, c, rs >>> SHIFT);
break;
case ManagerParse.SET:
c.write(c.writeToBuffer(OkPacket.OK, c.allocate()));
break;
case ManagerParse.SHOW:
ShowHandler.handle(sql, c, rs >>> SHIFT);
break;
case ManagerParse.SWITCH:
SwitchHandler.handler(sql, c, rs >>> SHIFT);
break;
case ManagerParse.KILL_CONN:
KillConnection.response(sql, rs >>> SHIFT, c);
break;
case ManagerParse.OFFLINE:
Offline.execute(sql, c);
break;
case ManagerParse.ONLINE:
Online.execute(sql, c);
break;
case ManagerParse.STOP:
StopHandler.handle(sql, c, rs >>> SHIFT);
break;
case ManagerParse.RELOAD:
ReloadHandler.handle(sql, c, rs >>> SHIFT);
break;
case ManagerParse.ROLLBACK:
RollbackHandler.handle(sql, c, rs >>> SHIFT);
break;
case ManagerParse.CLEAR:
ClearHandler.handle(sql, c, rs >>> SHIFT);
break;
case ManagerParse.CONFIGFILE:
ConfFileHandler.handle(sql, c);
break;
case ManagerParse.LOGFILE:
ShowServerLog.handle(sql, c);
break;
default:
c.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement");
}
}
總結
mycat的網絡處理邏輯上是通過隊列加上后台線程來實現了accept和read的解耦從而實現了高性能,但是代碼寫的就不敢恭維。
