JUC系列(一) AQS
AQS是什么
AQS是java并发包里很多并发工具都使用的基类,全名称是AbstractQueuedSynchronizer。例如ReentrantLock,Semaphore,其他的诸如SynchronousQueue,FutureTask都是基于AQS实现的,我们也可以基于AQS实现自己的同步器。
AQS原理
AQS内部维护一个FIFO队列来完成获取锁线程的排队工作。
AQS包含两个类型的节点,一个指向队列头部,一个指向队列尾部,未获取到锁的线程会利用cas操作线程安全的加入到队列尾部,队列头部的线程是获取到同步状态(锁)的线程。队列头部的线程在完成任务释放锁后会唤醒它的后继,后继会在获取锁成功后把自己设置为头节点。
AQS源码分析
static final class Node {
//共享
static final Node SHARED = new Node();
//独占
static final Node EXCLUSIVE = null;
//waitStatus=1,表示线程被取消(被中断或者等待超时)
static final int CANCELLED = 1;
//waitStatus=-1,表示后继线程需要被唤醒(unparking)
static final int SIGNAL = -1;
//waitStatus=-2,表示线程在等待条件
static final int CONDITION = -2;
//waitStatus=-3,表示下次共享同步会无条件传播
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
//与当前节点关联的队列中的线程
volatile Thread thread;
}
//aqs指向队列头部的节点
private transient volatile Node head;
//aqs指向队列尾部的节点
private transient volatile Node tail;
//用于同步线程之间的共享状态。通过 CAS 和 volatile 保证其原子性和可见性
private volatile int state;
独占式源码分析:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1:调用使用者重写的tryAcquire方法尝试获取同步状态,或获取成功,则后面逻辑全部结束,方法退出,否则进2.
2:此时获取同步状态失败,构造独占队列节点,以线程安全的方式加入到队列尾部。
3:该独占节点在队列中尝试获取同步状态,若获取失败,则阻塞节点线程直到被前驱唤醒或者中断。
addWaiter:cas加入到队列尾部,如果失败就enq(node)
//cas加入到队列尾部,如果失败就enq(node)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq:加入队列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//队列为空必须初始化
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//cas操作设置尾节点,失败自旋一直尝试,是一种乐观的并发操作
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued:在队列中尝试获取同步状态
//在队列中尝试获取同步状态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//初始化获取同步状态失败为true
try {
boolean interrupted = false;//初始化中断标志为false
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//前驱是头节点并且尝试获取同步状态成功
setHead(node);//设置当前节点为头节点
p.next = null; //帮助GC清理垃圾
failed = false;//获取同步状态失败标志false
return interrupted;//返回中断标志false
}
//如果执行到这里,表示之前获取同步状态失败了,shouldParkAfterFailedAcquire判断是否需要阻塞,如果需要就会调用parkAndCheckInterrupt阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire判断是否需要阻塞当前线程
//cancelled=1 signal=-1 CONDITION = -2 PROPAGATE = -3
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//node的前驱节点的等待状态
if (ws == Node.SIGNAL)//前驱是唤醒状态,可以阻塞,安心等待前驱来唤醒自己去竞争同步状态
return true;
if (ws > 0) {//cancelled状态
//向前找到第一个状态<=0的节点并赋值给node的前驱
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//cas设置状态为signal
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
共享式源码分析:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
//尝试获取共享锁失败,则进入doAcquireShared方法
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//如果新增的节点前任节点是头节点
final Node p = node.predecessor();
if (p == head) {
//尝试获取共享锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取共享锁成功则设置自己为头节点并尝试唤醒后续节点
setHeadAndPropagate(node, r);
p.next = null; // help GC 回收之前的头节点
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//如果获取锁失败,前任节点状态为SIGNAL,则当前节点阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
