AbstractQueuedSynchronizer:抽象同步队列,简称AQS主要依赖一个int成员变量来表示同步状态state,以及一个CLH等待队列
AQS的等待队列是一个CLH(Craig, Landin, and Hagersten lock queue)队列:竞争资源同一时间只能被一个线程访问,CLH为管理等待锁的线程的队列
AQS的子类被推荐定义为自定义同步组件的静态内部类
AQS自身不实现任何同步接口
AQS既支持独占式获取同步状态,也可以支持共享室获取同步状态,这样以方便的实现不同类型同步组件
AQS是实现锁(或者CountDownLatch,Semaphore)等同步组件的关键,在这些同步组件中聚合AQS
AQS面向同步组件的实现者,简化了同步组件的实现方式,屏蔽了同步状态state的管理,线程排队,等待、唤醒等底层操作
同步组件面向使用者,定义了使用者和同步组件交互的基本接口
AQS持有的成员变量有head和tail,分别指向队列头结点和尾结点,都为node类型,
即通过持有等待队列的头尾指针来管理等待队列,如下图所示,节点左侧为prev指针,右侧为next指针
头结点无前驱节点
尾结点无后继节点.
AQS的独占和共享
对于 AQS来说,线程同步关键是对状态值state进行操作,根据state判断是否属于同一个线程,操作state方式分为独占方式,共享方式
独占方式入队
public final void acquire(int arg) { //获取成功直接返回 //获取失败则将当前线程封装为Node.EXCLUSIVE的Node节点插入AQS阻塞队列的尾部 //,并调用LockSupport.park(this)方式阻塞自己 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire方法失败,则进行入队操作
//等待队列添加节点 private Node addWaiter(Node mode) { //(1)构建当前线程为Node类型 Node node = new Node(Thread.currentThread(), mode); //(2)用临时变量取的尾结点的引用 Node pred = tail; //(3)当前尾结点是否存在 if (pred != null) { //(3.1)将当前节点在等待队列尾部插入 node.prev = pred; //(3.1.1)CAS更新尾结点 if (compareAndSetTail(pred, node)) { //(3.1.2)更新成功将旧的尾结点的next指针指向新尾结点 pred.next = node; return node; } } //(3.2)当前尾结点不存在,说明当前线程是第一个加入等待队列进行等待的线程 enq(node); return node; }
分析上述代码,注释已经写的很明确,有个问题就是,3.1.1过程如果失败了,而尾结点不为空,则同样会执行无尾结点走的enq方法,因此,猜测enq()有两个作用
- 当前等待队列尾结点不存在的情况下的入队操作
- CAS尾结点后插入节点失败后的自旋(死循环)重试
下面来看看enq方法
private Node enq(final Node node) { //(2.3)失败自旋 for (;;) { //(1)用临时变量取的尾结点的引用 Node t = tail; if (t == null) { //(2.1)如果尾结点不存在,则初始化头结点 if (compareAndSetHead(new Node())) //(2.1.1)让尾结点也指向头结点的引用 tail = head; } else { //(2.2)尾结点存在,将当前节点在等待队列尾部插入 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
从上述源码可以看到,
- 如果尾结点不存在,入队操作前会创建一个头结点,然后将尾指针也指向这个节点,在下次尝试的时候,发现有了尾结点,走2.2步骤,在2.1步骤产生的节点后面插
- 如果一开始尾结点存在,说明addWaiter这一步CAS失败,然后进入enq方法自旋尝试尾部插入直到成功
在AQS等待队列上的线程获取独占锁
//等待队列中的线程获取锁 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //(1)自旋 for (;;) { //(2)获得当前节点的前驱节点(node相当于当前线程,上一个节点可以理解为上一个等待锁的线程) final Node p = node.predecessor(); //(2.1)如果当前节点的前驱节点是头节点(说明当前节点是顺位第一位需要获取锁的线程)并且成功获取同步状态,即获取到独占锁 // (tryAcquire在具体的同步组件中实现,比如ReentrantLock中的tryAcquire) if (p == head && tryAcquire(arg)) { //(2.1.1)将当前节点设置为新的头节点(清除prev和thread信息) setHead(node); //(2.1.2)释放当前节点的前驱节点,方便GC将该内存回收 p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
当前线程获取到锁后,会成为新的头节点,旧的头节点将被GC回收
shouldParkAfterFailedAcquire通过以下规则,判断当前线程是否需要被阻塞
- 如果前驱节点状态为SIGNAL,表示当前节点需要被unpack(唤醒),此时返回true
- 如果前驱节点状态为CANCELLED(ws>0),说明前驱节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false
- 如果前驱节点为非SIGNAL,非CANCELLED,则设置前驱节点状态为SIGNAL,并返回false
默认状况下,节点的状态都是0,为INITIAL,此时通过上述CAS将pred前驱节点的等待状态改为Node.SIGNAL,返回false,进入下次自旋,直到shouldParkAfterFailedAcquire返回true,开始执行parkAndCheckInterrupt().
private final boolean parkAndCheckInterrupt() { //挂起当前线程 LockSupport.park(this); //返回线程中断标志 return Thread.interrupted(); }