Curator使用:(五)分布式鎖


分布式鎖介紹##

分布式執行一些不需要同時執行的復雜任務,curator利用zk的特質,實現了這個選舉過程。其實就是利用了多個zk客戶端在同一個位置建節點,只會有一個客戶端建立成功這個特性。來實現同一時間,只會選擇一個客戶端執行任務

代碼###

    //分布式鎖
    InterProcessMutex lock = new InterProcessMutex(cc,"/lock_path");
    CountDownLatch down = new CountDownLatch(1);
    for (int i = 0; i < 30; i++) {
        new Thread(()->{
            try {
                down.await();
                lock.acquire();

            } catch (Exception e) {
                e.printStackTrace();
            }
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS");
            System.out.println(sdf.format(new Date()));
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
    down.countDown();

InterProcessMutex 是一個可重入的排他鎖,獲取鎖的過程是通過往ZK下面成功建立節點來實現的,下面是獲取鎖的過程

    //獲取當前線程
    Thread currentThread = Thread.currentThread();
    //獲取當前線程的鎖數據
        LockData lockData = threadData.get(currentThread);
    //如果不為null,則將鎖的數量+1,實現可重入
        if ( lockData != null )
        {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }
    //第一次會到這里,嘗試在我們之前設置的路徑下建立節點
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    //建立成功后就初始化lockdata 並按當前線程進行保存,所以可以通過創建多個thread來模擬鎖競爭,而不需要建多個client。
        if ( lockPath != null )
        {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;

下面是attemptLock的重要代碼

    while ( !isDone )
        {
            isDone = true;

            try
            {
                //建立節點
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                //下面就是獲取鎖和加鎖的循環了
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                //session過期時走這里,按策略處理,允許重試就重試,否則就拋出異常
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }
 

下面是internalLockLoop的重要代碼

    while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
            {
                //排序是因為要實現公平鎖,加上maxleases參數限制取首位
                List<String>        children = getSortedChildren();
                //得到子節點名稱,比如 _c_ce2a26cb-9721-4f56-91fd-a6d00b00b12c-lock-0000000030
                String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
                
                //獲取是否獲得鎖的狀態,重要方法,maxLeases為1,后面要通過這個參數進行對比,通過判斷小於這個來實現 公平鎖
                PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if ( predicateResults.getsTheLock() )
                {
                    haveTheLock = true;
                }
                else
                {
                    //如果沒有獲得鎖,就給當前節點加個watcher,繼續等待,一旦被刪掉就調用這個watcher notifyall。
                    String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();

                    synchronized(this)
                    {
                        try 
                        {
                            // 這個watcher只有一個作用就是喚醒其他線程進行競爭 notifyAll();
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            ...
                        }
                        catch ( KeeperException.NoNodeException e ) 
                        {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM