JUC系列(一) AQS
AQS是什么
AQS是java並發包里很多並發工具都使用的基類,全名稱是AbstractQueuedSynchronizer。例如ReentrantLock,Semaphore,其他的諸如SynchronousQueue,FutureTask都是基於AQS實現的,我們也可以基於AQS實現自己的同步器。
AQS原理
AQS內部維護一個FIFO隊列來完成獲取鎖線程的排隊工作。
AQS包含兩個類型的節點,一個指向隊列頭部,一個指向隊列尾部,未獲取到鎖的線程會利用cas操作線程安全的加入到隊列尾部,隊列頭部的線程是獲取到同步狀態(鎖)的線程。隊列頭部的線程在完成任務釋放鎖后會喚醒它的后繼,后繼會在獲取鎖成功后把自己設置為頭節點。
AQS源碼分析
static final class Node { //共享 static final Node SHARED = new Node(); //獨占 static final Node EXCLUSIVE = null; //waitStatus=1,表示線程被取消(被中斷或者等待超時) static final int CANCELLED = 1; //waitStatus=-1,表示后繼線程需要被喚醒(unparking) static final int SIGNAL = -1; //waitStatus=-2,表示線程在等待條件 static final int CONDITION = -2; //waitStatus=-3,表示下次共享同步會無條件傳播 static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; //與當前節點關聯的隊列中的線程 volatile Thread thread; } //aqs指向隊列頭部的節點 private transient volatile Node head; //aqs指向隊列尾部的節點 private transient volatile Node tail; //用於同步線程之間的共享狀態。通過 CAS 和 volatile 保證其原子性和可見性 private volatile int state;
獨占式源碼分析:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
1:調用使用者重寫的tryAcquire方法嘗試獲取同步狀態,或獲取成功,則后面邏輯全部結束,方法退出,否則進2.
2:此時獲取同步狀態失敗,構造獨占隊列節點,以線程安全的方式加入到隊列尾部。
3:該獨占節點在隊列中嘗試獲取同步狀態,若獲取失敗,則阻塞節點線程直到被前驅喚醒或者中斷。
addWaiter:cas加入到隊列尾部,如果失敗就enq(node)
//cas加入到隊列尾部,如果失敗就enq(node) private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
enq:加入隊列
private Node enq(final Node node) { for (;;) { Node t = tail; //隊列為空必須初始化 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; //cas操作設置尾節點,失敗自旋一直嘗試,是一種樂觀的並發操作 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued:在隊列中嘗試獲取同步狀態
//在隊列中嘗試獲取同步狀態 final boolean acquireQueued(final Node node, int arg) { boolean failed = true;//初始化獲取同步狀態失敗為true try { boolean interrupted = false;//初始化中斷標志為false for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//前驅是頭節點並且嘗試獲取同步狀態成功 setHead(node);//設置當前節點為頭節點 p.next = null; //幫助GC清理垃圾 failed = false;//獲取同步狀態失敗標志false return interrupted;//返回中斷標志false } //如果執行到這里,表示之前獲取同步狀態失敗了,shouldParkAfterFailedAcquire判斷是否需要阻塞,如果需要就會調用parkAndCheckInterrupt阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire判斷是否需要阻塞當前線程
//cancelled=1 signal=-1 CONDITION = -2 PROPAGATE = -3 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;//node的前驅節點的等待狀態 if (ws == Node.SIGNAL)//前驅是喚醒狀態,可以阻塞,安心等待前驅來喚醒自己去競爭同步狀態 return true; if (ws > 0) {//cancelled狀態 //向前找到第一個狀態<=0的節點並賦值給node的前驅 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //cas設置狀態為signal compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
共享式源碼分析:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) //嘗試獲取共享鎖失敗,則進入doAcquireShared方法 doAcquireShared(arg); }
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //如果新增的節點前任節點是頭節點 final Node p = node.predecessor(); if (p == head) { //嘗試獲取共享鎖 int r = tryAcquireShared(arg); if (r >= 0) { //獲取共享鎖成功則設置自己為頭節點並嘗試喚醒后續節點 setHeadAndPropagate(node, r); p.next = null; // help GC 回收之前的頭節點 if (interrupted) selfInterrupt(); failed = false; return; } } //如果獲取鎖失敗,前任節點狀態為SIGNAL,則當前節點阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }