TiKV事務實現淺析
Percolator事務的理論基礎
Percolator的來源
Percolator事務來源於Google在設計更新網頁索引的系統時提出的論文Large-scale Incremental Processing Using Distributed Transactions and Notifications中,Google用它在支持單行事務的分布式數據庫Bigtable的基礎上實現跨節點的分布式事務。Percolator是一種優化版的2PC,但是與 常見的2PC不同,它並沒有一個單獨的coodinator的角色,而是作為一個庫將所有邏輯放在客戶端實現,只需要下層存儲支持單行事務即可。原始的Percolator事務模型中,下層的存儲節點可以對於上層事務完全無感知。
為了確定事務的先后順序,Percolator還要求一個全局的授時中心,用於獲取全局有序的遞增時間戳(比如TiDB中的pd組件)。
隔離級別
Percolator事務實現了SI隔離級別(TiDB中將它作為RR)。每個事務都從授時中心獲取兩個時間戳:startTS 和 commitTS,startTS 在事務開始時獲取,commitTS在事務結束時獲取,事務之間通過這兩個時間戳來確定先后。例如有兩個事務T1和T2,如果T1的commitTS小於T2的startTS,則認為T1發生在T2之前 ,如果兩個事務的時間戳區間[startTS, commitTS]存在交叉,則兩個事務是並發的。在SI隔離級別下一個事務只應該看到commitTS小於自己的startTS的事務所寫入的數據。
例如上圖,最上面的橫軸代表時間,下面三條橫線分別代表三個事務T1,T2,T3,方框代表startTS,黑點代表commitTS。則T2不能讀取到T1寫入的數據,而T3能讀取到T1和T2寫入的數據。
存儲模型
Percolator的存儲基於Bigtable,其存儲模型有列族的概念(CF),同一個列族的數據存儲在一起。每個邏輯上的行分為多個列族,每個列族可以分為多個列,而其中每一列的數據以時間戳倒序排序。典型的行如下圖所示:
Key | Data | Lock | Write |
---|---|---|---|
Bob | 6: 5:$10 |
6: 5: |
6:data@5 5: |
Joe | 6: 5:$2 |
6: 5: |
6:data@5 5: |
key為整個行的key,data為該行的數據。而Percolator要求額外的兩個CF為:Lock和Write。Lock顧名思義表示該行的鎖,而write的版本號表示寫入這行數據的事務提交的時候時間戳commitTS。以Bob行為例,Key為Bob用於唯一確定該行,此時Bob沒有被加鎖Lock為空,在版本號為6的Write CF中有數據data@5,表示對應的數據在Data CF中版本號為5的地方。寫入這行數據的事務startTS為5,commitTS為6。這里的Write CF盡管看上去額外占了一行,並不會占據額外的整行空間。
基本步驟
總體來說,TiKV 的讀寫事務分為兩個階段:1、Prewrite 階段;2、Commit 階段。
客戶端會緩存本地的寫操作,在客戶端調用 client.Commit() 時,開始進入分布式事務 prewrite 和 commit 流程。
Prewrite 對應傳統 2PC 的第一階段
-
首先在所有行的寫操作中選出一個作為 primary row,其他的為 secondary rows
-
PrewritePrimary: 對 primaryRow 寫入鎖以及數據,鎖中記錄本次事務的開始時間戳。上鎖前會檢查:
- 該行是否已經有別的客戶端已經上鎖 (Locking)
- 是否在本次事務開始時間之后,檢查versions ,是否有更新 [startTs, +Inf) 的寫操作已經提交 (Conflict)
在這兩種種情況下會返回事務沖突。否則,就成功上鎖。將行的內容寫入 row 中,版本設置為 startTs
-
將 primaryRow 的鎖上好了以后,進行 secondaries 的 prewrite 流程:
- 類似 primaryRow 的上鎖流程,只不過鎖的內容為事務開始時間 startTs 及 primaryRow 的信息
- 檢查的事項同 primaryRow 的一致
- 當鎖成功寫入后,寫入 row,時間戳設置為 startTs
以上 Prewrite 流程任何一步發生錯誤,都會進行回滾:刪除 Lock 標記 , 刪除版本為 startTs 的數據。
當 Prewrite 階段完成以后,進入 Commit 階段,當前時間戳為 commitTs,TSO 會保證 commitTs > startTS
Commit 的流程對應 2PC 的第二階段
- commit primary: 寫入 write CF, 添加一個新版本,時間戳為 commitTs,內容為 startTs, 表明數據的最新版本是 startTs 對應的數據
- 刪除 Lock 標記
值得注意的是,如果 primary row 提交失敗的話,全事務回滾,回滾邏輯同 prewrite 失敗的回滾邏輯。
如果 commit primary 成功,則可以異步的 commit secondaries,流程和 commit primary 一致, 失敗了也無所謂。Primary row 提交的成功與否標志着整個事務是否提交成功。
事務中的讀操作
- 檢查該行是否有 Lock 標記,如果有,表示目前有其他事務正占用此行,如果這個鎖已經超時則嘗試清除,否則等待超時或者其他事務主動解鎖。注意此時不能直接返回老版本的數據。
- 讀取至 startTs 時該行最新的數據,找到最近的時間戳小於startTS的write CF,從其中讀取版本號t,讀取為於 t 版本的數據內容。
由於鎖是分兩級的,Primary 和 Seconary row,只要 Primary row 的鎖去掉,就表示該事務已經成功提交,這樣的好處是 Secondary 的 commit 是可以異步進行的,只是在異步提交進行的過程中,如果此時有讀請求,可能會需要做一下鎖的清理工作。因為即使 Secondary row 提交失敗,也可以通過 Secondary row 中的鎖,找到 Primary row,根據檢查 Primary row 的 meta,確定這個事務到底是被客戶端回滾還是已經成功提交。
轉賬示例
下面以論文中轉賬的一個例子來展示大體流程,以上面的Bob和Joe為例,假設Bob要轉賬7元給Joe。
Prewrite
首先需要隨機選擇一行最為primaryRow ,這里選擇Bob。以事務開始時間戳為版本號,寫入Lock與數據
Key | Data | Lock | Write |
---|---|---|---|
Bob | 7:\$3 6: 5:$10 |
7:I am Primary 6: 5: |
7: 6:data@5 5: |
Joe | 6: 5:$2 |
6: 5: |
6:data@5 5: |
從上圖可以看出轉賬事務的startTS為7,所以寫入了版本號為7的Lock與Bob的新數據,Lock中有表示自己是primaryLock的標志。隨后進行secondary rows的上鎖,這里只有Joe。
Key | Data | Lock | Write |
---|---|---|---|
Bob | 7:\$3 6: 5:$10 |
7:I am Primary 6: 5: |
7: 6:data@5 5: |
Joe | 7:\$9 6: 5:$2 |
7:primary@Bob 6: 5: |
7: 6:data@5 5: |
Joe的Lock中保存了primary的信息,用於找到這次提交的primary row Bob。
如果在prewrite的過程中檢測到了沖突,則整個事務需要進行回滾。例如,在此時另一個事務的startTS為8,試圖對Bob進行加鎖,發現已經被startTS為7的事務加鎖,則該事務會檢測到沖突,事務回滾。也有可能發現在自己startTS以后,已經有事務提交了新的數據,出現了大於startTS的write,此時事務也需要回滾。
Commit
首先commit primary row,客戶端通過Bigtable的單行事務,清除primary行的鎖,並且以提交時間戳在write寫入提交標志。
key | data | lock | write |
---|---|---|---|
Bob | 8: 7:\$3 6: 5:$10 |
8: 7: 6: 5: |
8:data@7 7: 6:data@5 5: |
Joe | 7:\$9 6: 5:$2 |
7:primary@Bob 6: 5: |
7: 6:data@5 5: |
primary row的Write CF的寫入是整個事務提交的標志,這個操作的完成就意味着事務已經完成提交了。
write中寫入的數據指向Bob真正存放余額的地方。完成這一步就可以向客戶端返回事務commit成功了。
接下可以異步釋放secondary rows的鎖。如果在commit階段發現primary鎖已經不存在(可能因為超時被其他事務清除),則提交失敗,事務回滾。
key | data | lock | write |
---|---|---|---|
Bob | 8: 7:\$3 6: 5:$10 |
8: 7: 6: 5: |
8:data@7 7: 6:data@5 5: |
Joe | 8: 7:\$9 6: 5:$2 |
8: 7: 6: 5: |
8data@7: 7: 6:data@5 5: |
實際上,即使在執行這一步前,客戶端掛了而沒能處理這些行的鎖也沒有問題。當其他事務讀取到這樣的行的數據的時候,通過鎖可以找出primary行,從而判斷出事務的狀態,如果已經提交,則可以清除鎖寫入提交標志。
偽代碼
論文中用C++風格的偽代碼進行了Percolator事務流程的表達,整個事務被封裝成了一個class,先來看其中需要用到的成員:
class Transaction {
// Write結構體表示一個寫入操作,哪個key下的哪一個列,寫入什么值
struct Write { Row row; Column col; string value; };
// writes 則為在這個事務中緩存的所有寫入的集合
vector<Write> writes ;
// 事務開始的時間戳ST
int start ts ;
// 事務建立的時候獲取開始時間戳
Transaction() : start ts (oracle.GetTimestamp())
//下面是各個實現函數,見下文
...
}
事務中需要用到的數據結構比較少,只保存了事務開始的時間戳和寫入集合。
Perwrite階段的偽代碼:
bool Prewrite(Write w, Write primary) {
// 列族名
Column c = w.col;
// google的Percolator基於bigtable的單行事務,因此這里用bigtable::Txn表示發起單行事務
bigtable::Txn T = bigtable::StartRowTransaction(w.row);
// Abort on writes after our start timestamp . . .
// 在自己事務開啟之后是否有新提交的數據
if (T.Read(w.row, c+"write", [start ts, +Inf])) return false;
// . . . or locks at any timestamp.
// 是否已經被其他事務加鎖
if (T.Read(w.row, c+"lock", [0, 1])) return false;
T.Write(w.row, c+"data", start ts , w.value);
T.Write(w.row, c+"lock", start ts ,
{primary.row, primary.col}); // The primary’s location.
return T.Commit();
}
prewite階段如上文所說,在進行沖突檢測后寫入了Lock和數據。這是對某一行進行prewrite的函數,在整個提交階段被多次調用。偽代碼中的commit代表整個percolator事務的提交:
bool Commit() {
Write primary = writes[0];
vector<Write> secondaries(writes .begin()+1, writes .end());
// 對所有參與事務的行執行Prewrite
// 先對隨機選出的某一個primary行加鎖,再對其他行加鎖。
if (!Prewrite(primary, primary)) return false;
for (Write w : secondaries)
if (!Prewrite(w, primary)) return false;
// 獲取提交時間戳commitTS
int commit ts = oracle .GetTimestamp();
// Commit primary first.
Write p = primary;
bigtable::Txn T = bigtable::StartRowTransaction(p.row);
// 失去了鎖,可能被別人終止了,事務回滾
if (!T.Read(p.row, p.col+"lock", [start ts , start ts ]))
return false; // aborted while working
// 向primary行的Write CF寫入提交標志,時間戳為commitTS
T.Write(p.row, p.col+"write", commit ts,
start ts ); // Pointer to data written at start ts .
//擦除primary的鎖
T.Erase(p.row, p.col+"lock", commit ts);
if (!T.Commit()) return false; // commit point
// Second phase: write out write records for secondary cells.
// 在其他行同樣進行寫入Write CF 並且擦除鎖
for (Write w : secondaries) {
bigtable::Write(w.row, w.col+"write", commit ts, start ts );
bigtable::Erase(w.row, w.col+"lock", commit ts);
}
return true;
}
讀取操作的偽代碼如下:
bool Get(Row row, Column c, string* value) {
while (true) {
bigtable::Txn T = bigtable::StartRowTransaction(row);
// Check for locks that signal concurrent writes.
if (T.Read(row, c+"lock", [0, start ts ])) {
// There is a pending lock; try to clean it and wait
// 注意,這里如果鎖沒有超時的情況下不能直接返回最近的可見數據
// 必須等待持鎖事務commit或者回滾,直到超時清除它
BackoffAndMaybeCleanupLock(row, c);
continue;
}
// 按照時間戳讀取最新的可見的數據
// Find the latest write below our start timestamp.
latest write = T.Read(row, c+"write", [0, start ts ]);
if (!latest write.found()) return false; // no data
int data ts = latest write.start timestamp();
*value = T.Read(row, c+"data", [data ts, data ts]);
return true;
}
}
在讀取操作的時候需要注意遇到鎖的情況,如果檢查primary row發現事務已經提交則可以由自己清除secondary row的鎖。如果有沒有commit的其他事務持有鎖,不能夠直接返回最新的對自己可見的數據。如上面轉賬的例子中正處於prewrite階段,此時另一個startTS為9的事務來進行對Joe的讀取操作:
Key | Data | Lock | Write |
---|---|---|---|
Bob | 7:\$3 6: 5:$10 |
7:I am Primary 6: 5: |
7: 6:data@5 5: |
Joe | 7:\$9 6: 5:$2 |
7:primary@Bob 6: 5: |
7: 6:data@5 5: |
此時能否直接通過版本號為6的write中的信息,返回版本號為5中的數據呢?不能,因為在Percolator的模型中,事務的先后順序是通過邏輯時間戳來確定的,從Joe和Bob中我們只能夠得到持鎖事務的startTS,無法得知commitTS,而SI隔離級別要求我們應該讀取到commitTS小於9的事務寫入的數據,只有等到持鎖事務提交,才能得知它的commitTS是小於9還是大於9。
TiDB事務與MySQL的區別
TiDB 使用樂觀事務模型,在執行 Update
、Insert
、Delete
等語句時,只有在提交過程中,執行 Update
,Insert
,Delete
等語句時才會檢查寫寫沖突,而不是像 MySQL 一樣使用行鎖來避免寫寫沖突。類似的, SELECT .. FOR UPDATE
之類的語句在 TiDB 和 MySQL 中的執行方式並不相同。TiDB的隔離級別的表現也與MySQL不盡相同,雖然TiDB也有對應MySQL的RR和RC隔離級別,但RR級別實質為SI級別,而RC隔離級別的則違反了線性一致。目前TiDB也在實現悲觀鎖的事務模型,但是官方文檔說還處於試驗階段尚不穩定,不建議開啟。
行為差異
由於tidb的基於的事務模型與mysql有較大區別,所以在實際使用中盡管協議兼容,但是事務的行為依然有比較大的區別。
寫入緩存在客戶端
從Peroclator的模型中可以看出所有的修改操作都先緩存在客戶端,只有在事務提交的時候才會進行沖突檢測。所以許多在mysql中會導致阻塞的操作在tidb中並不會。假設有一張表test96如下:
| test96 | CREATE TABLE `test96` (
`a` int(11) NOT NULL,
`b` int(11) DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
並向其中插入數據:
mysql> insert into test96 values(1,1),(2,2),(4,4);
Query OK, 3 rows affected (0.01 sec)
處於RR(SI)隔離級別下(tidb的RC隔離級別不完善,不建議使用,后文會提到),開啟兩個事務t1,t2進行如下操作
t1 | t2 | |
---|---|---|
step1 | begin; | begin; |
step2 | update test96 set b = 100 where a >= 2; | |
step3 | update test96 set b = 98 where a = 4; | |
step4 | commit | |
step5 | commit |
如果在mysql中,t2在step3的更新操作將會被t1阻塞,而當t1在step4 commit后,t2的后續操作也能成功。然而在tidb中,t1,t2的update操作都會返回成功。並且此時去select的話還可以發現數據已經被自己修改。
對於t1
mysql> select * from test96;
+---+------+
| a | b |
+---+------+
| 1 | 1 |
| 2 | 100 |
| 4 | 100 |
+---+------+
3 rows in set (0.00 sec)
對於t2
mysql> select * from test96;
+---+------+
| a | b |
+---+------+
| 1 | 1 |
| 2 | 2 |
| 4 | 98 |
+---+------+
3 rows in set (0.01 sec)
這是因為事務在讀取的過程中發現數據被本事務修改過,所以直接從本地緩存中讀取。
在step4,t1的commit可以成功
mysql> commit;
Query OK, 0 rows affected (0.00 sec)
而step5,t2 commit的時候會報錯
mysql> commit;
ERROR 1105 (HY000): [try again later]: [write conflict] txnStartTS 408979633880694785 is stale
如上文對percolator的描述,在提交的時候,事務進行了沖突檢測,發現事務沖突,因此t2被回滾。
自動重試
執行失敗的事務可以讓 TiDB 自動重試提交,但這可能會導致事務異常。當開啟自動重試的時候前文中的例子t1, t2的提交都會成功,這實際上與SI隔離級別的要求不符。有兩個參數與這個功能相關
tidb_disable_txn_auto_retry
作用域:SESSION | GLOBAL
默認值:1
這個變量用來設置是否禁用顯式事務自動重試,設置為 1 時,不會自動重試,如果遇到事務沖突需要在應用層重試。
這個變量不會影響自動提交的隱式事務和 TiDB 內部執行的事務,它們依舊會根據 tidb_retry_limit 的值來決定最大重試次數。
tidb_retry_limit
作用域:SESSION | GLOBAL
默認值:10
這個變量用來設置最多可重試次數,即在一個事務執行中遇到可重試的錯誤(例如事務沖突、事務提交過慢或表結構變更)時,這個事務可以被重新執行,這個變量值表明最多可重試的次數。
通過設置 tidb_disable_txn_auto_retry
可以控制該項功能,同時要注意 tidb_retry_limit
的值不能為 0,否則,也會禁用自動重試。
大事務
同時,由於受客戶端能緩存的數據量的限制,tidb對大事務的支持有限,在官方手冊中聲明了事務大小的限制:
- 單個事務包含的 SQL 語句不超過 5000 條(默認)
- 每個鍵值對不超過 6MB
- 鍵值對的總數不超過 300,000
- 鍵值對的總大小不超過 100MB
tidb中有個tidb_batch_insert參數可以自動將大事務分隔為一系列小事務執行
tidb_batch_insert
作用域: SESSION
默認值: 0
這個變量用來設置是否自動切分插入數據。僅在 autocommit 開啟時有效。 當插入大量數據時,可以將其設置為 1,這樣插入數據會被自動切分為多個 batch,每個 batch 使用一個單獨的事務進行插入。 該用法破壞了事務的原子性,因此,不建議在生產環境中使用。
小事務延時
由於 TiDB 中的每個事務都需要跟 PD leader 進行兩次 round trip以獲取時間戳,TiDB 中的小事務相比於 MySQL 中的小事務延遲更高。以如下的 query 為例,用顯式事務代替 auto_commit
,可優化該 query 的性能。
# 使用 auto_commit 的原始版本
UPDATE my_table SET a='new_value' WHERE id = 1;
UPDATE my_table SET a='newer_value' WHERE id = 2;
UPDATE my_table SET a='newest_value' WHERE id = 3;
# 優化后的版本
START TRANSACTION;
UPDATE my_table SET a='new_value' WHERE id = 1;
UPDATE my_table SET a='newer_value' WHERE id = 2;
UPDATE my_table SET a='newest_value' WHERE id = 3;
COMMIT;
把合理數量的更新打包到一個事務里有利於減小延時,但不宜過大,不能超過限制或者造成較多的沖突。
Load data
語法基本一致但是分隔符只支持‘//\‘
LOAD DATA LOCAL INFILE 'file_name' INTO TABLE table_name
{FIELDS | COLUMNS} TERMINATED BY 'string' ENCLOSED BY 'char' ESCAPED BY 'char'
LINES STARTING BY 'string' TERMINATED BY 'string'
IGNORE n LINES
(col_name ...);
而且,由於不支持大事務實際上數量大時,是把Load data分隔為了多個事務,默認將每 2 萬行記錄作為一個事務進行持久化存儲。如果一次 LOAD DATA
操作插入的數據超過 2 萬行,那么會分為多個事務進行提交。如果某個事務出錯,這個事務會提交失敗,但它前面的事務仍然會提交成功,在這種情況下,一次 LOAD DATA
操作會有部分數據插入成功,部分數據插入失敗。官方不建議在生產中使用Load data。
隔離級別
tidb實現了SI隔離級別,為了與MySQL保持一致,也稱其RR隔離級別。但其表現有區別。
丟失更新
mysql的RR隔離級別會存在丟失更新的情況,依然以表test96為例
| test96 | CREATE TABLE `test96` (
`a` int(11) NOT NULL,
`b` int(11) DEFAULT NULL,
PRIMARY KEY (`a`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin
mysql> insert into test96 values(1,1),(2,2),(4,4);
Query OK, 3 rows affected (0.01 sec)
執行兩個事務t1與t2,每個事務對a=2這一行數據的b列加1,我們希望b列記錄執行的事務的數量。
t1 | t2 | |
---|---|---|
step1 | begin; | begin; |
step2 | select b from test96 where a = 2; | select b from test96 where a = 2; |
step3 | update test96 set b = 3 where a = 2;(計數器+1) | |
step4 | update test96 set b = 3 where a= 2 (在mysql中這一步將會阻塞,但是tidb中會立即返回成功) | |
step5 | commit | (對於mysql,t1提交后t2此時可以繼續執行) |
step6 | commit (mysql提交成功,tidb會讓t2回滾報錯) |
在mysql中由於t2提交的時候其實t1已經提交寫入,t2用於計算的前提條件其實已經不成立了,t2的更新覆蓋了t1的更新,執行了兩個事務之后,b只加了1。而在tidb中,首先,由於寫入是緩存在本地,所以step4並不會造成阻塞,而在commit的時候,percolator的prewrite階段會檢查到b被更新過,t2提交失敗,回滾報錯。(就上面這種數值計算的特殊情況而言,如果在MySQL中用update test96 set b = b+1最后我們會得到正確的結果,這里只是為了方便舉例才用了讀取-計算-寫入的模式。)
幻讀
tidb的SI級別可以避免經典意義上的幻讀,在前文Percolator的讀取步驟中可以看出,只要數據的commitTS大於本事務的startTS,就不會被讀取到,在著名論文A critique of ANSI SQL isolation levels中對於經典幻讀的定義為
Transaction T1 reads a set of data items satisfying some <search condition>. Transaction T2 then creates data items that satisfy T1’s <search condition> and commits. If T1 then repeats its read with the same <search condition>, it gets a set of data items
different from the first read.
可見tidb不會出現經典意義上的幻讀
那么對於MySQL而言會嗎?一眼看去MySQL使用了MVCC機制,即使別人插入了一條滿足自己select篩選條件的數據,在RR隔離級別下由於ReadView不會更新的原因應該也不可能讀取到,然而並不是所有情況下都是如此,依然以上面的表test96為例
t1 | t2 | |
---|---|---|
step1 | begin; | begin; |
step2 | select b from test96 where a > 1; | |
step3 | insert into test96 values(3, 3); | |
step4 | commit; | |
step5 | select b from test96 where a > 1; | |
step6 | update test96 set b = 0 where a > 1; | |
step7 | select b from test96 where a > 1; |
對於step2與step5的select語句,mysql和tidb的行為一致,結果為
mysql> select * from test96 where a>1;
+---+------+
| a | b |
+---+------+
| 2 | 2 |
| 4 | 4 |
+---+------+
2 rows in set (0.01 sec)
而在step6的update之后,在step7再次進行select,tidb的結果行數依然看到的是a=2與a=4兩行
mysql> select * from test96 where a>1;
+---+------+
| a | b |
+---+------+
| 2 | 0 |
| 4 | 0 |
+---+------+
2 rows in set (0.00 sec)
而對於MySQL,卻能見到出乎意料的結果
mysql> select * from test96 where a>1;
+---+------+
| a | b |
+---+------+
| 2 | 0 |
| 3 | 0 |
| 4 | 0 |
+---+------+
3 rows in set (0.00 sec)
mysql中,t1的讀取操作雖然通過MVCC讀取不到t2的值,但是由於更新操作總是使用最新版本,a=3的這行數據也滿足條件,所以這行數據也被更新了。問題在於隱藏列trx_id的值也被更新了(或者說根本就是一個新的版本),因此進行讀取的時候這行數據會讀取到t1寫入的版本,所以我們可以讀取到一條意外出現的記錄。事實上,我們在mysql中進行step6的update操作的時候,可以看到雖然只能讀取到2行,但update影響行數為3行。
mysql> select * from test96 where a>1;
+---+------+
| a | b |
+---+------+
| 2 | 2 |
| 4 | 4 |
+---+------+
2 rows in set (0.00 sec)
mysql> update test96 set b = 0 where a>1;
Query OK, 3 rows affected (0.00 sec)
Rows matched: 3 Changed: 3 Warnings: 0
而在tidb中只更新了自己讀取到的2行
mysql> select * from test96 where a>1;
+---+------+
| a | b |
+---+------+
| 2 | 2 |
| 4 | 4 |
+---+------+
2 rows in set (0.00 sec)
mysql> update test96 set b=0 where a > 1;
Query OK, 2 rows affected (0.00 sec)
Rows matched: 2 Changed: 2 Warnings: 0
而在t1提交后,從tidb再次select可以查到a=3這條數據
mysql> select * from test96 where a>1;
+---+------+
| a | b |
+---+------+
| 2 | 0 |
| 3 | 3 |
| 4 | 0 |
+---+------+
3 rows in set (0.01 sec)
所以MySQL依然存在經典意義上幻讀
寫偏序
ANSI定義的RR隔離級別下不會發生寫偏序,tidb與mysql的RR隔離級別下都可能存在寫偏序。依然以上面test96表為例,假設我們的約束要求b > 1的記錄數量不能為0。
t1 | t2 | |
---|---|---|
step1 | begin; | begin; |
step2 | select count(*) from test96 where b > 1; | select count(*) from test96 where b > 1; |
step3 | if count > 1 {update test96 set b = 0 where a = 2;} | |
step4 | if count > 1 {update test96 set b = 0 where a = 4;} | |
step5 | commit | |
step6 | commit |
由於t1與t2沒有寫同一份數據,兩者在tidb和mysql中都會直接提交成功。但是最終test96中 b > 1的記錄數變成了0,這違背了我們的約束。
For update
tidb與mysql都支持for update 語法,但是行為表現有一定區別。對於mysql而言,for update是在讀取的時候對記錄實時的加鎖,阻塞其他試圖修改或者帶for update的語句。而對於tidb而言,for update本質上只是在本地緩存一個寫入,只是寫入的只有一個鎖,沒有數據, 相當於對符合條件的記錄進行了一次沒有數據的寫入。同時,tidb目前尚不支持謂詞鎖或類似mysql的gap鎖,所以即使在RR隔離級別下使用for update, 也不能防止其他事務的insert操作。先來看一個test96表上的例子
t1 | t2 | |
---|---|---|
step1 | begin; | begin; |
step2 | select count(*) from test96 where a > 1 for update; | |
step3 | update test96 set b = 0 where a = 4 | |
step4 | commit | |
step5 | commit |
在mysql中t2在step3的update操作會被阻塞,直到t1提交,之后t2的執行會成功。然而在tidb中,t2的update操作會返回成功,而在t2提交的時候會報錯發生沖突,t2回滾。
mysql> update test96 set b = 0 where a = 4;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
mysql> commit;
ERROR 1105 (HY000): [try again later]: [write conflict] txnStartTS=409004592597827585, conflictTS=409004587970985985, key={tableID=530, handle=4} primary={tableID=530, handle=4}
同時由於tidb沒有謂詞鎖或gap鎖,不能鎖定范圍,如果把t2 step3的操作修改如下 ,t1與t2都將提交成功。
t1 | t2 | |
---|---|---|
step1 | begin; | begin; |
step2 | select count(*) from test96 where a > 1 for update; | |
step3 | insert into test96 values(3,3); | |
step4 | commit | |
step5 | commit |
Tikv中的事務實現
論文percolator原型中,所有的事務相關操作都在客戶端執行,服務端不需要進行任何特殊處理。但是在tikv中,服務端對於不同的階段做了不同的處理,以優化性能。
Tikv客戶端實現
Tikv本身作為一個分布式的kv存儲供計算層的tidb訪問,同時也提供了單獨client客戶端以直接使用tikv。獨立客戶端的邏輯與tidb中對tikv的訪問邏輯基本相同,許多代碼都是復用的,tidb多了一些生成binlog和保存統計信息等額外操作。下面以golang版本的client為例,介紹tikv客戶端的實現。
客戶端使用
客戶端的使用上非常簡潔,下面兩個官方示例中的函數對tikv進行了set和get操作
// key1 val1 key2 val2 ...
func puts(args ...[]byte) error {
tx, err := client.Begin()
if err != nil {
return err
}
for i := 0; i < len(args); i += 2 {
key, val := args[i], args[i+1]
err := tx.Set(key, val)
if err != nil {
return err
}
}
return tx.Commit(context.Background())
}
func get(k []byte) (KV, error) {
tx, err := client.Begin()
if err != nil {
return KV{}, err
}
v, err := tx.Get(k)
if err != nil {
return KV{}, err
}
return KV{K: k, V: v}, nil
}
從上面可以看出,事務api的使用比較簡潔,只需要Begin開啟一個事務,進行Get,Set等操作,然后Commit即可。
寫入
Transaction
Transaction是所有事務操作的入口,Transaction的結構體如下
// Transaction is a key-value transaction.
type Transaction struct {
// 用於訪問tikv
tikvStore *store.TiKVStore
snapshot *store.TiKVSnapshot
// 本地緩存數據
us kv.UnionStore
// 即percolator事務模型中的開始時間戳startTS,創建事務的時候從TSO獲取
startTS uint64
startTime time.Time // Monotonic timestamp for recording txn time consuming.
// 注意,這個commitTS並不是percolator中的commitTS,實際上在client中並沒有發現有使用這個變量
// percolator真正的commitTS在TxnCommitter中。在tidb中有類似的變量,用於保存提交時間戳供統計。
// 也許是從tidb中復用代碼的時候搬過來忘了刪除?
commitTS uint64
valid bool
lockKeys [][]byte
setCnt int64
}
Transaction的方法提供了Get,Set,Del等多種數據操作,所有的修改都將保存在本地緩存us中,在Commit的時候按照percolator的事務模型進行提交。下面是Commit的簡化代碼。
// Commit commits the transaction operations to KV store.
func (txn *Transaction) Commit(ctx context.Context) error {
···
if len(mutations) == 0 {
return nil
}
···
// committer是percolator事務具體提交的執行者,mutations為所有的修改集合
committer, err := store.NewTxnCommitter(txn.tikvStore, txn.startTS, txn.startTime, mutations)
if err != nil || committer == nil {
return err
}
// latch是一個特性,開啟的話同一個客戶端發起的請求如果有沖突可以先在本地解決,需要先獲取本地latch
// 用於解決沖突過於嚴重的情況
// latches disabled
if txn.tikvStore.GetTxnLatches() == nil {
// 未開啟latch則直接開始2pc提交
err = committer.Execute(ctx)
log.Debug("[kv]", txn.startTS, " txnLatches disabled, 2pc directly:", err)
return err
}
// latches enabled
// for transactions which need to acquire latches
start = time.Now()
lock := txn.tikvStore.GetTxnLatches().Lock(txn.startTS, committer.GetKeys())
localLatchTime := time.Since(start)
if localLatchTime > 0 {
metrics.LocalLatchWaitTimeHistogram.Observe(localLatchTime.Seconds())
}
defer txn.tikvStore.GetTxnLatches().UnLock(lock)
if lock.IsStale() {
err = errors.Errorf("startTS %d is stale", txn.startTS)
return errors.WithMessage(err, store.TxnRetryableMark)
}
// 開啟了latch則先獲取本地latch后再開始2PC
err = committer.Execute(ctx)
if err == nil {
lock.SetCommitTS(committer.GetCommitTS())
}
log.Debug("[kv]", txn.startTS, " txnLatches enabled while txn retryable:", err)
return err
}
在提交過程中,Transaction收集好所有的修改,將startTS和修改的數據傳遞給TxnCommitter,由TxnCommitter執行具體的提交過程。
TxnCommitter
TxnCommitter負責事務的提交過程,下面是其簡化結構體
// TxnCommitter executes a two-phase commit protocol.
type TxnCommitter struct {
// tikv訪問接口
store *TiKVStore
conf *config.Config
// 開始時間戳
startTS uint64
// 所有修改數據的key
keys [][]byte
// 所有修改操作
mutations map[string]*pb.Mutation
// 鎖的最長生命周期,持鎖時間太長可能會被其他事務清除,本事務回滾。
lockTTL uint64
// 事務的提交時間戳
commitTS uint64
mu struct {
sync.RWMutex
committed bool
// 這個標志位表示是否收到了無法確定事務的狀態的錯誤,例如因網絡原因未收到c事務的commit指令的回應
undeterminedErr error // undeterminedErr saves the rpc error we encounter when commit primary key.
}
cleanWg sync.WaitGroup
}
Txn的Execute函數是實際的提交執行流程,由外部調用(Transaction的Commit),下面是簡化流程
// Execute executes the two-phase commit protocol.
func (c *TxnCommitter) Execute(ctx context.Context) error {
// 如果事務沒有設置提交標志並且沒有發生無法確定事務狀態的錯誤(也就是說可以確定事務回滾了)
// 則清除可能已經prewrite的數據
defer func() {
// Always clean up all written keys if the txn does not commit.
c.mu.RLock()
committed := c.mu.committed
undetermined := c.mu.undeterminedErr != nil
c.mu.RUnlock()
// 如果可以確定事務未能提交,則嘗試異步的進行清理
if !committed && !undetermined {
c.cleanWg.Add(1)
go func() {
err := c.cleanupKeys(retry.NewBackoffer(context.Background(), retry.CleanupMaxBackoff), c.keys)
if err != nil {
log.Infof("con:%d 2PC cleanup err: %v, tid: %d", c.ConnID, err, c.startTS)
} else {
log.Infof("con:%d 2PC clean up done, tid: %d", c.ConnID, c.startTS)
}
c.cleanWg.Done()
}()
}
}()
// Backoffer是用於控制重試的結構體
prewriteBo := retry.NewBackoffer(ctx, retry.PrewriteMaxBackoff)
start := time.Now()
// 執行percolator的第一階段,prewrite
err := c.prewriteKeys(prewriteBo, c.keys)
if err != nil {
log.Debugf("con:%d 2PC failed on prewrite: %v, tid: %d", c.ConnID, err, c.startTS)
return err
}
// 獲取提交時間戳
commitTS, err := c.store.GetTimestampWithRetry(retry.NewBackoffer(ctx, retry.TsoMaxBackoff))
if err != nil {
log.Warnf("con:%d 2PC get commitTS failed: %v, tid: %d", c.ConnID, err, c.startTS)
return err
// check commitTS
if commitTS <= c.startTS {
err = errors.Errorf("con:%d Invalid transaction tso with start_ts=%v while commit_ts=%v",
c.ConnID, c.startTS, commitTS)
log.Error(err)
return err
}
c.commitTS = commitTS
// 事務消耗的時間不能超過設定的最大時間
if c.store.GetOracle().IsExpired(c.startTS, c.maxTxnTimeUse) {
err = errors.Errorf("con:%d txn takes too much time, start: %d, commit: %d", c.ConnID, c.startTS, c.commitTS)
return errors.WithMessage(err, TxnRetryableMark)
}
start = time.Now()
commitBo := retry.NewBackoffer(ctx, retry.CommitMaxBackoff)
// percolator第二階段,commit
err = c.commitKeys(commitBo, c.keys)
if err != nil {
···
return nil
}
Eexcute的主要邏輯就是對應的percolator的兩個步驟,中間做了一些合法性檢查,在函數的最后如果事務不能正常提交,則需要嘗試清除已經提交的數據。在上面的函數中可以看到prewriteKeys和commitKeys兩個函數,這兩個函數未內部兩個階段的對應。
func (c *TxnCommitter) prewriteKeys(bo *retry.Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionPrewrite, keys)
}
func (c *TxnCommitter) commitKeys(bo *retry.Backoffer, keys [][]byte) error {
return c.doActionOnKeys(bo, actionCommit, keys)
}
由於兩個階段的操作都有較多共性,可以通過batch的方式來提高性能,所以這里采用了同一個函數doActionOnKeys來進行處理。
func (c *TxnCommitter) doActionOnKeys(bo *retry.Backoffer, action commitAction, keys [][]byte) error {
if len(keys) == 0 {
return nil
}
groups, firstRegion, err := c.store.GetRegionCache().GroupKeysByRegion(bo, keys)
if err != nil {
return err
}
var batches []batchKeys
···
// Make sure the group that contains primary key goes first.
// 按照key的region分布和大小打包分組,每個分組的key可以保證在一個region
commitBatchSize := c.conf.Txn.CommitBatchSize
batches = appendBatchBySize(batches, firstRegion, groups[firstRegion], sizeFunc, commitBatchSize)
delete(groups, firstRegion)
for id, g := range groups {
batches = appendBatchBySize(batches, id, g, sizeFunc, commitBatchSize)
}
firstIsPrimary := bytes.Equal(keys[0], c.primary())
// 在commit階段的時候的時候,primaryRow需要先處理,因為primaryRow的提交被視為整個事務提交與否的標志。
if firstIsPrimary && (action == actionCommit || action == actionCleanup) {
// primary should be committed/cleanup first
err = c.doActionOnBatches(bo, action, batches[:1])
if err != nil {
return err
}
batches = batches[1:]
}
if action == actionCommit {
// Commit secondary batches in background goroutine to reduce latency.
// The backoffer instance is created outside of the goroutine to avoid
// potencial data race in unit test since `CommitMaxBackoff` will be updated
// by test suites.
secondaryBo := retry.NewBackoffer(context.Background(), retry.CommitMaxBackoff)
// 這里可以看出secondaryRows是異步提交的,只要primaryRow提交成功就會給客戶端返回成功
go func() {
e := c.doActionOnBatches(secondaryBo, action, batches)
if e != nil {
log.Debugf("con:%d 2PC async doActionOnBatches %s err: %v", c.ConnID, action, e)
}
}()
} else {
err = c.doActionOnBatches(bo, action, batches)
}
return err
}
doActionOnKeys將需要修改的key進行打包,交給doActionOnBatches進行處理,如果是commit階段,則需要保證primaryRow先進行處理。在上面還可以看出這里可以看出secondaryRows是異步提交的,只要primaryRow提交成功就會給客戶端返回成功。這造成了一個問題,在RC隔離級別下,tikv在查詢數據的時候並沒有檢查lock的信息,而是直接找一個最近的本地已經提交版本返回給客戶端,所以如果某個事務已經提交但secondaryRows還未釋放,此時另一個事務是無法讀取到這個事務寫入的數據的,這與正常的RC級別的表現不符。
func (c *TxnCommitter) doActionOnBatches(bo *retry.Backoffer, action commitAction, batches []batchKeys) error {
if len(batches) == 0 {
return nil
}
var singleBatchActionFunc func(bo *retry.Backoffer, batch batchKeys) error
// singleBatchActionFunc是對單個batch的處理函數,主要進行grpc數據包的封裝發送和錯誤處理。
switch action {
case actionPrewrite:
singleBatchActionFunc = c.prewriteSingleBatch
case actionCommit:
singleBatchActionFunc = c.commitSingleBatch
case actionCleanup:
singleBatchActionFunc = c.cleanupSingleBatch
}
if len(batches) == 1 {
e := singleBatchActionFunc(bo, batches[0])
if e != nil {
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.ConnID, action, e, c.startTS)
}
return e
}
// For prewrite, stop sending other requests after receiving first error.
// prewrite階段一旦出錯就取消所有后續動作
backoffer := bo
var cancel context.CancelFunc
if action == actionPrewrite {
backoffer, cancel = bo.Fork()
defer cancel()
}
// Concurrently do the work for each batch.
ch := make(chan error, len(batches))
for _, batch1 := range batches {
batch := batch1
go func() {
if action == actionCommit {
// Because the secondary batches of the commit actions are implemented to be
// committed asynchronously in background goroutines, we should not
// fork a child context and call cancel() while the foreground goroutine exits.
// Otherwise the background goroutines will be canceled exceptionally.
// Here we makes a new clone of the original backoffer for this goroutine
// exclusively to avoid the data race when using the same backoffer
// in concurrent goroutines.
// commit階段只要primaryRow提交成功即可
singleBatchBackoffer := backoffer.Clone()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
} else {
singleBatchBackoffer, singleBatchCancel := backoffer.Fork()
defer singleBatchCancel()
ch <- singleBatchActionFunc(singleBatchBackoffer, batch)
}
}()
}
var err error
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.ConnID, action, e, c.startTS)
// Cancel other requests and return the first error.
if cancel != nil {
log.Debugf("con:%d 2PC doActionOnBatches %s to cancel other actions, tid: %d", c.ConnID, action, c.startTS)
cancel()
}
if err == nil {
err = e
}
}
}
return err
}
doActionOnBatches對每個batch進行處理,發送rpc請求給tikv服務端。其中,對於prewrite和commit的處理有所不同,prewrite如果其中某一個batch的寫入有錯,需要全部撤銷;而commit階段的時候如果是secondaryRows,則不會取消,因為此時事務被認為已經提交了。
各個xxxxSingleBatch函數是對每個batch的具體處理,根據具體階段的不同分別進行不同的處理。
prewriteSingleBatch
func (c *TxnCommitter) prewriteSingleBatch(bo *retry.Backoffer, batch batchKeys) error {
mutations := make([]*pb.Mutation, len(batch.keys))
for i, k := range batch.keys {
mutations[i] = c.mutations[string(k)]
}
// 組裝rpc請求
req := &rpc.Request{
Type: rpc.CmdPrewrite,
Prewrite: &pb.PrewriteRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
LockTtl: c.lockTTL,
},
Context: pb.Context{
Priority: c.Priority,
SyncLog: c.SyncLog,
},
}
for {
// 發送請求並進行各種錯誤處理
resp, err := c.store.SendReq(bo, req, batch.region, c.conf.RPC.ReadTimeoutShort)
if err != nil {
return err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return err
}
// TikcStore緩存的region信息有誤,連接pd重試提交
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return err
}
return c.prewriteKeys(bo, batch.keys)
}
prewriteResp := resp.Prewrite
if prewriteResp == nil {
return errors.WithStack(rpc.ErrBodyMissing)
}
// prewrite成功
keyErrs := prewriteResp.GetErrors()
if len(keyErrs) == 0 {
return nil
}
var locks []*Lock
for _, keyErr := range keyErrs {
// Check already exists error
if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
return errors.WithStack(ErrKeyAlreadyExist(alreadyExist.GetKey()))
}
// Extract lock from key error
// 如果被加鎖,需要識別出加鎖的key,然后嘗試解鎖或者等待釋放。
lock, err1 := extractLockFromKeyErr(keyErr, c.conf.Txn.DefaultLockTTL)
if err1 != nil {
return err1
}
log.Debugf("con:%d 2PC prewrite encounters lock: %v", c.ConnID, lock)
locks = append(locks, lock)
}
start := time.Now()
ok, err := c.store.GetLockResolver().ResolveLocks(bo, locks)
if err != nil {
return err
}
atomic.AddInt64(&c.detail.ResolveLockTime, int64(time.Since(start)))
if !ok {
err = bo.Backoff(retry.BoTxnLock, errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
return err
}
}
}
}
commitSingleBatch:
func (c *TxnCommitter) commitSingleBatch(bo *retry.Backoffer, batch batchKeys) error {
// 組裝rpc請求並且發送
req := &rpc.Request{
Type: rpc.CmdCommit,
Commit: &pb.CommitRequest{
StartVersion: c.startTS,
Keys: batch.keys,
CommitVersion: c.commitTS,
},
Context: pb.Context{
Priority: c.Priority,
SyncLog: c.SyncLog,
},
}
req.Context.Priority = c.Priority
sender := rpc.NewRegionRequestSender(c.store.GetRegionCache(), c.store.GetRPCClient())
resp, err := sender.SendReq(bo, req, batch.region, c.conf.RPC.ReadTimeoutShort)
// If we fail to receive response for the request that commits primary key, it will be undetermined whether this
// transaction has been successfully committed.
// Under this circumstance, we can not declare the commit is complete (may lead to data lost), nor can we throw
// an error (may lead to the duplicated key error when upper level restarts the transaction). Currently the best
// solution is to populate this error and let upper layer drop the connection to the corresponding mysql client.
isPrimary := bytes.Equal(batch.keys[0], c.primary())
// 這就是Execute函數中判斷的UndeterminedErr的值的設置,如果收到的RPC回復錯誤,則事務狀態是未定的,有可能提交了,也可能處於未提交狀態。
if isPrimary && sender.RPCError() != nil {
c.setUndeterminedErr(sender.RPCError())
}
if err != nil {
return err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return err
}
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return err
}
// re-split keys and commit again.
// region錯誤則進行重試
return c.commitKeys(bo, batch.keys)
}
commitResp := resp.Commit
if commitResp == nil {
return errors.WithStack(rpc.ErrBodyMissing)
}
// Here we can make sure tikv has processed the commit primary key request. So
// we can clean undetermined error.
if isPrimary {
// primary key提交成功則可以認為事務已經提交
c.setUndeterminedErr(nil)
}
if keyErr := commitResp.GetError(); keyErr != nil {
c.mu.RLock()
defer c.mu.RUnlock()
err = errors.Errorf("con:%d 2PC commit failed: %v", c.ConnID, keyErr.String())
if c.mu.committed {
// No secondary key could be rolled back after it's primary key is committed.
// There must be a serious bug somewhere.
log.Errorf("2PC failed commit key after primary key committed: %v, tid: %d", err, c.startTS)
return err
}
// The transaction maybe rolled back by concurrent transactions.
log.Debugf("2PC failed commit primary key: %v, retry later, tid: %d", err, c.startTS)
return errors.WithMessage(err, TxnRetryableMark)
}
c.mu.Lock()
defer c.mu.Unlock()
// Group that contains primary key is always the first.
// We mark transaction's status committed when we receive the first success response.
// commited設為true,由於secondary的提交一定在primary之后,所以這里secondary的提交也可以設置為true
c.mu.committed = true
return nil
}
讀取
TiKVSnapshot
TiKVSnapshot顧名思義代表一個事務的快照,Transaction的snapshot成員就是該類型。在進行Get操作時,事務首先從本地緩存查詢是否存在該key,如果不存在就調用TiKVSnapshot的get方法獲取
func (s *TiKVSnapshot) get(bo *retry.Backoffer, k key.Key) ([]byte, error) {
sender := rpc.NewRegionRequestSender(s.store.GetRegionCache(), s.store.GetRPCClient())
req := &rpc.Request{
Type: rpc.CmdGet,
Get: &pb.GetRequest{
Key: k,
Version: s.ts,
},
Context: pb.Context{
Priority: s.Priority,
NotFillCache: s.NotFillCache,
},
}
for {
loc, err := s.store.regionCache.LocateKey(bo, k)
if err != nil {
return nil, err
}
resp, err := sender.SendReq(bo, req, loc.Region, s.conf.RPC.ReadTimeoutShort)
if err != nil {
return nil, err
}
regionErr, err := resp.GetRegionError()
if err != nil {
return nil, err
}
// 如果本地緩存的region的信息錯誤,則重新獲取新的region信息
if regionErr != nil {
err = bo.Backoff(retry.BoRegionMiss, errors.New(regionErr.String()))
if err != nil {
return nil, err
}
continue
}
cmdGetResp := resp.Get
if cmdGetResp == nil {
return nil, errors.WithStack(rpc.ErrBodyMissing)
}
val := cmdGetResp.GetValue()
if keyErr := cmdGetResp.GetError(); keyErr != nil {
lock, err := extractLockFromKeyErr(keyErr, s.conf.Txn.DefaultLockTTL)
if err != nil {
return nil, err
}
// 如果有鎖存在則嘗試解鎖
ok, err := s.store.lockResolver.ResolveLocks(bo, []*Lock{lock})
if err != nil {
return nil, err
}
if !ok {
err = bo.Backoff(retry.BoTxnLockFast, errors.New(keyErr.String()))
if err != nil {
return nil, err
}
}
continue
}
return val, nil
}
}
Tikv服務端實現
在論文的原始Percolator事務模型中,存儲層不需要對分布式事務有任何感知,只需要支持單行事務,但是出於性能考慮Tikv在存儲層做了很多的優化,以減輕開銷。
寫入
當客戶端,經過執行框架的一系列調度,最終會來到寫入操作的入口process_write_impl
fn process_write_impl<S: Snapshot>(
cmd: Command,
snapshot: S,
statistics: &mut Statistics,
) -> Result<(Context, ProcessResult, Vec<Modify>, usize)> {
let (pr, modifies, rows, ctx) = match cmd {
// prewrite入口
Command::Prewrite {
ctx,
mutations,
primary,
start_ts,
options,
..
} => {
// 新建一個本地事務,通過這個事務來進行操作
let mut txn = MvccTxn::new(snapshot, start_ts, !ctx.get_not_fill_cache())?;
let mut locks = vec![];
let rows = mutations.len();
for m in mutations {
// 對每行進行prewrite
match txn.prewrite(m, &primary, &options) {
Ok(_) => {}
// 如果已經有其他事務將該行加鎖,應該將加鎖的鎖信息保存后返回
e @ Err(MvccError::KeyIsLocked { .. }) => {
locks.push(e.map_err(Error::from).map_err(StorageError::from));
}
// 有錯誤則直接返回錯誤
Err(e) => return Err(Error::from(e)),
}
}
statistics.add(&txn.take_statistics());
// 返回加鎖信息
if locks.is_empty() {
let pr = ProcessResult::MultiRes { results: vec![] };
// 獲取所有修改操作
let modifies = txn.into_modifies();
(pr, modifies, rows, ctx)
} else {
// Skip write stage if some keys are locked.
let pr = ProcessResult::MultiRes { results: locks };
(pr, vec![], 0, ctx)
}
}
Command::Commit {
ctx,
keys,
lock_ts,
commit_ts,
..
} => {
if commit_ts <= lock_ts {
return Err(Error::InvalidTxnTso {
start_ts: lock_ts,
commit_ts,
});
}
// commit同樣需要新建一個本地事務進行操作
let mut txn = MvccTxn::new(snapshot, lock_ts, !ctx.get_not_fill_cache())?;
let rows = keys.len();
for k in keys {
// 對每一個key調用commit
txn.commit(k, commit_ts)?;
}
statistics.add(&txn.take_statistics());
(ProcessResult::Res, txn.into_modifies(), rows, ctx)
}
Command::Cleanup {
ctx, key, start_ts, ..
} => {
// 新建本地事務清除廢棄的鎖,比如primary已經提交的secondary鎖
let mut txn = MvccTxn::new(snapshot, start_ts, !ctx.get_not_fill_cache())?;
// 使用本地事務的rollback接口
txn.rollback(key)?;
statistics.add(&txn.take_statistics());
(ProcessResult::Res, txn.into_modifies(), 1, ctx)
}
Command::Rollback {
ctx,
keys,
start_ts,
..
} => {
let mut txn = MvccTxn::new(snapshot, start_ts, !ctx.get_not_fill_cache())?;
let rows = keys.len();
// 調用Rollback清除所有相關的鎖
for k in keys {
txn.rollback(k)?;
}
statistics.add(&txn.take_statistics());
(ProcessResult::Res, txn.into_modifies(), rows, ctx)
}
Command::ResolveLock {
ctx,
txn_status,
mut scan_key,
key_locks,
} => {
let mut scan_key = scan_key.take();
let mut modifies: Vec<Modify> = vec![];
let mut write_size = 0;
let rows = key_locks.len();
for (current_key, current_lock) in key_locks {
// 對每一個lock新建事務釋放它
let mut txn =
MvccTxn::new(snapshot.clone(), current_lock.ts, !ctx.get_not_fill_cache())?;
let status = txn_status.get(¤t_lock.ts);
let commit_ts = match status {
Some(ts) => *ts,
None => panic!("txn status {} not found.", current_lock.ts),
};
if commit_ts > 0 {
if current_lock.ts >= commit_ts {
return Err(Error::InvalidTxnTso {
start_ts: current_lock.ts,
commit_ts,
});
}
txn.commit(current_key.clone(), commit_ts)?;
} else {
txn.rollback(current_key.clone())?;
}
write_size += txn.write_size();
statistics.add(&txn.take_statistics());
modifies.append(&mut txn.into_modifies());
if write_size >= MAX_TXN_WRITE_SIZE {
scan_key = Some(current_key);
break;
}
}
let pr = if scan_key.is_none() {
ProcessResult::Res
} else {
ProcessResult::NextCommand {
cmd: Command::ResolveLock {
ctx: ctx.clone(),
txn_status,
scan_key: scan_key.take(),
key_locks: vec![],
},
}
};
(pr, modifies, rows, ctx)
}
Command::Pause { ctx, duration, .. } => {
thread::sleep(Duration::from_millis(duration));
(ProcessResult::Res, vec![], 0, ctx)
}
_ => panic!("unsupported write command"),
};
Ok((ctx, pr, modifies, rows))
}
這個入口函數比較長,但很明確的分為了數個分支,其中每個分支對於數據的修改都新建了一個本地事務MvccTxn來進行,例如對於prewrite和commit而言,就是對於每一個修改再調用MvccTxn來執行。下面是進行prewrite的代碼
pub fn prewrite(
&mut self,
mutation: Mutation,
primary: &[u8],
options: &Options,
) -> Result<()> {
let lock_type = LockType::from_mutation(&mutation);
let (key, value, should_not_exist) = match mutation {
Mutation::Put((key, value)) => (key, Some(value), false),
Mutation::Delete(key) => (key, None, false),
Mutation::Lock(key) => (key, None, false),
Mutation::Insert((key, value)) => (key, Some(value), true),
};
{
if !options.skip_constraint_check {
if let Some((commit, write)) = self.reader.seek_write(&key, u64::max_value())? {
// Abort on writes after our start timestamp ...
// If exists a commit version whose commit timestamp is larger than or equal to
// current start timestamp, we should abort current prewrite, even if the commit
// type is Rollback.
// 判斷沖突,是否有時間戳大於自己開始時間戳的已提交數據
// 如果有則寫入失敗
if commit >= self.start_ts {
MVCC_CONFLICT_COUNTER.prewrite_write_conflict.inc();
return Err(Error::WriteConflict {
start_ts: self.start_ts,
conflict_start_ts: write.start_ts,
conflict_commit_ts: commit,
key: key.to_raw()?,
primary: primary.to_vec(),
});
}
// 對於插入操作需要先判斷key是否已經存在
if should_not_exist {
if write.write_type == WriteType::Put
|| (write.write_type != WriteType::Delete
&& self.key_exist(&key, write.start_ts - 1)?)
{
return Err(Error::AlreadyExist { key: key.to_raw()? });
}
}
}
}
// ... or locks at any timestamp.
if let Some(lock) = self.reader.load_lock(&key)? {
// 已經被別人加鎖,需要返回錯誤,事務提交失敗
if lock.ts != self.start_ts {
return Err(Error::KeyIsLocked {
key: key.to_raw()?,
primary: lock.primary,
ts: lock.ts,
ttl: lock.ttl,
});
}
// No need to overwrite the lock and data.
// If we use single delete, we can't put a key multiple times.
MVCC_DUPLICATE_CMD_COUNTER_VEC.prewrite.inc();
return Ok(());
}
}
if value.is_none() || is_short_value(value.as_ref().unwrap()) {
self.lock_key(key, lock_type, primary.to_vec(), options.lock_ttl, value);
} else {
// value is long
let ts = self.start_ts;
// 寫入數據和鎖
self.put_value(key.clone(), ts, value.unwrap());
self.lock_key(key, lock_type, primary.to_vec(), options.lock_ttl, None);
}
Ok(())
}
這里對於時間戳以及加鎖的檢查可以完整對應到percolator事務原型中prewrite的檢查邏輯,只是在原本模型中是由客戶端進行檢查,而tikv的實現中是client把對應的key打包好發送給所在的tikv,由tikv來具體執行檢查。如果在檢查中發現沖突則直接返回錯誤由客戶端決定接下來的行動,如果不存在沖突的,則寫入data CF和lock CF。在prewrite完成之后,調用本地事務的commit進行提交。
pub fn commit(&mut self, key: Key, commit_ts: u64) -> Result<()> {
let (lock_type, short_value) = match self.reader.load_lock(&key)? {
// 只有這行存在鎖,並且這個鎖是自己所加的才能進行提交(通過所得時間戳判斷)
Some(ref mut lock) if lock.ts == self.start_ts => {
(lock.lock_type, lock.short_value.take())
}
_ => {
// 本事務不再持有鎖,需要返回錯誤
return match self.reader.get_txn_commit_info(&key, self.start_ts)? {
Some((_, WriteType::Rollback)) | None => {
MVCC_CONFLICT_COUNTER.commit_lock_not_found.inc();
// None: related Rollback has been collapsed.
// Rollback: rollback by concurrent transaction.
info!(
"txn conflict (lock not found)";
"key" => %key,
"start_ts" => self.start_ts,
"commit_ts" => commit_ts,
);
Err(Error::TxnLockNotFound {
start_ts: self.start_ts,
commit_ts,
key: key.as_encoded().to_owned(),
})
}
// Committed by concurrent transaction.
Some((_, WriteType::Put))
| Some((_, WriteType::Delete))
| Some((_, WriteType::Lock)) => {
MVCC_DUPLICATE_CMD_COUNTER_VEC.commit.inc();
Ok(())
}
};
}
};
let write = Write::new(
WriteType::from_lock_type(lock_type),
self.start_ts,
short_value,
);
// 在write列寫入提交信息並且清除持有的鎖
self.put_write(key.clone(), commit_ts, write.to_bytes());
self.unlock_key(key);
Ok(())
}
在提交的時候先進行檢查,如果不再持有鎖(可能因為超時被其他事務清除)則返回錯誤。如果依然持有,則寫入write列並且清除鎖。
讀取
當客戶端進行讀取的時候,如percolator的原型所示,通過startTS和其他事務的commitTS的大小對比來判斷是否對數據可見,其基本邏輯在MvccReader::get:
pub fn get(&mut self, key: &Key, mut ts: u64) -> Result<Option<Value>> {
// Check for locks that signal concurrent writes.
// 在這里可以看到如果是SI隔離級別才需要檢查是否被加鎖
// 如果是RC隔離級別則直接返回找到的最近的一份已提交的數據
match self.isolation_level {
IsolationLevel::SI => ts = self.check_lock(key, ts)?,
IsolationLevel::RC => {}
}
if let Some(mut write) = self.get_write(key, ts)? {
if write.short_value.is_some() {
if self.key_only {
return Ok(Some(vec![]));
}
return Ok(write.short_value.take());
}
match self.load_data(key, write.start_ts)? {
None => {
return Err(default_not_found_error(key.to_raw()?, write, "get"));
}
Some(v) => return Ok(Some(v)),
}
}
Ok(None)
}
可以看出在SI隔離下會進行lock的檢查,如果發現被其他事務鎖定,則返回錯誤由客戶端判斷是否進行清除或者等待。檢查鎖的邏輯如下
fn check_lock(&mut self, key: &Key, ts: u64) -> Result<u64> {
// 如果存在鎖則進行具體的檢查邏輯
if let Some(lock) = self.load_lock(key)? {
return self.check_lock_impl(key, ts, lock);
}
Ok(ts)
}
fn check_lock_impl(&self, key: &Key, ts: u64, lock: Lock) -> Result<u64> {
// 如果lock是時間戳大於自己的時間戳,則表示加鎖的事務開始於當前事務之后,不應該看到這個事務的數據。
if lock.ts > ts || lock.lock_type == LockType::Lock {
// ignore lock when lock.ts > ts or lock's type is Lock
return Ok(ts);
}
if ts == std::u64::MAX && key.to_raw()? == lock.primary {
// when ts==u64::MAX(which means to get latest committed version for
// primary key),and current key is the primary key, returns the latest
// commit version's value
return Ok(lock.ts - 1);
}
// There is a pending lock. Client should wait or clean it.
// 如果有開始於自己之前的事務所加的鎖,則需要客戶端的事務模塊判斷是等待鎖還是清除鎖
Err(Error::KeyIsLocked {
key: key.to_raw()?,
primary: lock.primary,
ts: lock.ts,
ttl: lock.ttl,
})
}
這里的邏輯也可以對應到論文中percolator的讀取檢查鎖的邏輯,如果已經加鎖則返回Error::KeyIsLocked錯誤。
上面的RC級別會有一個問題,由於Percolator是異步釋secondary key的鎖,因此可能某個事務已經提交了,但它的secondary key的鎖還沒釋放。如果此時有另一個讀事務去讀取這行數據,那么在RC級別下讀事務按理是應該能讀取到前一個事務寫入的數據的,但是由於secondary key的鎖沒有釋放無法確定上一個事務已經提交,它直接尋找了更早的已經提交的版本而未能讀取到。甚至有可能出現在同一個client中,一個事務已經提交但是隨后的事務卻無法看到它寫入的數據的情況。顯然,這樣違反了線性一致。向pingCAP官方的開發人員咨詢后確認了這一問題,官方說他們以后將會改變RC隔離級別的邏輯。