更多MyCat源碼分析,請戳MyCat源碼分析系列
MyCat前端驗證
MyCat的前端驗證指的是應用連接MyCat時進行的用戶驗證過程,如使用MySQL客戶端時,$ mysql -uroot -proot -P8066 db_test觸發的一系列行為。
驗證的過程分為幾個步驟:
1)應用與MyCat建立TCP連接;
2)MyCat發送握手包,其中帶有為密碼加密的種子(seed);
3)應用接收握手包,使用種子進行密碼加密,隨后將包含用戶名、加密密碼、字符集和需連接的數據庫(可選)等的驗證包發送給MyCat;
4)MyCat依次驗證信息,並將驗證結果回發給應用
MyCat中,前端應用與MyCat之間建立的連接稱為FrontendConnection(其子類為ServerConnection和ManagerConnection,分別由ServerConnectionFactory和ManagerConnectionFactory負責創建),在構造函數的時候綁定了一個NIOHandler類型的FrontendAuthenticator用於后續的驗證
public FrontendConnection(NetworkChannel channel) throws IOException { super(channel); InetSocketAddress localAddr = (InetSocketAddress) channel.getLocalAddress(); InetSocketAddress remoteAddr = null; if (channel instanceof SocketChannel) { remoteAddr = (InetSocketAddress) ((SocketChannel) channel).getRemoteAddress(); } else if (channel instanceof AsynchronousSocketChannel) { remoteAddr = (InetSocketAddress) ((AsynchronousSocketChannel) channel).getRemoteAddress(); } this.host = remoteAddr.getHostString(); this.port = localAddr.getPort(); this.localPort = remoteAddr.getPort(); this.handler = new FrontendAuthenticator(this); }
首先,應用與MyCat的TCP連接建立過程由NIOAcceptor負責完成,當之前注冊的OP_ACCEPT事件得到響應時調用accept()方法:
private void accept() { SocketChannel channel = null; try { channel = serverChannel.accept(); channel.configureBlocking(false); FrontendConnection c = factory.make(channel); c.setAccepted(true); c.setId(ID_GENERATOR.getId()); NIOProcessor processor = (NIOProcessor) MycatServer.getInstance() .nextProcessor(); c.setProcessor(processor); NIOReactor reactor = reactorPool.getNextReactor(); reactor.postRegister(c); } catch (Exception e) { LOGGER.warn(getName(), e); closeChannel(channel); } }
該TCP連接建立完成后創建一個FrontendConnection實例,並交由NIOReactor負責后續的讀寫工作(reactor.postRegister(c);),而NIOReactor中的內部類RW會調用其register()方法:
private void register(Selector selector) { AbstractConnection c = null; if (registerQueue.isEmpty()) { return; } while ((c = registerQueue.poll()) != null) { try { ((NIOSocketWR) c.getSocketWR()).register(selector); c.register(); } catch (Exception e) { c.close("register err" + e.toString()); } } }
其中,NIOSocketWR會調用其register()方法注冊OP_READ事件,隨后FrontendConnection調用其register()方法發送握手包HandshakePacket,等待應用回發驗證包:
public void register() throws IOException { if (!isClosed.get()) { // 生成認證數據 byte[] rand1 = RandomUtil.randomBytes(8); byte[] rand2 = RandomUtil.randomBytes(12); // 保存認證數據 byte[] seed = new byte[rand1.length + rand2.length]; System.arraycopy(rand1, 0, seed, 0, rand1.length); System.arraycopy(rand2, 0, seed, rand1.length, rand2.length); this.seed = seed; // 發送握手數據包 HandshakePacket hs = new HandshakePacket(); hs.packetId = 0; hs.protocolVersion = Versions.PROTOCOL_VERSION; hs.serverVersion = Versions.SERVER_VERSION; hs.threadId = id; hs.seed = rand1; hs.serverCapabilities = getServerCapabilities(); hs.serverCharsetIndex = (byte) (charsetIndex & 0xff); hs.serverStatus = 2; hs.restOfScrambleBuff = rand2; hs.write(this); // asynread response this.asynRead(); } }
當收到應用的驗證包AuthPacket時,之前綁定在FrontendConnection上的FrontendAuthenticator會調用其handle()方法:
public void handle(byte[] data) { // check quit packet if (data.length == QuitPacket.QUIT.length && data[4] == MySQLPacket.COM_QUIT) { source.close("quit packet"); return; } AuthPacket auth = new AuthPacket(); auth.read(data); // check user if (!checkUser(auth.user, source.getHost())) { failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "' with host '" + source.getHost()+ "'"); return; } // check password if (!checkPassword(auth.password, auth.user)) { failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "', because password is error "); return; } // check degrade if ( isDegrade( auth.user ) ) { failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "', because service be degraded "); return; } // check schema switch (checkSchema(auth.database, auth.user)) { case ErrorCode.ER_BAD_DB_ERROR: failure(ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + auth.database + "'"); break; case ErrorCode.ER_DBACCESS_DENIED_ERROR: String s = "Access denied for user '" + auth.user + "' to database '" + auth.database + "'"; failure(ErrorCode.ER_DBACCESS_DENIED_ERROR, s); break; default: success(auth); } }
依次驗證用戶名、密碼、用戶負載、數據庫權限,驗證成功后調用success()方法向應用發送OK包:
protected void success(AuthPacket auth) { source.setAuthenticated(true); source.setUser(auth.user); source.setSchema(auth.database); source.setCharsetIndex(auth.charsetIndex); source.setHandler(new FrontendCommandHandler(source)); if (LOGGER.isInfoEnabled()) { StringBuilder s = new StringBuilder(); s.append(source).append('\'').append(auth.user).append("' login success"); byte[] extra = auth.extra; if (extra != null && extra.length > 0) { s.append(",extra:").append(new String(extra)); } LOGGER.info(s.toString()); } ByteBuffer buffer = source.allocate(); source.write(source.writeToBuffer(AUTH_OK, buffer)); boolean clientCompress = Capabilities.CLIENT_COMPRESS==(Capabilities.CLIENT_COMPRESS & auth.clientFlags); boolean usingCompress= MycatServer.getInstance().getConfig().getSystem().getUseCompression()==1 ; if(clientCompress&&usingCompress) { source.setSupportCompress(true); } }
這里最重要的就是將FrontendConnection的handler重新綁定為FrontendCommandHandler對象,用於后續各類命令的接收和處理分發。
后端驗證
后端驗證過程類似於前端驗證過程,區別就在於此時MyCat是作為客戶端,向后端MySQL數據庫發起連接與驗證請求。
MyCat中,MyCat與后端MySQL的連接稱為MySQLConnection,由MySQLConnectionFactory創建,並綁定了一個NIOHandler類型的MySQLConnectionAuthenticator,用於完成與MySQL的驗證過程:
public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler, String schema) throws IOException { DBHostConfig dsc = pool.getConfig(); NetworkChannel channel = openSocketChannel(MycatServer.getInstance() .isAIO()); MySQLConnection c = new MySQLConnection(channel, pool.isReadNode()); MycatServer.getInstance().getConfig().setSocketParams(c, false); c.setHost(dsc.getIp()); c.setPort(dsc.getPort()); c.setUser(dsc.getUser()); c.setPassword(dsc.getPassword()); c.setSchema(schema); c.setHandler(new MySQLConnectionAuthenticator(c, handler)); c.setPool(pool); c.setIdleTimeout(pool.getConfig().getIdleTimeout()); if (channel instanceof AsynchronousSocketChannel) { ((AsynchronousSocketChannel) channel).connect( new InetSocketAddress(dsc.getIp(), dsc.getPort()), c, (CompletionHandler) MycatServer.getInstance() .getConnector()); } else { ((NIOConnector) MycatServer.getInstance().getConnector()) .postConnect(c); } return c; }
MySQLConnection創建完成后,交由NIOConnector完成TCP連接,它會調用其connect()方法注冊OP_CONNECT事件,並發送連接請求至MySQL:
private void connect(Selector selector) { AbstractConnection c = null; while ((c = connectQueue.poll()) != null) { try { SocketChannel channel = (SocketChannel) c.getChannel(); channel.register(selector, SelectionKey.OP_CONNECT, c); channel.connect(new InetSocketAddress(c.host, c.port)); } catch (Exception e) { c.close(e.toString()); } } }
TCP連接建立后同樣交由NIOReactor負責后續的讀寫工作,最終之前綁定在MySQLConnection上的MySQLConnectionAuthenticator會調用其handle()方法:
public void handle(byte[] data) { try { switch (data[4]) { case OkPacket.FIELD_COUNT: HandshakePacket packet = source.getHandshake(); if (packet == null) { processHandShakePacket(data); // 發送認證數據包 source.authenticate(); break; } // 處理認證結果 source.setHandler(new MySQLConnectionHandler(source)); source.setAuthenticated(true); boolean clientCompress = Capabilities.CLIENT_COMPRESS==(Capabilities.CLIENT_COMPRESS & packet.serverCapabilities); boolean usingCompress= MycatServer.getInstance().getConfig().getSystem().getUseCompression()==1 ; if(clientCompress&&usingCompress) { source.setSupportCompress(true); } if (listener != null) { listener.connectionAcquired(source); } break; case ErrorPacket.FIELD_COUNT: ErrorPacket err = new ErrorPacket(); err.read(data); String errMsg = new String(err.message); LOGGER.warn("can't connect to mysql server ,errmsg:"+errMsg+" "+source); //source.close(errMsg); throw new ConnectionException(err.errno, errMsg); case EOFPacket.FIELD_COUNT: auth323(data[3]); break; default: packet = source.getHandshake(); if (packet == null) { processHandShakePacket(data); // 發送認證數據包 source.authenticate(); break; } else { throw new RuntimeException("Unknown Packet!"); } } } catch (RuntimeException e) { if (listener != null) { listener.connectionError(e, source); return; } throw e; } }
與前端驗證一樣,MySQL數據庫作為服務端會在TCP連接建立完成后向應用發送一個握手包HandshakePacket,在此MySQLConnectionAuthenticator負責讀取此包,並通過source.authenticate();調用MySQLConnection的authenticate()方法向MySQL發送驗證包AuthPacket:
public void authenticate() { AuthPacket packet = new AuthPacket(); packet.packetId = 1; packet.clientFlags = clientFlags; packet.maxPacketSize = maxPacketSize; packet.charsetIndex = this.charsetIndex; packet.user = user; try { packet.password = passwd(password, handshake); } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e.getMessage()); } packet.database = schema; packet.write(this); }
驗證通過后,將MySQLConnection的handler重新綁定為MySQLConnectionHandler對象,用於后續接收MySQL數據庫發送過來的各類數據和處理分發
參考資料:
[1] http://sonymoon.iteye.com/blog/2245141
為尊重原創成果,如需轉載煩請注明本文出處:http://www.cnblogs.com/fernandolee24/p/5196332.html,特此感謝