etcd分布式鎖及事務


前言

分布式鎖是控制分布式系統之間同步訪問共享資源的一種方式。在分布式系統中,常常需要協調他們的動作。如果不同的系統或是同一個系統的不同主機之間共享了一個或一組資源,那么訪問這些資源的時候,往往需要互斥來防止彼此干擾來保證一致性,在這種情況下,便需要使用到分布式鎖。

etcd分布式鎖設計

  1. 排他性:任意時刻,只能有一個機器的一個線程能獲取到鎖。

通過在etcd中存入key值來實現上鎖,刪除key實現解鎖,參考下面偽代碼:

func Lock(key string, cli *clientv3.Client) error {
    //獲取key,判斷是否存在鎖
	resp, err := cli.Get(context.Background(), key)
	if err != nil {
		return err
	}
	//鎖存在,返回上鎖失敗
	if len(resp.Kvs) > 0 {
		return errors.New("lock fail")
	}
	_, err = cli.Put(context.Background(), key, "lock")
	if err != nil {
		return err
	}
	return nil
}
//刪除key,解鎖
func UnLock(key string, cli *clientv3.Client) error {
	_, err := cli.Delete(context.Background(), key)
	return err
}

當發現已上鎖時,直接返回lock fail。也可以處理成等待解鎖,解鎖后競爭鎖。

//等待key刪除后再競爭鎖
func waitDelete(key string, cli *clientv3.Client) {
	rch := cli.Watch(context.Background(), key)
	for wresp := range rch {
		for _, ev := range wresp.Events {
			switch ev.Type {
			case mvccpb.DELETE: //刪除
				return
			}
		}
	}
}
  1. 容錯性:只要分布式鎖服務集群節點大部分存活,client就可以進行加鎖解鎖操作。 etcd基於Raft算法,確保集群中數據一致性。

  2. 避免死鎖:分布式鎖一定能得到釋放,即使client在釋放之前崩潰。 上面分布式鎖設計有缺陷,假如client獲取到鎖后程序直接崩了,沒有解鎖,那其他線程也無法拿到鎖,導致死鎖出現。 通過給key設定leases來避免死鎖,但是leases過期時間設多長呢?假如設了30秒,而上鎖后的操作比30秒大,會導致以下問題:

  • 操作沒完成,鎖被別人占用了,不安全

  • 操作完成后,進行解鎖,這時候把別人占用的鎖解開了

解決方案:給key添加過期時間后,以Keep leases alive方式延續leases,當client正常持有鎖時,鎖不會過期;當client程序崩掉后,程序不能執行Keep leases alive,從而讓鎖過期,避免死鎖。看以下偽代碼:

//上鎖
func Lock(key string, cli *clientv3.Client) error {
    //獲取key,判斷是否存在鎖
	resp, err := cli.Get(context.Background(), key)
	if err != nil {
		return err
	}
	//鎖存在,等待解鎖后再競爭鎖
	if len(resp.Kvs) > 0 {
		waitDelete(key, cli)
		return Lock(key)
	}
    //設置key過期時間
	resp, err := cli.Grant(context.TODO(), 30)
	if err != nil {
		return err
	}
	//設置key並綁定過期時間
	_, err = cli.Put(context.Background(), key, "lock", clientv3.WithLease(resp.ID))
	if err != nil {
		return err
	}
	//延續key的過期時間
	_, err = cli.KeepAlive(context.TODO(), resp.ID)
	if err != nil {
		return err
	}
	return nil
}
//通過讓key值過期來解鎖
func UnLock(resp *clientv3.LeaseGrantResponse, cli *clientv3.Client) error {
	_, err := cli.Revoke(context.TODO(), resp.ID)
	return err
}

經過以上步驟,我們初步完成了分布式鎖設計。其實官方已經實現了分布式鎖,它大致原理和上述有出入,接下來我們看下如何使用官方的分布式鎖。

etcd分布式鎖使用

func ExampleMutex_Lock() {
	cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	// create two separate sessions for lock competition
	s1, err := concurrency.NewSession(cli)
	if err != nil {
		log.Fatal(err)
	}
	defer s1.Close()
	m1 := concurrency.NewMutex(s1, "/my-lock/")

	s2, err := concurrency.NewSession(cli)
	if err != nil {
		log.Fatal(err)
	}
	defer s2.Close()
	m2 := concurrency.NewMutex(s2, "/my-lock/")

	// acquire lock for s1
	if err := m1.Lock(context.TODO()); err != nil {
		log.Fatal(err)
	}
	fmt.Println("acquired lock for s1")

	m2Locked := make(chan struct{})
	go func() {
		defer close(m2Locked)
		// wait until s1 is locks /my-lock/
		if err := m2.Lock(context.TODO()); err != nil {
			log.Fatal(err)
		}
	}()

	if err := m1.Unlock(context.TODO()); err != nil {
		log.Fatal(err)
	}
	fmt.Println("released lock for s1")

	<-m2Locked
	fmt.Println("acquired lock for s2")

	// Output:
	// acquired lock for s1
	// released lock for s1
	// acquired lock for s2
}

此代碼來源於官方文檔,etcd分布式鎖使用起來很方便。

etcd事務

順便介紹一下etcd事務,先看這段偽代碼:

Txn(context.TODO()).If(//如果以下判斷條件成立
	Compare(Value(k1), "<", v1),
	Compare(Version(k1), "=", 2)
).Then(//則執行Then代碼段
	OpPut(k2,v2), OpPut(k3,v3)
).Else(//否則執行Else代碼段
	OpPut(k4,v4), OpPut(k5,v5)
).Commit()//最后提交事務

使用例子,代碼來自官方文檔

func ExampleKV_txn() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   endpoints,
		DialTimeout: dialTimeout,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	kvc := clientv3.NewKV(cli)

	_, err = kvc.Put(context.TODO(), "key", "xyz")
	if err != nil {
		log.Fatal(err)
	}

	ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
	_, err = kvc.Txn(ctx).
		// txn value comparisons are lexical
		If(clientv3.Compare(clientv3.Value("key"), ">", "abc")).
		// the "Then" runs, since "xyz" > "abc"
		Then(clientv3.OpPut("key", "XYZ")).
		// the "Else" does not run
		Else(clientv3.OpPut("key", "ABC")).
		Commit()
	cancel()
	if err != nil {
		log.Fatal(err)
	}

	gresp, err := kvc.Get(context.TODO(), "key")
	cancel()
	if err != nil {
		log.Fatal(err)
	}
	for _, ev := range gresp.Kvs {
		fmt.Printf("%s : %s\n", ev.Key, ev.Value)
	}
	// Output: key : XYZ
}

總結

如果發展到分布式服務階段,且對數據的可靠性要求很高,選etcd實現分布式鎖不會錯。介於對ZooKeeper好感度不強,這里就不介紹ZooKeeper分布式鎖了。一般的Redis分布式鎖,可能出現鎖丟失的情況(如果你是Java開發者,可以使用Redisson客戶端實現分布式鎖,據說不會出現鎖丟失的情況)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM