1. 語言層面的鎖
樂觀鎖:
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer,old, new unsafe.Pointer) (swapped bool)
func main() {
var n int32
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
atomic.AddInt32(&n, 1)
wg.Done()
}()
}
wg.Wait()
fmt.Println(atomic.LoadInt32(&n)) // output:1000
}
golang中原子操作CompareAndSwap:

互斥鎖:
golang中互斥鎖的一個經典實現就是sync包下的sync.mutex,下面以並發訪問slice為例:
func main() {
slc := make([]int, 0, 1000)
var wg sync.WaitGroup
var lock sync.Mutex
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(a int) {
defer wg.Done()
// 加鎖
lock.Lock()
defer lock.Unlock()
slc = append(slc, a)
}(i)
}
wg.Wait()
fmt.Println(len(slc))
}
缺點:分布式部署環境下鎖會失效
2. mysql數據庫實現鎖
SET AUTOCOMMIT=0; BEGIN WORK; SELECT category_id FROM blog_article WHERE id=3 FOR UPDATE; UPDATE blog_article SET category_id = 3; # 在commit前其它事物無法對此行數據進行修改 COMMIT WORK;
UPDATE blog_article SET category_id = 2 WHERE id = 3;
會發現事物無法立即執行,會等待for update那條事物commit,如果此時長時間未commit則會超時:
[SQL]UPDATE blog_article SET category_id = 2 WHERE id = 3; [Err] 1205 - Lock wait timeout exceeded; try restarting transaction
3. zookeeper、etcd實現分布式鎖
zookeeper實現分布式鎖:

// 創建zookeeper連接,並創建永久父級節點
func NewZkConn(address, parentPath string) *zk.Conn {
hosts := []string{address}
conn, _, err := zk.Connect(hosts, time.Second*5)
if err != nil {
panic(err)
}
ok, _, _ := conn.Exists(parentPath)
if !ok {
// 創建永久節點
nodeName, err := conn.Create(parentPath, nil, zk.FlagSequence, acls)
if err != nil {
panic(err)
}
fmt.Println("create node name :", nodeName)
}
return conn
}
// nodeCreateSuccess 當前節點是否已成功創建
func nodeCreateSuccess(conn *zk.Conn, path string, id int) bool {
ok, _, ch, err := conn.ExistsW(path)
if err != nil {
return false
}
ex := false
// 節點已存在,則監聽狀態變化
if ok {
for {
select {
case c := <-ch:
{
if c.Type == zk.EventNodeDeleted {
ex = true
break
}
}
}
if ex {
break
}
}
}
// 節點不存在則嘗試創建
_, err = conn.Create(path, nil, flags, acls)
if err != nil {
return false
}
fmt.Printf("[%s] 創造節點的id為 [%d] \n", path, id)
return true
}
func main() {
conn := NewZkConn(zkHosts, parentPath)
// 假設臨時節點
path := parentPath + "/001_test_zookeeper_lock"
for i := 0; i < 10; i++ {
go func(conn *zk.Conn, path string, id int) {
// 節點未創建成功則阻塞等待
for {
ok := nodeCreateSuccess(conn, path, id)
// ok=true表示當前節點已成功創建
if ok {
// 釋放當前節點鎖
err := conn.Delete(path, 0)
if err != nil {
fmt.Println(err)
}
fmt.Printf("刪除成功 id為[%d] \n", id)
break
}
}
}(conn, path, i)
}
time.Sleep(time.Second * 10)
}
etcd有個很重要的特性,它的key value是多版本的,當有了一個值之后,再put時它的版本是不斷地往上加的,這里跟zookeeper類似,判斷是否是最小的版本- 利用租約在etcd集群中創建多個key,這個key有兩種形態,存在和不存在,而這兩種形態就是互斥量。
- 通過Prefix前綴機制獲取前綴目錄下所有KV及Revision,通過Revision機制判斷當前線程是否能獲取到鎖。
- 通過Watch監聽機制來監聽前一個Revision的刪除事件。
func main() {
var (
config clientv3.Config
client *clientv3.Client
lease clientv3.Lease
leaseResp *clientv3.LeaseGrantResponse
leaseId clientv3.LeaseID
leaseRespChan <-chan *clientv3.LeaseKeepAliveResponse
err error
)
//客戶端配置
config = clientv3.Config{
Endpoints: []string{"etcd2.sndu.cn:2379"},
DialTimeout: 5 * time.Second,
}
//建立連接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
//上鎖(創建租約,自動續租)
lease = clientv3.NewLease(client)
//設置1個ctx取消自動續租 執行cancleFunc即執行cancel操作
ctx, cancleFunc := context.WithCancel(context.TODO())
//設置10秒租約(過期時間)
if leaseResp, err = lease.Grant(context.TODO(), 10); err != nil {
fmt.Println(err)
return
}
//拿到租約id
leaseId = leaseResp.ID
//自動續租(不停地往管道中扔租約信息)
if leaseRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
fmt.Println(err)
}
//啟動多個協程去監聽
go listenLeaseChan(leaseRespChan)
//業務處理
kv := clientv3.NewKV(client)
//創建事務
txn := kv.Txn(context.TODO())
txn.If(clientv3.Compare(clientv3.CreateRevision("/lock/20201029-etcd"), "=", 0)).
Then(clientv3.OpPut("/lock/20201029-etcd", "true",
clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("/lock/20201029-etcd")) //否則搶鎖失敗
//提交事務
if txtResp, err := txn.Commit(); err != nil {
fmt.Println(err)
return
} else {
//判斷是否搶鎖
if !txtResp.Succeeded {
fmt.Println("鎖被占用:",
string(txtResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
}
fmt.Println("處理任務")
//釋放鎖(停止續租,終止租約)
defer cancleFunc() //函數退出取消自動續租
defer lease.Revoke(context.TODO(), leaseId) //終止租約(去掉過期時間)
time.Sleep(10 * time.Second)
}
// listenLeaseChan 監聽租約情況
func listenLeaseChan(leaseRespChan <-chan *clientv3.LeaseKeepAliveResponse) {
var leaseKeepResp *clientv3.LeaseKeepAliveResponse
for {
select {
case leaseKeepResp = <-leaseRespChan:
if leaseKeepResp == nil {
fmt.Println("租約失效了")
goto END
} else {
fmt.Println(leaseKeepResp.ID)
}
}
}
END:
}
在etcd官方的實現中其實已經實現了分布式鎖,具體實現代碼在https://github.com/etcd-io/etcd/blob/master/client/v3/concurrency/mutex.go目錄下:
// TryLock 嘗試加鎖 比較revision是否為最小版本
func (m *Mutex) TryLock(ctx context.Context) error {
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
client := m.s.Client()
// Cannot lock, so delete the key
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
m.myRev = -1
return ErrLocked
}
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
resp, err := m.tryAcquire(ctx)
if err != nil {
return err
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
client := m.s.Client()
// wait for deletion revisions prior to myKey
// TODO: early termination if the session key is deleted before other session keys with smaller revisions.
_, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
return werr
}
// make sure the session is not expired, and the owner key still exists.
gresp, werr := client.Get(ctx, m.myKey)
if werr != nil {
m.Unlock(client.Ctx())
return werr
}
if len(gresp.Kvs) == 0 { // is the session key lost?
return ErrSessionExpired
}
m.hdr = gresp.Header
return nil
}
// tryAcquire 嘗試釋放鎖
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
s := m.s
client := m.s.Client()
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return nil, err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
return resp, nil
}
// Unlock 釋放鎖 刪除節點信息
func (m *Mutex) Unlock(ctx context.Context) error {
client := m.s.Client()
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
m.myRev = -1
return nil
}
4. redis實現分布式鎖
Redis分布式鎖控制並發主要是通過在Redis里面創建一個key,當其它進程准備占用的時候只能等待key釋放再占用。Redis里面有一個原子性指令setnx,當key存在時,它返回0,表示當前已有進程占用,當它返回1時可以執行業務邏輯,此時沒有進程占用,等邏輯執行完后,可以刪除key釋放鎖,這樣可以簡單的控制並發:
127.0.0.1:6379> setnx distributedKey aaa (integer) 1 127.0.0.1:6379> setnx distributedKey aaa (integer) 0 127.0.0.1:6379> get distributedKey "aaa" 127.0.0.1:6379>
在業務邏輯執行的過程中如果發生異常,此時key並沒有刪除,這樣就會造成死鎖,死鎖帶來的后果想必大家都很清楚。為了解決這個問題,可以在setnx加鎖后設置key的過期時間,當key到期自動刪除:
127.0.0.1:6379> expire distributedKey 5 (integer) 1 127.0.0.1:6379>
如果在執行setnx后,執行expire前Redis發生宕機了,這樣就不會執行expire,也會造成死鎖。由於setnx與expire是兩條命令,並且expire依賴setnx的執行結果,為了解決這個問題可以使用set key value [expiration EX seconds|PX milliseconds] [NX|XX] ,這是一條原子性的指令,同時包含setnx和expire:
127.0.0.1:6379> set distributedKey aaa ex 5 nx OK 127.0.0.1:6379> set distributedKey aaa ex 5 nx (nil)
key存在時執行會返回nil,只有key過期或不存在時才會返回ok
// DistributedLock 並發鎖
func DistributedLock(key string, expire int, c redis.Conn, value time.Time) (bool, error) {
// 設置原子鎖
defer c.Close()
exists, err := c.Do("set", key, value, "nx", "ex", expire)
if err != nil {
return false, errors.New("執行 set nx ex 失敗")
}
// 鎖已存在,已被占用
if exists != nil {
return false, nil
}
return true, nil
}
// ReleaseLock 釋放鎖
func ReleaseLock(c redis.Conn, key string) (bool, error) {
defer c.Close()
v, err := redis.Bool(c.Do("DEL", key))
return v, err
}
調用:
func DoSomething(c redis.Conn, key string, expire int, value time.Time) {
// 獲取鎖
defer c.Close()
canUse, err := DistributedLock(key, expire, c, value)
if err != nil {
panic(err)
}
// 占用鎖
if canUse {
fmt.Println("start do something ...")
// 釋放鎖
_, err := ReleaseLock(c, key)
if err != nil {
panic(err)
}
}
return
}
redis釋放鎖的問題:

如上圖所示,線程A先獲得鎖,執行超時鎖自動釋放,此時線程B獲取鎖開始執行,A執行完后釋放了B所持有的鎖,這時B繼續執行,並且線程C能獲取鎖,同一時刻線程A和B同時執行鎖,違背了分布式鎖的安全性。
5. redis+lua實現原子性釋放分布式鎖

定義lua腳本釋放鎖:
const (
// ScriptDeleteLock 釋放redis並發鎖 lua腳本 判斷value為本次鎖的value才釋放
ScriptDeleteLock = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
`
)
// ReleaseLockWithLua 釋放鎖 使用lua腳本執行
func ReleaseLockWithLua(c redis.Conn, key string, value time.Time) (int, error) {
// keyCount表示lua腳本中key的個數
defer c.Close()
lua := redis.NewScript(1, ScriptDeleteLock)
// lua腳本中的參數為key和value
res, err := redis.Int(lua.Do(c, key, value))
if err != nil {
return 0, err
}
return res, nil
}
調用:
func DoSomethingWithLua(c redis.Conn, key string, expire int, value time.Time) {
// 獲取鎖
defer c.Close()
canUse, err := DistributedLock(key, expire, c, value)
if err != nil {
panic(err)
}
// 占用鎖
if canUse {
fmt.Println("start do something ...")
// 釋放鎖 lua腳本執行原子性刪除
_, err := ReleaseLockWithLua(c, key, value)
if err != nil {
panic(err)
}
}
return
}
redis sentinel集群下鎖的同步問題:
如上圖所示,線程在master成功創建鎖,此時鎖還未同步到slave,master發生宕機,當slave1成我新master后鎖丟失。
6. redlock算法及相關問題
redlock算法流程

如上圖所示,redlock算法的實現流程,每次加鎖的時候嘗試向redis集群中每個節點申請加鎖,當前節點加鎖失敗則跳過繼續向下一個節點執行加鎖請求,只有大於一半的節點加鎖成功才認為分布式鎖成功;釋放鎖時同樣需配合lua腳本向所有的redis節點發起釋放鎖請求。
redlock算法跳躍時鍾問題

上述redlock算法已經解決了redis集群中master宕機導致鎖失效的問題,但是它是否就是完美的呢?如上圖所示,client1向redis集群申請加鎖,此時節點A、B、C執行成功,client1成功獲取鎖,節點D和E由於網絡原因加鎖失敗;這時節點C所在的服務器由於時鍾向前跳躍導致鎖快速過期了,client2執行加鎖請求,顯然此時是能加鎖成功的;那么相當於在同一時刻兩個進程能持有鎖,這顯然違背了分布式鎖的互斥性的特點。
redlock算法GC停頓問題

同樣,還有一種特殊的情況就是GC停頓導致消息延遲的問題,當client1向redis集群發起加鎖請求並返回加鎖成功的結果,此時消息延遲到達client1導致在這段時間redis集群中的鎖過期了,client2顯然能夠正常獲取鎖,當GC恢復時client1收到結果會認為自己持有鎖,這同樣違背了分布式鎖互斥性的特點。
7. 總結
以上幾種鎖的實現方式並非說明哪種是最優解,具體場景需選擇具體的鎖。如果是單機環境建議直接使用語言層面的鎖來實現,這樣不需要引入額外的第三方依賴;如果是對數據庫的並發更新操作,並且並發量不是太大,可以使用mysql的select for update或者select for update nowait實現,但是注意盡量不要使用表鎖並且不要造成死鎖的問題;如果是對鎖的可靠性要求極高那么建議使用zookeeper、etcd實現;最后如果在開發環境中沒有zookeeper、etcd等第三方組件,並且對鎖的性能要求比較高,可以使用單機的redis配合lua腳本釋放鎖,這里我個人並不推薦使用redlock。
