MyCat源碼分析系列之——前后端驗證


更多MyCat源碼分析,請戳MyCat源碼分析系列


 MyCat前端驗證

MyCat的前端驗證指的是應用連接MyCat時進行的用戶驗證過程,如使用MySQL客戶端時,$ mysql -uroot -proot -P8066 db_test觸發的一系列行為。

驗證的過程分為幾個步驟:

1)應用與MyCat建立TCP連接;

2)MyCat發送握手包,其中帶有為密碼加密的種子(seed);

3)應用接收握手包,使用種子進行密碼加密,隨后將包含用戶名、加密密碼、字符集和需連接的數據庫(可選)等的驗證包發送給MyCat;

4)MyCat依次驗證信息,並將驗證結果回發給應用

MyCat中,前端應用與MyCat之間建立的連接稱為FrontendConnection(其子類為ServerConnection和ManagerConnection,分別由ServerConnectionFactory和ManagerConnectionFactory負責創建),在構造函數的時候綁定了一個NIOHandler類型的FrontendAuthenticator用於后續的驗證

public FrontendConnection(NetworkChannel channel) throws IOException {
        super(channel);
        InetSocketAddress localAddr = (InetSocketAddress) channel.getLocalAddress();
        InetSocketAddress remoteAddr = null;
        if (channel instanceof SocketChannel) {
            remoteAddr = (InetSocketAddress) ((SocketChannel) channel).getRemoteAddress();    
            
        } else if (channel instanceof AsynchronousSocketChannel) {
            remoteAddr = (InetSocketAddress) ((AsynchronousSocketChannel) channel).getRemoteAddress();
        }
        
        this.host = remoteAddr.getHostString();
        this.port = localAddr.getPort();
        this.localPort = remoteAddr.getPort();
        this.handler = new FrontendAuthenticator(this);
}

首先,應用與MyCat的TCP連接建立過程由NIOAcceptor負責完成,當之前注冊的OP_ACCEPT事件得到響應時調用accept()方法:

private void accept() {
        SocketChannel channel = null;
        try {
            channel = serverChannel.accept();
            channel.configureBlocking(false);
            FrontendConnection c = factory.make(channel);
            c.setAccepted(true);
            c.setId(ID_GENERATOR.getId());
            NIOProcessor processor = (NIOProcessor) MycatServer.getInstance()
                    .nextProcessor();
            c.setProcessor(processor);
            
            NIOReactor reactor = reactorPool.getNextReactor();
            reactor.postRegister(c);

        } catch (Exception e) {
            LOGGER.warn(getName(), e);
            closeChannel(channel);
        }
}

該TCP連接建立完成后創建一個FrontendConnection實例,並交由NIOReactor負責后續的讀寫工作(reactor.postRegister(c);),而NIOReactor中的內部類RW會調用其register()方法:

private void register(Selector selector) {
            AbstractConnection c = null;
            if (registerQueue.isEmpty()) {
                return;
            }
            while ((c = registerQueue.poll()) != null) {
                try {
                    ((NIOSocketWR) c.getSocketWR()).register(selector); c.register();
                } catch (Exception e) {
                    c.close("register err" + e.toString());
                }
            }
}

其中,NIOSocketWR會調用其register()方法注冊OP_READ事件,隨后FrontendConnection調用其register()方法發送握手包HandshakePacket,等待應用回發驗證包:

public void register() throws IOException {
        if (!isClosed.get()) {

            // 生成認證數據
            byte[] rand1 = RandomUtil.randomBytes(8);
            byte[] rand2 = RandomUtil.randomBytes(12);

            // 保存認證數據
            byte[] seed = new byte[rand1.length + rand2.length];
            System.arraycopy(rand1, 0, seed, 0, rand1.length);
            System.arraycopy(rand2, 0, seed, rand1.length, rand2.length);
            this.seed = seed;

            // 發送握手數據包
            HandshakePacket hs = new HandshakePacket();
            hs.packetId = 0;
            hs.protocolVersion = Versions.PROTOCOL_VERSION;
            hs.serverVersion = Versions.SERVER_VERSION;
            hs.threadId = id;
            hs.seed = rand1;
            hs.serverCapabilities = getServerCapabilities();
            hs.serverCharsetIndex = (byte) (charsetIndex & 0xff);
            hs.serverStatus = 2;
            hs.restOfScrambleBuff = rand2;
            hs.write(this);

            // asynread response
            this.asynRead();
        }
}

當收到應用的驗證包AuthPacket時,之前綁定在FrontendConnection上的FrontendAuthenticator會調用其handle()方法:

public void handle(byte[] data) {
        // check quit packet
        if (data.length == QuitPacket.QUIT.length && data[4] == MySQLPacket.COM_QUIT) {
            source.close("quit packet");
            return;
        }

        AuthPacket auth = new AuthPacket();
        auth.read(data);

        // check user
        if (!checkUser(auth.user, source.getHost())) {
            failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "' with host '" + source.getHost()+ "'");
            return;
        }

        // check password
        if (!checkPassword(auth.password, auth.user)) {
            failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "', because password is error ");
            return;
        }
        
        // check degrade
        if ( isDegrade( auth.user ) ) {
             failure(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + auth.user + "', because service be degraded ");
             return;
        }
        
        // check schema
        switch (checkSchema(auth.database, auth.user)) {
        case ErrorCode.ER_BAD_DB_ERROR:
            failure(ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + auth.database + "'");
            break;
        case ErrorCode.ER_DBACCESS_DENIED_ERROR:
            String s = "Access denied for user '" + auth.user + "' to database '" + auth.database + "'";
            failure(ErrorCode.ER_DBACCESS_DENIED_ERROR, s);
            break;
        default:
            success(auth);
        }
}

依次驗證用戶名、密碼、用戶負載、數據庫權限,驗證成功后調用success()方法向應用發送OK包:

protected void success(AuthPacket auth) {
        source.setAuthenticated(true);
        source.setUser(auth.user);
        source.setSchema(auth.database);
        source.setCharsetIndex(auth.charsetIndex);
        source.setHandler(new FrontendCommandHandler(source));

        if (LOGGER.isInfoEnabled()) {
            StringBuilder s = new StringBuilder();
            s.append(source).append('\'').append(auth.user).append("' login success");
            byte[] extra = auth.extra;
            if (extra != null && extra.length > 0) {
                s.append(",extra:").append(new String(extra));
            }
            LOGGER.info(s.toString());
        }

        ByteBuffer buffer = source.allocate();
        source.write(source.writeToBuffer(AUTH_OK, buffer)); boolean clientCompress = Capabilities.CLIENT_COMPRESS==(Capabilities.CLIENT_COMPRESS & auth.clientFlags);
        boolean usingCompress= MycatServer.getInstance().getConfig().getSystem().getUseCompression()==1 ;
        if(clientCompress&&usingCompress)
        {
            source.setSupportCompress(true);
        }
}

這里最重要的就是將FrontendConnection的handler重新綁定為FrontendCommandHandler對象,用於后續各類命令的接收和處理分發。

 


后端驗證

后端驗證過程類似於前端驗證過程,區別就在於此時MyCat是作為客戶端,向后端MySQL數據庫發起連接與驗證請求。

 MyCat中,MyCat與后端MySQL的連接稱為MySQLConnection,由MySQLConnectionFactory創建,並綁定了一個NIOHandler類型的MySQLConnectionAuthenticator,用於完成與MySQL的驗證過程:

public MySQLConnection make(MySQLDataSource pool, ResponseHandler handler,
            String schema) throws IOException {

        DBHostConfig dsc = pool.getConfig();
        NetworkChannel channel = openSocketChannel(MycatServer.getInstance()
                .isAIO());

        MySQLConnection c = new MySQLConnection(channel, pool.isReadNode());
        MycatServer.getInstance().getConfig().setSocketParams(c, false);
        c.setHost(dsc.getIp());
        c.setPort(dsc.getPort());
        c.setUser(dsc.getUser());
        c.setPassword(dsc.getPassword());
        c.setSchema(schema);
        c.setHandler(new MySQLConnectionAuthenticator(c, handler));
        c.setPool(pool);
        c.setIdleTimeout(pool.getConfig().getIdleTimeout());
        if (channel instanceof AsynchronousSocketChannel) {
            ((AsynchronousSocketChannel) channel).connect(
                    new InetSocketAddress(dsc.getIp(), dsc.getPort()), c,
                    (CompletionHandler) MycatServer.getInstance()
                            .getConnector());
        } else {
            ((NIOConnector) MycatServer.getInstance().getConnector()) .postConnect(c);
        }
        return c;
}

MySQLConnection創建完成后,交由NIOConnector完成TCP連接,它會調用其connect()方法注冊OP_CONNECT事件,並發送連接請求至MySQL:

private void connect(Selector selector) {
        AbstractConnection c = null;
        while ((c = connectQueue.poll()) != null) {
            try {
                SocketChannel channel = (SocketChannel) c.getChannel();
                channel.register(selector, SelectionKey.OP_CONNECT, c); channel.connect(new InetSocketAddress(c.host, c.port));
            } catch (Exception e) {
                c.close(e.toString());
            }
        }
}

TCP連接建立后同樣交由NIOReactor負責后續的讀寫工作,最終之前綁定在MySQLConnection上的MySQLConnectionAuthenticator會調用其handle()方法:

public void handle(byte[] data) {
        try {
            switch (data[4]) {
            case OkPacket.FIELD_COUNT:
                HandshakePacket packet = source.getHandshake();
                if (packet == null) {
                    processHandShakePacket(data);
                    // 發送認證數據包
                    source.authenticate();
                    break;
                }
                // 處理認證結果
                source.setHandler(new MySQLConnectionHandler(source));
                source.setAuthenticated(true);
                boolean clientCompress = Capabilities.CLIENT_COMPRESS==(Capabilities.CLIENT_COMPRESS & packet.serverCapabilities);
                boolean usingCompress= MycatServer.getInstance().getConfig().getSystem().getUseCompression()==1 ;
                if(clientCompress&&usingCompress)
                {
                    source.setSupportCompress(true);
                }
                if (listener != null) {
                    listener.connectionAcquired(source);
                }
                break;
            case ErrorPacket.FIELD_COUNT:
                ErrorPacket err = new ErrorPacket();
                err.read(data);
                String errMsg = new String(err.message);
                LOGGER.warn("can't connect to mysql server ,errmsg:"+errMsg+" "+source);
                //source.close(errMsg);
                throw new ConnectionException(err.errno, errMsg);

            case EOFPacket.FIELD_COUNT:
                auth323(data[3]);
                break;
            default:
                packet = source.getHandshake();
                if (packet == null) {
                    processHandShakePacket(data); // 發送認證數據包
                    source.authenticate();
                    break;
                } else {
                    throw new RuntimeException("Unknown Packet!");
                }

            }

        } catch (RuntimeException e) {
            if (listener != null) {
                listener.connectionError(e, source);
                return;
            }
            throw e;
        }
}

與前端驗證一樣,MySQL數據庫作為服務端會在TCP連接建立完成后向應用發送一個握手包HandshakePacket,在此MySQLConnectionAuthenticator負責讀取此包,並通過source.authenticate();調用MySQLConnection的authenticate()方法向MySQL發送驗證包AuthPacket:

public void authenticate() {
        AuthPacket packet = new AuthPacket();
        packet.packetId = 1;
        packet.clientFlags = clientFlags;
        packet.maxPacketSize = maxPacketSize;
        packet.charsetIndex = this.charsetIndex;
        packet.user = user;
        try {
            packet.password = passwd(password, handshake);
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException(e.getMessage());
        }
        packet.database = schema;
        packet.write(this);
}

驗證通過后,將MySQLConnection的handler重新綁定為MySQLConnectionHandler對象,用於后續接收MySQL數據庫發送過來的各類數據和處理分發


 

參考資料

[1] http://sonymoon.iteye.com/blog/2245141

 


為尊重原創成果,如需轉載煩請注明本文出處:http://www.cnblogs.com/fernandolee24/p/5196332.html,特此感謝


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM