要理解Lock首先要理解AQS,而要理解並發類最好的方法是先理解其並發控制量不同值的含義以及該類運作流程,然后配合一步步看源碼。
該類有一個重要的控制量是WaitStates,節點的狀態值。
/** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; //該節點被取消了 /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; //該節點后續節點需要被喚醒 /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; //該節點進入了等待隊列,即Condition的隊列里 /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; //共享節點,該節點進鎖后會釋放鎖,。
AQS流程圖:
Condition與Lock配合:
源碼分析:核心方法 aquaire和release及他們方法體里使用到的方法。
public final void acquire(int arg) { if (!tryAcquire(arg) && //如果tryacquire失敗 且 隊列里獲取節點成功 且被中斷過 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();//當前線程中斷 interrupt() //說下中斷的事,1、如果在acquireQueued過程線程被interrupt,如果線程沒進入等待狀態,並不會中斷線程。只是改變interrupt變量 // 且被傳回到這里(因為是用interrupted,所以返回true之后又把線程的interrupt變量設為false)。然后selfinterrupt,將interrupt變量設為true。 // 2、如果線程被park了然后被interrupt,則被喚醒,循環一次發現還是阻塞又進入park等待狀態。直到被unpark,interrupt參數為true傳回到這里。 //然后interrupt參數又變回false(受interrupted影響),selfinterrupt則又把它設為true。 }
private Node addWaiter(Node mode) {//mode表示該節點的共享/排他性,null為排他,不為null為共享 Node node = new Node(Thread.currentThread(), mode);//將線程加入創建一個新節點 // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) {//尾節點不為空,新節點連到隊列尾部 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node);//上一步失敗通過enq入隊 return node; }
final boolean acquireQueued(final Node node, int arg) {//從隊列里嘗試獲取鎖 boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor();//前節點 if (p == head && tryAcquire(arg)) {//前節點為頭節點,且嘗試獲取鎖獲取到了。則清理前節點, setHead(node); //head指向現節點 p.next = null; // help GC failed = false; return interrupted; } //shouldParkAfterFailedAcquire如果前節點為-1則返回true,如果0(初始),-3(共享)則設為-1,如果1則 // 找到前面<-0的節點連他后面 //parkAndCheckInterrupt 阻塞當前線程,並返回interrupted狀態(阻塞則設置失敗),並清除中斷狀態 if (shouldParkAfterFailedAcquire(p, node) &&//前節點狀態為-1 return true parkAndCheckInterrupt()) //中斷狀態為true return true interrupted = true; } } finally { if (failed) cancelAcquire(node);//節點取消獲取,隊列中刪除節點, } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//獲取失敗后判斷是否暫停該線程 int ws = pred.waitStatus; if (ws == Node.SIGNAL)//前節點處於當前節點后面的節點需要被喚醒狀態 /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) {//前節點處於取消狀態 /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do {//當前節點找到前面狀態<=0的節點連他后面。 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {//如果前節點不處於取消狀態,則設為signal -1.(0或-3設為-1) /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//阻塞該線程,至此該線程進入等待狀態,等着unpark和interrupt叫醒他 return Thread.interrupted();//叫醒之后返回該線程是否在中斷狀態, 並會清除中斷記號。 }
private void cancelAcquire(Node node) { // Ignore if node doesn't exist if (node == null) return; node.thread = null;//thread指向null // Skip cancelled predecessors Node pred = node.prev;//當前節點找到前面status<=0的節點連它后面 while (pred.waitStatus > 0) node.prev = pred = pred.prev; // predNext is the apparent node to unsplice. CASes below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. Node predNext = pred.next; // Can use unconditional write instead of CAS here. // After this atomic step, other Nodes can skip past us. // Before, we are free of interference from other threads. node.waitStatus = Node.CANCELLED;//status狀態設為1 取消狀態 // If we are the tail, remove ourselves. if (node == tail && compareAndSetTail(node, pred)) {//節點為末尾,把找到的status<0節點后面節點都切掉 compareAndSetNext(pred, predNext, null); } else {//節點不為末尾,前節點連上當前節點后節點 // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; //節點不為末尾,找到status<=0的前節點不是頭節點且該節點線程不是null、且status為<=0的前節點狀態為-1(不是(-3,0)則設為-1) if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else {//節點不為末尾,status<=0的前節點是頭節點 或status<=0的前節點線程為null, unparkSuccessor(node);//專門給頭節點用的啟動繼任者函數,只有前status<=0節點是頭節點,且 //現節點后有節點才需unpark后續節點。(因為前節點可能喚醒的是當前線程,如果你刪除當前節點 //不unpark后面節點可能就停止工作。如果你是尾節點,那無所謂,反正你后面也沒線程需要unpark) } node.next = node; // help GC } }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//釋放操作,啟動繼任者 return true; } return false; }
private void unparkSuccessor(Node node) {//啟動繼任者線程 /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0)//-2,-3等待隊列或共享鎖線程改為0 空白狀態 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) {//如果后節點為null或waitstatus>0(線程取消狀態) s = null; for (Node t = tail; t != null && t != node; t = t.prev)//找到節點后面status<=0的節點啟動它 if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }