java並發編程中,鎖自然其中的必須的產物。而在java的容器框架中,也提供了滿足各種場景的鎖。但是,有一個共性就是,他們都是基於AbstractQueuedSynchronizer(AQS)。可見AQS的重要性!
下面,讓我們也來基於AQS實現一個自己的鎖!
public class TwinsLockTest { @Test public void testTwinsLock() { final Lock lock = new TwinsLock(); class Worker extends Thread { @Override public void run() { while (true) { // 獲取鎖 lock.lock(); try { SleepUtils.second(1); System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName()); SleepUtils.second(1); } finally { // 釋放鎖 lock.unlock(); } } } } // 開10個線程運行worker, 如果沒有鎖,應該是幾乎同時很快完成 // 但 TwinsLock 只允許同時有兩個線程獲得鎖運行 for (int i = 0; i < 10; i++) { Worker w = new Worker(); w.setDaemon(true); w.start(); } // 每隔1s換行 for (int i = 0; i < 10; i++) { SleepUtils.second(1); System.out.println(); } } } /** * 雙資源鎖 */ class TwinsLock implements Lock { private final Sync sync = new Sync(2); private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -8540764104913403569L; Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("鎖資源數不能為負數~"); } // 調用 AQS 設置資源總數,備用 setState(count); } @Override public int tryAcquireShared(int reduceCount) { // cas 獲取鎖 // 由 AQS 的 acquireShared -> doAcquireShared 調用 for (; ; ) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } @Override public boolean tryReleaseShared(int returnCount) { // cas 釋放鎖 // 由AQS releaseShared -> doReleaseShared 調用 for (; ; ) { int current = getState(); int newState = current + returnCount; if (compareAndSetState(current, newState)) { return true; } } } } @Override public void lock() { sync.acquireShared(1); } @Override public void unlock() { sync.releaseShared(1); } // 忽略,如要實現,直接調用 AQS @Override public boolean tryLock() { return false; } // 忽略,如要實現,直接調用 AQS @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } // 忽略,如要實現,直接調用 AQS @Override public void lockInterruptibly() throws InterruptedException { } // 忽略,如要實現,直接調用 AQS @Override public Condition newCondition() { return null; } } // 睡眠工具類 class SleepUtils { public static void second(int sec) { try { Thread.sleep(sec * 1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }
輸出的結果是,每兩個線程同時執行,10個中挑兩個線程,也就是10個任務花5秒鍾完成,從而達到資源數量限制的目的。
下面我們來分析下 lock 的運行原理!
首先,調用 lock.lock(), 獲得鎖,該lock返回值為void, 所以怎么獲取鎖呢?自然是在沒有獲取到鎖的時候,自己進行阻塞了!
調用lock()方法后,lock調用了AQS中的 acquireShared(), 可見,具體實現方法是在 acquireShared() 中,如下:
public final void acquireShared(int arg) { // 先嘗試獲取 shared 鎖,tryAcquireShared() 由具體的實現類處理,如果返回小於0則進入競爭狀態 // 如果大於0,說明資源還有多余的,直接進入后續操作 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
而咱們自定義實現的 TwinsLock 實現獲取鎖方式為cas獲取,從而達到阻塞的效果:
@Override public int tryAcquireShared(int reduceCount) { // cas 獲取鎖 // 由 AQS 的 acquireShared 調用 for (; ; ) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } }
但是,對於剩余資源數小於0的情況,直接返回,那么是不是就不能阻塞鎖了呢?答案是,在AQS中,會有另一個阻塞操作, doAcquireShared()
/** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { // 先將線程加入等待隊列中,類型為 SHARED final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 獲取上一個等待中的線程 final Node p = node.predecessor(); // 如果是頭節點,那么就可以嘗試獲取鎖,也就是說,每次只會取頭節點線程進行調用,即先到先得FIFO規則,公平鎖 if (p == head) { // 調用子類具體實現,獲取共享鎖 int r = tryAcquireShared(arg); if (r >= 0) { // 獲取到鎖后,將head設置過 setHeadAndPropagate(node, r); p.next = null; // help GC // 如果上次捕獲到中斷信息,則進行中斷響應 if (interrupted) selfInterrupt(); failed = false; return; } } // 獲取鎖失敗后,檢測中斷 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 如果獲取鎖失敗,則 if (failed) cancelAcquire(node); } }
我們從下面的代碼中看到具體是怎么添加隊列,怎么進行中斷檢測的:
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } /** * CAS tail field. Used only by enq. */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } /** * Sets head of queue, and checks if successor may be waiting * in shared mode, if so propagating if either propagate > 0 or * PROPAGATE status was set. * * @param node the node * @param propagate the return value from a tryAcquireShared */ private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } /** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } /** * Cancels an ongoing attempt to acquire. * * @param node the node */ private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null; // Skip cancelled predecessors Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED; // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } }
以上是獲取鎖的過程,鎖得到后,就可以后續處理。最后,釋放鎖: unlock(), 調用 AQS 的releaseShared 。
/** * Releases in shared mode. Implemented by unblocking one or more * threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to * {@link #tryReleaseShared} but is otherwise uninterpreted * and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { // 調用子類實現,如果成功,再進入AQS邏輯,否則釋放失敗 if (tryReleaseShared(arg)) { // AQS 釋放 doReleaseShared(); return true; } return false; }
可以看到,AQS已經提供了很方便的基礎鎖設施,我們要實現自定義的鎖,只需重寫幾個特定的方法即可。
jdk中,基於AQS實現的鎖有: ReentrantLock 可重入鎖, ReadWriteLock 讀寫鎖, Semaphore 信號量, CountDownLatch 閉鎖; 各盡其用吧!
