一:AQS概念
AQS是java.util.concurrent包的一個同步器,它實現了鎖的基本抽象功能,支持獨占鎖與共享鎖兩張方式,
獨占鎖:同一時刻只允許一個線程方法加鎖資源,例如:ReentrantLock
共享鎖:同一時刻允許多個線程方法資源,例如:countDownLatch
二:數據結構
AQS 隊列內部維護的是一個 FIFO 的雙向鏈表,這種結構的特點是每個數據結構都有兩個指針,分別指向直接的后繼節點和直接前驅節點。所以雙向鏈表可以從任
ReentrantLock lock = new ReentrantLock(); lock.lock(); try { System.out.println("do something......"); } finally { lock.unlock(); }
看一下new ReentrantLock()
默認創建了一個非公平鎖,ReentrantLock內部維護了一個Sync同步器,
public ReentrantLock() { sync = new NonfairSync(); }
看一下lock方法:
首先是比較狀態,同步器類AbstractQueuedSynchronizer維護了一個同步狀態的字段
private volatile int state;
當狀態為0時,表示資源沒有被占用,大於0則表示被占用
來一個線程首先判斷當前狀態是否是0,如果是則把state設置為1,然后把當前線程設置為資源擁有者
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }

這里的acquire都是同步器實現的:
如果當前的狀態是0,則說明此時沒有線程占用鎖,那么設置同步器狀態,並把當前線程設置為資源擁有者
如果當前狀態不是0,則判斷當前線程是否是資源擁有者(因為支持可重入,所以可能大於0),如果是則累加狀態值
如果不是則返回失敗
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
如果獲取鎖失敗,則入隊列
先看一下addWaiter方法:
CLH隊列底層維護的是一個雙向鏈表結構,每一個節點Node維護當前線程引用,前一個節點和后一個節點的引用,Node節點
還具有狀態
static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } }
看一下addWaiter方法:
第一次進來tail為null,所以會調用enq這個方法:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
來看一下enq方法:
如果tail為null,則新建一個node節點,並設置為head,然后將head引用賦值給tail,這樣head和tail都指向一個空節點Node
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
初始化完成,新加進來的Node會被設置為tail尾部節點,然后之前最后一個節點建立pre、next連接
下面框里tail不為null,就不是第一個被放進來的node節點,直接把node設置到tail尾部。
看一下獲取對列方法acquireQueued:
前面已經將獲取鎖失敗的線程以node節點的形式放到了CLH對列的tail尾部,這里的node就是維護當前線程的node,
如果node的前驅節點為head(head為正在執行的線程的節點),那么會再次嘗試獲取鎖。
獲取鎖成功:把讓對列的head指向node,然后將node節點維護的前驅和線程置位null,在這里拿到鎖,其實並不需要前驅節點的線程喚醒,
因為當前線程並沒有阻塞。
獲取鎖失敗:如果前驅節點不是head或者是head獲取鎖失敗,那么就會park當前線程。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
看一下shouldParkAfterFailedAcquire:
如果前驅節點的狀態為signal,則可以安全的park阻塞當前線程,因為前驅為signal狀態,說明當前驅節點維護的線程釋放鎖后,會通知當前線程。
如果狀態大於0,就是已取消,則向前遍歷,直到找到一個未取消的,已取消狀態,可能該任務已經中斷或者超時。
如果不是signal狀態,也沒有取消,那么就把前驅節點的waitStatus設置為signal,然后該節點就可以安全的park了。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
這里很簡單,就是把當前線程park阻塞。然后當前線程就會在這里阻塞,直到被前驅節點的線程喚醒。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
如果此時前一個線程執行完畢,執行unpark,那么該線程就會被喚醒。
喚醒之后,又會進入for死循環,爭搶鎖資源(因為是非公平鎖),獲取到鎖則執行代碼,獲取不到,還會執行
到park方法阻塞,如果該線程在阻塞過程中被中斷,那么喚醒后會執行中斷方法。
上面是非公平鎖的實現,下面來看一下公平鎖的實現,公平鎖的實現應該更簡單一些,
就是獲取鎖失敗后,直接進入對列等待,不會在獲取鎖,進入對列的時候獲取鎖,非公平鎖,在阻塞之前
有三次獲取鎖的機會。
ReentrantLock的構造方法是有參數,可以設置是否采用公平鎖:
看lock方法:
公平鎖與非公平鎖的區別:
非公平鎖獲取失敗后,會再次嘗試獲取,而公平鎖直接獲取。
公平鎖:
非公平鎖:
如果當前狀態為0,當前沒有線程占有鎖,它會先判斷對列中是否有前驅節點,如果有則不會獲取鎖,然后進入對列中
等待。
鎖的釋放:
如果線程的狀態為0,則把排他線程設置為null,如果重入次數過多,那么就需要多次unlock才可以,到最后一次unlock才會
釋放鎖
喚醒下一個線程:
unparkSuccessor(h);
把head節點設置為0狀態,然后下一個節點不為null,則喚醒下一個節點,如果為null,
則從tail往前遍歷,找到node下面不能為null且最近的沒有被取消的節點,然后喚醒。
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }