【JUC】JDK1.8源碼分析之Semaphore(六)


一、前言

  分析了CountDownLatch源碼后,下面接着分析Semaphore的源碼。Semaphore稱為計數信號量,它允許n個任務同時訪問某個資源,可以將信號量看做是在向外分發使用資源的許可證,只有成功獲取許可證,才能使用資源。下面開始分析Semaphore的源碼。

二、Semaphore的數據結構

  分析源碼可以知道,Semaphore底層是基於AbstractQueuedSynchronizer來實現的,所以,Semaphore的數據結構也依托於AQS的數據結構,在前面對AQS的分析中已經指出了其數據結構,在這里不再累贅。

三、Semaphore源碼分析

  3.1 類的繼承關系 

public class Semaphore implements java.io.Serializable {}

  說明:Semaphore實現了Serializable接口,即可以進行序列化。

  3.2 類的內部類

  Semaphore總共有三個內部類,並且三個內部類是緊密相關的,下面先看三個類的關系。

  說明:Semaphore與ReentrantLock的內部類的結構相同,類內部總共存在Sync、NonfairSync、FairSync三個類,NonfairSync與FairSync類繼承自Sync類,Sync類繼承自AbstractQueuedSynchronizer抽象類。下面逐個進行分析。

  1. Sync類   

  Sync類的源碼如下。 

    // 內部類,繼承自AQS
    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 版本號
        private static final long serialVersionUID = 1192457210091910933L;
        
        // 構造函數
        Sync(int permits) {
            // 設置狀態數
            setState(permits);
        }
        
        // 獲取許可
        final int getPermits() {
            return getState();
        }

        // 共享模式下非公平策略獲取
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) { // 無限循環
                // 獲取許可數
                int available = getState();
                // 剩余的許可
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining)) // 許可小於0或者比較並且設置狀態成功
                    return remaining;
            }
        }
        
        // 共享模式下進行釋放
        protected final boolean tryReleaseShared(int releases) {
            for (;;) { // 無限循環
                // 獲取許可
                int current = getState();
                // 可用的許可
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next)) // 比較並進行設置成功
                    return true;
            }
        }

        // 根據指定的縮減量減小可用許可的數目
        final void reducePermits(int reductions) {
            for (;;) { // 無限循環
                // 獲取許可
                int current = getState();
                // 可用的許可
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next)) // 比較並進行設置成功
                    return;
            }
        }

        // 獲取並返回立即可用的所有許可
        final int drainPermits() {
            for (;;) { // 無限循環
                // 獲取許可
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0)) // 許可為0或者比較並設置成功
                    return current;
            }
        }
    }
View Code

  說明:Sync類的屬性相對簡單,只有一個版本號,Sync類存在如下方法和作用如下。

  2. NonfairSync類

  NonfairSync類繼承了Sync類,表示采用非公平策略獲取資源,其只有一個tryAcquireShared方法,重寫了AQS的該方法,其源碼如下。 

    static final class NonfairSync extends Sync {
        // 版本號
        private static final long serialVersionUID = -2694183684443567898L;
        
        // 構造函數
        NonfairSync(int permits) {
            super(permits);
        }
        // 共享模式下獲取
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
View Code

  說明:從tryAcquireShared方法的源碼可知,其會調用父類Sync的nonfairTryAcquireShared方法,表示按照非公平策略進行資源的獲取。

  3. FairSync類

  FairSync類繼承了Sync類,表示采用公平策略獲取資源,其只有一個tryAcquireShared方法,重寫了AQS的該方法,其源碼如下。  

        protected int tryAcquireShared(int acquires) {
            for (;;) { // 無限循環
                if (hasQueuedPredecessors()) // 同步隊列中存在其他節點
                    return -1;
                // 獲取許可
                int available = getState();
                // 剩余的許可
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining)) // 剩余的許可小於0或者比較設置成功
                    return remaining;
            }
        }
View Code

  說明:從tryAcquireShared方法的源碼可知,它使用公平策略來獲取資源,它會判斷同步隊列中是否存在其他的等待節點。

  3.3 類的屬性   

public class Semaphore implements java.io.Serializable {
    // 版本號
    private static final long serialVersionUID = -3222578661600680210L;
    // 屬性
    private final Sync sync;
}

  說明:Semaphore自身只有兩個屬性,最重要的是sync屬性,基於Semaphore對象的操作絕大多數都轉移到了對sync的操作。

  3.4 類的構造函數

  1. Semaphore(int)型構造函數  

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
View Code

  說明:該構造函數會創建具有給定的許可數和非公平的公平設置的Semaphore。

  2. Semaphore(int, boolean)型構造函數 

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
View Code

  說明:該構造函數會創建具有給定的許可數和給定的公平設置的Semaphore。

  3.5 核心函數分析

  1. acquire函數

  此方法從信號量獲取一個(多個)許可,在提供一個許可前一直將線程阻塞,或者線程被中斷,其源碼如下  

public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
View Code

  說明:該方法中將會調用Sync對象的acquireSharedInterruptibly(從AQS繼承而來的方法)方法,而acquireSharedInterruptibly方法在上一篇CountDownLatch中已經進行了分析,在此不再累贅。

  最終可以獲取大致的方法調用序列(假設使用非公平策略)。如下圖所示。

  說明:上圖只是給出了大體會調用到的方法,和具體的示例可能會有些差別,之后會根據具體的示例進行分析。

  2. release函數

  此方法釋放一個(多個)許可,將其返回給信號量,源碼如下。  

    public void release() {
        sync.releaseShared(1);
    }
View Code

  說明:該方法中將會調用Sync對象的releaseShared(從AQS繼承而來的方法)方法,而releaseShared方法在上一篇CountDownLatch中已經進行了分析,在此不再累贅。

  最終可以獲取大致的方法調用序列(假設使用非公平策略)。如下圖所示。

  說明:上圖只是給出了大體會調用到的方法,和具體的示例可能會有些差別,之后會根據具體的示例進行分析。

四、示例

  下面給出了一個使用Semaphore的示例。 

package com.hust.grid.leesf.semaphore;

import java.util.concurrent.Semaphore;

class MyThread extends Thread {
    private Semaphore semaphore;
    
    public MyThread(String name, Semaphore semaphore) {
        super(name);
        this.semaphore = semaphore;
    }
    
    public void run() {        
        int count = 3;
        System.out.println(Thread.currentThread().getName() + " trying to acquire");
        try {
            semaphore.acquire(count);
            System.out.println(Thread.currentThread().getName() + " acquire successfully");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release(count);
            System.out.println(Thread.currentThread().getName() + " release successfully");
        }
    }
}

public class SemaphoreDemo {
    public final static int SEM_SIZE = 10;
    
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(SEM_SIZE);
        MyThread t1 = new MyThread("t1", semaphore);
        MyThread t2 = new MyThread("t2", semaphore);
        t1.start();
        t2.start();
        int permits = 5;
        System.out.println(Thread.currentThread().getName() + " trying to acquire");
        try {
            semaphore.acquire(permits);
            System.out.println(Thread.currentThread().getName() + " acquire successfully");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore.release();
            System.out.println(Thread.currentThread().getName() + " release successfully");
        }
        
                
    }
}

  運行結果(某一次): 

main trying to acquire
main acquire successfully
t1 trying to acquire
t1 acquire successfully
t2 trying to acquire
t1 release successfully
main release successfully
t2 acquire successfully
t2 release successfully

  說明:首先,生成一個信號量,信號量有10個許可,然后,main,t1,t2三個線程獲取許可運行,根據結果,可能存在如下的一種時序。

  說明:如上圖所示,首先,main線程執行acquire操作,並且成功獲得許可,之后t1線程執行acquire操作,成功獲得許可,之后t2執行acquire操作,由於此時許可數量不夠,t2線程將會阻塞,直到許可可用。之后t1線程釋放許可,main線程釋放許可,此時的許可數量可以滿足t2線程的要求,所以,此時t2線程會成功獲得許可運行,t2運行完成后釋放許可。下面進行詳細分析。

  ① main線程執行semaphore.acquire操作。主要的函數調用如下圖所示。

  說明:此時,可以看到只是AQS的state變為了5,main線程並沒有被阻塞,可以繼續運行。

  ② t1線程執行semaphore.acquire操作。主要的函數調用如下圖所示。

  說明:此時,可以看到只是AQS的state變為了2,t1線程並沒有被阻塞,可以繼續運行。

  ③ t2線程執行semaphore.acquire操作。主要的函數調用如下圖所示。

  說明:此時,t2線程獲取許可不會成功,之后會導致其被禁止運行,值得注意的是,AQS的state還是為2。

  ④ t1執行semaphore.release操作。主要的函數調用如下圖所示。

  說明:此時,t2線程將會被unpark,並且AQS的state為5,t2獲取cpu資源后可以繼續運行。

  ⑤ main線程執行semaphore.release操作。主要的函數調用如下圖所示。

  說明:此時,t2線程還會被unpark,但是不會產生影響,此時,只要t2線程獲得CPU資源就可以運行了。此時,AQS的state為10。

  ⑥ t2獲取CPU資源,繼續運行,此時t2需要恢復現場,回到parkAndCheckInterrupt函數中,也是在should繼續運行。主要的函數調用如下圖所示。

  說明:此時,可以看到,Sync queue中只有一個結點,頭結點與尾節點都指向該結點,在setHeadAndPropagate的函數中會設置頭結點並且會unpark隊列中的其他結點。

  ⑦ t2線程執行semaphore.release操作。主要的函數調用如下圖所示。

  說明:t2線程經過release后,此時信號量的許可又變為10個了,此時Sync queue中的結點還是沒有變化。

五、總結

  經過分析可知Semaphore的內部工作流程也是基於AQS,並且不同於CyclicBarrier和ReentrantLock,單獨使用Semaphore是不會使用到AQS的條件隊列的,其實,只有進行await操作才會進入條件隊列,其他的都是在同步隊列中,只是當前線程會被park。謝謝各位園友的觀看~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM