Ghost-無損DDL


一、什么是DDL?

DDL全稱:Data Definition Language

它包含三個主要的關鍵字:create、drop、alter

操作 statement
創建數據庫 create database
刪除數據庫 drop database
修改數據庫 alter database
創建表 create table
刪除表 drop table
修改表 alter table
創建索引 create index
刪除索引 drop index

二、表級鎖和元數據鎖

MySQL的表級鎖有兩種,一種是表鎖,一種是元數據鎖MDL

2.1、什么是表鎖?

表鎖的語法: lock tables ... read/write

釋放時機:通過unlock tables 主動釋放,當客戶端斷開時也會自動釋放。

例如:線程A執行: lock tables t1 read, t2 write; 那其他線程寫t1和讀寫t2時都會被阻塞, 而且線程A在unlock tables 之前,也只能執行讀t1,讀寫t2,它自己也不被允許寫t1。

image-20200722221231987

2.2、什么是MDL?

元數據鎖也是一種表級鎖:metadata lock。

作用:

我們不需要顯示的使用它,當訪問一個表的時候,它會被自動加上。MDL鎖的作用就是保證讀寫的正確性

說白了,就是實現:當有用戶對表執行DML相關操作時,其他線程不能把表結構給改了(想改表結構也可以,等排在它前面的DML全部執行完)。反之,當有線程在更改表結構時,其他線程需要執行的DML也會被阻塞住。

特性:

1、系統默認添加。

2、讀鎖之間不互斥。

3、讀寫鎖之間互斥。

三、什么是無損DDL?

需求:

一般對公司對業務線來說,總是難免遇到需要修改線上表結構的需求。比如搞個活動,結果發現:現有的表中的列不夠用了,那么就需要對現有的表進行無損DDL操作,添加一列。

有損DDL

為什么直接執行alter table add column有損

如下圖所示:你alter table時是需要獲取元數據鎖的寫鎖的,而所有的DML操作又會被默認的加上元數據讀鎖。如果所有的語句都是DML語句那皆大歡喜,大家都是讀鎖彼此不影響。

image-20200722134354760

但是你看上圖這突然整出來一個alter語句,一旦等他持有寫鎖后,去執行DDL語句時期間,所有的DML語句全部被阻塞,我們稱這種情況對業務來說是有損的。

無損DDL

所謂的無損是相對於業務來說的,如果能做到執行DDL的過程中,對業務無影響,那我們稱這種ddl是無損的。

至於如何無損的解決這個問題,接着看下文。

四、DDL重建表

什么是重建表?為什么要重建表?

當我執行delete語句刪除表A中的數據時,對應Innodb來說其實只是在標記刪除,而是不實實在在的將表空間中的數據刪除,對應innodb來將被標記刪除的位置是可以可重復使用。

那么delete語句多了,表空間上的空洞就多了,磁盤的占用量也只增不減。這時我們就得重建表。縮小表A上的空洞

重建表的方法:

方式1、可以新建一個新的表,然后將原表中的數據按照id生序一次拷貝過去。

方式2、也可以執行alter table A engine=InnoDB 來重建表。

這里的alter table 其實就是DDL語句

Mysql5.5之前重建表

在5.5之前,mysql執行alter table A engine=InnoDB的流程如下圖:

image-20200722225453513

在上面的過程中,MySQL會自動的為我們創建臨時表,拷貝數據,交換表名,以及刪除舊表。

特點:

一、這個過程並不是安全的。因為在往tmp表中寫數據的過程中,如果有業務流量寫入表A,而且寫入的位置是不久前完成往tmp中拷貝的位置,就會導致數據的丟失。

二、即使是MySQL會我們自動的創建臨時表,數據拷貝的過程依然是在MySQL-Server層面做的。

Mysql5.6之后重建表

重建表的過程如下圖:

image-20200722231100008

1、創建一個tmp_file, 掃描表A主鍵的所有數據頁。

2、使用數據頁中的記錄生成B+樹,存儲進tmp_file中。

這一步是對針對數據文件的操作。由innodb直接完成。

3、在生成轉存B+數的過程中,將針對A的寫操作記錄在row_log日志中。

4、完成了B+樹的轉存后,將row_log中記錄的日志在tmp_file中回放。

5、使用臨時文件替換A中的數據文件。

可以看到,這個過程其實已經實現無損了。因為在做數據遷移的過程中,允許對原表進行CRUD

局限性:

這種DDL本質上是在替換表空間中的數據文件,僅僅是用於對原表進行無損DDL瘦身。而不是解決我們開題所說的動態無損加列的情況。

五、ghost工具源碼梳理

5.1、工作模式

image-20200726194251281

5.1.1 主從模式a

ghost會連接上主庫

從主庫中讀取數據rowCopy主庫上的影子表中。

添加對從庫binlog的監聽。將binlog-event轉換成sql應用在主庫上的影子表上。

因為ghost會去解析binlog,所以要求從庫的binlog格式必須上row格式。不對主庫的binlog格式有要求。

cutOver在主庫上完成。

5.1.2 主主模式b

ghost會連接上主庫

從主庫中讀取數據rowCopy主庫上的影子表中。

添加對主庫binlog的監聽。將binlog-event轉換成sql應用在主庫上的影子表上。

要求主庫的binlog格式為row格式。

cutOver在主庫上完成。

5.1.3 migrate/test on relica

一般這中情況是在做預檢查時完成才使用到的,ghost的任何操作都在從庫上完成,主要是驗證整個流程是否可以跑通,相關參數:-test-on-replica

上面所說的主主、主從、並不是MySQL實例的主從關系。說的是 rowCopy和binlog的同步是在誰身上進行。

5.1、前置性檢查

這一步主要是去檢查從庫的基礎信息。比如執行table row count、schema validation、hearbeat 但是當我們有提供 --allow-on-master時,inspector指的是主庫。

  • 校驗alter語句,允許重命名列名,但是不允許重命名表名
validateStatement()
  • 測試DB的連通性: 實現的思路是使用根據用戶輸入的數據庫連接信息獲取到和主/從庫的連接,然后使用db.QueryRow(sqlStr)執行指定的SQL,觀察獲取到的結果是否符合預期。
versionQuery := `select @@global.version`
err := db.QueryRow(versionQuery).Scan(&version); 

extraPortQuery := `select @@global.extra_port`
db.QueryRow(extraPortQuery).Scan(&extraPort);

portQuery := `select @@global.port`
db.QueryRow(portQuery).Scan(&port); 
  • 權限校驗,確保用戶給定sql對相應的庫表有足夠的操作權限:思路獲取db連接,執行如下的sql;將可能的情況枚舉出來,和sql返回的語句比對
`show /* gh-ost */ grants for current_user()`

在控制台執行sql的shili
 mysql> show  grants for current_user();
+-------------------------------------------------------------+
| Grants for root@%                                           |
+-------------------------------------------------------------+
| GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION |
+-------------------------------------------------------------+
1 row in set (0.01 sec)

	err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
		for _, grantData := range rowMap {
			grant := grantData.String
			if strings.Contains(grant, `GRANT ALL PRIVILEGES ON *.*`) {
				foundAll = true
			}
			if strings.Contains(grant, `SUPER`) && strings.Contains(grant, ` ON *.*`) {
				foundSuper = true
			}
      ...
                               
  if foundAll {
		log.Infof("User has ALL privileges")
		return nil
	}                             
  • 校驗binlog的格式

實現的思路同樣是執行下面的sql,查看bin-log是否是row格式。以及binlog_row_image是否是FULL格式。

從庫強制要求:binlog為 ROW模式,還要開啟log_slave_updates(告訴從服務器將其SQL線程執行的更新記入到從服務器自己的binlog中)。

為什么會這么要求binlog為row格式?

rowlevel的日志內容會非常清楚的記錄下每一行數據修改的細節,而ghost有專門的go協程專門負責解析binlog同步增量數據。

相關參數:

--switch-to-rbr 作用:讓gh-ost自動將從庫的binlog_format轉換為ROW格式。(ghost不會將格式還原回之前的狀態)

--assume-rbr 作用: 如果你確定從庫的bin-log格式就是row格式。可以使用這個參數,他可以保證禁止從庫上運行stop slave,start slave

query := `select @@global.log_bin, @@global.binlog_format`
this.db.QueryRow(query).Scan(&hasBinaryLogs, &this.migrationContext.OriginalBinlogFormat); 

query = `select @@global.binlog_row_image`	
this.db.QueryRow(query).Scan(&this.migrationContext.OriginalBinlogRowImage);

#正常從控制台執行命令得到的結果如下:
mysql> select @@global.log_bin, @@global.binlog_format;
+------------------+------------------------+
| @@global.log_bin | @@global.binlog_format |
+------------------+------------------------+
|                1 | ROW                    |
+------------------+------------------------+
1 row in set (0.01 sec)

mysql> select @@global.binlog_row_image;
+---------------------------+
| @@global.binlog_row_image |
+---------------------------+
| FULL                      |
+---------------------------+
1 row in set (0.00 sec)

解析一下檢測的兩參數,一個要求binlog為row格式(比如主主模式,就強制主庫的binlog是row格式),另一個參數期望 binlog_row_image為full模式, 原因是因為row格式的binlog記錄sql對這行數據的做了哪些修改,而full + row 格式的binlog記錄了最最詳細的信息,包含sql對哪個庫、哪個表、以及這個表的第一列、第二列...的值是什么 + 現在這條SQL想把這個表中的字段更新成什么,因為增量數據是從binlog里面拷貝出來的,所以記錄最全的binlog 關於binlog格式+binlog_row_image 可參考:https://www.cnblogs.com/gomysql/p/6155160.html

5.2、創建streamer監聽binlog

這一步同樣是在 migrator.go的Migrate()this.initiateInspector(); 函數中。

首先創建一個 eventsStreamer,因為他要求同步bin-log,所以為他初始化一個DB連接。
if err := this.eventsStreamer.InitDBConnections(); err != nil {
		return err
}
// 細節:
// 獲取連接數據庫的url:
this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
// root:root@tcp(127.0.0.1:3307)/lossless_ddl_test?interpolateParams=true&autocommit=true&charset=utf8mb4,utf8,latin1&tls=false
// 其中參數:interpolateParams=true用於防止sql注入攻擊
// 其中參數:autocommit=true 表示每一條sql都當做一個事物自動提交,一般推薦這樣做,如果不自動提交的話很容易出現長事物,系統也會因為這個長事物維護很大的readView占用存儲空間。還可能長時間占用鎖資源不釋放,增加死鎖的幾率。

// 獲取DB實例
mysql.GetDB(this.migrationContext.Uuid, EventsStreamerUri);

// 校驗連接的可用性
base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext);
	-- 思路還是和5.1小節一致,使用db.QueryRow(versionQuery).Scan(&version); 執行sql,觀察結果
	-- `select @@global.version`
	-- `select @@global.extra_port`
  -- `select @@global.port`

//獲取當前binlog的位點信息
//在ghost啟動的時候會先獲取mysql集群的bin-log狀態,因為ghost的設計哲學是,現有的數據從原表select出來灌進影子表,在同步的過程中增量的數據通過解析重放binlog來實現,那獲取集群中當前的bin-log信息自然也是master中讀取:如下:
query := `show /* gh-ost readCurrentBinlogCoordinates */ master status`
foundMasterStatus := false
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {

//正常從console中執行命令獲取到的結果如下:
  mysql> show  master status;
+------------------+----------+--------------+------------------+---------------------------------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set                           |
+------------------+----------+--------------+------------------+---------------------------------------------+
| MySQL-bin.000007 |      194 |              |                  | a89eec96-b882-11ea-ade2-0242ac110002:1-8844 |
+------------------+----------+--------------+------------------+---------------------------------------------+


//獲取到當前binlog位點后,ghost會將自己偽裝成一個replica連接上master,同步master的binlog
//具體的實現依賴第三方類庫:	"github.com/siddontang/go-mysql/replication"
//調用 *replication.BinlogSyncer 的如下方法同步bin-log
func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
  

 // 這是你可以嘗試往主庫寫幾條語句,然后flush log,ghost是能感知到的
[info] binlogsyncer.go:723 rotate to (MySQL-bin.000008, 4)
INFO rotate to next log from MySQL-bin.000008:6074 to MySQL-bin.000008
[info] binlogsyncer.go:723 rotate to (MySQL-bin.000008, 4)
INFO rotate to next log from MySQL-bin.000008:0 to MySQL-bin.000008

5.3、創建xxx_ghc表,xxx_gho表。

// 創建applier
NewApplier(this.migrationContext)

// 初始化DB連接(和上面說過的步驟雷同)
this.applier.InitDBConnections();

// 根據配置判斷是否刪除ghost表。相關的配置參數:--initially-drop-old-table
// 所謂的ghost就是xxx_ghc表,xxx_gho表,一個是日志表,一個是影子表,他們是ghost創建的表
// 其中前者中存放ghost打印的日志,后者是未來替換現以后表的影子表。ghost在這里判斷,如果mysql實例中已經存在這兩個表,是不允許進行剩下的任務的,但是可以使用--initially-drop-old-table參數配置,ghost在啟動的過程中碰到這個表就把他刪除。(ghost任務他們都是殘留表)
if this.migrationContext.InitiallyDropGhostTable {
		if err := this.DropGhostTable(); err != nil {
			return err
		}
	}
// 刪除表的語句如下:
drop /* gh-ost */ table if exists `lossless_ddl_test`.`_user_gho`
drop /* gh-ost */ table if exists `lossless_ddl_test`.`_user_del`

// 創建日志表,建表語句如下
create /* gh-ost */ table `lossless_ddl_test`.`_user_ghc` (
			id bigint auto_increment,
			last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
			hint varchar(64) charset ascii not null,
			value varchar(4096) charset ascii not null,
			primary key(id),
			unique key hint_uidx(hint)
		) auto_increment=256
		

// 創建影子表,一開始創建的影子表其實就是原表。
create /* gh-ost */ table `lossless_ddl_test`.`_user_gho` like `lossless_ddl_test`.`user`

// 對影子表執行alter語句,alter語句不在原表上執行也就不會出現所謂的表級鎖或者MDL寫鎖去阻塞業務方的sql
alter /* gh-ost */ table `lossless_ddl_test`.`_user_gho` add column newC6 varchar(24);

// 寫心跳記錄
			insert /* gh-ost */ into `lossless_ddl_test`.`_user_ghc`
				(id, hint, value)
			values
				(NULLIF(?, 0), ?, ?)
			on duplicate key update
				last_update=NOW(),
				value=VALUES(value)
	
			insert /* gh-ost */ into `lossless_ddl_test`.`_user_ghc`
				(id, hint, value)
			values
				(NULLIF(?, 0), ?, ?)
			on duplicate key update
				last_update=NOW(),
				value=VALUES(value)
			
// 這時可以去數據庫中查看ghost
mysql> select * from _user_ghc;
+-----+---------------------+------------------------------+--------------------+
| id  | last_update         | hint                         | value              |
+-----+---------------------+------------------------------+--------------------+
|   2 | 2020-07-25 19:55:11 | state                        | GhostTableMigrated |
| 256 | 2020-07-25 19:55:39 | state at 1595678112426190000 | GhostTableMigrated |
+-----+---------------------+------------------------------+--------------------+


// 接着獲取slave的狀態,主要是獲到slave落后於master到秒數
// 獲取到方法如下:通過show slave status查看從庫的主從同步狀態,其中的Slave_IO_Running和Slave_SQL_Running為YES說明主從同步正常工作,Seconds_Behind_Master為當前的從庫中的數據落后於主庫的秒數
	err = sqlutils.QueryRowsMap(informationSchemaDb, `show slave status`, 
            func(m sqlutils.RowMap) error {
								slaveIORunning := m.GetString("Slave_IO_Running")
								slaveSQLRunning := m.GetString("Slave_SQL_Running")
								secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master")

5.4 開始遷移數據

在開始migration數據之前做一些檢查工作

// 校驗master和slave表結構是否相同,具體的實現會分別獲取到他們的列的name然后比較
table structure is not identical between master and replica

// 獲取到原表和ghost表唯一鍵的交集
sharedUniqueKeys, err := this.getSharedUniqueKeys(this.migrationContext.OriginalTableUniqueKeys, this.migrationContext.GhostTableUniqueKeys)


// 校驗唯一鍵。
for i, sharedUniqueKey := range sharedUniqueKeys {
		this.applyColumnTypes(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &sharedUniqueKey.Columns)
		uniqueKeyIsValid := true
    // 校驗唯一鍵的類型,ghost不支持FloatColumnType和Json列。
		for _, column := range sharedUniqueKey.Columns.Columns() {
			switch column.Type {
			case sql.FloatColumnType:
				{
					log.Warning("Will not use %+v as shared key due to FLOAT data type", sharedUniqueKey.Name)
					uniqueKeyIsValid = false
				}
			case sql.JSONColumnType:
				{
					// Noteworthy that at this time MySQL does not allow JSON indexing anyhow, but this code
					// will remain in place to potentially handle the future case where JSON is supported in indexes.
					log.Warning("Will not use %+v as shared key due to JSON data type", sharedUniqueKey.Name)
					uniqueKeyIsValid = false
				}
			}
		}

// 檢驗選出的唯一鍵,如果沒有選出唯一鍵的話報錯求助。
if this.migrationContext.UniqueKey == nil {
		return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out")
}

  
// 檢查唯一鍵是否可以是空的。默認是不允許唯一鍵為空的,如果沒辦法改變讓他不為空,ghost也提供了參數去適配
  if this.migrationContext.UniqueKey.HasNullable {
		if this.migrationContext.NullableUniqueKeyAllowed {
			log.Warningf("Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data", this.migrationContext.UniqueKey)
		} else {
			return fmt.Errorf("Chosen key (%s) has nullable columns. Bailing out. To force this operation to continue, supply --allow-nullable-unique-key flag. Only do so if you are certain there are no actual NULL values in this key. As long as there aren't, migration should be fine. NULL values in columns of this key will corrupt migration's data", this.migrationContext.UniqueKey)
		}
	}

// 獲取原表和影子表的交集列
this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns, this.migrationContext.OriginalTableVirtualColumns, this.migrationContext.GhostTableVirtualColumns, this.migrationContext.ColumnRenameMap)
//打印日志
INFO Shared columns are id,status,newC1,newC2,newC3,newC5,newC4

//做其他額外的檢查

做完了上面的檢查工作就可以真正的遷移數據了。

數據遷移部分的主要邏輯在如下函數中

  // 統計當前有多少行
  // 具體實現:新開協程,用於統計一共多少行,執行如下如下sql
  // select /* gh-ost */ count(*) as rows from %s.%s`
  if err := this.countTableRows(); err != nil {
		return err
	}
	
	// 添加DML語句監聽器
  // addDMLEventsListener開始監聽原始表上的binlog事件,並為每個此類事件創建一個寫任務並將其排隊。 
  // 由executeWriteFuncs專門負責消費這個隊列。
	if err := this.addDMLEventsListener(); err != nil {
		return err
	}

  // 獲取遷移的范圍
  //  -- 執行sql:獲取最小的id值
  //    select /* gh-ost `lossless_ddl_test`.`user` */ `id`
	//			from
	//				`lossless_ddl_test`.`user`
	//  		order by
	//				`id` asc
	//			limit 1
				
	//   -- 執行sql:獲取最大id值
  //   	select /* gh-ost `lossless_ddl_test`.`user` */ `id`
	//				from
	//					`lossless_ddl_test`.`user`
	//  			order by
	//					`id` desc
	//				limit 1
	if err := this.applier.ReadMigrationRangeValues(); err != nil {
		return err
	}

  // 這兩個協程分別是遷移任務的執行者。
  // 和遷移任務的創建者。
	go this.executeWriteFuncs()
	go this.iterateChunks()

下圖是:上面代碼中最后兩個協程之間是如何配合工作的邏輯圖

通過上圖可以看出其中的executeWrite的主要作用其實是執行任務。

那他要執行的任務有主要有兩種:

  • 數據遷移任務:copyRowsFunc()
  • 同步binlog事件的函數:ApplyDMLEventQueries()

其中copyRowsFunc()如下:

	copyRowsFunc := func() error {
			if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 {
				// Done. There's another such check down the line
				return nil
			}

			// 當hasFurterRange為false時,原始表可能被寫鎖定,
      // CalculateNextIterationRangeEndValues將永遠掛起
			hasFurtherRange := false
			if err := this.retryOperation(func() (e error) {
				hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues()
				return e
			}); err != nil {
				return terminateRowIteration(err)
			}
			if !hasFurtherRange {
				atomic.StoreInt64(&hasNoFurtherRangeFlag, 1)
				return terminateRowIteration(nil)
			}
			// Copy task:
			applyCopyRowsFunc := func() error {
				if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
					return nil
				}
				_, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery()
				if err != nil {
					return err // wrapping call will retry
				}
				atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected)
				atomic.AddInt64(&this.migrationContext.Iteration, 1)
				return nil
			}
			if err := this.retryOperation(applyCopyRowsFunc); err != nil {
				return terminateRowIteration(err)
			}
			return nil
		}

如上函數有個需要關注的點:

第一:上面的CalculateNextIterationRangeEndValues()函數用於計算下一個迭代的范圍,他會構建出類似如下的sql,默認的chunkSize是1000

				select  /* gh-ost `lossless_ddl_test`.`user` iteration:1 */
						`id`
					from
						`lossless_ddl_test`.`user`
					where ((`id` > ?)) and ((`id` < ?) or ((`id` = ?)))
					order by
						`id` asc
					limit 1
					offset 999

第二:也是如上函數中的核心邏輯是:this.applier.ApplyIterationInsertQuery()

func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
	startTime := time.Now()
	chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize)
	// 構建查詢的sql
	query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
		this.migrationContext.DatabaseName,
		this.migrationContext.OriginalTableName,// 原表名
		this.migrationContext.GetGhostTableName(), // 幽靈表名
		this.migrationContext.SharedColumns.Names(),
		this.migrationContext.MappedSharedColumns.Names(),
		this.migrationContext.UniqueKey.Name,
		&this.migrationContext.UniqueKey.Columns,
		this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
		this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
		this.migrationContext.GetIteration() == 0,
		this.migrationContext.IsTransactionalTable(),
	)
	if err != nil {
		return chunkSize, rowsAffected, duration, err
	}

	// 在這個匿名函數中執行查詢,返回查詢的結果
	sqlResult, err := func() (gosql.Result, error) {
		tx, err := this.db.Begin()
		if err != nil {
			return nil, err
		}
		defer tx.Rollback()
		sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.migrationContext.ApplierTimeZone)
		sqlModeAddendum := `,NO_AUTO_VALUE_ON_ZERO`
		if !this.migrationContext.SkipStrictMode {
			sqlModeAddendum = fmt.Sprintf("%s,STRICT_ALL_TABLES", sqlModeAddendum)
		}
		sessionQuery = fmt.Sprintf("%s, sql_mode = CONCAT(@@session.sql_mode, ',%s')", sessionQuery, sqlModeAddendum)

		if _, err := tx.Exec(sessionQuery); err != nil {
			return nil, err
		}
		// 執行查詢
		result, err := tx.Exec(query, explodedArgs...)
		if err != nil {
			return nil, err
		}
		if err := tx.Commit(); err != nil {
			return nil, err
		}
		return result, nil
	}()

	if err != nil {
		return chunkSize, rowsAffected, duration, err
	}
	// 獲取到查詢的影響行數
	rowsAffected, _ = sqlResult.RowsAffected()
	duration = time.Since(startTime)
	log.Debugf(
		"Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
		this.migrationContext.MigrationIterationRangeMinValues,
		this.migrationContext.MigrationIterationRangeMaxValues,
		this.migrationContext.GetIteration(),
		chunkSize)
	return chunkSize, rowsAffected, duration, nil

如上函數會構建出兩條SQL,

  • 第一條是用於數據遷移的SQL如下:
insert /* gh-ost `lossless_ddl_test`.`user` */ ignore into `lossless_ddl_test`.`_user_gho` (`id`, `status`, `newC1`, `newC2`, `newC3`, `newC5`, `newC4`)(select `id`, `status`, `newC1`, `newC2`, `newC3`, `newC5`, `newC4` from `lossless_ddl_test`.`user` force index (`PRIMARY`) where (((`id` > ?)) and ((`id` < ?) or ((`id` = ?)))) lock in share mode)

SQL解讀:

  1. 按照(((id > ?)) and ((id < ?) or ((id = ?)))的范圍從原表讀數據。
  2. 從原表中讀取出所有列的數據,強制使用索引 force index (PRIMARY)。
  3. 將數據灌進影子表的操作使用的是 insert ignore into,表示如果影子表已經存在了相同的數據,不再重復寫入
  4. 為了防止遷移數據時數據被改動,每次插入數據對原表的數據持有讀鎖(lock in share mode)。
  • 構建的第二條SQL如下:主要是設置session
// 為這此執行sql的
SET SESSION time_zone = '+08:00',sql_mode = CONCAT(@@session.sql_mode, ', NO_AUTO_VALUE_ON_ZERO,STRICT_ALL_TABLES')

NO_AUTO_VALUE_ON_ZERO表示:讓MySQL中的自增長列可以從0開始。默認情況下自增長列是從1開始的,如果你插入值為0的數據會報錯,設置這個之后就可以正常插入為0的數據了。

STRICT_ALL_TABLES表示語句中有非法或丟失值,則會出現錯誤。語句被放棄並滾動。

第二個重點關注的函數是和 遷移應用增量數據相關的函數ApplyDMLEventQueries()

通過如下的方式可以斷點順利進入到ApplyDMLEventQueries()

首先:看下圖

Snipaste_2020-07-25_23-37-16

所以我們將斷點打在第18行上,再通過控制台寫入一條數據

image-20200725233957457

在18行開啟的協程中就會優先處理這個事件。

於是我們就會順利進入到下面的代碼中:

image-20200725234049366

這段代碼的邏輯如下:

  1. 開啟事物
  2. 對當前session進行會話參數。
SET SESSION time_zone = '+00:00', sql_mode = CONCAT(@@session.sql_mode, ',,NO_AUTO_VALUE_ON_ZERO,STRICT_ALL_TABLES')
  1. 然后遍歷所有的事件,將不同的binlog事件轉換成不同的sql。

    func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (results [](*dmlBuildResult)) {
    	switch dmlEvent.DML {
    	case binlog.DeleteDML:
    		{
    			query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
    			return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, err))
    		}
    	case binlog.InsertDML:
    		{
    			query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
    			return append(results, newDmlBuildResult(query, sharedArgs, 1, err))
    		}
    	case binlog.UpdateDML:
    		{
    			if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
    				dmlEvent.DML = binlog.DeleteDML
    				results = append(results, this.buildDMLEventQuery(dmlEvent)...)
    				dmlEvent.DML = binlog.InsertDML
    				results = append(results, this.buildDMLEventQuery(dmlEvent)...)
    				return results
    			}
    			query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
    			args := sqlutils.Args()
    			args = append(args, sharedArgs...)
    			args = append(args, uniqueKeyArgs...)
    			return append(results, newDmlBuildResult(query, args, 0, err))
    		}
    	}
    	return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
    }
    

    剛剛Insert類型的事件被轉換成:注意哦,是replace into ,而不是insert into,也不是insert ignore into

        replace /* gh-ost `lossless_ddl_test`.`_user_gho` */ into
    				`lossless_ddl_test`.`_user_gho`
    					(`id`, `status`, `newC1`, `newC2`, `newC3`, `newC5`, `newC4`)
    				values
    					(?, ?, ?, ?, ?, ?, ?)
    		
    

    delete 類型的事件被轉換成如下 :

    	delete /* gh-ost `lossless_ddl_test`.`_user_gho` */
    				from
    					`lossless_ddl_test`.`_user_gho`
    				where
    					((`id` = ?))
    		
    

    update類型的事件轉換為如下格式sql:

         	 update /* gh-ost `lossless_ddl_test`.`_user_gho` */
     					`lossless_ddl_test`.`_user_gho`
    				set
    					`id`=?, `status`=?, `newC1`=?, `newC2`=?, `newC3`=?, `newC5`=?, `newC4`=?
    				where
     					((`id` = ?))
     		
    

    其中通過這個過程可以看出來,ghost重訪binlog和執行數據遷移其實是同步進行的,甚至重訪binlog的優先級比遷移數據的row copy還高。

5.5 cut-over

cutOver是數據遷移的最后一步,它主要做的工作就是: 改表名

image-20200726172940924

  • 和哨兵表相關的操作:_xxx_del
    • 檢查哨兵表是否存在,如果有的話就干掉它
    • ghost創建新的哨兵表

之所以要檢查 _xxx_del 是否存在是因為,原表現將表名改成 : 原表名_del, 防止因為這個表名原來就有而導致改名出錯。


//在如下函數中執行:show /* gh-ost */ table status from `lossless_ddl_test` like '_user_del'
showTableStatus(tableName string)

//如果哨兵表不存在的話,返回如下
mysql> show  table status from `lossless_ddl_test` like '_user_del';
Empty set (0.00 sec)

//如果存在的話會返回一坨關於當前表的信息
mysql> show  table status from `lossless_ddl_test` like '_user_gho';
+-----------+--------+---------+------------+------+----------------+-------------+-----------------+--------------+-----------+----------------+---------------------+---------------------+------------+-----------------+----------+----------------+---------+
| Name      | Engine | Version | Row_format | Rows | Avg_row_length | Data_length | Max_data_length | Index_length | Data_free | Auto_increment | Create_time         | Update_time         | Check_time | Collation       | Checksum | Create_options | Comment |
+-----------+--------+---------+------------+------+----------------+-------------+-----------------+--------------+-----------+----------------+---------------------+---------------------+------------+-----------------+----------+----------------+---------+
| _user_gho | InnoDB |      10 | Dynamic    | 8178 |             34 |      278528 |               0 |            0 |         0 |           8188 | 2020-07-26 17:01:57 | 2020-07-26 17:01:59 | NULL       | utf8_general_ci |     NULL |                |         |
+-----------+--------+---------+------------+------+----------------+-------------+-----------------+--------------+-----------+----------------+---------------------+---------------------+------------+-----------------+----------+----------------+---------+
1 row in set (0.01 sec)

// 如果哨兵表存在的話就會刪除它
this.dropTable(tableName)

//創建哨兵表,執行如下的sql
create /* gh-ost */ table `lossless_ddl_test`.`_user_del` (
			id int auto_increment primary key
		) engine=InnoDB comment='ghost-cut-over-sentry'

// !!! 創建 _user_del表是為了防止cut-over提前執行,導致數據丟失!!!!!

// 如果_user_del 表都創建失敗了,ghost會直接退出,因為ghost通過 _user_del表來控制cutOver在可控的時機執行(當ghost加的寫鎖被釋放時執行)。那現在這個表都創建不成功,所以直接退出也罷。
  • 加鎖
//執行如下sql,我們稱這個會話叫 會話A
lock /* gh-ost */ tables `lossless_ddl_test`.`user` write, `lossless_ddl_test`.`_user_del` write

// 加完write鎖后,在這之后的諸如select 等 dml操作都會被阻塞等寫鎖的釋放。

// 如果加鎖失敗了,ghost程序退出,因為沒有加上任何鎖,所以業務方的SQL不會受到任何影響。

// !!!!注意,在一個會話中,即使先加上了 writelock,依然是可以執行drop的!!!!!

對user表和哨兵表同時添加了寫鎖,當然終究還是看到了ghost也會將原表鎖住,真真切切的加了寫鎖

但是我們依然會說ghost其實是無損的DDL,為啥這么說呢?因為做無損DDL的過程中,最耗時的步驟其實是數據遷移這一步,如果我們在數據遷移時將寫鎖,或者MDL寫鎖添加在原表上,那這遷移過程中業務表不能被訪問,這才是不能被允許的,ghost完美避過了這個耗時的過程,而將寫鎖放在改表名這一步。該表名很快的,幾乎瞬間就完成了。那用寫鎖保證該表名的過程中沒有寫流量打進來,完全是可以接受的。

  • 改表名
// 執行如下sql,獲取當前會話的sessionID
// select connection_id()
// 將sessionID寫入channel
sessionIdChan <- sessionId

// 整個rename的操作seesion的超時時間,防止寫鎖一直存在阻塞業務方的dml
INFO Setting RENAME timeout as 3 seconds

// 如果這時會話A出現異常了,會話A持有的鎖會被自動釋放,保證了業務方的DML語句不被影響。此外ghsot設定的是 哨兵表在會話A沒有任何異常的情況下刪除的,現在會話A有了異常,_user_del就會一直存在,而這個表還存在,下面的rename操作就不會被執行成功。保證數據遷移整體的安全性。

// 執行如下sql,將原表名改為哨兵表名, 影子表改成原表名
// 這個rename操作會因為上面的lock 語句而等待。
// 我們稱這個會話叫做 會話B
rename /* gh-ost */ table `lossless_ddl_test`.`user` to `lossless_ddl_test`.`_user_del`, `lossless_ddl_test`.`_user_gho` to `lossless_ddl_test`.`user`


// 如果在會話B執行rename等待過程中,這時會話A出現異常了,同樣的:會話A持有的鎖會被自動釋放,保證了業務方的DML語句不被影響。此外ghsot設定的是 哨兵表在會話A沒有任何異常的情況下刪除的,現在會話A有了異常,_user_del就會一直存在,而這個表還存在,上面的rename操作就不會被執行成功。保證數據遷移整體的安全性。

// 執行完上面的rename語句后,業務方的sql會因為前面的lock語句和rename語句而等待。

// 會話A 通過如下sql,檢查執行rename的會話B在等待dml鎖。如果會話B異常失敗了,會話A通過下面的sql就檢測不出會話B的存在,會話A繼續運行,釋放寫鎖。
		select id
            from information_schema.processlist
            where
                id != connection_id()
                and 17765 in (0, id)
                and state like concat('%', 'metadata lock', '%')
                and info  like concat('%', 'rename', '%')


// 會話A,執行如下SQL,刪除 _user_del 表,讓cutOver可以正常執行。
drop table `_user_del` 

// 會話A執行如下SQL釋放writeLock
UNLOCK TABLES

// 現在writeLock被釋放了,剩下的問題就是現有的諸多DML SQL和 rename SQL到底誰先執行的問題。
// MySQL有機制保證:無損DML和rename誰先打向MySQL,MySQL都會優先執行rename SQL。

// 所以下面rename SQL會優先於其他的DMLSQL 去改表名。

// 如果rename過程成功結果,ghost工作完成
INFO Tearing down applier
DEBUG Tearing down...
Tearing down streamer
INFO Tearing down throttler
DEBUG Tearing down...
# Done
Exiting.

// 去檢查一下結果
mysql> show tables;
+-----------------------------+
| Tables_in_lossless_ddl_test |
+-----------------------------+
| _user_del                   |
| user                        |
+-----------------------------+
2 rows in set (0.00 sec)

5.6、如何保證數據一致性

在數據遷移的過程中原表和影子表存在三種操作

  1. ghost對原表進行row copy,將數據遷移到影子表。
  2. 業務對原表進行DML操作。
  3. ghost對影子表重放binlog日志。
5.6.1、兩種情況
  • 情況1:rowCopy都進行完了,剩下的增量數據只需要從binlog-event中解析出sql然后在影子表重放就ok,這也是最簡單的情況。因為重放binlog只會出現更新的狀態覆蓋舊狀態的數據。

  • 情況2: rowCopy還在進行的過程中。監聽到了binlog-event。

    上面記錄ghost整個工作流程的時候有提到,對ghost來說,處理binlog-event的優先級比進行rowCopy的優先級還要高。那在未完成rowCopy的情況下,就重放binlog,數據一致性是如何保證的呢?

    ghost會監聽處理的dml類型binlog有 insert,delete,update,他們大概會被轉換處理成下面樣子的sql。

    // insert 類型
    replace /* gh-ost `lossless_ddl_test`.`_user_gho` */ into
    				`lossless_ddl_test`.`_user_gho`
    					(`id`, `status`, `newC1`, `newC2`, `newC3`, `newC5`, `newC4`)
    				values
    					(?, ?, ?, ?, ?, ?, ?)
    					
    					
    // delete 類型
    delete /* gh-ost `lossless_ddl_test`.`_user_gho` */
    				from
    					`lossless_ddl_test`.`_user_gho`
    				where
    					((`id` = ?))
    					
    //update類型的事件 					
    update /* gh-ost `lossless_ddl_test`.`_user_gho` */
     					`lossless_ddl_test`.`_user_gho`
    				set
    					`id`=?, `status`=?, `newC1`=?, `newC2`=?, `newC3`=?, `newC5`=?, `newC4`=?
    				where
     					((`id` = ?))	
    
5.6.2、對於insert

由於binlog-event的優先級更高,所以數據通過 replace into(看我上面列出來的sql) 的方式寫進影子表。

注意這是replace into,表示不存在相同的數據就直接插入,數據已經存在的先把舊數據刪除再將當前最新的數據插入。

而rowCopy時使用的插入語句時 insert ignore。表示,如果已經存在了,那好它肯定是通過重放binlog-event得到的,肯定比我新,那直接忽略當前記錄處理下一個insert ignore。

5.6.3 、對於update

假設現在有1000條數據(id從1~1000)。 row'Copy拷貝完了前300條,這時ghost接受到了binlog-event竟然是對update id = 999的數據。又因為binlog-event對優先級比rowCopy高,所以ghost還不得不先處理這個update事件。可是ghost控制的影子表中還不存在id=999的數據啊~

其實不用差異。如果影子表里面沒有就直接忽略好了,在影子表上執行這個sql又不會報錯。

mysql> update  user_gho set status = 123  where id = 9999;
Query OK, 0 rows affected (0.00 sec)
Rows matched: 0  Changed: 0  Warnings: 0

反正過一會我們通過rowCopy來的數據肯定是最新的。

5.6.4、對於delete

情況1: 完成了rowCopy,然后收到了 delete-binlog-event

這時執行回放 delete-binlog-event 就好了,因為原表中的數據已經被刪除了。所以影子表中的數據自然要被刪除。

情況2: 未完成rowCopy,然后收到了 delete-binlog-event

這時執行也是直接回放 delete-binlog-event 就好了,數據不存在只是說影響結果為空,這時原表中的數據已經被刪除了。過一會的rowCopy也不會把已經刪除的數據拷貝過來。所以還是安全的。

2020-08-13 補充第三種情況
情況3:構造遷移數據的協程生成了一個任務,將id從1000~1999的數據從原表拷貝到影子表,與此同時ghost收到了一個delete-bin-log-event,想刪除id=1555的記錄,又因為回放binlog的優先級比執行rowcopy的優先級高,所以執行從binlog事件中解析出來的函數,這時影子表中沒有要刪除的id=1555的記錄,但是執行也不會報錯只不過影響行數為0,然后執行rowcopy,這樣豈不是沒有刪除掉id=1555的記錄?難道是ghost的bug嗎?

其實不是的~,因為ghost生成的遷移數據的SQL又存在 lock in share mode ,加了一把鎖!因為有這把鎖的存在,上面說的情況三就不可能出現,因為當MySql-Server執行rowcopy時,其他的DML SQL是不能修改這些數據的,因為他們沒有鎖,ghsot也就不會收到上述情況三中的deletebinlog事件。
而當ghost收到delete-binlog事件時,說明主庫已經寫完了redolog和binlog,而且ghost遷移數據的SQL也一定沒有來得及發送到MySql-Server

六、ghost的暫停、繼續、限流、終止

  • 暫停和繼續

ghost可是實現真正的暫停,當我們出發暫停時,ghost不會再有任何行rowCopy,也不會再處理任何事件。不對主庫產生任何壓力

相關參數:throttle-additional-flag-file

//在ghost的main.go中有定義這個參數
//這個文件存在的話操作會停止 保留默認值即可,用於限制多個gh ost操作
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")

//具體可以通過shell命令完成
echo throttle | socat - /tmp/gh-ost.test.sock
//繼續
echo no-throttle | socat - /tmp/gh-ost.test.sock
  • 限流相關

相關參數

--max-lag-millis

--chunk-size

--max-load

//這些參數在main.go中的定義如下
//限制操作的復制延遲, 當主從復制延遲時間超過該值后,gh-ost將采取節流(throttle)措施
maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation")

//每次迭代中要處理的行數 范圍從100 - 100000
//就是rowCopy過程中range
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")

//當ghost檢測到超過最大負載 ghost不會退出,為了不給系統負載造成更大的壓力,ghost會等load低於這個值時再工作。
maxLoad := flag.String("max-load", "Threads_running=25", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes")

  • 終止

--panic-flag-file

//在main.go中,如下	
//創建此文件時,gh ost將立即終止,而不進行清理
flag.StringVar(&migrationContext.PanicFlagFile, "panic-flag-file", "/tmp/ghost.panic.flag", "when this file is created, gh-ost will immediately terminate, without cleanup")

七、寫有中文注釋的ghost源碼項目

image-20200726193118559

項目使用vender管理依賴,開箱即用

github地址:https://github.com/zhuchangwu/Ghost-source-code-reading-env

todo:GoMySqlReader


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM