semaphore 信號標,旗語。
Semaphore 一般譯作 信號量
,它也是一種線程同步工具,主要用於多個線程對共享資源進行並行操作的一種工具類。它代表了一種許可
的概念,是否允許多線程對同一資源進行操作的許可,使用 Semaphore 可以控制並發訪問資源的線程個數。
其作用就是停車場的顯示牌,如果剩余車位為0,那么你只能在車桿前等待或者離去。
上圖是semaphore的流程圖。
內部的同步工具類還是sync,繼承自AQS。所以學並發AQS很重要。
semaphore的公平鎖和非公平鎖:
/** * Synchronization implementation for semaphore. Uses AQS state * to represent permits. Subclassed into fair and nonfair * versions. */ abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) {
/**
* 設置AQS中的state值,AQS中state的值就是同步狀態的值,而semaphore中的permits代表了許可的數量
**/ setState(permits); }
/**
* 調用了父類的getstate方法獲取一下線程同步狀態值,線程同步值就是state
**/ final int getPermits() { return getState(); }
/**
* 這個方法是為了semaphore中的非公平鎖提供方法
**/ final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 ||
/**
* 這個方法調用了AQS中的cas方法設置state值,就是semaphore中的信號量
**/
compareAndSetState(available, remaining))
return remaining;
} } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }
/** * NonFair version */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
/** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
上面是AQS中的CAS設置state方法。
/** * Fair version */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) {
// 此處調用了AQS類中的方法,此方法是用來查看是否有線程正處於阻塞隊列中阻塞,這個方法是公平鎖和非公平鎖差異的關鍵,前者判斷的是處於阻塞隊列中的線程數量,后者是直接判斷是否滿足條件 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
對比發現公平鎖和非公平鎖的區別就是非公鎖是FairSync是先判斷是否存在待阻塞線程,這就是后到后出, 而NonfairSync是直接判斷state狀態值是否滿足條件,如果滿足條件直接返回,這就是先到先得。
到目前為之,我們看的都是tryAcquire,而tryAcquire方法是不阻塞,即使獲取不到state也不會阻塞而是返回失敗。而semaphore還有阻塞獲取的方法:
/** * Acquires a permit from this semaphore, blocking until one is * available, or the thread is {@linkplain Thread#interrupt interrupted}. * * <p>Acquires a permit, if one is available and returns immediately, * reducing the number of available permits by one. * * <p>If no permit is available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until * one of two things happens: * <ul> * <li>Some other thread invokes the {@link #release} method for this * semaphore and the current thread is next to be assigned a permit; or * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread. * </ul> * * <p>If the current thread: * <ul> * <li>has its interrupted status set on entry to this method; or * <li>is {@linkplain Thread#interrupt interrupted} while waiting * for a permit, * </ul> * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * * @throws InterruptedException if the current thread is interrupted */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
/** * Acquires in shared mode, aborting if interrupted. Implemented * by first checking interrupt status, then invoking at least once * {@link #tryAcquireShared}, returning on success. Otherwise the * thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread * is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is * otherwise uninterpreted and can represent anything * you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
可以看到阻塞的設置state狀態值的方法和非阻塞的區別在於首先調用非阻塞的設置狀態值的方法,這個方法是:
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
這個方法必須在子類中實現,前面公平鎖和非公平鎖都已經實現過了,接下來是doAcquireInterruptibly方法
/** * Acquires in exclusive interruptible mode. * @param arg the acquire argument */ private void doAcquireInterruptibly(int arg) throws InterruptedException {
//這個地方把當前線程加載阻塞隊列中 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) {
//predecessor方法是獲取前驅節點 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//這里判斷當前線程前面只有一個前驅節點,就是說當前線程是老二,再此請求獲取設置state,如果成功,則把自己設置為頭,然后把前一個線程節點置空。 setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) &&//此方法是檢查更新當前線程的節點狀態 parkAndCheckInterrupt())//如果檢查到被打斷則拋出被打斷的異常 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
更新當前節點的狀態(請求失敗的線程),這個方法是控制整個阻塞隊列的關鍵方法,前提條件是當前節點的前驅節點是pred節點。提供判斷前驅節點的狀態
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL)//當前節點需要阻塞 /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev;//跳過已經取消的節點 } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3;
另外semaphore還提供了忽略中斷的阻塞請求方法:
/** * Acquires a permit from this semaphore, blocking until one is * available. * * <p>Acquires a permit, if one is available and returns immediately, * reducing the number of available permits by one. * * <p>If no permit is available then the current thread becomes * disabled for thread scheduling purposes and lies dormant until * some other thread invokes the {@link #release} method for this * semaphore and the current thread is next to be assigned a permit. * * <p>If the current thread is {@linkplain Thread#interrupt interrupted} * while waiting for a permit then it will continue to wait, but the * time at which the thread is assigned a permit may change compared to * the time it would have received the permit had no interruption * occurred. When the thread does return from this method its interrupt * status will be set. */ public void acquireUninterruptibly() { sync.acquireShared(1); }
釋放一個許可證的方法:
/** * Releases a permit, returning it to the semaphore. * * <p>Releases a permit, increasing the number of available permits by * one. If any threads are trying to acquire a permit, then one is * selected and given the permit that was just released. That thread * is (re)enabled for thread scheduling purposes. * * <p>There is no requirement that a thread that releases a permit must * have acquired that permit by calling {@link #acquire}. * Correct usage of a semaphore is established by programming convention * in the application. */ public void release() { sync.releaseShared(1);//這個方法調用的是semaphore中的sync的tryReleaseShared方法 }
其他 Semaphore 方法
除了上面基本的 acquire 和 release 相關方法外,我們也要了解一下 Semaphore 的其他方法。Semaphore 的其他方法比較少,只有下面這幾個
drainPermits : 獲取並退還所有立即可用的許可,其實相當於使用 CAS 方法把內存值置為 0
reducePermits:和 nonfairTryAcquireShared
方法類似,只不過 nonfairTryAcquireShared 是使用 CAS 使內存值 + 1,而 reducePermits 是使內存值 - 1 。
isFair:對 Semaphore 許可的爭奪是采用公平還是非公平的方式,對應到內部的實現就是 FairSync 和 NonfairSync。
hasQueuedThreads:當前是否有線程由於要獲取 Semaphore 許可而進入阻塞。
getQueuedThreads:返回一個包含了等待獲取許可的線程集合。
getQueueLength:獲取正在排隊而進入阻塞狀態的線程個數