更多MyCat源碼分析,請戳MyCat源碼分析系列
結果合並
在SQL下發流程和前后端驗證流程中介紹過,通過用戶驗證的后端連接綁定的NIOHandler是MySQLConnectionHandler實例,在MySQL服務端返回執行結果時會調用到MySQLConnecionHandler.handleData(),用於不同類型的處理派發:
protected void handleData(byte[] data) { switch (resultStatus) { case RESULT_STATUS_INIT: switch (data[4]) { case OkPacket.FIELD_COUNT: handleOkPacket(data); break; case ErrorPacket.FIELD_COUNT: handleErrorPacket(data); break; case RequestFilePacket.FIELD_COUNT: handleRequestPacket(data); break; default: resultStatus = RESULT_STATUS_HEADER; header = data; fields = new ArrayList<byte[]>((int) ByteUtil.readLength(data, 4)); } break; case RESULT_STATUS_HEADER: switch (data[4]) { case ErrorPacket.FIELD_COUNT: resultStatus = RESULT_STATUS_INIT; handleErrorPacket(data); break; case EOFPacket.FIELD_COUNT: resultStatus = RESULT_STATUS_FIELD_EOF; handleFieldEofPacket(data); break; default: fields.add(data); } break; case RESULT_STATUS_FIELD_EOF: switch (data[4]) { case ErrorPacket.FIELD_COUNT: resultStatus = RESULT_STATUS_INIT; handleErrorPacket(data); break; case EOFPacket.FIELD_COUNT: resultStatus = RESULT_STATUS_INIT; handleRowEofPacket(data); break; default: handleRowPacket(data); } break; default: throw new RuntimeException("unknown status!"); } }
上述代碼片段中用紅色標注的幾個方法是最為核心的,其中handleOkPacket()主要用於insert/update/delete和其余返回OK包的語句返回的執行結果,而handleFieldEofPacket()、handleRowPacket()和handleRowEofPacket()用於select語句返回的執行結果。這幾個方法的內部的流程其實就是分別調用了其上綁定的ResponseHandler(SingleNodeHandler或MultiNodeQueryHandler)實例對應的這幾個方法。
1. 先來看單節點操作的情況,SingleNodeHandler包含的這幾個方法實現如下:
public void okResponse(byte[] data, BackendConnection conn) { boolean executeResponse = conn.syncAndExcute(); if (executeResponse) { session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), false); endRunning(); ServerConnection source = session.getSource(); OkPacket ok = new OkPacket(); ok.read(data); if (rrs.isLoadData()) { byte lastPackId = source.getLoadDataInfileHandler() .getLastPackId(); ok.packetId = ++lastPackId;// OK_PACKET source.getLoadDataInfileHandler().clear(); } else { ok.packetId = ++packetId;// OK_PACKET } ok.serverStatus = source.isAutocommit() ? 2 : 1; recycleResources(); source.setLastInsertId(ok.insertId); ok.write(source); //TODO: add by zhuam //查詢結果派發 QueryResult queryResult = new QueryResult(session.getSource().getUser(), rrs.getSqlType(), rrs.getStatement(), startTime); QueryResultDispatcher.dispatchQuery( queryResult ); } } public void fieldEofResponse(byte[] header, List<byte[]> fields, byte[] eof, BackendConnection conn) { //TODO: add by zhuam //查詢結果派發 QueryResult queryResult = new QueryResult(session.getSource().getUser(), rrs.getSqlType(), rrs.getStatement(), startTime); QueryResultDispatcher.dispatchQuery( queryResult ); header[3] = ++packetId; ServerConnection source = session.getSource(); buffer = source.writeToBuffer(header, allocBuffer()); for (int i = 0, len = fields.size(); i < len; ++i) { byte[] field = fields.get(i); field[3] = ++packetId; buffer = source.writeToBuffer(field, buffer); } eof[3] = ++packetId; buffer = source.writeToBuffer(eof, buffer); if (isDefaultNodeShowTable) { for (String name : shardingTablesSet) { RowDataPacket row = new RowDataPacket(1); row.add(StringUtil.encode(name.toLowerCase(), source.getCharset())); row.packetId = ++packetId; buffer = row.write(buffer, source, true); } } } public void rowResponse(byte[] row, BackendConnection conn) { if(isDefaultNodeShowTable) { RowDataPacket rowDataPacket =new RowDataPacket(1); rowDataPacket.read(row); String table= StringUtil.decode(rowDataPacket.fieldValues.get(0),conn.getCharset()); if(shardingTablesSet.contains(table.toUpperCase())) return; } row[3] = ++packetId; buffer = session.getSource().writeToBuffer(row, allocBuffer()); } public void rowEofResponse(byte[] eof, BackendConnection conn) { ServerConnection source = session.getSource(); conn.recordSql(source.getHost(), source.getSchema(), node.getStatement()); // 判斷是調用存儲過程的話不能在這里釋放鏈接 if (!rrs.isCallStatement()) { session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), false); endRunning(); } eof[3] = ++packetId; buffer = source.writeToBuffer(eof, allocBuffer()); source.write(buffer); }
在okResponse()方法中,首先調用了conn.syncAndExcute(),這個過程就解釋了之前在SQL下發過程中提及的當某個連接現有的設置需要修改時並未等待這些修改成功返回,這兒才對此做出了判斷:
public boolean syncAndExcute() { StatusSync sync = this.statusSync; if (sync == null) { return true; } else { boolean executed = sync.synAndExecuted(this); if (executed) { statusSync = null; } return executed; } }
這里面又進一步依次調用了StatusSync.synAndExecuted()和updateConnectionInfo()方法:
public boolean synAndExecuted(MySQLConnection conn) { int remains = synCmdCount.decrementAndGet(); if (remains == 0) {// syn command finished this.updateConnectionInfo(conn); conn.metaDataSyned = true; return false; } else if (remains < 0) { return true; } return false; } private void updateConnectionInfo(MySQLConnection conn) { conn.xaStatus = (xaStarted == true) ? 1 : 0; if (schema != null) { conn.schema = schema; conn.oldSchema = conn.schema; } if (charsetIndex != null) { conn.setCharset(CharsetUtil.getCharset(charsetIndex)); } if (txtIsolation != null) { conn.txIsolation = txtIsolation; } if (autocommit != null) { conn.autocommit = autocommit; } }
假如當前的連接與所需連接在數據庫名和字符集上存在不同,那需同步的數量為2,如果這兩個修改都成功,那應該分別返回2個OK包(即觸發兩次SingleNodeHandler.okResponse()),在synAndExecuted()中通過對於收到的OK包數量synCmdCount進行判斷,若全部收到則調用updateConnectionInfo(),將該連接的這些設置都設置為新值。該同步過程完成之后,才會真正進入到SQL語句執行返回的結果處理階段(select/insert/update/delete)。
1.1 insert/update/delete
- okResponse():讀取data字節數組,組成一個OKPacket,並調用ok.write(source)將結果寫入前端連接FrontendConnection的寫緩沖隊列writeQueue中,真正發送給應用是由對應的NIOSocketWR從寫隊列中讀取ByteBuffer並返回的。
1.2 select
- fieldEofResponse():元數據返回時觸發,將header和元數據內容依次寫入緩沖區中;
- rowResponse():行數據返回時觸發,將行數據寫入緩沖區中;
- rowEofResponse():行結束標志返回時觸發,將EOF標志寫入緩沖區,最后調用source.write(buffer)將緩沖區放入前端連接的寫緩沖隊列中,等待NIOSocketWR將其發送給應用。
2. 再來看多節點操作的結果合並和返回過程,MultiNodeQueryHandler負責這一過程的執行。
多節點操作和單節點操作的不同之處就在於:
1)接收來自多個MySQL節點各自發送的結果數據,可能需要對所有得到的結果進行簡單的合並(順序是不確定的,滿足FIFO);
2)如果本次操作涉及聚合函數、group by、order by和limit,還需要對所有結果進行一系列歸並
針對第一種情況,insert/update/delete和select的區別如下:
- insert/update/delete:這三類語句都會返回一個OK包,里面包含了最為核心的affectedRows,因此每得到一個MySQL節點發送回的affectedRows,就將其累加,當收到最后一個節點的數據后(通過decrementOkCountBy()方法判斷),將結果返回給前端;
- select:每一個MySQL節點都會依次返回元數據、行數據1、行數據2...、行數據n、行結束標志位(EOF),其中元數據和行結束標志位每個節點返回上來都是一樣的,但MultiNodeQueryHandler負責合並所有數據,因此實際只需要一份元數據、所有節點的行數據以及一份行結束標志位,所以在fieldEofResponse()方法中通過boolean類型的fieldsReturned判斷獲取第一份收到的元數據包,后續的統統丟棄,並在rowEofResponse()方法中通過decrementCountBy()方法判斷是否收到了所有節點的EOF包,將最后一份的EOF寫入緩沖區返回前端。
針對第二種情況,insert/update/delete與第一種情況完全一致,但select的處理由MultiNodeQueryHandler上包含的DataMergeService實例負責歸並結果集(limit其實是在MultiNodeQueryHandler中實現的),我們先來看DataMergeService中包含的核心變量:
private int fieldCount; // 字段數 private RouteResultset rrs; // 路由信息 private RowDataSorter sorter; // 排序器 private RowDataPacketGrouper grouper; // 分組器 private volatile boolean hasOrderBy = false; private MultiNodeQueryHandler multiQueryHandler; public PackWraper END_FLAG_PACK = new PackWraper(); // 結束包 private AtomicInteger areadyAdd = new AtomicInteger(); private List<RowDataPacket> result = new Vector<RowDataPacket>(); // 歸並結果隊列 private static Logger LOGGER = Logger.getLogger(DataMergeService.class); private BlockingQueue<PackWraper> packs = new LinkedBlockingQueue<PackWraper>(); // 原始數據封裝隊列 private ConcurrentHashMap<String, Boolean> canDiscard = new ConcurrentHashMap<String, Boolean>(); // 是否可丟棄標志位map
其中,最為重要的就是用於排序的sorter和用於分組的grouper,以及最后存放歸並結果的行數據包隊列result,DataMergeService中核心的方法實現如下:
public void onRowMetaData(Map<String, ColMeta> columToIndx, int fieldCount) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("field metadata inf:" + columToIndx.entrySet()); } int[] groupColumnIndexs = null; this.fieldCount = fieldCount; if (rrs.getGroupByCols() != null) { groupColumnIndexs = toColumnIndex(rrs.getGroupByCols(), columToIndx); } if (rrs.getHavingCols() != null) { ColMeta colMeta = columToIndx.get(rrs.getHavingCols().getLeft() .toUpperCase()); if (colMeta != null) { rrs.getHavingCols().setColMeta(colMeta); } } if (rrs.isHasAggrColumn()) { List<MergeCol> mergCols = new LinkedList<MergeCol>(); Map<String, Integer> mergeColsMap = rrs.getMergeCols(); if (mergeColsMap != null) { for (Map.Entry<String, Integer> mergEntry : mergeColsMap .entrySet()) { String colName = mergEntry.getKey().toUpperCase(); int type = mergEntry.getValue(); if (MergeCol.MERGE_AVG == type) { ColMeta sumColMeta = columToIndx.get(colName + "SUM"); ColMeta countColMeta = columToIndx.get(colName + "COUNT"); if (sumColMeta != null && countColMeta != null) { ColMeta colMeta = new ColMeta(sumColMeta.colIndex, countColMeta.colIndex, sumColMeta.getColType()); mergCols.add(new MergeCol(colMeta, mergEntry .getValue())); } } else { ColMeta colMeta = columToIndx.get(colName); mergCols.add(new MergeCol(colMeta, mergEntry.getValue())); } } } // add no alias merg column for (Map.Entry<String, ColMeta> fieldEntry : columToIndx.entrySet()) { String colName = fieldEntry.getKey(); int result = MergeCol.tryParseAggCol(colName); if (result != MergeCol.MERGE_UNSUPPORT && result != MergeCol.MERGE_NOMERGE) { mergCols.add(new MergeCol(fieldEntry.getValue(), result)); } } grouper = new RowDataPacketGrouper(groupColumnIndexs, mergCols.toArray(new MergeCol[mergCols.size()]), rrs.getHavingCols()); } if (rrs.getOrderByCols() != null) { LinkedHashMap<String, Integer> orders = rrs.getOrderByCols(); OrderCol[] orderCols = new OrderCol[orders.size()]; int i = 0; for (Map.Entry<String, Integer> entry : orders.entrySet()) { String key = StringUtil.removeBackquote(entry.getKey() .toUpperCase()); ColMeta colMeta = columToIndx.get(key); if (colMeta == null) { throw new java.lang.IllegalArgumentException( "all columns in order by clause should be in the selected column list!" + entry.getKey()); } orderCols[i++] = new OrderCol(colMeta, entry.getValue()); } // sorter = new RowDataPacketSorter(orderCols); RowDataSorter tmp = new RowDataSorter(orderCols); tmp.setLimit(rrs.getLimitStart(), rrs.getLimitSize()); hasOrderBy = true; sorter = tmp; } else { hasOrderBy = false; } MycatServer.getInstance().getBusinessExecutor().execute(this); } public boolean onNewRecord(String dataNode, byte[] rowData) { // 對於需要排序的數據,由於mysql傳遞過來的數據是有序的, // 如果某個節點的當前數據已經不會進入,后續的數據也不會入堆 if (canDiscard.size() == rrs.getNodes().length) { LOGGER.error("now we output to client"); packs.add(END_FLAG_PACK); return true; } if (canDiscard.get(dataNode) != null) { return true; } PackWraper data = new PackWraper(); data.node = dataNode; data.data = rowData; packs.add(data); areadyAdd.getAndIncrement(); return false; } public void run() { int warningCount = 0; EOFPacket eofp = new EOFPacket(); ByteBuffer eof = ByteBuffer.allocate(9); BufferUtil.writeUB3(eof, eofp.calcPacketSize()); eof.put(eofp.packetId); eof.put(eofp.fieldCount); BufferUtil.writeUB2(eof, warningCount); BufferUtil.writeUB2(eof, eofp.status); ServerConnection source = multiQueryHandler.getSession().getSource(); while (!Thread.interrupted()) { try { PackWraper pack = packs.take(); if (pack == END_FLAG_PACK) { break; } RowDataPacket row = new RowDataPacket(fieldCount); row.read(pack.data); if (grouper != null) { grouper.addRow(row); } else if (sorter != null) { if (!sorter.addRow(row)) { canDiscard.put(pack.node, true); } } else { result.add(row); } } catch (Exception e) { LOGGER.error("Merge multi data error", e); } } byte[] array = eof.array(); multiQueryHandler.outputMergeResult(source, array, getResults(array)); } private List<RowDataPacket> getResults(byte[] eof) { List<RowDataPacket> tmpResult = result; if (this.grouper != null) { tmpResult = grouper.getResult(); grouper = null; } if (sorter != null) { // 處理grouper處理后的數據 if (tmpResult != null) { Iterator<RowDataPacket> itor = tmpResult.iterator(); while (itor.hasNext()) { sorter.addRow(itor.next()); itor.remove(); } } tmpResult = sorter.getSortedResult(); sorter = null; } if (LOGGER.isDebugEnabled()) { LOGGER.debug("prepare mpp merge result for " + rrs.getStatement()); } return tmpResult; }
接下來對這幾個方法逐一解釋:
- onRowMetaData():在MultiNodeQueryHandler.fieldEofResponse()中被調用,初始化grouper和sorter,隨后利用線程池調用run()方法;
- onNewRecord():在MultiNodeQueryHandler.rowResponse()中被調用,首先判斷canDiscard的長度是否等於下發的節點個數,如果是說明后續所有節點的數據都會被丟棄,往packs中放入END_FLAG_PACK終止run()中的循環(另外一處更為常規的終止循環是由MultiNodeQueryHandler.rowEofResponse()中收到全部節點的EOF包后觸發的),如果當前節點在canDiscard隊列中也同樣忽略該節點的后續數據,隨后將節點名和行數據封裝成一個PackWraper實例,放入packs中;
- run():核心是一個循環,每次阻塞式地從packs中讀取一個PackWraper實例並生成RowDataPacket實例,如果發現是END_FLAG_PACK就退出,接下來進行if-else判斷:
1)如果需要分組則調用grouper.addRow()添加該行,由於分組是優先於排序的,因此一旦有分組需求,那排序就必須等到所有分組行為完成后才能開始(getResults()中);
2)反之,如果需要排序則調用sorter.addRow()嘗試添加該行,若加入不成功說明該節點后續的數據都不可能加入成功(因為這里的排序是通過構建最大堆MaxHeap實現的,堆一旦滿了就會執行元素淘汰,並且每個節點返回的數據又滿足內部有序),將該節點放入canDiscard中忽略后續數據;
3)反之,直接將該行加入result中。
當循環退出時,調用MultiNodeQueryHandler.outputMergeResult(),其中會先調用getResults()獲取分組數據/分組排序數據/排序數據/普通數據,MultiNodeQueryHandler.outputMergeResult()就是為了執行limit,隨后將處理好的結果集依次寫入緩沖區,最后返回前端,具體實現如下:
public void outputMergeResult(final ServerConnection source, final byte[] eof, List<RowDataPacket> results) { try { lock.lock(); ByteBuffer buffer = session.getSource().allocate(); final RouteResultset rrs = this.dataMergeSvr.getRrs(); // 處理limit語句 int start = rrs.getLimitStart(); int end = start + rrs.getLimitSize(); if (start < 0) start = 0; if (rrs.getLimitSize() < 0) end = results.size(); if (end > results.size()) end = results.size(); for (int i = start; i < end; i++) { RowDataPacket row = results.get(i); row.packetId = ++packetId; buffer = row.write(buffer, source, true); } eof[3] = ++packetId; if (LOGGER.isDebugEnabled()) { LOGGER.debug("last packet id:" + packetId); } source.write(source.writeToBuffer(eof, buffer)); } catch (Exception e) { handleDataProcessException(e); } finally { lock.unlock(); dataMergeSvr.clear(); } }
為尊重原創成果,如需轉載煩請注明本文出處: