本文:https://chai2010.cn/advanced-go-programming-book/ch6-cloud/ch6-02-lock.html
分布式鎖
在單機程序並發或並行修改全局變量時,需要對修改行為加鎖以創造臨界區。為什么需要加鎖呢?我們看看在不加鎖的情況下並發計數會發生什么情況:
package main import ( "sync" ) // 全局變量 var counter int func main() { var wg sync.WaitGroup for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() counter++ }() } wg.Wait() println(counter) }
多次運行會得到不同的結果:
❯❯❯ go run local_lock.go 945 ❯❯❯ go run local_lock.go 937 ❯❯❯ go run local_lock.go 959
進程內加鎖
想要得到正確的結果的話,要把對計數器(counter)的操作代碼部分加上鎖:
// ... 省略之前部分 var wg sync.WaitGroup var l sync.Mutex for i := 0; i < 1000; i++ { wg.Add(1) go func() { defer wg.Done() l.Lock() counter++ l.Unlock() }() } wg.Wait() println(counter) // ... 省略之后部分
這樣就可以穩定地得到計算結果了:
❯❯❯ go run local_lock.go 1000
trylock
在某些場景,我們只是希望一個任務有單一的執行者。而不像計數器場景一樣,所有goroutine都執行成功。后來的goroutine在搶鎖失敗后,需要放棄其流程。這時候就需要trylock了。
trylock顧名思義,嘗試加鎖,加鎖成功執行后續流程,如果加鎖失敗的話也不會阻塞,而會直接返回加鎖的結果。在Go語言中我們可以用大小為1的Channel來模擬trylock:
package main import ( "sync" ) // Lock try lock type Lock struct { c chan struct{} } // NewLock generate a try lock func NewLock() Lock { var l Lock l.c = make(chan struct{}, 1) l.c <- struct{}{} return l } // Lock try lock, return lock result func (l Lock) Lock() bool { lockResult := false select { case <-l.c: lockResult = true default: } return lockResult } // Unlock , Unlock the try lock func (l Lock) Unlock() { l.c <- struct{}{} } var counter int func main() { var l = NewLock() var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() if !l.Lock() { // log error println("lock failed") return } counter++ println("current counter", counter) l.Unlock() }() } wg.Wait() }
因為我們的邏輯限定每個goroutine只有成功執行了Lock
才會繼續執行后續邏輯,因此在Unlock
時可以保證Lock結構體中的channel一定是空,從而不會阻塞,也不會失敗。上面的代碼使用了大小為1的channel來模擬trylock,理論上還可以使用標准庫中的CAS來實現相同的功能且成本更低,讀者可以自行嘗試。
在單機系統中,trylock並不是一個好選擇。因為大量的goroutine搶鎖可能會導致CPU無意義的資源浪費。有一個專有名詞用來描述這種搶鎖的場景:活鎖。
活鎖指的是程序看起來在正常執行,但CPU周期被浪費在搶鎖,而非執行任務上,從而程序整體的執行效率低下。活鎖的問題定位起來要麻煩很多。所以在單機場景下,不建議使用這種鎖。
基於Redis的setnx
在分布式場景下,我們也需要這種“搶占”的邏輯,這時候怎么辦呢?我們可以使用Redis提供的setnx
命令:
package main import ( "fmt" "sync" "time" "github.com/go-redis/redis" ) func incr() { client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", // no password set DB: 0, // use default DB }) var lockKey = "counter_lock" var counterKey = "counter" // lock resp := client.SetNX(lockKey, 1, time.Second*5) lockSuccess, err := resp.Result() if err != nil || !lockSuccess { fmt.Println(err, "lock result: ", lockSuccess) return } // counter ++ getResp := client.Get(counterKey) cntValue, err := getResp.Int64() if err == nil || err == redis.Nil { cntValue++ resp := client.Set(counterKey, cntValue, 0) _, err := resp.Result() if err != nil { // log err println("set value error!") } } println("current counter is ", cntValue) delResp := client.Del(lockKey) unlockSuccess, err := delResp.Result() if err == nil && unlockSuccess > 0 { println("unlock success!") } else { println("unlock failed", err) } } func main() { var wg sync.WaitGroup for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() incr() }() } wg.Wait() }
看看運行結果:
❯❯❯ go run redis_setnx.go <nil> lock result: false <nil> lock result: false <nil> lock result: false <nil> lock result: false <nil> lock result: false <nil> lock result: false <nil> lock result: false <nil> lock result: false <nil> lock result: false current counter is 2028 unlock success!
通過代碼和執行結果可以看到,我們遠程調用setnx
運行流程上和單機的trylock非常相似,如果獲取鎖失敗,那么相關的任務邏輯就不應該繼續向前執行。
setnx
很適合在高並發場景下,用來爭搶一些“唯一”的資源。比如交易撮合系統中賣家發起訂單,而多個買家會對其進行並發爭搶。這種場景我們沒有辦法依賴具體的時間來判斷先后,因為不管是用戶設備的時間,還是分布式場景下的各台機器的時間,都是沒有辦法在合並后保證正確的時序的。哪怕是我們同一個機房的集群,不同的機器的系統時間可能也會有細微的差別。
所以,我們需要依賴於這些請求到達Redis節點的順序來做正確的搶鎖操作。如果用戶的網絡環境比較差,那也只能自求多福了。
基於ZooKeeper
package main import ( "time" "github.com/samuel/go-zookeeper/zk" ) func main() { c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second) //*10) if err != nil { panic(err) } l := zk.NewLock(c, "/lock", zk.WorldACL(zk.PermAll)) err = l.Lock() if err != nil { panic(err) } println("lock succ, do your business logic") time.Sleep(time.Second * 10) // do some thing l.Unlock() println("unlock succ, finish business logic") }
基於ZooKeeper的鎖與基於Redis的鎖的不同之處在於Lock成功之前會一直阻塞,這與我們單機場景中的mutex.Lock
很相似。
其原理也是基於臨時Sequence節點和watch API,例如我們這里使用的是/lock
節點。Lock會在該節點下的節點列表中插入自己的值,只要節點下的子節點發生變化,就會通知所有watch該節點的程序。這時候程序會檢查當前節點下最小的子節點的id是否與自己的一致。如果一致,說明加鎖成功了。
這種分布式的阻塞鎖比較適合分布式任務調度場景,但不適合高頻次持鎖時間短的搶鎖場景。按照Google的Chubby論文里的闡述,基於強一致協議的鎖適用於粗粒度
的加鎖操作。這里的粗粒度指鎖占用時間較長。我們在使用時也應思考在自己的業務場景中使用是否合適。
基於etcd
etcd是分布式系統中,功能上與ZooKeeper類似的組件,這兩年越來越火了。上面基於ZooKeeper我們實現了分布式阻塞鎖,基於etcd,也可以實現類似的功能:
package main import ( "log" "github.com/zieckey/etcdsync" ) func main() { m, err := etcdsync.New("/lock", 10, []string{"http://127.0.0.1:2379"}) if m == nil || err != nil { log.Printf("etcdsync.New failed") return } err = m.Lock() if err != nil { log.Printf("etcdsync.Lock failed") return } log.Printf("etcdsync.Lock OK") log.Printf("Get the lock. Do something here.") err = m.Unlock() if err != nil { log.Printf("etcdsync.Unlock failed") } else { log.Printf("etcdsync.Unlock OK") } }
etcd中沒有像ZooKeeper那樣的Sequence節點。所以其鎖實現和基於ZooKeeper實現的有所不同。在上述示例代碼中使用的etcdsync的Lock流程是:
- 先檢查
/lock
路徑下是否有值,如果有值,說明鎖已經被別人搶了 - 如果沒有值,那么寫入自己的值。寫入成功返回,說明加鎖成功。寫入時如果節點被其它節點寫入過了,那么會導致加鎖失敗,這時候到 3
- watch
/lock
下的事件,此時陷入阻塞 - 當
/lock
路徑下發生事件時,當前進程被喚醒。檢查發生的事件是否是刪除事件(說明鎖被持有者主動unlock),或者過期事件(說明鎖過期失效)。如果是的話,那么回到 1,走搶鎖流程。
值得一提的是,在etcdv3的API中官方已經提供了可以直接使用的鎖API,讀者可以查閱etcd的文檔做進一步的學習。
如何選擇合適的鎖
業務還在單機就可以搞定的量級時,那么按照需求使用任意的單機鎖方案就可以。
如果發展到了分布式服務階段,但業務規模不大,qps很小的情況下,使用哪種鎖方案都差不多。如果公司內已有可以使用的ZooKeeper、etcd或者Redis集群,那么就盡量在不引入新的技術棧的情況下滿足業務需求。
業務發展到一定量級的話,就需要從多方面來考慮了。首先是你的鎖是否在任何惡劣的條件下都不允許數據丟失,如果不允許,那么就不要使用Redis的setnx
的簡單鎖。
對鎖數據的可靠性要求極高的話,那只能使用etcd或者ZooKeeper這種通過一致性協議保證數據可靠性的鎖方案。但可靠的背面往往都是較低的吞吐量和較高的延遲。需要根據業務的量級對其進行壓力測試,以確保分布式鎖所使用的etcd或ZooKeeper集群可以承受得住實際的業務請求壓力。需要注意的是,etcd和Zookeeper集群是沒有辦法通過增加節點來提高其性能的。要對其進行橫向擴展,只能增加搭建多個集群來支持更多的請求。這會進一步提高對運維和監控的要求。多個集群可能需要引入proxy,沒有proxy那就需要業務去根據某個業務id來做分片。如果業務已經上線的情況下做擴展,還要考慮數據的動態遷移。這些都不是容易的事情。
在選擇具體的方案時,還是需要多加思考,對風險早做預估。