TiKV事務實現淺析


TiKV事務實現淺析

Percolator事務的理論基礎

Percolator的來源

Percolator事務來源於Google在設計更新網頁索引的系統時提出的論文Large-scale Incremental Processing Using Distributed Transactions and Notifications中,Google用它在支持單行事務的分布式數據庫Bigtable的基礎上實現跨節點的分布式事務。Percolator是一種優化版的2PC,但是與 常見的2PC不同,它並沒有一個單獨的coodinator的角色,而是作為一個庫將所有邏輯放在客戶端實現,只需要下層存儲支持單行事務即可。原始的Percolator事務模型中,下層的存儲節點可以對於上層事務完全無感知。

1559041182155

為了確定事務的先后順序,Percolator還要求一個全局的授時中心,用於獲取全局有序的遞增時間戳(比如TiDB中的pd組件)。

隔離級別

Percolator事務實現了SI隔離級別(TiDB中將它作為RR)。每個事務都從授時中心獲取兩個時間戳:startTS 和 commitTS,startTS 在事務開始時獲取,commitTS在事務結束時獲取,事務之間通過這兩個時間戳來確定先后。例如有兩個事務T1和T2,如果T1的commitTS小於T2的startTS,則認為T1發生在T2之前 ,如果兩個事務的時間戳區間[startTS, commitTS]存在交叉,則兩個事務是並發的。在SI隔離級別下一個事務只應該看到commitTS小於自己的startTS的事務所寫入的數據。

1559042153126

例如上圖,最上面的橫軸代表時間,下面三條橫線分別代表三個事務T1,T2,T3,方框代表startTS,黑點代表commitTS。則T2不能讀取到T1寫入的數據,而T3能讀取到T1和T2寫入的數據。

存儲模型

Percolator的存儲基於Bigtable,其存儲模型有列族的概念(CF),同一個列族的數據存儲在一起。每個邏輯上的行分為多個列族,每個列族可以分為多個列,而其中每一列的數據以時間戳倒序排序。典型的行如下圖所示:

Key Data Lock Write
Bob 6:
5:$10
6:
5:
6:data@5
5:
Joe 6:
5:$2
6:
5:
6:data@5
5:

key為整個行的key,data為該行的數據。而Percolator要求額外的兩個CF為:Lock和Write。Lock顧名思義表示該行的鎖,而write的版本號表示寫入這行數據的事務提交的時候時間戳commitTS。以Bob行為例,Key為Bob用於唯一確定該行,此時Bob沒有被加鎖Lock為空,在版本號為6的Write CF中有數據data@5,表示對應的數據在Data CF中版本號為5的地方。寫入這行數據的事務startTS為5,commitTS為6。這里的Write CF盡管看上去額外占了一行,並不會占據額外的整行空間。

基本步驟

總體來說,TiKV 的讀寫事務分為兩個階段:1、Prewrite 階段;2、Commit 階段。

客戶端會緩存本地的寫操作,在客戶端調用 client.Commit() 時,開始進入分布式事務 prewrite 和 commit 流程。

Prewrite 對應傳統 2PC 的第一階段

  1. 首先在所有行的寫操作中選出一個作為 primary row,其他的為 secondary rows

  2. PrewritePrimary: 對 primaryRow 寫入鎖以及數據,鎖中記錄本次事務的開始時間戳。上鎖前會檢查:

    • 該行是否已經有別的客戶端已經上鎖 (Locking)
    • 是否在本次事務開始時間之后,檢查versions ,是否有更新 [startTs, +Inf) 的寫操作已經提交 (Conflict)

    在這兩種種情況下會返回事務沖突。否則,就成功上鎖。將行的內容寫入 row 中,版本設置為 startTs

  3. 將 primaryRow 的鎖上好了以后,進行 secondaries 的 prewrite 流程:

    • 類似 primaryRow 的上鎖流程,只不過鎖的內容為事務開始時間 startTs 及 primaryRow 的信息
    • 檢查的事項同 primaryRow 的一致
    • 當鎖成功寫入后,寫入 row,時間戳設置為 startTs

以上 Prewrite 流程任何一步發生錯誤,都會進行回滾:刪除 Lock 標記 , 刪除版本為 startTs 的數據。

當 Prewrite 階段完成以后,進入 Commit 階段,當前時間戳為 commitTs,TSO 會保證 commitTs > startTS

Commit 的流程對應 2PC 的第二階段

  1. commit primary: 寫入 write CF, 添加一個新版本,時間戳為 commitTs,內容為 startTs, 表明數據的最新版本是 startTs 對應的數據
  2. 刪除 Lock 標記

值得注意的是,如果 primary row 提交失敗的話,全事務回滾,回滾邏輯同 prewrite 失敗的回滾邏輯。

如果 commit primary 成功,則可以異步的 commit secondaries,流程和 commit primary 一致, 失敗了也無所謂。Primary row 提交的成功與否標志着整個事務是否提交成功。

事務中的讀操作

  1. 檢查該行是否有 Lock 標記,如果有,表示目前有其他事務正占用此行,如果這個鎖已經超時則嘗試清除,否則等待超時或者其他事務主動解鎖。注意此時不能直接返回老版本的數據。
  2. 讀取至 startTs 時該行最新的數據,找到最近的時間戳小於startTS的write CF,從其中讀取版本號t,讀取為於 t 版本的數據內容。

由於鎖是分兩級的,Primary 和 Seconary row,只要 Primary row 的鎖去掉,就表示該事務已經成功提交,這樣的好處是 Secondary 的 commit 是可以異步進行的,只是在異步提交進行的過程中,如果此時有讀請求,可能會需要做一下鎖的清理工作。因為即使 Secondary row 提交失敗,也可以通過 Secondary row 中的鎖,找到 Primary row,根據檢查 Primary row 的 meta,確定這個事務到底是被客戶端回滾還是已經成功提交。

轉賬示例

下面以論文中轉賬的一個例子來展示大體流程,以上面的Bob和Joe為例,假設Bob要轉賬7元給Joe。

Prewrite

首先需要隨機選擇一行最為primaryRow ,這里選擇Bob。以事務開始時間戳為版本號,寫入Lock與數據

Key Data Lock Write
Bob 7:\$3
6:
5:$10
7:I am Primary
6:
5:
7:
6:data@5
5:
Joe 6:
5:$2
6:
5:
6:data@5
5:

從上圖可以看出轉賬事務的startTS為7,所以寫入了版本號為7的Lock與Bob的新數據,Lock中有表示自己是primaryLock的標志。隨后進行secondary rows的上鎖,這里只有Joe。

Key Data Lock Write
Bob 7:\$3
6:
5:$10
7:I am Primary
6:
5:
7:
6:data@5
5:
Joe 7:\$9
6:
5:$2
7:primary@Bob
6:
5:
7:
6:data@5
5:

Joe的Lock中保存了primary的信息,用於找到這次提交的primary row Bob。

如果在prewrite的過程中檢測到了沖突,則整個事務需要進行回滾。例如,在此時另一個事務的startTS為8,試圖對Bob進行加鎖,發現已經被startTS為7的事務加鎖,則該事務會檢測到沖突,事務回滾。也有可能發現在自己startTS以后,已經有事務提交了新的數據,出現了大於startTS的write,此時事務也需要回滾。

Commit

首先commit primary row,客戶端通過Bigtable的單行事務,清除primary行的鎖,並且以提交時間戳在write寫入提交標志。

key data lock write
Bob 8:
7:\$3
6:
5:$10
8:
7:
6:
5:
8:data@7
7:
6:data@5
5:
Joe 7:\$9
6:
5:$2
7:primary@Bob
6:
5:
7:
6:data@5
5:

primary row的Write CF的寫入是整個事務提交的標志,這個操作的完成就意味着事務已經完成提交了。
write中寫入的數據指向Bob真正存放余額的地方。完成這一步就可以向客戶端返回事務commit成功了。

接下可以異步釋放secondary rows的鎖。如果在commit階段發現primary鎖已經不存在(可能因為超時被其他事務清除),則提交失敗,事務回滾。

key data lock write
Bob 8:
7:\$3
6:
5:$10
8:
7:
6:
5:
8:data@7
7:
6:data@5
5:
Joe 8:
7:\$9
6:
5:$2
8:
7:
6:
5:
8data@7:
7:
6:data@5
5:

實際上,即使在執行這一步前,客戶端掛了而沒能處理這些行的鎖也沒有問題。當其他事務讀取到這樣的行的數據的時候,通過鎖可以找出primary行,從而判斷出事務的狀態,如果已經提交,則可以清除鎖寫入提交標志。

偽代碼

論文中用C++風格的偽代碼進行了Percolator事務流程的表達,整個事務被封裝成了一個class,先來看其中需要用到的成員:

class Transaction {
   // Write結構體表示一個寫入操作,哪個key下的哪一個列,寫入什么值
    struct Write { Row row; Column col; string value; };
    // writes 則為在這個事務中緩存的所有寫入的集合
    vector<Write> writes ;
    // 事務開始的時間戳ST
    int start ts ;
    // 事務建立的時候獲取開始時間戳
    Transaction() : start ts (oracle.GetTimestamp()) 
    //下面是各個實現函數,見下文
    ...
}

事務中需要用到的數據結構比較少,只保存了事務開始的時間戳和寫入集合。

Perwrite階段的偽代碼:

bool Prewrite(Write w, Write primary) {
    // 列族名
    Column c = w.col;
    // google的Percolator基於bigtable的單行事務,因此這里用bigtable::Txn表示發起單行事務
    bigtable::Txn T = bigtable::StartRowTransaction(w.row);

    // Abort on writes after our start timestamp . . .
    // 在自己事務開啟之后是否有新提交的數據
     if (T.Read(w.row, c+"write", [start ts, +Inf])) return false;
     // . . . or locks at any timestamp.
     // 是否已經被其他事務加鎖
    if (T.Read(w.row, c+"lock", [0, 1])) return false;

    T.Write(w.row, c+"data", start ts , w.value);
    T.Write(w.row, c+"lock", start ts ,
        {primary.row, primary.col}); // The primary’s location.
    return T.Commit();
}

prewite階段如上文所說,在進行沖突檢測后寫入了Lock和數據。這是對某一行進行prewrite的函數,在整個提交階段被多次調用。偽代碼中的commit代表整個percolator事務的提交:

bool Commit() {
    Write primary = writes[0];
    vector<Write> secondaries(writes .begin()+1, writes .end());
    // 對所有參與事務的行執行Prewrite
    // 先對隨機選出的某一個primary行加鎖,再對其他行加鎖。
    if (!Prewrite(primary, primary)) return false;
    for (Write w : secondaries)
        if (!Prewrite(w, primary)) return false;
    
    // 獲取提交時間戳commitTS
    int commit ts = oracle .GetTimestamp();

    // Commit primary first.
    Write p = primary;
    bigtable::Txn T = bigtable::StartRowTransaction(p.row);
    // 失去了鎖,可能被別人終止了,事務回滾
    if (!T.Read(p.row, p.col+"lock", [start ts , start ts ])) 
        return false; // aborted while working
    // 向primary行的Write CF寫入提交標志,時間戳為commitTS
    T.Write(p.row, p.col+"write", commit ts,
        start ts ); // Pointer to data written at start ts .
    //擦除primary的鎖
    T.Erase(p.row, p.col+"lock", commit ts);
    if (!T.Commit()) return false; // commit point

    // Second phase: write out write records for secondary cells.
    // 在其他行同樣進行寫入Write CF 並且擦除鎖
    for (Write w : secondaries) {
        bigtable::Write(w.row, w.col+"write", commit ts, start ts );
        bigtable::Erase(w.row, w.col+"lock", commit ts);
    }
    return true;
}

讀取操作的偽代碼如下:

bool Get(Row row, Column c, string* value) {
    while (true) {
        bigtable::Txn T = bigtable::StartRowTransaction(row);
        // Check for locks that signal concurrent writes.
        if (T.Read(row, c+"lock", [0, start ts ])) {
            // There is a pending lock; try to clean it and wait
            // 注意,這里如果鎖沒有超時的情況下不能直接返回最近的可見數據
            // 必須等待持鎖事務commit或者回滾,直到超時清除它
            BackoffAndMaybeCleanupLock(row, c);
        	continue;
   		 }
		// 按照時間戳讀取最新的可見的數據
   		// Find the latest write below our start timestamp.
   		latest write = T.Read(row, c+"write", [0, start ts ]);
   		if (!latest write.found()) return false; // no data
   		int data ts = latest write.start timestamp();
   		*value = T.Read(row, c+"data", [data ts, data ts]);
  		 return true;
	}
}

在讀取操作的時候需要注意遇到鎖的情況,如果檢查primary row發現事務已經提交則可以由自己清除secondary row的鎖。如果有沒有commit的其他事務持有鎖,不能夠直接返回最新的對自己可見的數據。如上面轉賬的例子中正處於prewrite階段,此時另一個startTS為9的事務來進行對Joe的讀取操作:

Key Data Lock Write
Bob 7:\$3
6:
5:$10
7:I am Primary
6:
5:
7:
6:data@5
5:
Joe 7:\$9
6:
5:$2
7:primary@Bob
6:
5:
7:
6:data@5
5:

此時能否直接通過版本號為6的write中的信息,返回版本號為5中的數據呢?不能,因為在Percolator的模型中,事務的先后順序是通過邏輯時間戳來確定的,從Joe和Bob中我們只能夠得到持鎖事務的startTS,無法得知commitTS,而SI隔離級別要求我們應該讀取到commitTS小於9的事務寫入的數據,只有等到持鎖事務提交,才能得知它的commitTS是小於9還是大於9。

TiDB事務與MySQL的區別

TiDB 使用樂觀事務模型,在執行 UpdateInsertDelete 等語句時,只有在提交過程中,執行 UpdateInsertDelete 等語句時才會檢查寫寫沖突,而不是像 MySQL 一樣使用行鎖來避免寫寫沖突。類似的, SELECT .. FOR UPDATE 之類的語句在 TiDB 和 MySQL 中的執行方式並不相同。TiDB的隔離級別的表現也與MySQL不盡相同,雖然TiDB也有對應MySQL的RR和RC隔離級別,但RR級別實質為SI級別,而RC隔離級別的則違反了線性一致。目前TiDB也在實現悲觀鎖的事務模型,但是官方文檔說還處於試驗階段尚不穩定,不建議開啟。

行為差異

由於tidb的基於的事務模型與mysql有較大區別,所以在實際使用中盡管協議兼容,但是事務的行為依然有比較大的區別。

寫入緩存在客戶端

從Peroclator的模型中可以看出所有的修改操作都先緩存在客戶端,只有在事務提交的時候才會進行沖突檢測。所以許多在mysql中會導致阻塞的操作在tidb中並不會。假設有一張表test96如下:

| test96 | CREATE TABLE `test96` (
  `a` int(11) NOT NULL,
  `b` int(11) DEFAULT NULL,
  PRIMARY KEY (`a`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin

並向其中插入數據:

mysql> insert into test96 values(1,1),(2,2),(4,4);
Query OK, 3 rows affected (0.01 sec)

處於RR(SI)隔離級別下(tidb的RC隔離級別不完善,不建議使用,后文會提到),開啟兩個事務t1,t2進行如下操作

t1 t2
step1 begin; begin;
step2 update test96 set b = 100 where a >= 2;
step3 update test96 set b = 98 where a = 4;
step4 commit
step5 commit

如果在mysql中,t2在step3的更新操作將會被t1阻塞,而當t1在step4 commit后,t2的后續操作也能成功。然而在tidb中,t1,t2的update操作都會返回成功。並且此時去select的話還可以發現數據已經被自己修改。

對於t1

mysql>    select * from test96;
+---+------+
| a | b    |
+---+------+
| 1 |    1 |
| 2 |  100 |
| 4 |  100 |
+---+------+
3 rows in set (0.00 sec)

對於t2

mysql>   select * from test96;
+---+------+
| a | b    |
+---+------+
| 1 |    1 |
| 2 |    2 |
| 4 |   98 |
+---+------+
3 rows in set (0.01 sec)

這是因為事務在讀取的過程中發現數據被本事務修改過,所以直接從本地緩存中讀取。

在step4,t1的commit可以成功

mysql>  commit;
Query OK, 0 rows affected (0.00 sec)

而step5,t2 commit的時候會報錯

mysql>  commit;
ERROR 1105 (HY000): [try again later]: [write conflict] txnStartTS 408979633880694785 is stale

如上文對percolator的描述,在提交的時候,事務進行了沖突檢測,發現事務沖突,因此t2被回滾。

自動重試

執行失敗的事務可以讓 TiDB 自動重試提交,但這可能會導致事務異常。當開啟自動重試的時候前文中的例子t1, t2的提交都會成功,這實際上與SI隔離級別的要求不符。有兩個參數與這個功能相關

tidb_disable_txn_auto_retry

作用域:SESSION | GLOBAL

默認值:1

這個變量用來設置是否禁用顯式事務自動重試,設置為 1 時,不會自動重試,如果遇到事務沖突需要在應用層重試。

這個變量不會影響自動提交的隱式事務和 TiDB 內部執行的事務,它們依舊會根據 tidb_retry_limit 的值來決定最大重試次數。

tidb_retry_limit

作用域:SESSION | GLOBAL

默認值:10

這個變量用來設置最多可重試次數,即在一個事務執行中遇到可重試的錯誤(例如事務沖突、事務提交過慢或表結構變更)時,這個事務可以被重新執行,這個變量值表明最多可重試的次數。

通過設置 tidb_disable_txn_auto_retry可以控制該項功能,同時要注意 tidb_retry_limit 的值不能為 0,否則,也會禁用自動重試。

大事務

同時,由於受客戶端能緩存的數據量的限制,tidb對大事務的支持有限,在官方手冊中聲明了事務大小的限制:

  • 單個事務包含的 SQL 語句不超過 5000 條(默認)
  • 每個鍵值對不超過 6MB
  • 鍵值對的總數不超過 300,000
  • 鍵值對的總大小不超過 100MB

tidb中有個tidb_batch_insert參數可以自動將大事務分隔為一系列小事務執行

tidb_batch_insert

作用域: SESSION

默認值: 0

這個變量用來設置是否自動切分插入數據。僅在 autocommit 開啟時有效。 當插入大量數據時,可以將其設置為 1,這樣插入數據會被自動切分為多個 batch,每個 batch 使用一個單獨的事務進行插入。 該用法破壞了事務的原子性,因此,不建議在生產環境中使用。

小事務延時

由於 TiDB 中的每個事務都需要跟 PD leader 進行兩次 round trip以獲取時間戳,TiDB 中的小事務相比於 MySQL 中的小事務延遲更高。以如下的 query 為例,用顯式事務代替 auto_commit,可優化該 query 的性能。

# 使用 auto_commit 的原始版本
UPDATE my_table SET a='new_value' WHERE id = 1;
UPDATE my_table SET a='newer_value' WHERE id = 2;
UPDATE my_table SET a='newest_value' WHERE id = 3;

# 優化后的版本
START TRANSACTION;
UPDATE my_table SET a='new_value' WHERE id = 1;
UPDATE my_table SET a='newer_value' WHERE id = 2;
UPDATE my_table SET a='newest_value' WHERE id = 3;
COMMIT;

把合理數量的更新打包到一個事務里有利於減小延時,但不宜過大,不能超過限制或者造成較多的沖突。

Load data

語法基本一致但是分隔符只支持‘//\‘

LOAD DATA LOCAL INFILE 'file_name' INTO TABLE table_name
    {FIELDS | COLUMNS} TERMINATED BY 'string' ENCLOSED BY 'char' ESCAPED BY 'char'
    LINES STARTING BY 'string' TERMINATED BY 'string'
    IGNORE n LINES
    (col_name ...);

而且,由於不支持大事務實際上數量大時,是把Load data分隔為了多個事務,默認將每 2 萬行記錄作為一個事務進行持久化存儲。如果一次 LOAD DATA 操作插入的數據超過 2 萬行,那么會分為多個事務進行提交。如果某個事務出錯,這個事務會提交失敗,但它前面的事務仍然會提交成功,在這種情況下,一次 LOAD DATA 操作會有部分數據插入成功,部分數據插入失敗。官方不建議在生產中使用Load data。

隔離級別

tidb實現了SI隔離級別,為了與MySQL保持一致,也稱其RR隔離級別。但其表現有區別。

丟失更新

mysql的RR隔離級別會存在丟失更新的情況,依然以表test96為例

| test96 | CREATE TABLE `test96` (
  `a` int(11) NOT NULL,
  `b` int(11) DEFAULT NULL,
  PRIMARY KEY (`a`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
mysql> insert into test96 values(1,1),(2,2),(4,4);
Query OK, 3 rows affected (0.01 sec)

執行兩個事務t1與t2,每個事務對a=2這一行數據的b列加1,我們希望b列記錄執行的事務的數量。

t1 t2
step1 begin; begin;
step2 select b from test96 where a = 2; select b from test96 where a = 2;
step3 update test96 set b = 3 where a = 2;(計數器+1)
step4 update test96 set b = 3 where a= 2 (在mysql中這一步將會阻塞,但是tidb中會立即返回成功)
step5 commit (對於mysql,t1提交后t2此時可以繼續執行)
step6 commit (mysql提交成功,tidb會讓t2回滾報錯)

在mysql中由於t2提交的時候其實t1已經提交寫入,t2用於計算的前提條件其實已經不成立了,t2的更新覆蓋了t1的更新,執行了兩個事務之后,b只加了1。而在tidb中,首先,由於寫入是緩存在本地,所以step4並不會造成阻塞,而在commit的時候,percolator的prewrite階段會檢查到b被更新過,t2提交失敗,回滾報錯。(就上面這種數值計算的特殊情況而言,如果在MySQL中用update test96 set b = b+1最后我們會得到正確的結果,這里只是為了方便舉例才用了讀取-計算-寫入的模式。)

幻讀

tidb的SI級別可以避免經典意義上的幻讀,在前文Percolator的讀取步驟中可以看出,只要數據的commitTS大於本事務的startTS,就不會被讀取到,在著名論文A critique of ANSI SQL isolation levels中對於經典幻讀的定義為

Transaction T1 reads a set of data items satisfying some <search condition>. Transaction T2 then creates data items that satisfy T1’s <search condition> and commits. If T1 then repeats its read with the same <search condition>, it gets a set of data items
different from the first read.

可見tidb不會出現經典意義上的幻讀

那么對於MySQL而言會嗎?一眼看去MySQL使用了MVCC機制,即使別人插入了一條滿足自己select篩選條件的數據,在RR隔離級別下由於ReadView不會更新的原因應該也不可能讀取到,然而並不是所有情況下都是如此,依然以上面的表test96為例

t1 t2
step1 begin; begin;
step2 select b from test96 where a > 1;
step3 insert into test96 values(3, 3);
step4 commit;
step5 select b from test96 where a > 1;
step6 update test96 set b = 0 where a > 1;
step7 select b from test96 where a > 1;

對於step2與step5的select語句,mysql和tidb的行為一致,結果為

mysql> select * from test96 where a>1;
+---+------+
| a | b    |
+---+------+
| 2 |    2 |
| 4 |    4 |
+---+------+
2 rows in set (0.01 sec)

而在step6的update之后,在step7再次進行select,tidb的結果行數依然看到的是a=2與a=4兩行

mysql> select * from test96 where a>1;
+---+------+
| a | b    |
+---+------+
| 2 |    0 |
| 4 |    0 |
+---+------+
2 rows in set (0.00 sec)

而對於MySQL,卻能見到出乎意料的結果

mysql> select * from test96 where a>1;
+---+------+
| a | b    |
+---+------+
| 2 |    0 |
| 3 |    0 |
| 4 |    0 |
+---+------+
3 rows in set (0.00 sec)

mysql中,t1的讀取操作雖然通過MVCC讀取不到t2的值,但是由於更新操作總是使用最新版本,a=3的這行數據也滿足條件,所以這行數據也被更新了。問題在於隱藏列trx_id的值也被更新了(或者說根本就是一個新的版本),因此進行讀取的時候這行數據會讀取到t1寫入的版本,所以我們可以讀取到一條意外出現的記錄。事實上,我們在mysql中進行step6的update操作的時候,可以看到雖然只能讀取到2行,但update影響行數為3行。

mysql> select * from test96 where a>1;
+---+------+
| a | b    |
+---+------+
| 2 |    2 |
| 4 |    4 |
+---+------+
2 rows in set (0.00 sec)

mysql> update test96 set b = 0 where a>1;
Query OK, 3 rows affected (0.00 sec)
Rows matched: 3  Changed: 3  Warnings: 0

而在tidb中只更新了自己讀取到的2行

mysql> select * from test96 where a>1;
+---+------+
| a | b    |
+---+------+
| 2 |    2 |
| 4 |    4 |
+---+------+
2 rows in set (0.00 sec)

mysql> update test96 set b=0 where a > 1;
Query OK, 2 rows affected (0.00 sec)
Rows matched: 2  Changed: 2  Warnings: 0

而在t1提交后,從tidb再次select可以查到a=3這條數據

mysql> select * from test96 where a>1;
+---+------+
| a | b    |
+---+------+
| 2 |    0 |
| 3 |    3 |
| 4 |    0 |
+---+------+
3 rows in set (0.01 sec)

所以MySQL依然存在經典意義上幻讀

寫偏序

ANSI定義的RR隔離級別下不會發生寫偏序,tidb與mysql的RR隔離級別下都可能存在寫偏序。依然以上面test96表為例,假設我們的約束要求b > 1的記錄數量不能為0。

t1 t2
step1 begin; begin;
step2 select count(*) from test96 where b > 1; select count(*) from test96 where b > 1;
step3 if count > 1 {update test96 set b = 0 where a = 2;}
step4 if count > 1 {update test96 set b = 0 where a = 4;}
step5 commit
step6 commit

由於t1與t2沒有寫同一份數據,兩者在tidb和mysql中都會直接提交成功。但是最終test96中 b > 1的記錄數變成了0,這違背了我們的約束。

For update

tidb與mysql都支持for update 語法,但是行為表現有一定區別。對於mysql而言,for update是在讀取的時候對記錄實時的加鎖,阻塞其他試圖修改或者帶for update的語句。而對於tidb而言,for update本質上只是在本地緩存一個寫入,只是寫入的只有一個鎖,沒有數據, 相當於對符合條件的記錄進行了一次沒有數據的寫入。同時,tidb目前尚不支持謂詞鎖或類似mysql的gap鎖,所以即使在RR隔離級別下使用for update, 也不能防止其他事務的insert操作。先來看一個test96表上的例子

t1 t2
step1 begin; begin;
step2 select count(*) from test96 where a > 1 for update;
step3 update test96 set b = 0 where a = 4
step4 commit
step5 commit

在mysql中t2在step3的update操作會被阻塞,直到t1提交,之后t2的執行會成功。然而在tidb中,t2的update操作會返回成功,而在t2提交的時候會報錯發生沖突,t2回滾。

mysql> update test96 set b = 0 where a = 4;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1  Changed: 1  Warnings: 0

mysql> commit;
ERROR 1105 (HY000): [try again later]: [write conflict] txnStartTS=409004592597827585, conflictTS=409004587970985985, key={tableID=530, handle=4} primary={tableID=530, handle=4}

同時由於tidb沒有謂詞鎖或gap鎖,不能鎖定范圍,如果把t2 step3的操作修改如下 ,t1與t2都將提交成功。

t1 t2
step1 begin; begin;
step2 select count(*) from test96 where a > 1 for update;
step3 insert into test96 values(3,3);
step4 commit
step5 commit

Tikv中的事務實現

論文percolator原型中,所有的事務相關操作都在客戶端執行,服務端不需要進行任何特殊處理。但是在tikv中,服務端對於不同的階段做了不同的處理,以優化性能。

Tikv客戶端實現

Tikv本身作為一個分布式的kv存儲供計算層的tidb訪問,同時也提供了單獨client客戶端以直接使用tikv。獨立客戶端的邏輯與tidb中對tikv的訪問邏輯基本相同,許多代碼都是復用的,tidb多了一些生成binlog和保存統計信息等額外操作。下面以golang版本的client為例,介紹tikv客戶端的實現。

客戶端使用

客戶端的使用上非常簡潔,下面兩個官方示例中的函數對tikv進行了set和get操作

// key1 val1 key2 val2 ...
func puts(args ...[]byte) error {
	tx, err := client.Begin()
	if err != nil {
		return err
	}

	for i := 0; i < len(args); i += 2 {
		key, val := args[i], args[i+1]
		err := tx.Set(key, val)
		if err != nil {
			return err
		}
	}
	return tx.Commit(context.Background())
}
func get(k []byte) (KV, error) {
	tx, err := client.Begin()
	if err != nil {
		return KV{}, err
	}
	v, err := tx.Get(k)
	if err != nil {
		return KV{}, err
	}
	return KV{K: k, V: v}, nil
}

從上面可以看出,事務api的使用比較簡潔,只需要Begin開啟一個事務,進行Get,Set等操作,然后Commit即可。

寫入

Transaction

Transaction是所有事務操作的入口,Transaction的結構體如下

// Transaction is a key-value transaction.
type Transaction struct {
    // 用於訪問tikv
	tikvStore *store.TiKVStore
	snapshot  *store.TiKVSnapshot
    // 本地緩存數據
	us        kv.UnionStore
	// 即percolator事務模型中的開始時間戳startTS,創建事務的時候從TSO獲取
	startTS   uint64
	startTime time.Time // Monotonic timestamp for recording txn time consuming.
    // 注意,這個commitTS並不是percolator中的commitTS,實際上在client中並沒有發現有使用這個變量
    // percolator真正的commitTS在TxnCommitter中。在tidb中有類似的變量,用於保存提交時間戳供統計。
    // 也許是從tidb中復用代碼的時候搬過來忘了刪除?
	commitTS  uint64
	valid     bool
	lockKeys  [][]byte

	setCnt int64
}

Transaction的方法提供了Get,Set,Del等多種數據操作,所有的修改都將保存在本地緩存us中,在Commit的時候按照percolator的事務模型進行提交。下面是Commit的簡化代碼。

// Commit commits the transaction operations to KV store.
func (txn *Transaction) Commit(ctx context.Context) error {
	···
	if len(mutations) == 0 {
		return nil
	}
	···
    // committer是percolator事務具體提交的執行者,mutations為所有的修改集合
	committer, err := store.NewTxnCommitter(txn.tikvStore, txn.startTS, txn.startTime, mutations)
	if err != nil || committer == nil {
		return err
	}
	// latch是一個特性,開啟的話同一個客戶端發起的請求如果有沖突可以先在本地解決,需要先獲取本地latch
    // 用於解決沖突過於嚴重的情況
	// latches disabled
	if txn.tikvStore.GetTxnLatches() == nil {
        // 未開啟latch則直接開始2pc提交
		err = committer.Execute(ctx)
		log.Debug("[kv]", txn.startTS, " txnLatches disabled, 2pc directly:", err)
		return err
	}

	// latches enabled
	// for transactions which need to acquire latches
	start = time.Now()
	lock := txn.tikvStore.GetTxnLatches().Lock(txn.startTS, committer.GetKeys())
	localLatchTime := time.Since(start)
	if localLatchTime > 0 {
		metrics.LocalLatchWaitTimeHistogram.Observe(localLatchTime.Seconds())
	}
	defer txn.tikvStore.GetTxnLatches().UnLock(lock)
	if lock.IsStale() {
		err = errors.Errorf("startTS %d is stale", txn.startTS)
		return errors.WithMessage(err, store.TxnRetryableMark)
	}
  	// 開啟了latch則先獲取本地latch后再開始2PC
	err = committer.Execute(ctx)
	if err == nil {
		lock.SetCommitTS(committer.GetCommitTS())
	}
	log.Debug("[kv]", txn.startTS, " txnLatches enabled while txn retryable:", err)
	return err
}

在提交過程中,Transaction收集好所有的修改,將startTS和修改的數據傳遞給TxnCommitter,由TxnCommitter執行具體的提交過程。

TxnCommitter

TxnCommitter負責事務的提交過程,下面是其簡化結構體

// TxnCommitter executes a two-phase commit protocol.
type TxnCommitter struct {
    // tikv訪問接口
	store     *TiKVStore
	conf      *config.Config
	// 開始時間戳
	startTS   uint64
	// 所有修改數據的key
	keys      [][]byte
	// 所有修改操作
	mutations map[string]*pb.Mutation
	// 鎖的最長生命周期,持鎖時間太長可能會被其他事務清除,本事務回滾。
	lockTTL   uint64
	// 事務的提交時間戳
	commitTS  uint64
	mu        struct {
		sync.RWMutex
		committed       bool
		// 這個標志位表示是否收到了無法確定事務的狀態的錯誤,例如因網絡原因未收到c事務的commit指令的回應
		undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key.
	}
	cleanWg sync.WaitGroup
}

Txn的Execute函數是實際的提交執行流程,由外部調用(Transaction的Commit),下面是簡化流程

// Execute executes the two-phase commit protocol.
func (c *TxnCommitter) Execute(ctx context.Context) error {
    // 如果事務沒有設置提交標志並且沒有發生無法確定事務狀態的錯誤(也就是說可以確定事務回滾了)
    // 則清除可能已經prewrite的數據
	defer func() {
		// Always clean up all written keys if the txn does not commit.
		c.mu.RLock()
		committed := c.mu.committed
		undetermined := c.mu.undeterminedErr != nil
		c.mu.RUnlock()
        // 如果可以確定事務未能提交,則嘗試異步的進行清理
		if !committed && !undetermined {
			c.cleanWg.Add(1)
			go func() {
				err := c.cleanupKeys(retry.NewBackoffer(context.Background(), retry.CleanupMaxBackoff), c.keys)
				if err != nil {
					log.Infof("con:%d 2PC cleanup err: %v, tid: %d", c.ConnID, err, c.startTS)
				} else {
					log.Infof("con:%d 2PC clean up done, tid: %d", c.ConnID, c.startTS)
				}
				c.cleanWg.Done()
			}()
		}
	}()
	// Backoffer是用於控制重試的結構體
	prewriteBo := retry.NewBackoffer(ctx, retry.PrewriteMaxBackoff)
	start := time.Now()
	// 執行percolator的第一階段,prewrite
	err := c.prewriteKeys(prewriteBo, c.keys)

	if err != nil {
		log.Debugf("con:%d 2PC failed on prewrite: %v, tid: %d", c.ConnID, err, c.startTS)
		return err
	}
    
	// 獲取提交時間戳
	commitTS, err := c.store.GetTimestampWithRetry(retry.NewBackoffer(ctx, retry.TsoMaxBackoff))
	if err != nil {
		log.Warnf("con:%d 2PC get commitTS failed: %v, tid: %d", c.ConnID, err, c.startTS)
		return err

	// check commitTS
	if commitTS <= c.startTS {
		err = errors.Errorf("con:%d Invalid transaction tso with start_ts=%v while commit_ts=%v",
			c.ConnID, c.startTS, commitTS)
		log.Error(err)
		return err
	}
	c.commitTS = commitTS
    // 事務消耗的時間不能超過設定的最大時間
	if c.store.GetOracle().IsExpired(c.startTS, c.maxTxnTimeUse) {
		err = errors.Errorf("con:%d txn takes too much time, start: %d, commit: %d", c.ConnID, c.startTS, c.commitTS)
		return errors.WithMessage(err, TxnRetryableMark)
	}

	start = time.Now()
	commitBo := retry.NewBackoffer(ctx, retry.CommitMaxBackoff)
	// percolator第二階段,commit
	err = c.commitKeys(commitBo, c.keys)

	if err != nil {
		···
	return nil
}

Eexcute的主要邏輯就是對應的percolator的兩個步驟,中間做了一些合法性檢查,在函數的最后如果事務不能正常提交,則需要嘗試清除已經提交的數據。在上面的函數中可以看到prewriteKeys和commitKeys兩個函數,這兩個函數未內部兩個階段的對應。

func (c *TxnCommitter) prewriteKeys(bo *retry.Backoffer, keys [][]byte) error {
	return c.doActionOnKeys(bo, actionPrewrite, keys)
}

func (c *TxnCommitter) commitKeys(bo *retry.Backoffer, keys [][]byte) error {
	return c.doActionOnKeys(bo, actionCommit, keys)
}

由於兩個階段的操作都有較多共性,可以通過batch的方式來提高性能,所以這里采用了同一個函數doActionOnKeys來進行處理。

func (c *TxnCommitter) doActionOnKeys(bo *retry.Backoffer, action commitAction, keys [][]byte) error {
	if len(keys) == 0 {
		return nil
	}
	groups, firstRegion, err := c.store.GetRegionCache().GroupKeysByRegion(bo, keys)
	if err != nil {
		return err
	}

	var batches []batchKeys
    ···
	// Make sure the group that contains primary key goes first.
    // 按照key的region分布和大小打包分組,每個分組的key可以保證在一個region
	commitBatchSize := c.conf.Txn.CommitBatchSize
	batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, commitBatchSize)
	delete(groups, firstRegion)
	for id, g := range groups {
		batches = appendBatchBySize(batches, id, g, sizeFunc, commitBatchSize)
	}
	
	firstIsPrimary := bytes.Equal(keys[0], c.primary())
    // 在commit階段的時候的時候,primaryRow需要先處理,因為primaryRow的提交被視為整個事務提交與否的標志。
	if firstIsPrimary && (action == actionCommit || action == actionCleanup) {
		// primary should be committed/cleanup first
		err = c.doActionOnBatches(bo, action, batches[:1])
		if err != nil {
			return err
		}
		batches = batches[1:]
	}
	if action == actionCommit {
		// Commit secondary batches in background goroutine to reduce latency.
		// The backoffer instance is created outside of the goroutine to avoid
		// potencial data race in unit test since `CommitMaxBackoff` will be updated
		// by test suites.
		secondaryBo := retry.NewBackoffer(context.Background(), retry.CommitMaxBackoff)
        // 這里可以看出secondaryRows是異步提交的,只要primaryRow提交成功就會給客戶端返回成功
		go func() {
			e := c.doActionOnBatches(secondaryBo, action, batches)
			if e != nil {
				log.Debugf("con:%d 2PC async doActionOnBatches %s err: %v", c.ConnID, action, e)
			}
		}()
	} else {
		err = c.doActionOnBatches(bo, action, batches)
	}
	return err
}

doActionOnKeys將需要修改的key進行打包,交給doActionOnBatches進行處理,如果是commit階段,則需要保證primaryRow先進行處理。在上面還可以看出這里可以看出secondaryRows是異步提交的,只要primaryRow提交成功就會給客戶端返回成功。這造成了一個問題,在RC隔離級別下,tikv在查詢數據的時候並沒有檢查lock的信息,而是直接找一個最近的本地已經提交版本返回給客戶端,所以如果某個事務已經提交但secondaryRows還未釋放,此時另一個事務是無法讀取到這個事務寫入的數據的,這與正常的RC級別的表現不符。

func (c *TxnCommitter) doActionOnBatches(bo *retry.Backoffer, action commitAction, batches []batchKeys) error {
	if len(batches) == 0 {
		return nil
	}
	var singleBatchActionFunc func(bo *retry.Backoffer, batch batchKeys) error
	// singleBatchActionFunc是對單個batch的處理函數,主要進行grpc數據包的封裝發送和錯誤處理。
	switch action {
	case actionPrewrite:
		singleBatchActionFunc = c.prewriteSingleBatch
	case actionCommit:
		singleBatchActionFunc = c.commitSingleBatch
	case actionCleanup:
		singleBatchActionFunc = c.cleanupSingleBatch
	}
	if len(batches) == 1 {
		e := singleBatchActionFunc(bo, batches[0])
		if e != nil {
			log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.ConnID, action, e, c.startTS)
		}
		return e
	}

	// For prewrite, stop sending other requests after receiving first error.
    // prewrite階段一旦出錯就取消所有后續動作
	backoffer := bo
	var cancel context.CancelFunc
	if action == actionPrewrite {
		backoffer, cancel = bo.Fork()
		defer cancel()
	}

	// Concurrently do the work for each batch.
	ch := make(chan error, len(batches))
	for _, batch1 := range batches {
		batch := batch1
		go func() {
			if action == actionCommit {
				// Because the secondary batches of the commit actions are implemented to be
				// committed asynchronously in background goroutines, we should not
				// fork a child context and call cancel() while the foreground goroutine exits.
				// Otherwise the background goroutines will be canceled exceptionally.
				// Here we makes a new clone of the original backoffer for this goroutine
				// exclusively to avoid the data race when using the same backoffer
				// in concurrent goroutines.
                // commit階段只要primaryRow提交成功即可
				singleBatchBackoffer := backoffer.Clone()
				ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
			} else {
				singleBatchBackoffer, singleBatchCancel := backoffer.Fork()
				defer singleBatchCancel()
				ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
			}
		}()
	}
	var err error
	for i := 0; i < len(batches); i++ {
		if e := <-ch; e != nil {
			log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.ConnID, action, e, c.startTS)
			// Cancel other requests and return the first error.
			if cancel != nil {
				log.Debugf("con:%d 2PC doActionOnBatches %s to cancel other actions, tid: %d", c.ConnID, action, c.startTS)
				cancel()
			}
			if err == nil {
				err = e
			}
		}
	}
	return err
}

doActionOnBatches對每個batch進行處理,發送rpc請求給tikv服務端。其中,對於prewrite和commit的處理有所不同,prewrite如果其中某一個batch的寫入有錯,需要全部撤銷;而commit階段的時候如果是secondaryRows,則不會取消,因為此時事務被認為已經提交了。

各個xxxxSingleBatch函數是對每個batch的具體處理,根據具體階段的不同分別進行不同的處理。

prewriteSingleBatch

func (c *TxnCommitter) prewriteSingleBatch(bo *retry.Backoffer, batch batchKeys) error {
	mutations := make([]*pb.Mutation, len(batch.keys))
	for i, k := range batch.keys {
		mutations[i] = c.mutations[string(k)]
	}
    // 組裝rpc請求
	req := &rpc.Request{
		Type: rpc.CmdPrewrite,
		Prewrite: &pb.PrewriteRequest{
			Mutations:    mutations,
			PrimaryLock:  c.primary(),
			StartVersion: c.startTS,
			LockTtl:      c.lockTTL,
		},
		Context: pb.Context{
			Priority: c.Priority,
			SyncLog:  c.SyncLog,
		},
	}
	for {
	    // 發送請求並進行各種錯誤處理
		resp, err := c.store.SendReq(bo, req, batch.region, c.conf.RPC.ReadTimeoutShort)
		if err != nil {
			return err
		}
		regionErr, err := resp.GetRegionError()
		if err != nil {
			return err
		}
		// TikcStore緩存的region信息有誤,連接pd重試提交
		if regionErr != nil {
			err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
			if err != nil {
				return err
			}
			return c.prewriteKeys(bo, batch.keys)
		}
		prewriteResp := resp.Prewrite
		if prewriteResp == nil {
			return errors.WithStack(rpc.ErrBodyMissing)
		}
        // prewrite成功
		keyErrs := prewriteResp.GetErrors()
		if len(keyErrs) == 0 {
			return nil
		}
		var locks []*Lock
		for _, keyErr := range keyErrs {
			// Check already exists error
			if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
				return errors.WithStack(ErrKeyAlreadyExist(alreadyExist.GetKey()))
			}

			// Extract lock from key error
            // 如果被加鎖,需要識別出加鎖的key,然后嘗試解鎖或者等待釋放。
			lock, err1 := extractLockFromKeyErr(keyErr, c.conf.Txn.DefaultLockTTL)
			if err1 != nil {
				return err1
			}
			log.Debugf("con:%d 2PC prewrite encounters lock: %v", c.ConnID, lock)
			locks = append(locks, lock)
		}
		start := time.Now()
		ok, err := c.store.GetLockResolver().ResolveLocks(bo, locks)
		if err != nil {
			return err
		}
		atomic.AddInt64(&c.detail.ResolveLockTime, int64(time.Since(start)))
		if !ok {
			err = bo.Backoff(retry.BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
			if err != nil {
				return err
			}
		}
	}
}

commitSingleBatch:

func (c *TxnCommitter) commitSingleBatch(bo *retry.Backoffer, batch batchKeys) error {
    // 組裝rpc請求並且發送
	req := &rpc.Request{
		Type: rpc.CmdCommit,
		Commit: &pb.CommitRequest{
			StartVersion:  c.startTS,
			Keys:          batch.keys,
			CommitVersion: c.commitTS,
		},
		Context: pb.Context{
			Priority: c.Priority,
			SyncLog:  c.SyncLog,
		},
	}
	req.Context.Priority = c.Priority

	sender := rpc.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetRPCClient())
	resp, err := sender.SendReq(bo, req, batch.region, c.conf.RPC.ReadTimeoutShort)

	// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
	// transaction has been successfully committed.
	// Under this circumstance,  we can not declare the commit is complete (may lead to data lost), nor can we throw
	// an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best
	// solution is to populate this error and let upper layer drop the connection to the corresponding mysql client.
	isPrimary := bytes.Equal(batch.keys[0], c.primary())
	// 這就是Execute函數中判斷的UndeterminedErr的值的設置,如果收到的RPC回復錯誤,則事務狀態是未定的,有可能提交了,也可能處於未提交狀態。
	if isPrimary && sender.RPCError() != nil {
		c.setUndeterminedErr(sender.RPCError())
	}

	if err != nil {
		return err
	}
	regionErr, err := resp.GetRegionError()
	if err != nil {
		return err
	}
	if regionErr != nil {
		err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
		if err != nil {
			return err
		}
		// re-split keys and commit again.
		// region錯誤則進行重試
		return c.commitKeys(bo, batch.keys)
	}
	commitResp := resp.Commit
	if commitResp == nil {
		return errors.WithStack(rpc.ErrBodyMissing)
	}
	// Here we can make sure tikv has processed the commit primary key request. So
	// we can clean undetermined error.
	if isPrimary {
	    // primary key提交成功則可以認為事務已經提交
		c.setUndeterminedErr(nil)
	}
	if keyErr := commitResp.GetError(); keyErr != nil {
		c.mu.RLock()
		defer c.mu.RUnlock()
		err = errors.Errorf("con:%d 2PC commit failed: %v", c.ConnID, keyErr.String())
		if c.mu.committed {
			// No secondary key could be rolled back after it's primary key is committed.
			// There must be a serious bug somewhere.
			log.Errorf("2PC failed commit key after primary key committed: %v, tid: %d", err, c.startTS)
			return err
		}
		// The transaction maybe rolled back by concurrent transactions.
		log.Debugf("2PC failed commit primary key: %v, retry later, tid: %d", err, c.startTS)
		return errors.WithMessage(err, TxnRetryableMark)
	}

	c.mu.Lock()
	defer c.mu.Unlock()
	// Group that contains primary key is always the first.
	// We mark transaction's status committed when we receive the first success response.
	// commited設為true,由於secondary的提交一定在primary之后,所以這里secondary的提交也可以設置為true
	c.mu.committed = true
	return nil
}

讀取

TiKVSnapshot

TiKVSnapshot顧名思義代表一個事務的快照,Transaction的snapshot成員就是該類型。在進行Get操作時,事務首先從本地緩存查詢是否存在該key,如果不存在就調用TiKVSnapshot的get方法獲取

func (s *TiKVSnapshot) get(bo *retry.Backoffer, k key.Key) ([]byte, error) {
	sender := rpc.NewRegionRequestSender(s.store.GetRegionCache(), s.store.GetRPCClient())

	req := &rpc.Request{
		Type: rpc.CmdGet,
		Get: &pb.GetRequest{
			Key:     k,
			Version: s.ts,
		},
		Context: pb.Context{
			Priority:     s.Priority,
			NotFillCache: s.NotFillCache,
		},
	}
	for {
		loc, err := s.store.regionCache.LocateKey(bo, k)
		if err != nil {
			return nil, err
		}
		resp, err := sender.SendReq(bo, req, loc.Region, s.conf.RPC.ReadTimeoutShort)
		if err != nil {
			return nil, err
		}
		regionErr, err := resp.GetRegionError()
		if err != nil {
			return nil, err
		}
		// 如果本地緩存的region的信息錯誤,則重新獲取新的region信息
		if regionErr != nil {
			err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
			if err != nil {
				return nil, err
			}
			continue
		}
		cmdGetResp := resp.Get
		if cmdGetResp == nil {
			return nil, errors.WithStack(rpc.ErrBodyMissing)
		}
		val := cmdGetResp.GetValue()
		if keyErr := cmdGetResp.GetError(); keyErr != nil {
			lock, err := extractLockFromKeyErr(keyErr, s.conf.Txn.DefaultLockTTL)
			if err != nil {
				return nil, err
			}
			// 如果有鎖存在則嘗試解鎖
			ok, err := s.store.lockResolver.ResolveLocks(bo, []*Lock{lock})
			if err != nil {
				return nil, err
			}
			if !ok {
				err = bo.Backoff(retry.BoTxnLockFast, errors.New(keyErr.String()))
				if err != nil {
					return nil, err
				}
			}
			continue
		}
		return val, nil
	}
}

Tikv服務端實現

在論文的原始Percolator事務模型中,存儲層不需要對分布式事務有任何感知,只需要支持單行事務,但是出於性能考慮Tikv在存儲層做了很多的優化,以減輕開銷。

寫入

當客戶端,經過執行框架的一系列調度,最終會來到寫入操作的入口process_write_impl

fn process_write_impl<S: Snapshot>(
    cmd: Command,
    snapshot: S,
    statistics: &mut Statistics,
) -> Result<(Context, ProcessResult, Vec<Modify>, usize)> {
    let (pr, modifies, rows, ctx) = match cmd {
    // prewrite入口
        Command::Prewrite {
            ctx,
            mutations,
            primary,
            start_ts,
            options,
            ..
        } => {
            // 新建一個本地事務,通過這個事務來進行操作
            let mut txn = MvccTxn::new(snapshot, start_ts, !ctx.get_not_fill_cache())?;
            let mut locks = vec![];
            let rows = mutations.len();
            for m in mutations {
                // 對每行進行prewrite
                match txn.prewrite(m, &primary, &options) {
                    Ok(_) => {}
                    // 如果已經有其他事務將該行加鎖,應該將加鎖的鎖信息保存后返回
                    e @ Err(MvccError::KeyIsLocked { .. }) => {
                        locks.push(e.map_err(Error::from).map_err(StorageError::from));
                    }
                    // 有錯誤則直接返回錯誤
                    Err(e) => return Err(Error::from(e)),
                }
            }

            statistics.add(&txn.take_statistics());
            // 返回加鎖信息
            if locks.is_empty() {
                let pr = ProcessResult::MultiRes { results: vec![] };
                // 獲取所有修改操作
                let modifies = txn.into_modifies();
                (pr, modifies, rows, ctx)
            } else {
                // Skip write stage if some keys are locked.
                let pr = ProcessResult::MultiRes { results: locks };
                (pr, vec![], 0, ctx)
            }
        }
        Command::Commit {
            ctx,
            keys,
            lock_ts,
            commit_ts,
            ..
        } => {
            if commit_ts <= lock_ts {
                return Err(Error::InvalidTxnTso {
                    start_ts: lock_ts,
                    commit_ts,
                });
            }
            // commit同樣需要新建一個本地事務進行操作
            let mut txn = MvccTxn::new(snapshot, lock_ts, !ctx.get_not_fill_cache())?;
            let rows = keys.len();
            for k in keys {
                // 對每一個key調用commit
                txn.commit(k, commit_ts)?;
            }

            statistics.add(&txn.take_statistics());
            (ProcessResult::Res, txn.into_modifies(), rows, ctx)
        }
        Command::Cleanup {
            ctx, key, start_ts, ..
        } => {
            // 新建本地事務清除廢棄的鎖,比如primary已經提交的secondary鎖
            let mut txn = MvccTxn::new(snapshot, start_ts, !ctx.get_not_fill_cache())?;
            // 使用本地事務的rollback接口
            txn.rollback(key)?;

            statistics.add(&txn.take_statistics());
            (ProcessResult::Res, txn.into_modifies(), 1, ctx)
        }
        Command::Rollback {
            ctx,
            keys,
            start_ts,
            ..
        } => {
            let mut txn = MvccTxn::new(snapshot, start_ts, !ctx.get_not_fill_cache())?;
            let rows = keys.len();
            // 調用Rollback清除所有相關的鎖
            for k in keys {
                txn.rollback(k)?;
            }

            statistics.add(&txn.take_statistics());
            (ProcessResult::Res, txn.into_modifies(), rows, ctx)
        }
        Command::ResolveLock {
            ctx,
            txn_status,
            mut scan_key,
            key_locks,
        } => {
            let mut scan_key = scan_key.take();
            let mut modifies: Vec<Modify> = vec![];
            let mut write_size = 0;
            let rows = key_locks.len();
            for (current_key, current_lock) in key_locks {
                // 對每一個lock新建事務釋放它
                let mut txn =
                    MvccTxn::new(snapshot.clone(), current_lock.ts, !ctx.get_not_fill_cache())?;
                let status = txn_status.get(&current_lock.ts);
                let commit_ts = match status {
                    Some(ts) => *ts,
                    None => panic!("txn status {} not found.", current_lock.ts),
                };
                if commit_ts > 0 {
                    if current_lock.ts >= commit_ts {
                        return Err(Error::InvalidTxnTso {
                            start_ts: current_lock.ts,
                            commit_ts,
                        });
                    }
                    txn.commit(current_key.clone(), commit_ts)?;
                } else {
                    txn.rollback(current_key.clone())?;
                }
                write_size += txn.write_size();

                statistics.add(&txn.take_statistics());
                modifies.append(&mut txn.into_modifies());

                if write_size >= MAX_TXN_WRITE_SIZE {
                    scan_key = Some(current_key);
                    break;
                }
            }
            let pr = if scan_key.is_none() {
                ProcessResult::Res
            } else {
                ProcessResult::NextCommand {
                    cmd: Command::ResolveLock {
                        ctx: ctx.clone(),
                        txn_status,
                        scan_key: scan_key.take(),
                        key_locks: vec![],
                    },
                }
            };
            (pr, modifies, rows, ctx)
        }
        Command::Pause { ctx, duration, .. } => {
            thread::sleep(Duration::from_millis(duration));
            (ProcessResult::Res, vec![], 0, ctx)
        }
        _ => panic!("unsupported write command"),
    };

    Ok((ctx, pr, modifies, rows))
}

這個入口函數比較長,但很明確的分為了數個分支,其中每個分支對於數據的修改都新建了一個本地事務MvccTxn來進行,例如對於prewrite和commit而言,就是對於每一個修改再調用MvccTxn來執行。下面是進行prewrite的代碼

    pub fn prewrite(
        &mut self,
        mutation: Mutation,
        primary: &[u8],
        options: &Options,
    ) -> Result<()> {
        let lock_type = LockType::from_mutation(&mutation);
        let (key, value, should_not_exist) = match mutation {
            Mutation::Put((key, value)) => (key, Some(value), false),
            Mutation::Delete(key) => (key, None, false),
            Mutation::Lock(key) => (key, None, false),
            Mutation::Insert((key, value)) => (key, Some(value), true),
        };

        {
            if !options.skip_constraint_check {
                if let Some((commit, write)) = self.reader.seek_write(&key, u64::max_value())? {
                    // Abort on writes after our start timestamp ...
                    // If exists a commit version whose commit timestamp is larger than or equal to
                    // current start timestamp, we should abort current prewrite, even if the commit
                    // type is Rollback.
                    // 判斷沖突,是否有時間戳大於自己開始時間戳的已提交數據
                    // 如果有則寫入失敗
                    if commit >= self.start_ts {
                        MVCC_CONFLICT_COUNTER.prewrite_write_conflict.inc();
                        return Err(Error::WriteConflict {
                            start_ts: self.start_ts,
                            conflict_start_ts: write.start_ts,
                            conflict_commit_ts: commit,
                            key: key.to_raw()?,
                            primary: primary.to_vec(),
                        });
                    }
                    // 對於插入操作需要先判斷key是否已經存在
                    if should_not_exist {
                        if write.write_type == WriteType::Put
                            || (write.write_type != WriteType::Delete
                                && self.key_exist(&key, write.start_ts - 1)?)
                        {
                            return Err(Error::AlreadyExist { key: key.to_raw()? });
                        }
                    }
                }
            }
            // ... or locks at any timestamp.
            if let Some(lock) = self.reader.load_lock(&key)? {
                // 已經被別人加鎖,需要返回錯誤,事務提交失敗
                if lock.ts != self.start_ts {
                    return Err(Error::KeyIsLocked {
                        key: key.to_raw()?,
                        primary: lock.primary,
                        ts: lock.ts,
                        ttl: lock.ttl,
                    });
                }
                // No need to overwrite the lock and data.
                // If we use single delete, we can't put a key multiple times.
                MVCC_DUPLICATE_CMD_COUNTER_VEC.prewrite.inc();
                return Ok(());
            }
        }

        if value.is_none() || is_short_value(value.as_ref().unwrap()) {
            self.lock_key(key, lock_type, primary.to_vec(), options.lock_ttl, value);
        } else {
            // value is long
            let ts = self.start_ts;
            // 寫入數據和鎖
            self.put_value(key.clone(), ts, value.unwrap());

            self.lock_key(key, lock_type, primary.to_vec(), options.lock_ttl, None);
        }

        Ok(())
    }

這里對於時間戳以及加鎖的檢查可以完整對應到percolator事務原型中prewrite的檢查邏輯,只是在原本模型中是由客戶端進行檢查,而tikv的實現中是client把對應的key打包好發送給所在的tikv,由tikv來具體執行檢查。如果在檢查中發現沖突則直接返回錯誤由客戶端決定接下來的行動,如果不存在沖突的,則寫入data CF和lock CF。在prewrite完成之后,調用本地事務的commit進行提交。

    pub fn commit(&mut self, key: Key, commit_ts: u64) -> Result<()> {
        let (lock_type, short_value) = match self.reader.load_lock(&key)? {
           // 只有這行存在鎖,並且這個鎖是自己所加的才能進行提交(通過所得時間戳判斷)
            Some(ref mut lock) if lock.ts == self.start_ts => {
                (lock.lock_type, lock.short_value.take())
            }
            _ => {
                // 本事務不再持有鎖,需要返回錯誤
                return match self.reader.get_txn_commit_info(&key, self.start_ts)? {
                    Some((_, WriteType::Rollback)) | None => {
                        MVCC_CONFLICT_COUNTER.commit_lock_not_found.inc();
                        // None: related Rollback has been collapsed.
                        // Rollback: rollback by concurrent transaction.
                        info!(
                            "txn conflict (lock not found)";
                            "key" => %key,
                            "start_ts" => self.start_ts,
                            "commit_ts" => commit_ts,
                        );
                        Err(Error::TxnLockNotFound {
                            start_ts: self.start_ts,
                            commit_ts,
                            key: key.as_encoded().to_owned(),
                        })
                    }
                    // Committed by concurrent transaction.
                    Some((_, WriteType::Put))
                    | Some((_, WriteType::Delete))
                    | Some((_, WriteType::Lock)) => {
                        MVCC_DUPLICATE_CMD_COUNTER_VEC.commit.inc();
                        Ok(())
                    }
                };
            }
        };
        let write = Write::new(
            WriteType::from_lock_type(lock_type),
            self.start_ts,
            short_value,
        );
        // 在write列寫入提交信息並且清除持有的鎖
        self.put_write(key.clone(), commit_ts, write.to_bytes());
        self.unlock_key(key);
        Ok(())
    }

在提交的時候先進行檢查,如果不再持有鎖(可能因為超時被其他事務清除)則返回錯誤。如果依然持有,則寫入write列並且清除鎖。

讀取

當客戶端進行讀取的時候,如percolator的原型所示,通過startTS和其他事務的commitTS的大小對比來判斷是否對數據可見,其基本邏輯在MvccReader::get:

    pub fn get(&mut self, key: &Key, mut ts: u64) -> Result<Option<Value>> {
        // Check for locks that signal concurrent writes.
        // 在這里可以看到如果是SI隔離級別才需要檢查是否被加鎖
        // 如果是RC隔離級別則直接返回找到的最近的一份已提交的數據
        match self.isolation_level {
            IsolationLevel::SI => ts = self.check_lock(key, ts)?,
            IsolationLevel::RC => {}
        }
        if let Some(mut write) = self.get_write(key, ts)? {
            if write.short_value.is_some() {
                if self.key_only {
                    return Ok(Some(vec![]));
                }
                return Ok(write.short_value.take());
            }
            match self.load_data(key, write.start_ts)? {
                None => {
                    return Err(default_not_found_error(key.to_raw()?, write, "get"));
                }
                Some(v) => return Ok(Some(v)),
            }
        }
        Ok(None)
    }

可以看出在SI隔離下會進行lock的檢查,如果發現被其他事務鎖定,則返回錯誤由客戶端判斷是否進行清除或者等待。檢查鎖的邏輯如下

    fn check_lock(&mut self, key: &Key, ts: u64) -> Result<u64> {
    // 如果存在鎖則進行具體的檢查邏輯
        if let Some(lock) = self.load_lock(key)? {
            return self.check_lock_impl(key, ts, lock);
        }
        Ok(ts)
    }

    fn check_lock_impl(&self, key: &Key, ts: u64, lock: Lock) -> Result<u64> {
        // 如果lock是時間戳大於自己的時間戳,則表示加鎖的事務開始於當前事務之后,不應該看到這個事務的數據。
        if lock.ts > ts || lock.lock_type == LockType::Lock {
            // ignore lock when lock.ts > ts or lock's type is Lock
            return Ok(ts);
        }

        if ts == std::u64::MAX && key.to_raw()? == lock.primary {
            // when ts==u64::MAX(which means to get latest committed version for
            // primary key),and current key is the primary key, returns the latest
            // commit version's value
            return Ok(lock.ts - 1);
        }

        // There is a pending lock. Client should wait or clean it.
        // 如果有開始於自己之前的事務所加的鎖,則需要客戶端的事務模塊判斷是等待鎖還是清除鎖
        Err(Error::KeyIsLocked {
            key: key.to_raw()?,
            primary: lock.primary,
            ts: lock.ts,
            ttl: lock.ttl,
        })
    }

這里的邏輯也可以對應到論文中percolator的讀取檢查鎖的邏輯,如果已經加鎖則返回Error::KeyIsLocked錯誤。

上面的RC級別會有一個問題,由於Percolator是異步釋secondary key的鎖,因此可能某個事務已經提交了,但它的secondary key的鎖還沒釋放。如果此時有另一個讀事務去讀取這行數據,那么在RC級別下讀事務按理是應該能讀取到前一個事務寫入的數據的,但是由於secondary key的鎖沒有釋放無法確定上一個事務已經提交,它直接尋找了更早的已經提交的版本而未能讀取到。甚至有可能出現在同一個client中,一個事務已經提交但是隨后的事務卻無法看到它寫入的數據的情況。顯然,這樣違反了線性一致。向pingCAP官方的開發人員咨詢后確認了這一問題,官方說他們以后將會改變RC隔離級別的邏輯。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM