Zookeeper實戰-分布式鎖


1. 簡介

我們在之前的博文中講解了如何使用redis實現分布式鎖,其實除了 redis 還有 zookeeper 也能實現分布式鎖。

廢話不多說,直接上圖。

從整個流程中可以看出,zk實現分布式鎖,主要是靠zk的臨時順序節點和watch機制實現的。

2. quick start

Curator 是 Netflix 公司開源的一套 zookeeper 客戶端框架,解決了很多 Zookeeper 客戶端非常底層的細節開發工作,包括連接重連、反復注冊 Watcher 和 NodeExistsException 異常等。

curator-recipes:封裝了一些高級特性,如:Cache 事件監聽、選舉、分布式鎖、分布式計數器、分布式 Barrier 等。

2.1 引入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.0</version>
</dependency>

curator-recipes中已經依賴了zookeepercurator-framework jar,所以這里不用額外的依賴其他jar。

2.2 測試代碼

測試代碼其實很簡單,只需要幾行代碼而已,初始化客戶端,創建鎖對象,加鎖 和 釋放鎖。

這里先把加鎖的代碼注釋掉,試下不加鎖的情況。

package com.ldx.zookeeper.controller;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;

/**
 * 分布式鎖demo
 *
 * @author ludangxin
 * @date 2021/9/4
 */
@Slf4j
@RestController
@RequestMapping("lock")
@RequiredArgsConstructor
public class LockDemoController {
   /**
    * 庫存數
    */
   private Integer stock = 30;
   /**
    * zk client 
    */
   private static CuratorFramework CLIENT;

   /**
    * 初始化連接信息
    */
   @PostConstruct
   private void init() {
      RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
      CLIENT = CuratorFrameworkFactory.builder().connectString("localhost:2181").retryPolicy(retryPolicy).build();
      CLIENT.start();
   }

   @GetMapping("buy")
   public String buy() {
      // 可重入鎖
      InterProcessMutex mutexLock = new InterProcessMutex(CLIENT, "/lock");
      try {
         // 加鎖
//         mutexLock.acquire();
         if(this.stock > 0) {
            Thread.sleep(500);
            this.stock--;
         }
         log.info("剩余庫存==={}", this.stock);
      } catch(Exception e) {
         log.error(e.getMessage());
         return "no";
      }
      finally {
         try {
            // 釋放鎖
//            mutexLock.release();
         } catch(Exception e) {
            log.error(e.getMessage());
         }
      }
      return "ok";
   }
}

2.3 啟動測試

這里我們使用jemter進行模擬並發請求,當然我們這里只啟動了一個server,主要是為了節約文章篇幅(啟動多個server還得連接db...),能說明問題即可。

同一時刻發送一百個請求。

測試結果部分日志如下:

很明顯出現了超賣了現象,並且請求是無序的(請求是非公平的)。

此時我們將注釋的加鎖代碼打開,再進行測試。

測試結果部分日志如下:

很明顯沒有出現超賣的現象。

通過zk 客戶端工具查看創建的部分臨時節點如下:

3. 源碼解析

3.1 加鎖邏輯

我們再通過查看Curator加鎖源碼來驗證下我們的加鎖邏輯。

首先我們查看InterProcessMutex::acquire()方法,並且我們通過注釋可以得知該方法加的鎖是可重入鎖。

/**
 * Acquire the mutex - blocking until it's available. Note: the same thread
 * can call acquire re-entrantly. Each call to acquire must be balanced by a call
 * to {@link #release()}
 *
 * @throws Exception ZK errors, connection interruptions
 */
@Override
public void acquire() throws Exception
{
    if ( !internalLock(-1, null) )
    {
        throw new IOException("Lost connection while trying to acquire lock: " + basePath);
    }
}

查看internalLock方法如下。

private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();

private boolean internalLock(long time, TimeUnit unit) throws Exception {
    // 獲取當前線程
    Thread currentThread = Thread.currentThread();
    // 在map中查看當前線程有沒有請求過
    LockData lockData = threadData.get(currentThread);
    if ( lockData != null) {
        // 請求過 則 +1 , 實現了鎖的重入邏輯
        lockData.lockCount.incrementAndGet();
        return true;
    }
    // 嘗試獲取鎖
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null) {
        // 創建鎖對象
        LockData newLockData = new LockData(currentThread, lockPath);
        // 添加到map中
        threadData.put(currentThread, newLockData);
        return true;
    }

    return false;
}

我們繼續查看LockInternals::attemptLock()嘗試獲取鎖邏輯如下。

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
    final long      startMillis = System.currentTimeMillis();
    final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
    final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
    int             retryCount = 0;
    String          ourPath = null;
    boolean         hasTheLock = false;
    boolean         isDone = false;
    while(!isDone) {
        // 成功標識
        isDone = true;

        try {
            // 創建鎖
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            // 判斷是否加鎖成功
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        } catch( KeeperException.NoNodeException e ) {
            // 當StandardLockInternalsDriver 找不到鎖定節點時,它會拋出會話過期等情況。因此,如果重試允許,則繼續循環
            if( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) ) {
                isDone = false;
            } else {
                throw e;
            }
        }
    }

    if(hasTheLock) {
        return ourPath;
    }

    return null;
}

在這里先查看下創建鎖的邏輯StandardLockInternalsDriver::createsTheLock(),如下。

@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
    String ourPath;
    // 判斷有沒有傳znode data 我們這里為null
    if(lockNodeBytes != null) {
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
    } else {
      // 創建Container父節點且創建臨時的順序節點
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
    }
    return ourPath;
}

鎖創建成功后我們再查看下程序是如何加鎖的LockInternals::internalLockLoop()

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
    boolean     haveTheLock = false;
    boolean     doDelete = false;
    try {
        if(revocable.get() != null) {
            client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
        }
	// 當客戶端初始化好后 且 還沒有獲取到鎖
        while((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
            // 獲取所有的子節點 且 遞增排序
            List<String>        children = getSortedChildren();
            // 獲取當前節點 path
            String              sequenceNodeName = ourPath.substring(basePath.length() + 1);
	    // 獲取當前鎖
            // 1. 先判斷當前節點是不是下標為0的節點,即是不是序列值最小的節點。
            // 2. 如果是則獲取鎖成功,返回成功標識。
            // 3. 如果不是則返回比它小的元素作為被監聽的節點
            PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if(predicateResults.getsTheLock()) {
                // 獲取鎖成功 返回成功標識
                haveTheLock = true;
            } else {
                // 索取鎖失敗,則獲取比它小的上一個節點元素
                String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                synchronized(this) {
                    try {
                        // 監聽比它小的上一個節點元素
                        client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        // 如果設置了超時,則繼續判斷是否超時
                        if(millisToWait != null) {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if(millisToWait <= 0) {
                                doDelete = true;    
                                break;
                            }
			    // 沒有超時則 等待
                            wait(millisToWait);
                        } else {
			    // 沒有超時則 等待
                            wait();
                        }
                    } catch(KeeperException.NoNodeException e) {
                        // it has been deleted (i.e. lock released). Try to acquire again
                    }
                }
            }
        }
    } catch(Exception e) {
        ThreadUtils.checkInterrupted(e);
        doDelete = true;
        throw e;
    } finally {
        // 報錯即刪除該節點
        if(doDelete) {
            deleteOurPath(ourPath);
        }
    }
    return haveTheLock;
}

最后 我們再看下上段代碼中提到的很關鍵的方法driver.getsTheLock() 即 StandardLockInternalsDriver::getsTheLock()

@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
    // 獲取當前節點的下標 
    int             ourIndex = children.indexOf(sequenceNodeName);
    validateOurIndex(sequenceNodeName, ourIndex);
    // 這里的maxLeases == 1,即當前節點的下標是不是0
    boolean         getsTheLock = ourIndex < maxLeases;
    // 如果當前節點的下標為0,則不返回被監聽的節點(因為自己已經是最小的節點了),如果不是則返回比自己小的節點作為被監聽的節點。
    String          pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
    // 構造返回結果
    return new PredicateResults(pathToWatch, getsTheLock);
}

3.2 小節

其實加鎖的源碼還是比較清晰和易懂的,我們在這里再總結下。

  1. 執行InterProcessMutex::acquire()加鎖方法。
  2. InterProcessMutex::internalLock()判斷當前線程是加過鎖,如果加過則加鎖次數+1實現鎖的重入,如果沒有加過鎖,則調用LockInternals::attemptLock()嘗試獲取鎖。
  3. LockInternals::attemptLock()首先創建Container父節點再創建臨時的順序節點,然后執行加鎖方法LockInternals::internalLockLoop()
  4. LockInternals::internalLockLoop()
    1. 先獲取當前Container下的所有順序子節點並且按照從小到大排序。
    2. 調用StandardLockInternalsDriver::getsTheLock()方法加鎖,先判斷當前節點是不是最小的順序節點,如果是則加鎖成功,如果不是則返回上一個比他小的節點,作為被監聽的節點。
    3. 上一步加鎖成功則返回true,如果失敗則執行監聽邏輯。

3.3 釋放鎖邏輯

@Override
public void release() throws Exception {
    /*
        Note on concurrency: a given lockData instance
        can be only acted on by a single thread so locking isn't necessary
     */
    // 獲取當前線程
    Thread currentThread = Thread.currentThread();
    // 查看當前線程有沒有鎖
    LockData lockData = threadData.get(currentThread);
    if(lockData == null) {
        // 沒有鎖 還釋放,報錯
        throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
    }
    // 有鎖則 鎖次數 -1
    int newLockCount = lockData.lockCount.decrementAndGet();
    // 如果鎖的次數還大於0,說明還不能釋放鎖,因為重入的方法還未執行完
    if (newLockCount > 0) {
        return;
    }
    if (newLockCount < 0) {
        // 鎖的次數小於0,報錯
        throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
    }
    try {
        // 刪除節點
        internals.releaseLock(lockData.lockPath);
    }
    finally {
        // 從當前的map中移除
        threadData.remove(currentThread);
    }
}

final void releaseLock(String lockPath) throws Exception{
    client.removeWatchers();
    revocable.set(null);
    deleteOurPath(lockPath);
}

4. redis 和 zookeeper

Zookeeper采用臨時節點和事件監聽機制可以實現分布式鎖,Redis主要是通過setnx命令實現分布式鎖。
Redis需要不斷的去嘗試獲取鎖,比較消耗性能,Zookeeper是可以通過對鎖的監聽,自動獲取到鎖,所以性能開銷較小。
另外如果獲取鎖的jvm出現bug或者掛了,那么只能redis過期刪除key或者超時刪除key,Zookeeper則不存在這種情況,連接斷開節點則會自動刪除,這樣會即時釋放鎖。

這樣一聽感覺zk的優勢還是很大的。

但是要考慮一個情況在鎖並發不高的情況下 zk沒有問題 如果在並發很高的情況下 zk的數據同步 可能造成鎖時延較長,在選舉過程中需要接受一段時間zk不可用(因為ZK 是 CP 而 redis集群是AP)。

所以說沒有哪個技術是適用於任何場景的,具體用哪個技術,還是要結合當前的技術架構和業務場景做選型和取舍。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM