摘要:在單進程的系統中,當存在多個線程可以同時改變某個變量時,就需要對變量或代碼塊做同步,使其在修改這種變量時能夠線性執行消除並發修改變量,而同步本質上通過鎖來實現。
本文分享自華為雲社區《還不會使用分布式鎖?從零開始基於 etcd 實現分布式鎖》,原文作者:aoho 。
為什么需要分布式鎖?
在單進程的系統中,當存在多個線程可以同時改變某個變量時,就需要對變量或代碼塊做同步,使其在修改這種變量時能夠線性執行消除並發修改變量。而同步本質上通過鎖來實現。為了實現多個線程在一個時刻同一個代碼塊只能有一個線程可執行,那么需要在某個地方做個標記,這個標記必須每個線程都能看到,當標記不存在時可以設置該標記,其余后續線程發現已經有標記了則等待擁有標記的線程結束同步代碼塊取消標記后再去嘗試設置標記。
而在分布式環境下,數據一致性問題一直是難點。相比於單進程,分布式環境的情況更加復雜。分布式與單機環境最大的不同在於其不是多線程而是多進程。多線程由於可以共享堆內存,因此可以簡單的采取內存作為標記存儲位置。而進程之間甚至可能都不在同一台物理機上,因此需要將標記存儲在一個所有進程都能看到的地方。
常見的是秒殺場景,訂單服務部署了多個服務實例。如秒殺商品有 4 個,第一個用戶購買 3 個,第二個用戶購買 2 個,理想狀態下第一個用戶能購買成功,第二個用戶提示購買失敗,反之亦可。而實際可能出現的情況是,兩個用戶都得到庫存為 4,第一個用戶買到了 3 個,更新庫存之前,第二個用戶下了 2 個商品的訂單,更新庫存為 2,導致業務邏輯出錯。
在上面的場景中,商品的庫存是共享變量,面對高並發情形,需要保證對資源的訪問互斥。在單機環境中,比如 Java 語言中其實提供了很多並發處理相關的 API,但是這些 API 在分布式場景中就無能為力了,由於分布式系統具備多線程和多進程的特點,且分布在不同機器中,synchronized 和 lock 關鍵字將失去原有鎖的效果,。僅依賴這些語言自身提供的 API 並不能實現分布式鎖的功能,因此需要我們想想其它方法實現分布式鎖。
常見的鎖方案如下:
- 基於數據庫實現分布式鎖
- 基於 Zookeeper 實現分布式鎖
- 基於緩存實現分布式鎖,如 redis、etcd 等
下面我們簡單介紹下這幾種鎖的實現,並重點介紹 etcd 實現鎖的方法。
基於數據庫的鎖
基於數據庫的鎖實現也有兩種方式,一是基於數據庫表,另一種是基於數據庫的排他鎖。
基於數據庫表的增刪
基於數據庫表增刪是最簡單的方式,首先創建一張鎖的表主要包含下列字段:方法名,時間戳等字段。
具體使用的方法為:當需要鎖住某個方法時,往該表中插入一條相關的記錄。需要注意的是,方法名有唯一性約束。如果有多個請求同時提交到數據庫的話,數據庫會保證只有一個操作可以成功,那么我們就可以認為操作成功的那個線程獲得了該方法的鎖,可以執行方法體內容。執行完畢,需要刪除該記錄。
對於上述方案可以進行優化,如應用主從數據庫,數據之間雙向同步。一旦主庫掛掉,將應用服務快速切換到從庫上。除此之外還可以記錄當前獲得鎖的機器的主機信息和線程信息,那么下次再獲取鎖的時候先查詢數據庫,如果當前機器的主機信息和線程信息在數據庫可以查到的話,直接把鎖分配給該線程,實現可重入鎖。
基於數據庫排他鎖
我們還可以通過數據庫的排他鎖來實現分布式鎖。基於 Mysql 的 InnoDB 引擎,可以使用以下方法來實現加鎖操作:
public void lock(){ connection.setAutoCommit(false) int count = 0; while(count < 4){ try{ select * from lock where lock_name=xxx for update; if(結果不為空){ //代表獲取到鎖 return; } }catch(Exception e){ } //為空或者拋異常的話都表示沒有獲取到鎖 sleep(1000); count++; } throw new LockException(); }
在查詢語句后面增加 for update,數據庫會在查詢過程中給數據庫表增加排他鎖。當某條記錄被加上排他鎖之后,其他線程無法再在該行記錄上增加排他鎖。其他沒有獲取到鎖的就會阻塞在上述 select 語句上,可能的結果有 2 種,在超時之前獲取到了鎖,在超時之前仍未獲取到鎖。
獲得排它鎖的線程即可獲得分布式鎖,當獲取到鎖之后,可以執行業務邏輯,執行完業務之后釋放鎖。
基於數據庫鎖的總結
上面兩種方式都是依賴數據庫的一張表,一種是通過表中的記錄的存在情況確定當前是否有鎖存在,另外一種是通過數據庫的排他鎖來實現分布式鎖。優點是直接借助現有的關系型數據庫,簡單且容易理解;缺點是操作數據庫需要一定的開銷,性能問題以及 SQL 執行超時的異常需要考慮。
基於 Zookeeper
基於 Zookeeper 的臨時節點和順序特性可以實現分布式鎖。
申請對某個方法加鎖時,在 Zookeeper 上與該方法對應的指定節點的目錄下,生成一個唯一的臨時有序節點。當需要獲取鎖時,只需要判斷有序節點中該節點是否為序號最小的一個。業務邏輯執行完成釋放鎖,只需將這個臨時節點刪除即可。這種方式也可以避免由於服務宕機導致的鎖無法釋放,而產生的死鎖問題。
Netflix 開源了一套 Zookeeper 客戶端框架 curator,你可以自行去看一下具體使用方法。Curator 提供的 InterProcessMutex 是分布式鎖的一種實現。acquire 方法獲取鎖,release 方法釋放鎖。另外,鎖釋放、阻塞鎖、可重入鎖等問題都可以有效解決。
關於阻塞鎖的實現,客戶端可以通過在 Zookeeper 中創建順序節點,並且在節點上綁定監聽器 Watch。一旦節點發生變化,Zookeeper 會通知客戶端,客戶端可以檢查自己創建的節點是不是當前所有節點中序號最小的,如果是就獲取到鎖,便可以執行業務邏輯。
Zookeeper 實現的分布式鎖也存在一些缺陷。在性能上可能不如基於緩存實現的分布式鎖。因為每次在創建鎖和釋放鎖的過程中,都要動態創建、銷毀瞬時節點來實現鎖功能。
此外,Zookeeper 中創建和刪除節點只能通過 Leader 節點來執行,然后將數據同步到集群中的其他節點。分布式環境中難免存在網絡抖動,導致客戶端和 Zookeeper 集群之間的 session 連接中斷,此時 Zookeeper 服務端以為客戶端掛了,就會刪除臨時節點。其他客戶端就可以獲取到分布式鎖了,導致了同時獲取鎖的不一致問題。
基於緩存實現分布式鎖
相對於基於數據庫實現分布式鎖的方案來說,基於緩存來實現在性能方面會表現的更好一點,存取速度快很多。而且很多緩存是可以集群部署的,可以解決單點問題。基於緩存的鎖有好幾種,如memcached、redis、本文下面主要講解基於 etcd 實現分布式鎖。
通過 etcd txn 實現分布式鎖
通過 etcd 實現分布式鎖,同樣需要滿足一致性、互斥性和可靠性等要求。etcd 中的事務 txn、lease 租約以及 watch 監聽特性,能夠使得基於 etcd 實現上述要求的分布式鎖。
思路分析
通過 etcd 的事務特性可以幫助我們實現一致性和互斥性。etcd 的事務特性,使用的 IF-Then-Else 語句,IF 語言判斷 etcd 服務端是否存在指定的 key,即該 key 創建版本號 create_revision 是否為 0 來檢查 key 是否已存在,因為該 key 已存在的話,它的 create_revision 版本號就不是 0。滿足 IF 條件的情況下則使用 then 執行 put 操作,否則 else 語句返回搶鎖失敗的結果。當然,除了使用 key 是否創建成功作為 IF 的判斷依據,還可以創建前綴相同的 key,比較這些 key 的 revision 來判斷分布式鎖應該屬於哪個請求。
客戶端請求在獲取到分布式鎖之后,如果發生異常,需要及時將鎖給釋放掉。因此需要租約,當我們申請分布式鎖的時候需要指定租約時間。超過 lease 租期時間將會自動釋放鎖,保證了業務的可用性。是不是這樣就夠了呢?在執行業務邏輯時,如果客戶端發起的是一個耗時的操作,操作未完成的請情況下,租約時間過期,導致其他請求獲取到分布式鎖,造成不一致。這種情況下則需要續租,即刷新租約,使得客戶端能夠和 etcd 服務端保持心跳。
具體實現
我們基於如上分析的思路,繪制出實現 etcd 分布式鎖的流程圖,如下所示:
基於 Go 語言實現的 etcd 分布式鎖,測試代碼如下所示:
func TestLock(t *testing.T) { // 客戶端配置 config = clientv3.Config{ Endpoints: []string{"localhost:2379"}, DialTimeout: 5 * time.Second, } // 建立連接 if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } // 1. 上鎖並創建租約 lease = clientv3.NewLease(client) if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil { panic(err) } leaseId = leaseGrantResp.ID // 2 自動續約 // 創建一個可取消的租約,主要是為了退出的時候能夠釋放 ctx, cancelFunc = context.WithCancel(context.TODO()) // 3. 釋放租約 defer cancelFunc() defer lease.Revoke(context.TODO(), leaseId) if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil { panic(err) } // 續約應答 go func() { for { select { case keepResp = <-keepRespChan: if keepRespChan == nil { fmt.Println("租約已經失效了") goto END } else { // 每秒會續租一次, 所以就會受到一次應答 fmt.Println("收到自動續租應答:", keepResp.ID) } } } END: }() // 1.3 在租約時間內去搶鎖(etcd 里面的鎖就是一個 key) kv = clientv3.NewKV(client) // 創建事務 txn = kv.Txn(context.TODO()) //if 不存在 key,then 設置它,else 搶鎖失敗 txn.If(clientv3.Compare(clientv3.CreateRevision("lock"), "=", 0)). Then(clientv3.OpPut("lock", "g", clientv3.WithLease(leaseId))). Else(clientv3.OpGet("lock")) // 提交事務 if txnResp, err = txn.Commit(); err != nil { panic(err) } if !txnResp.Succeeded { fmt.Println("鎖被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value)) return } // 搶到鎖后執行業務邏輯,沒有搶到退出 fmt.Println("處理任務") time.Sleep(5 * time.Second) }
預期的執行結果如下所示:
=== RUN TestLock 處理任務 收到自動續租應答: 7587848943239472601 收到自動續租應答: 7587848943239472601 收到自動續租應答: 7587848943239472601 --- PASS: TestLock (5.10s) PASS
總得來說,如上關於 etcd 分布式鎖的實現過程分為四個步驟:
- 客戶端初始化與建立連接;
- 創建租約,自動續租;
- 創建事務,獲取鎖;
- 執行業務邏輯,最后釋放鎖。
創建租約的時候,需要創建一個可取消的租約,主要是為了退出的時候能夠釋放。釋放鎖對應的步驟,在上面的 defer 語句中。當 defer 租約關掉的時候,分布式鎖對應的 key 就會被釋放掉了。
小結
本文主要介紹了基於 etcd 實現分布式鎖的案例。首先介紹了分布式鎖產生的背景以及必要性,分布式架構不同於單體架構,涉及到多服務之間多個實例的調用,跨進程的情況下使用編程語言自帶的並發原語沒有辦法實現數據的一致性,因此分布式鎖出現,用來解決分布式環境中的資源互斥操作。接着介紹了基於數據庫實現分布式鎖的兩種方式:數據表增刪和數據庫的排它鎖。基於 Zookeeper 的臨時節點和順序特性也可以實現分布式鎖,這兩種方式或多或少存在性能和穩定性方面的缺陷。
接着本文重點介紹了基於 etcd 實現分布式鎖的方案,根據 etcd 的特點,利用事務 txn、lease 租約以及 watch 監測實現分布式鎖。
在我們上面的案例中,搶鎖失敗,客戶端就直接返回了。那么當該鎖被釋放之后,或者持有鎖的客戶端出現了故障退出了,其他鎖如何快速獲取鎖呢?所以上述代碼可以基於 watch 監測特性進行改進,各位同學可以自行試試。