ETCD 分布式鎖實現邏輯


https://github.com/coreos/etcd/blob/master/Documentation/api.md

 

Atomic Compare-and-Swap

 

etcd can be used as a centralized coordination service in a cluster, and CompareAndSwap (CAS) is the most basic operation used to build a distributed lock service.

This command will set the value of a key only if the client-provided conditions are equal to the current conditions.

The current comparable conditions are:

  1. prevValue - checks the previous value of the key.

  2. prevIndex - checks the previous modifiedIndex of the key.

  3. prevExist - checks existence of the key: if prevExist is true, it is an update request; if prevExist is false, it is acreate request.

Here is a simple example. Let's create a key-value pair first: foo=one.

curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=one

Now let's try some invalid CompareAndSwap commands.

Trying to set this existing key with prevExist=false fails as expected:

curl http://127.0.0.1:2379/v2/keys/foo?prevExist=false -XPUT -d value=three

The error code explains the problem:

{
    "cause": "/foo",
    "errorCode": 105,
    "index": 39776,
    "message": "Key already exists"
}

Now let's provide a prevValue parameter:

curl http://127.0.0.1:2379/v2/keys/foo?prevValue=two -XPUT -d value=three

This will try to compare the previous value of the key and the previous value we provided. If they are equal, the value of the key will change to three.

{
    "cause": "[two != one]",
    "errorCode": 101,
    "index": 8,
    "message": "Compare failed"
}

which means CompareAndSwap failed. cause explains why the test failed. Note: the condition prevIndex=0 always passes.

Let's try a valid condition:

curl http://127.0.0.1:2379/v2/keys/foo?prevValue=one -XPUT -d value=two

The response should be:

{
    "action": "compareAndSwap",
    "node": {
        "createdIndex": 8,
        "key": "/foo",
        "modifiedIndex": 9,
        "value": "two"
    },
    "prevNode": {
        "createdIndex": 8,
        "key": "/foo",
        "modifiedIndex": 8,
        "value": "one"
    }
}

We successfully changed the value from "one" to "two" since we gave the correct previous value.

上面這個API主要用來在分布式環境中做集中式的協商,CAS操作的基本用途就是創建分布式的鎖服務,即選主。
 
此命令的基本作用在於僅當客戶端提供的條件等於當前etcd的條件時,才會修改一個key的值。當前提供的可以比較的條件有:
 
1- prevValue 檢查key以前的值
2- prevIndex 檢查key以前的modifiedIndex
3- prevExist - 檢查key的存在性,如果prevExist為true, 則這是一個更新請求,如果prevExist的值是false, 這是一個創建請求
 
下面是一個例子,首先創建一個key-value對:foo=one
 
# curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=one
{"action":"set","node":{"key":"/foo","value":"one","modifiedIndex":27,"createdIndex":27}}
 
接下來,試一些非法的CompareAndSwap命令,首先是當prevExist=false的情況下設置已存在的key,命令如下:
 
# curl http://127.0.0.1:2379/v2/keys/foo?prevExist=false -XPUT -d value=three
{"errorCode":105,"message":"Key already exists","cause":"/foo","index":27}
 
此時可以看到,返回了錯誤代碼,告訴我們這個key已經存在。
 
接下來,我們使用prevValue參數,命令如下:
 
# curl http://127.0.0.1:2379/v2/keys/foo?prevValue=two -XPUT -d value=three
{"errorCode":101,"message":"Compare failed","cause":"[two != one]","index":27}
 
上面命令的意思是如果指定的key以前的值等於two,則會將此key的值改成我們提供的three
 
最后,試一下合法的條件,命令如下:
 
curl http://127.0.0.1:2379/v2/keys/foo?prevValue=one -XPUT -d value=two
{"action":"compareAndSwap","node":{"key":"/foo","value":"two","modifiedIndex":28,"createdIndex":27},"prevNode":{"key":"/foo","value":"one","modifiedIndex":27,"createdIndex":27}}
 
這里可以看到,我們成功地將foo這個key的值改成了two,因為提供了正確的prevValue
 
這樣,如果要實現分布式鎖,則我們為每一個鎖提供一個唯一的key,這樣,大家都會來競爭,通過CompareAndSwap的操作,設置prevExist,這樣當多個節點嘗試去創建一個目錄時,只有一個能夠成功,而創建成功的用戶即認為是獲得了鎖。
 
整個方案要解決幾個問題:
 
1- 為每個鎖設置共同的名稱或者目錄,以便通過后面的SDK可以迅速地在Etcd集群中定位
2- 需要判斷某個鎖是否已經存在,當存在的話,則不再處理(通過curl來驗證,后面再通過Etcd Java API來驗證)
3- 如果某個節點失效,則其他節點需要得到通知,以便能夠迅速地接管
 
第一個問題很容易解決,假如我們創建一個/locks的目錄,每個不同的鎖都是/locks目錄下的節點。例如,我們要為SCEC的訂單報表服務需要做選主,則創建的節點是:/locks/scec/report/order。
 
要判斷某個鎖是否存在,比較簡單,通過prevExist就可以達成。然而,如果服務當機,則沒有這么簡單,因為他自己需要知道當前是誰,決定如何修改其值。
 
如何判斷是否有改變?能否主動通知?
 
使用etcdctl創建一個時間的節點,然后查看節點的內容
 
1- 創建/locks目錄(如果目錄已存在,如何處理?)
2- 創建/locks/scec/report目錄,如何級聯創建(需要寫工具類,即createDirRecursive)
3- 僅當不存在order節點的時候,去創建order節點,內容就是創建的時間和自身的IP,指定TTL為30秒
4- 定期更新TTL的值
 
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl mkdir /locks/scec/report

[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl ls / --recursive
/message
/workers
/workers/00000000000000000021
/workers/00000000000000000022
/workers/00000000000000000023
/foo
/locks
/locks/scec
/locks/scec/report
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl get /locks/scec/report
/locks/scec/report: is a directory
 
上面創建了/locks/scec/repor這個節點,接下來就是創建相應的節點了,我們要創建一個order節點,表示是訂單報表的任務,TTL為30秒,而其值為一個JSON,分別是IP,端口,和創建的時間。
 
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl set /locks/scec/report/order "192.168.1.10" --ttl 10
192.168.1.10
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl get /locks/scec/report/order
192.168.1.10
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl get /locks/scec/report/order
192.168.1.10
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl get /locks/scec/report/order
192.168.1.10
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl get /locks/scec/report/order
Error:  100: Key not found (/locks/scec/report/order) [34]
 
可以看到,當修改后,這個TTL是會失效的,接着我們來測試,如果沒有失效的時候,一直去更新,結果會如何?
 
測試發現,一旦這樣設置后,節點會一直存在,即生命周期不斷地延長。
 
# ./etcdctl set /locks/scec/report/order "192.168.1.10" --ttl 10
192.168.1.10
 
首先設置一下有TTL值的情況,然后,要監測其改變,需要監測上層目錄的改變,這樣才能夠進行協調,所以每個監聽器都是一個單獨的線程,否則無法進行回調操作。
 
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl get /locks/scec/report/order
192.168.1.10
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl get /locks/scec/report/order
192.168.1.10
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl get /locks/scec/report/order
Error:  100: Key not found (/locks/scec/report/order) [52]
 
此時,原來watch的節點會有變化,會退出,並且顯示下面的內容:
 
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl watch --recursive /locks/scec/report
[expire] /locks/scec/report/order

除此之外,只要下層節點有變化的,都會watch退出,例如:
 
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl watch --recursive /locks/scec/report
[set] /locks/scec/report/order
192.168.1.10
 
如上所示,一旦有節點的變化,則會退出,因為有事件產生。所以,在watch的時候,如果節點會變化,需要做的事情是設置上級目錄的watch。
 
如果節點是永久性的節點,或者說不是節點的刪除事件,則了會觸發節點上的事件。例如:
 
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl set /locks/scec/report/order "192.168.1.10"
192.168.1.10
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl set /locks/scec/report/order "192.168.1.11"
192.168.1.11
 
如上所示,中間我們修改了節點的值,此時,我們的節點上的watch會生效:
 
[root@ansible01 etcd-v2.2.0-linux-amd64]# ./etcdctl watch --recursive /locks/scec/report/order
[set] /locks/scec/report/order
192.168.1.11
整個的原理驗證沒有問題后,就可以使用boon的etcd java client library來完成SDK的封裝了。先從用戶角度進行分析。
 
首先,對於Java程序員來說,使用spring IoC來注入功能是最常見的,而且,而在分布式鎖情況下,watch是最常用的功能,所以,一旦watch后,就觸發一個線程,來進行控制,因為API自行完成了這個功能,所以,
 
雖然boon的etcd java client library是非常強大的,但是對於使用者而言,是需要屏蔽這些細節的,以后改成使用zookeeper或者其他的分布式協調器才能夠成為可能,因此我們需要首先抽象中相應的接口。
 
對於分布式協調器來說,假如我們這個叫LeaderElection,即選主算法。主要行為有兩個,一個是試圖成為master,即try,即試圖成為Master, 提供的就是目錄和節點的名稱,其中要實現分級式處理,即如果上級目錄不存在,則需要依次創建。已存在的,則忽略繼續。這樣,如果當前沒有主存在,則會注冊自己,要提供的有IP和端口,因此需要提供IP和端口信息,這也是接口的部分,參考InetAddress的構造函數。其實現上,需要知道,自己是某一個的master,所以要提供一個方法,isMaster,表示自己是否是某個的主,參數就是節點全路徑。當完成后,是需要通知做一些事情的,這是一個觸發器,即回調Listener, 所以需要提供一個回調的方法,因此try這里面需要指定接口的實現,這樣可以將SDK與外部分離開來。回調的接口實例就毫無疑問地是由外部提供的。並且,定期地,當成為master后,需要周期性地發心跳的信號,這顯然需要有一個線程來做這樣的事情。要注意調度的不准確性,例如TTL是30秒,則心跳的時間就需要設置為10秒,這樣就可以避免心跳調度不及時,導致出現問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM