信號量Semaphore實現原理


  Semaphore用於管理信號量,在並發編程中,可以控制返訪問同步代碼的線程數量。Semaphore在實例化時傳入一個int值,也就是指明信號數量。主要方法有兩個:acquire()和release()。acquire()用於請求信號,每調用一次,信號量便少一個。release()用於釋放信號,調用一次信號量加一個。信號量用完以后,后續使用acquire()方法請求信號的線程便會加入阻塞隊列掛起。本篇簡單分析Semaphore的源碼,說明其實現原理。

  Semaphore對於信號量的控制是基於AQS(AbstractQueuedSynchronizer)來做的。Semaphore有一個內部類Sync繼承了AQS。而且Semaphore中還有兩個內部類FairSync和NonfairSync繼承Sync,也就是說Semaphore有公平鎖和非公平鎖之分。以下是Semaphore中內部類的結構:

    

  看一下Semaphore的兩個構造函數:

public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

  默認是非公平鎖。兩個構造方法都必須傳int permits值。

  

  這個int值在實例化內部類時,被設置為AQS中的state。

Sync(int permits) {
            setState(permits);
        }

 

一、acquire()獲取信號

  內部類Sync調用AQS中的acquireSharedInterruptibly()方法

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
  • 調用tryAcquireShared()方法嘗試獲取信號。
  • 如果沒有可用信號,將當前線程加入等待隊列並掛起

  tryAcquireShared()方法被Semaphore的內部類NonfairSync和FairSync重寫,實現有一些區別。

  NonfairSync.tryAcquireShared()

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

  可以看到,非公平鎖對於信號的獲取是直接使用CAS進行嘗試的。

 

  FairSync.tryAcquireShared()

protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
  • 先調用hasQueuedPredecessors()方法,判斷隊列中是否有等待線程。如果有,直接返回-1,表示沒有可用信號
  • 隊列中沒有等待線程,再使用CAS嘗試更新state,獲取信號

  再看看acquireSharedInterruptibly()方法中,如果沒有可用信號加入隊列的方法doAcquireSharedInterruptibly()

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);   // 1
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();   
                if (p == head) {      // 2
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&     // 3
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);   
        }
    }
  1. 封裝一個Node節點,加入隊列尾部
  2. 在無限循環中,如果當前節點是頭節點,就嘗試獲取信號
  3. 不是頭節點,在經過節點狀態判斷后,掛起當前線程

二、release()釋放信號  

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {    // 1
            doReleaseShared();  // 2
            return true;
        }
        return false;
 }
  1. 更新state加一
  2. 喚醒等待隊列頭節點線程

  tryReleaseShared()方法在內部類Sync中被重寫

protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

  這里也就是直接使用CAS算法,將state也就是可用信號,加1。

  

看看Semaphore具體的使用示例

public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 10,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10));
        //信號總數為5
        Semaphore semaphore = new Semaphore(5);
        //運行10個線程
        for (int i = 0; i < 10; i++) {
            threadPool.execute(new Runnable() {
                
                @Override
                public void run() {
                    try {
                        //獲取信號
                        semaphore.acquire();   
                        System.out.println(Thread.currentThread().getName() + "獲得了信號量,時間為" + System.currentTimeMillis());
                        //阻塞2秒,測試效果
                        Thread.sleep(2000);
                        System.out.println(Thread.currentThread().getName() + "釋放了信號量,時間為" + System.currentTimeMillis());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        //釋放信號
                        semaphore.release();
                    }
                
                }
            });
        }
        threadPool.shutdown();
    }

  代碼結果為:

pool-1-thread-2獲得了信號量,時間為1550584196125
pool-1-thread-1獲得了信號量,時間為1550584196125
pool-1-thread-3獲得了信號量,時間為1550584196125
pool-1-thread-4獲得了信號量,時間為1550584196126
pool-1-thread-5獲得了信號量,時間為1550584196127
pool-1-thread-2釋放了信號量,時間為1550584198126
pool-1-thread-3釋放了信號量,時間為1550584198126
pool-1-thread-4釋放了信號量,時間為1550584198126
pool-1-thread-6獲得了信號量,時間為1550584198126
pool-1-thread-9獲得了信號量,時間為1550584198126
pool-1-thread-8獲得了信號量,時間為1550584198126
pool-1-thread-1釋放了信號量,時間為1550584198126
pool-1-thread-10獲得了信號量,時間為1550584198126
pool-1-thread-5釋放了信號量,時間為1550584198127
pool-1-thread-7獲得了信號量,時間為1550584198127
pool-1-thread-6釋放了信號量,時間為1550584200126
pool-1-thread-8釋放了信號量,時間為1550584200126
pool-1-thread-10釋放了信號量,時間為1550584200126
pool-1-thread-9釋放了信號量,時間為1550584200126
pool-1-thread-7釋放了信號量,時間為1550584200127

  可以看到,最多5個線程獲得信號,其它線程必須等待獲得信號的線程釋放信號。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM