Apache Curator之InterProcessMutex源碼分析(四)


上篇文章通過秒購的例子對InterProcessMutex鎖有了初步認識,本文將通過對源碼進行分析帶你進入分布式鎖的世界。

老規矩先上圖,為了更清晰的了解獲取鎖,釋放鎖的過程,下圖簡化了一些細節,使整個流程更加通暢。

下面將逐個方法去分析。

InterProcessMutex.acquire()

 

 1     @Override
//獲得分布式鎖,阻塞
2 public void acquire() throws Exception { 3 if (!internalLock(-1, null)) { 4 throw new IOException("Lost connection while trying to acquire lock: " + basePath); 5 } 6 } 7
//獲得分布式鎖,在指定的時間內阻塞,推薦使用 8 @Override 9 public boolean acquire(long time, TimeUnit unit) throws Exception { 10 return internalLock(time, unit); 11 }

 

InterProcessMutex.internalLock(long time, TimeUnit unit)

 1     private boolean internalLock(long time, TimeUnit unit) throws Exception {
 2 
 3         Thread currentThread = Thread.currentThread();
 4         //獲得當前線程的鎖對象
 5         LockData lockData = threadData.get(currentThread);
 6         if (lockData != null) {
 7             // re-entering
//如果鎖不為空,當前線程已經獲得鎖,可重入鎖,lockCount++ 8 lockData.lockCount.incrementAndGet(); 9 return true; 10 } 11
//獲取鎖,返回鎖的節點路徑 12 String lockPath = internals.attemptLock(time, unit, getLockNodeBytes()); 13 if (lockPath != null) {
//將當前線程的鎖對象信息保存起來
14 LockData newLockData = new LockData(currentThread, lockPath); 15 threadData.put(currentThread, newLockData); 16 return true; 17 } 18 19 return false; 20 }

LockInternals.attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes)

 1     String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
 2         final long startMillis = System.currentTimeMillis();
 3         final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;//等待時間,毫秒
 4         final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
 5         int retryCount = 0;
 6 
 7         String ourPath = null;
 8         boolean hasTheLock = false;
 9         boolean isDone = false;
10         while (!isDone) {
11             isDone = true;
12 
13             try {
//在當前path下創建臨時有序節點
14 ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//判斷是不是序號最小的節點,如果是返回true,否則阻塞等待
15 hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath); 16 } catch (KeeperException.NoNodeException e) { 17 if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) { 18 isDone = false; 19 } else { 20 throw e; 21 } 22 } 23 } 24 25 if (hasTheLock) { 26 return ourPath; 27 } 28 29 return null; 30 }

StandardLockInternalsDriver.createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes)

 1     @Override
 2     public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
 3         String ourPath;
//在zookeeper的指定路徑上,創建一個臨時序列節點。只是純粹的創建了一個節點,並不是說線程已經持有了鎖。  
4 if (lockNodeBytes != null) { 5 ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes); 6 } else { 7 ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path); 8 } 9 return ourPath; 10 }

LockInternals.internalLockLoop(long startMillis, Long millisToWait, String ourPath)

 1     private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
 2         boolean haveTheLock = false;
 3         boolean doDelete = false;
 4         try {
 5             if (revocable.get() != null) {
 6                 client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
 7             }
 8 
 9             while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
            //獲得所有子節點
10 List<String> children = getSortedChildren(); 11 String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash 12 13 PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
//判斷是否是最小節點
14 if (predicateResults.getsTheLock()) { 15 haveTheLock = true; 16 } else { 17 String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch(); 18             //同步,是為了實現公平鎖 19 synchronized (this) { 20 try {
//給比自己小的節點設置監聽器
21 client.getData().usingWatcher(watcher).forPath(previousSequencePath); 22 if (millisToWait != null) { 23 millisToWait -= (System.currentTimeMillis() - startMillis); 24 startMillis = System.currentTimeMillis();
//等待超時,超時刪除臨時節點,超時時間是acquire方法傳入的參數
25 if (millisToWait <= 0) { 26 doDelete = true; // timed out - delete our node 27 break; 28 } 29 //沒有超時,繼續等待 30 wait(millisToWait); 31 } else {
//如果等待時間==null,一直阻塞等待
32 wait(); 33 } 34 } catch (KeeperException.NoNodeException e) { 35 // it has been deleted (i.e. lock released). Try to acquire again 36 } 37 } 38 } 39 } 40 } catch (Exception e) { 41 ThreadUtils.checkInterrupted(e); 42 doDelete = true; 43 throw e; 44 } finally { 45 if (doDelete) { 46 deleteOurPath(ourPath);//如果如果拋出異常或超時,都會刪除臨時節點 47 } 48 } 49 return haveTheLock; 50 }

LockInternals.getSortedChildren()

 1     List<String> getSortedChildren() throws Exception {
 2         return getSortedChildren(client, basePath, lockName, driver);
 3     }
 4 
 5     public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {
 6         try {
//獲得basePath下所有子節點,進行排序
7 List<String> children = client.getChildren().forPath(basePath); 8 List<String> sortedList = Lists.newArrayList(children); 9 Collections.sort(sortedList, new Comparator<String>() { 10 @Override 11 public int compare(String lhs, String rhs) { 12 return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName)); 13 } 14 }); 15 return sortedList; 16 } catch (KeeperException.NoNodeException ignore) { 17 return Collections.emptyList(); 18 } 19 }

StandardLockInternalsDriver.getsTheLock(CuratorFramework c, List<String> child, String nodeName, int max)

 1     @Override
 2     public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
 3         int ourIndex = children.indexOf(sequenceNodeName);
 4         validateOurIndex(sequenceNodeName, ourIndex);
 5 
 6         boolean getsTheLock = ourIndex < maxLeases;
 7         String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
 8 
 9         return new PredicateResults(pathToWatch, getsTheLock);
10     }

LockInternals.deleteOurPath(String ourPath)

1     private void deleteOurPath(String ourPath) throws Exception {
2         try {
3             client.delete().guaranteed().forPath(ourPath);
4         } catch (KeeperException.NoNodeException e) {
5             // ignore - already deleted (possibly expired session, etc.)
6         }
7     }

InterProcessMutex.release()

 1     @Override
 2     public void release() throws Exception {
 3         /*
 4             Note on concurrency: a given lockData instance
 5             can be only acted on by a single thread so locking isn't necessary
 6          */
 7 
 8         Thread currentThread = Thread.currentThread();
 9         LockData lockData = threadData.get(currentThread);
//如果當前線程沒有持有鎖,不能釋放
10 if (lockData == null) { 11 throw new IllegalMonitorStateException("You do not own the lock: " + basePath); 12 } 13
//重入鎖計數減一,如果還大於0,不能釋放。直到所有重入業務完成,計數為0才能釋放 14 int newLockCount = lockData.lockCount.decrementAndGet(); 15 if (newLockCount > 0) { 16 return; 17 } 18 if (newLockCount < 0) { 19 throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath); 20 } 21 try { 22 internals.releaseLock(lockData.lockPath); 23 } finally { 24 threadData.remove(currentThread); 25 } 26 }

LockInternals.release()

1     final void releaseLock(String lockPath) throws Exception {
2         client.removeWatchers();//刪除監聽
3         revocable.set(null);
4         deleteOurPath(lockPath);//刪除Path
5     }

 

 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM