排他鎖(X)
這里主要講講分布式鎖中的排他鎖。排他鎖(Exclusive Locks,簡稱X鎖),又稱為寫鎖或獨占鎖,是一種基本的鎖類型。如果事務T1對數據對象O1加上了排他鎖,那么在整個加鎖期間,只允許T1對O1進行數據的讀取和更新操作,其它任何事務都不能對O1進行任何類型的操作,直道T1釋放了排他鎖。
定義鎖
在ZooKeeper中,可以通過在ZooKeeper中創建一個數據節點來表示一個鎖。比如,/exclusive_lock/lock節點(znode)就可以表示為一個鎖。
獲取鎖
在需要獲取排他鎖時,所有的客戶端都會試圖通過create()接口,在/exclusive_lock節點下創建臨時的子節點/exclusive_lock/lock,但ZooKeeper的強一致性最終只會保證僅有一個客戶單能創建成功,那么就認為該客戶端獲取了鎖。同時,所有沒有獲取鎖的客戶端事務只能處於等待狀態,這些處於等待狀態的客戶端事先可以在/exclusive_lock節點上注冊一個子節點變更的Watcher監聽,以便實時監聽到子節點的變更情況。
釋放鎖
在“定義鎖”部分,我們已經提到/exclusive_lock/lock是一個臨時節點,因此在以下兩種情況下可能釋放鎖。
- 當前獲取鎖的客戶端發生宕機,那么ZooKeeper服務器上保存的臨時性節點就會被刪除;
- 正常執行完業務邏輯后,由客戶端主動來將自己創建的臨時節點刪除。
無論什么情況下,臨時節點/exclusive_lock/lock被移除,ZooKeeper都會通知在/exclusive_lock注冊了子節點變更Watcher監聽的客戶端。這些客戶端在接收到通知以后就會再次發起獲取鎖的操作,即重復“獲取鎖”過程。排他鎖流程如下:
下面的代碼(java)演示了使用Curator框架來實現ZooKeeper分布式鎖
import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.SneakyThrows; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.data.Stat; public class ZkLock { @SneakyThrows public static void main(String[] args) { final String connectString = "localhost:2181,localhost:2182,localhost:2183"; // 重試策略,初始化每次重試之間需要等待的時間,基准等待時間為1秒。 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); // 使用默認的會話時間(60秒)和連接超時時間(15秒)來創建 Zookeeper 客戶端 @Cleanup CuratorFramework client = CuratorFrameworkFactory.builder(). connectString(connectString). connectionTimeoutMs(15 * 1000). sessionTimeoutMs(60 * 100). retryPolicy(retryPolicy). build(); // 啟動客戶端 client.start(); final String lockNode = "/lock_node"; InterProcessMutex lock = new InterProcessMutex(client, lockNode); try { // 1. Acquire the mutex - blocking until it's available. lock.acquire(); // OR // 2. Acquire the mutex - blocks until it's available or the given time expires. if (lock.acquire(60, TimeUnit.MINUTES)) { Stat stat = client.checkExists().forPath(lockNode); if (null != stat){ // Dot the transaction } } } finally { if (lock.isAcquiredInThisProcess()) { lock.release(); } } } }
maven引用
<!--curator這個開源項目提供zookeeper分布式鎖實現--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.8.0</version> </dependency>