JUC系列(一)AQS


JUC系列(一) AQS

     AQS是什么

       AQS是java并发包里很多并发工具都使用的基类,全名称是AbstractQueuedSynchronizer。例如ReentrantLock,Semaphore,其他的诸如SynchronousQueue,FutureTask都是基于AQS实现的,我们也可以基于AQS实现自己的同步器。

  AQS原理

     AQS内部维护一个FIFO队列来完成获取锁线程的排队工作。

     AQS包含两个类型的节点,一个指向队列头部,一个指向队列尾部,未获取到锁的线程会利用cas操作线程安全的加入到队列尾部,队列头部的线程是获取到同步状态(锁)的线程。队列头部的线程在完成任务释放锁后会唤醒它的后继,后继会在获取锁成功后把自己设置为头节点。

  AQS源码分析

static final class Node {
       //共享
        static final Node SHARED = new Node();
      //独占
        static final Node EXCLUSIVE = null;
        //waitStatus=1,表示线程被取消(被中断或者等待超时)
        static final int CANCELLED =  1;
        //waitStatus=-1,表示后继线程需要被唤醒(unparking)
        static final int SIGNAL    = -1;
       //waitStatus=-2,表示线程在等待条件
        static final int CONDITION = -2;
        //waitStatus=-3,表示下次共享同步会无条件传播
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
     //与当前节点关联的队列中的线程
        volatile Thread thread;

}
  //aqs指向队列头部的节点
    private transient volatile Node head;
  //aqs指向队列尾部的节点
    private transient volatile Node tail;
  //用于同步线程之间的共享状态。通过 CAS 和 volatile 保证其原子性和可见性
    private volatile int state;

       独占式源码分析:

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

 1:调用使用者重写的tryAcquire方法尝试获取同步状态,或获取成功,则后面逻辑全部结束,方法退出,否则进2.
2:此时获取同步状态失败,构造独占队列节点,以线程安全的方式加入到队列尾部。
3:该独占节点在队列中尝试获取同步状态,若获取失败,则阻塞节点线程直到被前驱唤醒或者中断。

addWaiter:cas加入到队列尾部,如果失败就enq(node)

//cas加入到队列尾部,如果失败就enq(node)

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

 enq:加入队列

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
        //队列为空必须初始化
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
          //cas操作设置尾节点,失败自旋一直尝试,是一种乐观的并发操作
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

 acquireQueued:在队列中尝试获取同步状态

//在队列中尝试获取同步状态
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;//初始化获取同步状态失败为true
        try {
            boolean interrupted = false;//初始化中断标志为false
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {//前驱是头节点并且尝试获取同步状态成功
                    setHead(node);//设置当前节点为头节点
                    p.next = null; //帮助GC清理垃圾
                    failed = false;//获取同步状态失败标志false
                    return interrupted;//返回中断标志false
                }
          //如果执行到这里,表示之前获取同步状态失败了,shouldParkAfterFailedAcquire判断是否需要阻塞,如果需要就会调用parkAndCheckInterrupt阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 shouldParkAfterFailedAcquire判断是否需要阻塞当前线程

//cancelled=1 signal=-1  CONDITION = -2  PROPAGATE = -3
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//node的前驱节点的等待状态
        if (ws == Node.SIGNAL)//前驱是唤醒状态,可以阻塞,安心等待前驱来唤醒自己去竞争同步状态
            return true;

        if (ws > 0) {//cancelled状态
       //向前找到第一个状态<=0的节点并赋值给node的前驱
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //cas设置状态为signal
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

 共享式源码分析:

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            //尝试获取共享锁失败,则进入doAcquireShared方法
            doAcquireShared(arg);
}

 

 

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                //如果新增的节点前任节点是头节点
                final Node p = node.predecessor();
                if (p == head) {
                    //尝试获取共享锁
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //获取共享锁成功则设置自己为头节点并尝试唤醒后续节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC 回收之前的头节点
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //如果获取锁失败,前任节点状态为SIGNAL,则当前节点阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM