MyCat源碼分析系列之——SQL下發


更多MyCat源碼分析,請戳MyCat源碼分析系列


 SQL下發

SQL下發指的是MyCat將解析並改造完成的SQL語句依次發送至相應的MySQL節點(datanode)的過程,該執行過程由NonBlockingSession.execute()觸發:

public void execute(RouteResultset rrs, int type) {
        // clear prev execute resources
        clearHandlesResources();
        if (LOGGER.isDebugEnabled()) {
            StringBuilder s = new StringBuilder();
            LOGGER.debug(s.append(source).append(rrs).toString() + " rrs ");
        }

        // 檢查路由結果是否為空
        RouteResultsetNode[] nodes = rrs.getNodes();
        if (nodes == null || nodes.length == 0 || nodes[0].getName() == null
                || nodes[0].getName().equals("")) {
            source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,
                    "No dataNode found ,please check tables defined in schema:"
                            + source.getSchema());
            return;
        }
        if (nodes.length == 1) {
            singleNodeHandler = new SingleNodeHandler(rrs, this);
            try {
                singleNodeHandler.execute();
            } catch (Exception e) {
                LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
                source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
            }
        } else {
            boolean autocommit = source.isAutocommit();
            SystemConfig sysConfig = MycatServer.getInstance().getConfig()
                    .getSystem();
            int mutiNodeLimitType = sysConfig.getMutiNodeLimitType();
            multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit,
                    this);

            try {
                multiNodeHandler.execute();
            } catch (Exception e) {
                LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
                source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
            }
        }
}

從代碼中可以看到,首先對於路由節點信息RouteResultsetNode[]進行了判斷,如果不存在任何需要派發的節點則直接返回;如果是單節點操作,則創建SingleNodeHandler實例,並調用其execute();如果是多節點操作,則創建MultiNodeQueryHandler實例,並調用其execute()

下面先來看單節點操作的SQL下發過程,以下是SingleNodeHandler的execute()方法:

public void execute() throws Exception {
        startTime=System.currentTimeMillis();
        ServerConnection sc = session.getSource();
        this.isRunning = true;
        this.packetId = 0;
        final BackendConnection conn = session.getTarget(node);
        if (session.tryExistsCon(conn, node)) {
            _execute(conn);
        } else {
            // create new connection

            MycatConfig conf = MycatServer.getInstance().getConfig();
            PhysicalDBNode dn = conf.getDataNodes().get(node.getName());
            dn.getConnection(dn.getDatabase(), sc.isAutocommit(), node, this,
                    node);
        }
}

如果session已經有該datanode關聯的后端連接(session.tryExistsCon(conn, node)返回true),則調用_execute()方法下發SQL指令;反之,則調用dn.getConnection()方法從連接池中獲取一個可用連接或新建一個連接,並且由於第4個參數將this作為ResponseHandler對象傳入,獲取到連接后會在PhysicalDatasource.takeCon()中調用handler.connectionAcquired(conn)完成回調,即SingleNodeHandler.connectionAcquired()

public void connectionAcquired(final BackendConnection conn) {
    session.bindConnection(node, conn); _execute(conn);
}

該方法先將獲取到的后端連接關聯到本session中,隨后同樣調用_execute()方法下發SQL指令。_execute()方法的實現如下:

private void _execute(BackendConnection conn) {
        if (session.closed()) {
            endRunning();
            session.clearResources(true);
            return;
        }
        conn.setResponseHandler(this);
        try {
            conn.execute(node, session.getSource(), session.getSource()
                    .isAutocommit());
        } catch (Exception e1) {
            executeException(conn, e1);
            return;
        }
}

首先,很重要的是通過conn.setResponseHandler(this)將SingleNodeHandler與當前后端連接(MySQLConnection)以及連接中包含的MySQLConnectionHandler實例關聯起來,這樣做的目的是當結果返回的時候可以回調SingleNodeHandler相應的方法處理。隨后調用MySQLConnection.execute()

public void execute(RouteResultsetNode rrn, ServerConnection sc,
            boolean autocommit) throws UnsupportedEncodingException {
        if (!modifiedSQLExecuted && rrn.isModifySQL()) {
            modifiedSQLExecuted = true;
        }
        String xaTXID = sc.getSession2().getXaTXID();
        synAndDoExecute(xaTXID, rrn, sc.getCharsetIndex(), sc.getTxIsolation(),
                autocommit);
    }

    private void synAndDoExecute(String xaTxID, RouteResultsetNode rrn,
            int clientCharSetIndex, int clientTxIsoLation,
            boolean clientAutoCommit) {
        String xaCmd = null;

        boolean conAutoComit = this.autocommit;
        String conSchema = this.schema;
        // never executed modify sql,so auto commit
        boolean expectAutocommit = !modifiedSQLExecuted || isFromSlaveDB()
                || clientAutoCommit;
        if (expectAutocommit == false && xaTxID != null && xaStatus == 0) {
            clientTxIsoLation = Isolations.SERIALIZABLE;
            xaCmd = "XA START " + xaTxID + ';';

        }
        int schemaSyn = conSchema.equals(oldSchema) ? 0 : 1;
        int charsetSyn = (this.charsetIndex == clientCharSetIndex) ? 0 : 1;
        int txIsoLationSyn = (txIsolation == clientTxIsoLation) ? 0 : 1;
        int autoCommitSyn = (conAutoComit == expectAutocommit) ? 0 : 1;
        int synCount = schemaSyn + charsetSyn + txIsoLationSyn + autoCommitSyn;
        if (synCount == 0) {
            // not need syn connection
 sendQueryCmd(rrn.getStatement());
            return;
        }
        CommandPacket schemaCmd = null;
        StringBuilder sb = new StringBuilder();
        if (schemaSyn == 1) {
            schemaCmd = getChangeSchemaCommand(conSchema);
            // getChangeSchemaCommand(sb, conSchema);
        }

        if (charsetSyn == 1) {
            getCharsetCommand(sb, clientCharSetIndex);
        }
        if (txIsoLationSyn == 1) {
            getTxIsolationCommand(sb, clientTxIsoLation);
        }
        if (autoCommitSyn == 1) {
            getAutocommitCommand(sb, expectAutocommit);
        }
        if (xaCmd != null) {
            sb.append(xaCmd);
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("con need syn ,total syn cmd " + synCount
                    + " commands " + sb.toString() + "schema change:"
                    + (schemaCmd != null) + " con:" + this);
        }
        metaDataSyned = false;
        statusSync = new StatusSync(xaCmd != null, conSchema,
                clientCharSetIndex, clientTxIsoLation, expectAutocommit,
                synCount);
        // syn schema
        if (schemaCmd != null) {
            schemaCmd.write(this);
        }
        // and our query sql to multi command at last
        sb.append(rrn.getStatement());
        // syn and execute others
        this.sendQueryCmd(sb.toString());
        // waiting syn result...
}

其中又會調用synAndDoExecute()方法,顧名思義是同步並執行,同步的目的在於之前獲取到的后端連接可能在自動提交模式、數據庫名、事務隔離級別和字符集上與當前要求可能不同,因此在真正執行SQL語句之前需要檢查並同步相應如上設置。

如果synCount==0,則說明不需要同步,直接調用sendQuery()發送指令即可;反之,將相應的設置語句依次append到sb中(數據庫切換是個例外,直接發送了COM_INIT_DB包進行設置),並創建一個StatusSync對象,最后添加待執行的SQL語句,隨后調用sendQuery()發送指令。到這里,大家可能會有疑問,在此將需更改的相關設置(數據庫名、字符集等)與SQL語句一起發送(並不等待其設置成功與否),萬一之前的更改失敗怎么辦?MyCat對此就是依靠之前創建的StatusSync對象來處理的,在結果合並的流程介紹中會具體解釋。

到此為止,SingleNodeHandler的SQL語句下發過程就算是結束了,當然底層真正的下發是由負責處理一個連接讀寫事件的NIOSocketWR對象來執行的。

 接下來,看多節點操作SQL語句下發過程,與單節點極其類似,以下是MultiNodeQueryHandler的execute()方法:

public void execute() throws Exception {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            this.reset(rrs.getNodes().length);
            this.fieldsReturned = false;
            this.affectedRows = 0L;
            this.insertId = 0L;
        } finally {
            lock.unlock();
        }
        MycatConfig conf = MycatServer.getInstance().getConfig();
        startTime = System.currentTimeMillis();
        for (final RouteResultsetNode node : rrs.getNodes()) {
            BackendConnection conn = session.getTarget(node);
            if (session.tryExistsCon(conn, node)) {
                _execute(conn, node);
            } else {
                // create new connection
                PhysicalDBNode dn = conf.getDataNodes().get(node.getName());
                dn.getConnection(dn.getDatabase(), autocommit, node, this, node);
            }
        }
}

 不難發現,與單節點的執行過程基本是一致的,無非是打了一層循環,對每個datanode分別進行了同樣的操作而已。

 


為尊重原創成果,如需轉載煩請注明本文出處:

http://www.cnblogs.com/fernandolee24/p/5236237.html,特此感謝


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM