一、背景
在測試過程中,對於不同的測試團隊,出於不同的測試目的,我們可能會有多套測試環境。在產品版本迭代過程中,根據業務需求,會對數據庫的結構進行一些修改,如:新增表、字段、索引,修改表、字段索引等操作,在一些流程不規范的公司,開發人員不按照規范操作,不及時將這些修改數據庫的 SQL 提交到 SVN/Git,當修改后的業務代碼部署到新環境時就會引起錯誤,從而影響測試效率。換個角度再說,就算流程規范的大公司,核心業務都采取分庫分表的架構,上千張表難道我們都采用手工執行 SQL 的方式去添加和修改字段嗎?這樣當然不妥,也許會有同學想到,我們可以采取使用腳本語言的方式批量更新和修改對應數據庫,這樣也是一種方式,但這種情況的前提是執行人員非常清楚兩個數據庫的差異,如果執行人員自己也不清楚兩個數據庫之間的差異呢?可能有的同學又想到可以把源數據庫的結構和數據都導入到目標數據庫當中,這樣就解決了。這樣看似可行,但實際不妥。前面我們說了,有多套測試環境,他們的作用可能不一樣,舉個例子:測試環境用於內部測試,聯調環境用於和外部系統的聯調,如果我們把測試環境的數據庫結構和所有數據都導入聯調環境,那么聯調環境原有的數據不存在了,無法再和外部進行聯調了,所以這也不是一種好的方式。
基於以上種種原因,一個數據庫結構同步工具貌似是一個比較好的解決方案。
二、實現功能
基於以上的分析,該工具需要實現以下三個功能
- 分析(diff):分析源數據庫和目標數據庫結構上的差異,在執行同步和拷貝前建議先執行分析來確定源數據庫和目標數據庫的差異;
- 同步(sync):只同步數據庫的結構,不同步數據;
- 拷貝(copy): 對於數據沒有要求的情況,可以直接使用拷貝將源數據庫的數據庫結構和數據全部導入目標數據庫;
三、實現思路
具體流程如下:
- 對傳入指令進行解析,包括:源數據庫和目標數據庫的 IP、端口、用戶名、密碼、數據庫名以及執行動作(diff、sync、copy);
- 分析 db,執行 SQL;
- 分析 db 下的表,執行 SQL;
- 分析表的字段和索引,執行 SQL;
四、分析過程
我們要對數據庫的結構進行比對和分析,包括:數據庫、數據庫下面表、表中的字段和索引,那具體我們應該如何來進行分析和比對呢?
既然我們要做的是MySQL數據庫的同步工具,那么我們對 MySQL 數據庫就需要有深入一點的了解。在MySQL中,把 INFORMATION_SCHEMA 看作是一個數據庫,確切說是信息數據庫。其中保存着關於當前 MySQL 服務器所維護的所有其他數據庫的信息。如數據庫名,數據庫的表、表的字段與索引以及訪問權限等等。所以我們應該關注的是 INFORMATION_SCHEMA 中的以下幾張表:
- SCHEMATA:提供了當前mysql實例中所有數據庫的信息。SHOW DATABASES 的結果取之此表;
- TABLES:提供了關於數據庫中的表的信息(包括視圖)。詳細表述了某個表屬於哪個 schema,表類型,表引擎,創建時間等信息。SHOW TABLES FROM SCHEMANAME的結果取之此表;
- COLUMNS:提供了表中的列信息。詳細表述了某張表的所有列以及每個列的信息。SHOW COLUMNS FROM SCHEMANAME.TABLENAME 的結果取之此表;
- STATISTICS:提供了關於表索引的信息。SHOW INDEX FROM SCHEMANAME.TABLENAME 的結果取之此表;
關鍵 SQL:
- SELECT * FROM SCHEMATA WHERE SCHEMA_NAME='XXX';
- SELECT * FROM TABLES WHERE TABLE_SCHEMA='XXX' AND TABLE_NAME='XX';
- SELECT * FROM COLUMNS WHERE TABLE_SCHEMA='XXX' AND TABLE_NAME='XX';
- SELECT * FROM STATISTICS WHERE TABLE_SCHEMA='XXX' AND TABLE_NAME='XX';
五、代碼實現
這里只對 sync(同步)做簡單介紹:

@Slf4j public class App { /** * java -jar xxxx.jar src dst action * * src: host:port:username:passwd * * dst: host:port:username:passwd * * action: sync(同步)|diff(比對)|copy(復制) * * eg. java -jar day09-1.0.0.jar 127.0.0.1:3366:root:123456 127.0.0.1:3377:root:123456 sync * * @param args src dst action */ public static void main(String[] args) { log.info("db schema sync start, args={}", Arrays.toString(args)); start(args); } public static void start(String[] args){ // 1.校驗,參數個數,類型,格式不對校驗 calibration(args); // 2.解析,將args 解析成 SyncActionDTO SchemaActionDTO actionDTO = parse(args); // System.out.println(actionDTO); // 3.執行同步/比對/復制 SchemaHander.doAction(actionDTO); } }
入口類包含三個步驟:校驗參數、解析參數、執行操作(doAction)

public class SchemaHander { public static void doAction(SchemaActionDTO actionDTO){ ConnectDTO src = actionDTO.getSrc(); ConnectDTO dst = actionDTO.getDst(); Action action = actionDTO.getAction(); if (Action.SYNC.equals(action)){ SyncHander.doSync(src,dst); }else if (Action.DIFF.equals(action)){ DiffHander.doDiff(src,dst); }else if (Action.COPY.equals(action)){ CopyHander.doCopy(src,dst); }else { throw new IllegalStateException("do not supprt this action"); } } }
根據接收到的指令的第三個參數從而做對應的操作(diff、sync、copy)

public class SyncHander { /** * 分析src和dst兩個數據庫實例 * @param src * @param dst */ public static void doSync(ConnectDTO src,ConnectDTO dst){ // 1.解析src和dst中的db差異,相同的數據庫名和不同的數據庫名 Pair<Set<String>, Set<String>> dbPair = parseDb(src, dst); System.out.println("dbPair = " + dbPair); // 2.src有,dst無 DbHander.copyDb(src, dst, dbPair.getLeft()); // 3.src有,dst有 DbHander.diffDb(src, dst, dbPair.getRight()); } }
解析源數據庫和目標數據庫的差異,相同的數據庫和不同的數據庫(不同的指的是src中有二dst中沒有)。
src 中有而 dst 中沒有的數據庫,直接在 dst 中創建數據庫、表和索引。
src 中和 dst 中都有的數據庫,則進一步分析該數據庫中的表的情況。

public class DbHander { /** * 分析db,src有,dst有 * @param src * @param dst * @param target */ public static void diffDb(ConnectDTO src, ConnectDTO dst, Set<String> target){ for (String db : target) { // 解析src和dst中的同名的數據庫的差異,返回該數據庫中表的差異,相同的表名和不同的表名 Pair<Set<String>, Set<String>> tablePair = parseTable(src, dst, db); // 復制差異表 TableHandler.copyTable(src, dst, db, tablePair.getLeft()); // 對比相同表 TableHandler.diffTable(src, dst, db, tablePair.getRight()); } } }
套路和分析數據庫一樣
src 中有而 dst 中沒有的表,直接在 dst 中創建。
src 中和 dst 中都有的表,則進一步分析該表的所有字段和字段屬性。

public class TableHandler { /** * 分析相同表的字段和索引 * @param src 源 * @param dst 目標 * @param db 數據庫 * @param targetTables 分析的目標表 */ public static void diffTable(ConnectDTO src, ConnectDTO dst, String db, Set<String> targetTables) { for (String table : targetTables) { // 1.分析差異字段 Pair<Set<String>, Set<String>> columnPair = parseColumn(src, dst, db, table); // 2.復制src有,dst無 ColumnHandler.copyColumn(src, dst, db, table, columnPair.getLeft()); // 3.分析src有,dst有 ColumnHandler.diffColumn(src, dst, db, table, columnPair.getRight()); // 1.分析差異索引 Pair<Set<String>, Set<String>> indexPair = parseIndex(src, dst, db, table); // 2.復制src有,dst無 IndexHander.copyIndex(src, dst, db, table, indexPair.getLeft()); // 3.分析src有,dst有 IndexHander.diffIndex(src, dst, db, table, indexPair.getRight()); } } }
src 中有而 dst 中沒有的字段和索引,直接在 dst 中創建。
src 中和 dst 中都有的字段和索引,則進一步分析。
需要注意的是索引,由於索引分為普通索引、唯一索引、主鍵索引和組合索引幾種類型,所以在生成修改 SQL 時會比較復雜。

public class ColumnHandler { 長度、是否可為空、默認值、注釋 * @param src 源數據庫實例 * @param dst 目標數據庫實例 * @param db 數據庫 * @param table 表 * @param targetColumns 分析的目標列 */ public static void diffColumn(ConnectDTO src, ConnectDTO dst, String db, String table, Set<String> targetColumns) { for (String column : targetColumns) { String queryColumnInfoSql = String.format( "select * from COLUMNS where TABLE_SCHEMA='%s' and TABLE_NAME='%s' AND COLUMN_NAME='%s'", db, table, column); // 1.取出src中的column的幾個我們關注的屬性,COLUMN_TYPE,COLUMN_COMMENT,IS_NULLABLE,COLUMN_DEFAULT Set<ColumnInfoDTO> srcColumnSet = JdbcUtils .read(src, ConnectConsts.INFO_SCHEMA_DB_NAME, queryColumnInfoSql) .stream() .map(entity -> ColumnInfoDTO.builder() .columnComment(entity.get("COLUMN_COMMENT").toString()) .columnDefault(StringUtils.defaultString(String.valueOf(entity.get("COLUMN_DEFAULT")), "")) .columnType(entity.get("COLUMN_TYPE").toString()) .isNullable(entity.get("IS_NULLABLE").toString()) .build()).collect(Collectors.toSet()); // 2.取出dst中的column的幾個我們關注的屬性,COLUMN_TYPE,COLUMN_COMMENT,IS_NULLABLE,COLUMN_DEFAULT Set<ColumnInfoDTO> dstColumnSet = JdbcUtils .read(dst, ConnectConsts.INFO_SCHEMA_DB_NAME, queryColumnInfoSql) .stream() .map(entity -> ColumnInfoDTO.builder() .columnComment(entity.get("COLUMN_COMMENT").toString()) .columnDefault(StringUtils.defaultString(String.valueOf(entity.get("COLUMN_DEFAULT")), "")) .columnType(entity.get("COLUMN_TYPE").toString()) .isNullable(entity.get("IS_NULLABLE").toString()) .build()).collect(Collectors.toSet()); // 3.逐個去對比,如果不一樣,就生成修改SQL,如果一樣,就什么都不做 // 3.1 這個differenceColumn是需要去修改到dst中的 Set<ColumnInfoDTO> differenceColumn = Sets.difference(srcColumnSet, dstColumnSet) .immutableCopy(); for (ColumnInfoDTO infoDTO : differenceColumn) { String sql = String.format("alter table %s modify column %s %s %s %s comment '%s'", table, column, infoDTO.getColumnType(), isNullableSet(infoDTO.getIsNullable()), isDefaultSet(infoDTO.getColumnDefault()), infoDTO.getColumnComment() ); JdbcUtils.write(dst, db, sql); } } } }

public class IndexHander { /** * 分析相同表相同索引的屬性,並修改dst中索引的屬性 * 屬性包括索引類型,是否唯一,單索引還是組合索引 * @param src 源 * @param dst 目標 * @param db db * @param table 表 * @param targeIndexs 分析的目標索引 */ public static void diffIndex(ConnectDTO src, ConnectDTO dst, String db, String table, Set<String> targeIndexs){ for (String index : targeIndexs) { String queryIndexInfoSql = String.format( "select * from STATISTICS where TABLE_SCHEMA='%s' and TABLE_NAME='%s' and INDEX_NAME='%s'", db, table, index); // 查出該index信息的返回結果,如果是組合索引,一個索引名對應多條記錄 List<Map<String, Object>> entities = JdbcUtils .read(src, ConnectConsts.INFO_SCHEMA_DB_NAME, queryIndexInfoSql); // 1.取出src中的index的幾個我們關注的屬性,COLUMN_NAME,NON_UNIQUE,SEQ_IN_INDEX Set<IndexInfoDTO> srcIndexSet = JdbcUtils .read(src, ConnectConsts.INFO_SCHEMA_DB_NAME, queryIndexInfoSql) .stream() .map(entity-> IndexInfoDTO.builder() .columnName(entity.get("COLUMN_NAME").toString()) .nonUnique(entity.get("NON_UNIQUE").toString()) .seqInIndex(entity.get("SEQ_IN_INDEX").toString()) .build()).collect(Collectors.toSet()); // 2.取出dst中的index的幾個我們關注的屬性,COLUMN_NAME,NON_UNIQUE,SEQ_IN_INDEX Set<IndexInfoDTO> dstIndexSet = JdbcUtils .read(dst, ConnectConsts.INFO_SCHEMA_DB_NAME, queryIndexInfoSql) .stream() .map(entity-> IndexInfoDTO.builder() .columnName(entity.get("COLUMN_NAME").toString()) .nonUnique(entity.get("NON_UNIQUE").toString()) .seqInIndex(entity.get("SEQ_IN_INDEX").toString()) .build()).collect(Collectors.toSet()); // 對比,找出名稱一樣,但是屬性不一樣的索引。組合索引的比對有問題 Set<IndexInfoDTO> differenctIndex = Sets.difference(srcIndexSet, dstIndexSet).immutableCopy(); System.out.println("differenctIndex.size() = " + differenctIndex.size()); for (IndexInfoDTO infoDTO : differenctIndex) { System.out.println("infoDTO = " + infoDTO); } String sql = null; // 單列索引 if (differenctIndex.size() == 1) { // 先刪除dst中的索引 deleteIndex(dst,db,table,index); // 再在dst中創建索引 for (IndexInfoDTO indexDTO : differenctIndex) { if ("PRIMARY".equals(index)) { sql = String.format("ALTER TABLE %s ADD PRIMARY KEY(%s);", table, indexDTO.getColumnName()); }else { sql = String.format("ALTER TABLE %s ADD %s %s(%s)", table, isNonUnique(indexDTO.getNonUnique()), index, indexDTO.getColumnName()); } JdbcUtils.write(dst,db,sql); } // 組合索引 }else if (differenctIndex.size() > 1) { // 先刪除dst中的索引 deleteIndex(dst,db,table,index); // 再在dst中創建索引 String[] arrs = getPair(entities).getLeft(); String nonUnique = getPair(entities).getRight(); if ("PRIMARY".equals(index)){ String baseSql = "alter table %s add primary key("; String formatSql = formatSql(arrs, baseSql); sql = String.format(formatSql, table); }else { String baseSql = "alter table %s add %s %s("; String formatSql = formatSql(arrs, baseSql); sql = String.format(formatSql, table, isNonUnique(nonUnique), index); } JdbcUtils.write(dst,db,sql); } } } }
這里是分析字段和索引的過程。
以上所有代碼,復制數據庫、表、字段、索引的代碼都沒有貼出來,大家可以自己來實現。
另外,最后索引的分析有一個 bug,希望大家可以發現。
六、問題
上面,我們基本實現了這個工具的框架,但是還存在一些問題:
Connection
- 使用連接池,並且基於連接信息做了一個Map<ConnectDTO,DruidDatasource>
SQL 執行
- 使用批量執行SQL;
多線程執行
- 任務分割去從線程池中申請線程,然后去執行;