Curator框架實現ZooKeeper分布式鎖


 排他鎖(X)

這里主要講講分布式鎖中的排他鎖。排他鎖(Exclusive Locks,簡稱X鎖),又稱為寫鎖或獨占鎖,是一種基本的鎖類型。如果事務T1對數據對象O1加上了排他鎖,那么在整個加鎖期間,只允許T1對O1進行數據的讀取和更新操作,其它任何事務都不能對O1進行任何類型的操作,直道T1釋放了排他鎖。

定義鎖

在ZooKeeper中,可以通過在ZooKeeper中創建一個數據節點來表示一個鎖。比如,/exclusive_lock/lock節點(znode)就可以表示為一個鎖。

獲取鎖

在需要獲取排他鎖時,所有的客戶端都會試圖通過create()接口,在/exclusive_lock節點下創建臨時的子節點/exclusive_lock/lock,但ZooKeeper的強一致性最終只會保證僅有一個客戶單能創建成功,那么就認為該客戶端獲取了鎖。同時,所有沒有獲取鎖的客戶端事務只能處於等待狀態,這些處於等待狀態的客戶端事先可以在/exclusive_lock節點上注冊一個子節點變更的Watcher監聽,以便實時監聽到子節點的變更情況。

釋放鎖

 在“定義鎖”部分,我們已經提到/exclusive_lock/lock是一個臨時節點,因此在以下兩種情況下可能釋放鎖。

  • 當前獲取鎖的客戶端發生宕機,那么ZooKeeper服務器上保存的臨時性節點就會被刪除;
  • 正常執行完業務邏輯后,由客戶端主動來將自己創建的臨時節點刪除。

無論什么情況下,臨時節點/exclusive_lock/lock被移除,ZooKeeper都會通知在/exclusive_lock注冊了子節點變更Watcher監聽的客戶端。這些客戶端在接收到通知以后就會再次發起獲取鎖的操作,即重復“獲取鎖”過程。排他鎖流程如下:

下面的代碼(java)演示了使用Curator框架來實現ZooKeeper分布式鎖

import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.SneakyThrows;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;


public class ZkLock {

  @SneakyThrows
  public static void main(String[] args) {

    final String connectString = "localhost:2181,localhost:2182,localhost:2183";

    // 重試策略,初始化每次重試之間需要等待的時間,基准等待時間為1秒。
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

    // 使用默認的會話時間(60秒)和連接超時時間(15秒)來創建 Zookeeper 客戶端
    @Cleanup CuratorFramework client = CuratorFrameworkFactory.builder().
        connectString(connectString).
        connectionTimeoutMs(15 * 1000).
        sessionTimeoutMs(60 * 100).
        retryPolicy(retryPolicy).
        build();

    // 啟動客戶端
    client.start();

    final String lockNode = "/lock_node";
    InterProcessMutex lock = new InterProcessMutex(client, lockNode);
    try {
      // 1. Acquire the mutex - blocking until it's available.
      lock.acquire();

      // OR

      // 2. Acquire the mutex - blocks until it's available or the given time expires.
      if (lock.acquire(60, TimeUnit.MINUTES)) {
        Stat stat = client.checkExists().forPath(lockNode);
        if (null != stat){
          // Dot the transaction
        }
      }
    } finally {
      if (lock.isAcquiredInThisProcess()) {
        lock.release();
      }
    }
  }

}

 maven引用

<!--curator這個開源項目提供zookeeper分布式鎖實現-->
<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>2.8.0</version>
</dependency>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM