JUC系列(一)AQS


JUC系列(一) AQS

     AQS是什么

       AQS是java並發包里很多並發工具都使用的基類,全名稱是AbstractQueuedSynchronizer。例如ReentrantLock,Semaphore,其他的諸如SynchronousQueue,FutureTask都是基於AQS實現的,我們也可以基於AQS實現自己的同步器。

  AQS原理

     AQS內部維護一個FIFO隊列來完成獲取鎖線程的排隊工作。

     AQS包含兩個類型的節點,一個指向隊列頭部,一個指向隊列尾部,未獲取到鎖的線程會利用cas操作線程安全的加入到隊列尾部,隊列頭部的線程是獲取到同步狀態(鎖)的線程。隊列頭部的線程在完成任務釋放鎖后會喚醒它的后繼,后繼會在獲取鎖成功后把自己設置為頭節點。

  AQS源碼分析

static final class Node {
       //共享
        static final Node SHARED = new Node();
      //獨占
        static final Node EXCLUSIVE = null;
        //waitStatus=1,表示線程被取消(被中斷或者等待超時)
        static final int CANCELLED =  1;
        //waitStatus=-1,表示后繼線程需要被喚醒(unparking)
        static final int SIGNAL    = -1;
       //waitStatus=-2,表示線程在等待條件
        static final int CONDITION = -2;
        //waitStatus=-3,表示下次共享同步會無條件傳播
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
     //與當前節點關聯的隊列中的線程
        volatile Thread thread;

}
  //aqs指向隊列頭部的節點
    private transient volatile Node head;
  //aqs指向隊列尾部的節點
    private transient volatile Node tail;
  //用於同步線程之間的共享狀態。通過 CAS 和 volatile 保證其原子性和可見性
    private volatile int state;

       獨占式源碼分析:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

 1:調用使用者重寫的tryAcquire方法嘗試獲取同步狀態,或獲取成功,則后面邏輯全部結束,方法退出,否則進2.
2:此時獲取同步狀態失敗,構造獨占隊列節點,以線程安全的方式加入到隊列尾部。
3:該獨占節點在隊列中嘗試獲取同步狀態,若獲取失敗,則阻塞節點線程直到被前驅喚醒或者中斷。

addWaiter:cas加入到隊列尾部,如果失敗就enq(node)

//cas加入到隊列尾部,如果失敗就enq(node)

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

 enq:加入隊列

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
        //隊列為空必須初始化
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
          //cas操作設置尾節點,失敗自旋一直嘗試,是一種樂觀的並發操作
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

 acquireQueued:在隊列中嘗試獲取同步狀態

//在隊列中嘗試獲取同步狀態
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;//初始化獲取同步狀態失敗為true
        try {
            boolean interrupted = false;//初始化中斷標志為false
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {//前驅是頭節點並且嘗試獲取同步狀態成功
                    setHead(node);//設置當前節點為頭節點
                    p.next = null; //幫助GC清理垃圾
                    failed = false;//獲取同步狀態失敗標志false
                    return interrupted;//返回中斷標志false
                }
          //如果執行到這里,表示之前獲取同步狀態失敗了,shouldParkAfterFailedAcquire判斷是否需要阻塞,如果需要就會調用parkAndCheckInterrupt阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 shouldParkAfterFailedAcquire判斷是否需要阻塞當前線程

//cancelled=1 signal=-1  CONDITION = -2  PROPAGATE = -3
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//node的前驅節點的等待狀態
        if (ws == Node.SIGNAL)//前驅是喚醒狀態,可以阻塞,安心等待前驅來喚醒自己去競爭同步狀態
            return true;

        if (ws > 0) {//cancelled狀態
       //向前找到第一個狀態<=0的節點並賦值給node的前驅
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //cas設置狀態為signal
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

 共享式源碼分析:

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            //嘗試獲取共享鎖失敗,則進入doAcquireShared方法
            doAcquireShared(arg);
}

 

 

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //如果新增的節點前任節點是頭節點
                final Node p = node.predecessor();
                if (p == head) {
                    //嘗試獲取共享鎖
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //獲取共享鎖成功則設置自己為頭節點並嘗試喚醒后續節點
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC 回收之前的頭節點
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //如果獲取鎖失敗,前任節點狀態為SIGNAL,則當前節點阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM