JUC系列(一) AQS
AQS是什么
AQS是java并发包里很多并发工具都使用的基类,全名称是AbstractQueuedSynchronizer。例如ReentrantLock,Semaphore,其他的诸如SynchronousQueue,FutureTask都是基于AQS实现的,我们也可以基于AQS实现自己的同步器。
AQS原理
AQS内部维护一个FIFO队列来完成获取锁线程的排队工作。
AQS包含两个类型的节点,一个指向队列头部,一个指向队列尾部,未获取到锁的线程会利用cas操作线程安全的加入到队列尾部,队列头部的线程是获取到同步状态(锁)的线程。队列头部的线程在完成任务释放锁后会唤醒它的后继,后继会在获取锁成功后把自己设置为头节点。
AQS源码分析
static final class Node { //共享 static final Node SHARED = new Node(); //独占 static final Node EXCLUSIVE = null; //waitStatus=1,表示线程被取消(被中断或者等待超时) static final int CANCELLED = 1; //waitStatus=-1,表示后继线程需要被唤醒(unparking) static final int SIGNAL = -1; //waitStatus=-2,表示线程在等待条件 static final int CONDITION = -2; //waitStatus=-3,表示下次共享同步会无条件传播 static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; //与当前节点关联的队列中的线程 volatile Thread thread; } //aqs指向队列头部的节点 private transient volatile Node head; //aqs指向队列尾部的节点 private transient volatile Node tail; //用于同步线程之间的共享状态。通过 CAS 和 volatile 保证其原子性和可见性 private volatile int state;
独占式源码分析:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
1:调用使用者重写的tryAcquire方法尝试获取同步状态,或获取成功,则后面逻辑全部结束,方法退出,否则进2.
2:此时获取同步状态失败,构造独占队列节点,以线程安全的方式加入到队列尾部。
3:该独占节点在队列中尝试获取同步状态,若获取失败,则阻塞节点线程直到被前驱唤醒或者中断。
addWaiter:cas加入到队列尾部,如果失败就enq(node)
//cas加入到队列尾部,如果失败就enq(node) private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
enq:加入队列
private Node enq(final Node node) { for (;;) { Node t = tail; //队列为空必须初始化 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; //cas操作设置尾节点,失败自旋一直尝试,是一种乐观的并发操作 if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued:在队列中尝试获取同步状态
//在队列中尝试获取同步状态 final boolean acquireQueued(final Node node, int arg) { boolean failed = true;//初始化获取同步状态失败为true try { boolean interrupted = false;//初始化中断标志为false for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//前驱是头节点并且尝试获取同步状态成功 setHead(node);//设置当前节点为头节点 p.next = null; //帮助GC清理垃圾 failed = false;//获取同步状态失败标志false return interrupted;//返回中断标志false } //如果执行到这里,表示之前获取同步状态失败了,shouldParkAfterFailedAcquire判断是否需要阻塞,如果需要就会调用parkAndCheckInterrupt阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire判断是否需要阻塞当前线程
//cancelled=1 signal=-1 CONDITION = -2 PROPAGATE = -3 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;//node的前驱节点的等待状态 if (ws == Node.SIGNAL)//前驱是唤醒状态,可以阻塞,安心等待前驱来唤醒自己去竞争同步状态 return true; if (ws > 0) {//cancelled状态 //向前找到第一个状态<=0的节点并赋值给node的前驱 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //cas设置状态为signal compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
共享式源码分析:
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) //尝试获取共享锁失败,则进入doAcquireShared方法 doAcquireShared(arg); }
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { //如果新增的节点前任节点是头节点 final Node p = node.predecessor(); if (p == head) { //尝试获取共享锁 int r = tryAcquireShared(arg); if (r >= 0) { //获取共享锁成功则设置自己为头节点并尝试唤醒后续节点 setHeadAndPropagate(node, r); p.next = null; // help GC 回收之前的头节点 if (interrupted) selfInterrupt(); failed = false; return; } } //如果获取锁失败,前任节点状态为SIGNAL,则当前节点阻塞 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }