AQS-等待队列


  AQS的原理在于,每当有新的线程请求资源时,该线程会进入一个等待队列(Waiter Queue),只有当持有锁的线程释放资源后,该线程才能持有资源。该等待队列的实现方式是双向链表,线程会被包裹在链表节点Node中。Node即队列的节点对象,它封装了各种等待状态(典型的状态机模式),前驱和后继节点信息,以及它对应的线程。

  AQS定义两种资源共享方式:Exclusive(独占,在特定时间内,只有一个线程能够执行,如ReentrantLock)和share(共享,多个线程可以同时执行,如ReadLock、Semaphore、CountDownLatch),可见不同的实现方式征用共享资源的方式不同,由此,自定义同步器在实现时要根据需求来实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经实现好了。

Node节点

 1 //标记节点为共享模式
 2 static final Node SHARED = new Node();  3 //标记节点为独占模式
 4 static final Node EXCLUSIVE = null;  5 //等待状态
 6 volatile int waitStatus;  7 //前驱结点
 8 volatile Node prev;  9 //后继节点
10 volatile Node next; 11 //线程
12 volatile Thread thread;

自定义同步器时主要需要实现以下几种方法: 

  • isHeldExclusively():该线程是否正在独占资源,只有用到condition时才需要去使用它。
  • tryAcquire(int):独占方式,尝试获取资源,返回boolean。
  • tryRelease(int):独占方式,尝试释放资源,返回boolean。
  • tryAcquireShared(int):共享方式,尝试获取资源,返回int。
  • tryReleaseShared(int):共享方式,尝试释放资源,返回boolean。

等待队列节点对象Node有四种不同的状态:

  • CANCELLED(1)已取消
  • SIGNAL(-1)竞争获胜需要唤醒
  • CONDITION(-2)在condition队列中等待
  • PROPAGATE(-3)后续节点传播唤醒操作,共享模式下使用

acquire方法执行流程

1 public final void acquire(int arg) { 2     if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { 3  selfInterrupt(); 4  } 5 }

1)      尝试获取锁tryAcquire,返回值表示当前线程是否获取锁。 

2)      如果获取成功,那么说明当前对象已经持有锁,执行中断操作,中断操作会解除线程阻塞。

3)      如果获取失败,那么把当前线程封装为Waiter节点,等待队列没有节点时初始化队列,有则使用compareAndSetTail()添加进Waiter队列尾端。

4)      acquiredQueue自旋获取资源,并且返回Waiter节点持有的线程的应当具备的中断状态。

5)      根据返回结果来确定是否需要执行线程中断操作。

 1 private Node addWaiter(Node mode) {  2     //封装当前节点为node节点
 3     Node node = new Node(Thread.currentThread(), mode);  4     Node pred = tail;  5     if(pred != null) {  6         //将node节点的前驱节点设置为tail
 7         node.prev = pred;  8         //多线程环境下,tail可能已经被其它线程修改了,这里校验pred是否依然是为节点  9         //如果是,那么将node设置为尾结点,原尾结点的后继节点设置为node,返回node
10         if(compareAndSelfTail(pred, node)) { 11             pred.next = node; 12             return node; 13  } 14  } 15     //执行到这里,说明tail为null,或者tail已经发生了变动
16  enq(node); 17     return node; 18 }    
 1 private Node enq(final Node node) {  2     //下面这个死循环用于把node节点插入到队尾,由于多线程环境下,tail节点可能  3     //随时变动,必须不停的尝试,让下面两个操作不会被其它线程干涉。  4     //1,node.prev必须为当前尾结点  5     //2,node设置为新的尾结点
 6     for(;;) {  7         Node t = tail;  8         //tail为空,也说明head为空,此时初始化队列
 9         if(t == null) { 10             //CAS方式初始化队头
11             if(compareAndSetHead(new Node())) 12                 tail = head; 13         } else { 14             //设置node.prev为当前尾结点
15             node.prev = t; 16            //多线程环境下,此时尾结点可能已经被其它访问修改了,需要CAS来进行比较 17             //如果t依然是尾结点,那么node设置为尾结点、
18             if(compareAndSetTail(t, node)) { 19                 t.next = node; 20                 return t; 21  } 22  } 23  } 24 }

  acquiredQueued(Node)方法会接收addWaiter封装好的Node对象,该方法的本质在于以自旋的方式获取资源,即自旋锁。它做了两件事,如果指定节点的前驱节点时头结点,那么再次尝试获取锁,反之,尝试阻塞当前线程。自旋不能构成死循环,否则会浪费大量CPU资源,在AQS中如果p==head&&tryAcquire(arg)条件不足时不会一直循环下去。通常,在p==head之前,必然会有一个线程得到锁,此时tryAcquire()通过,循环结束。如果发生了极端情况,那么node.predecessor()也会在node==head的情况下抛出空指针异常,循环结束。shouldParkAfterFailedAcquire(p,node)检测前驱节点的等待状态,需要阻塞则调用partAndCheckInterrupt()方法会阻塞当前线程,该循环也不会无限制的消耗资源。

 1 final boolean acquireQueued(final Node, int arg) {  2     boolean failed = true;  3     try {  4         boolean interrupted = false;  5         for(;;) {  6             //找到node的前驱节点,如果node已经为head,那么会抛出空指针异常  7             //空指针异常说明整个等待队列都没有能够获取锁的线程。
 8             final Node p = node.predecessor();  9             //前驱节点为头结点时,当前线程尝试获取锁 10             //如果获取成功,那么node会成为新的头结点,这个过程会清空node的线程信息。
11             if(p == head && tryAcquire(arg)) { 12  setHead(node); 13                p.next = null; 14                 failed = false; 15                 return interrupted; 16  } 17             //当前线程不能获取锁,则说明该节点需要阻塞 18             //shouldParkAfterFailedAcquire()用于检查和设置节点阻塞状态 19             //如果为通过检查,那么说明没有阻塞,parkAndCheckInterrupt()用于阻塞当前线程。
20             if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) 21                interrupted = true; 22  } 23             finally { 24                 if(failed) cancelAcquire(node); 25  } 26  } 27 }                    
 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  2     //前驱节点的等待状态
 3     int ws = pred.waitStatus;  4     if(ws == Node.SIGNAL) {  5         //SIGNAL表示前驱节点需要被唤醒,此时node是一定可以安全阻塞的,所以返回true
 6         return true;  7  }  8     if(ws > 0) {  9         //大于0的等待状态只有CANCELLED,从队列里移除所有前置的CANCELLED节点。
10         do { 11             node.prev = pred = pred.prev; 12         } while (pred.waitStatus > 0); 13         pred.next = node; 14     } else { 15         //运行到这里,说明前驱节点处于0、CONDITION或者PROPAGATE状态下 16         //此时该节点需要被置为SIGNAL状态,等待被唤醒。
17  compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 18  } 19 }
1 private final boolean parkAndCheckInterrupt() { 2     //LockSupport.park()用于阻塞当前线程
3     LockSupport.park(this); 4     return Thread.interruupted(); 5 }

  由此可以得出结论,当一个新的线程节点入队之后,会检查它的前驱节点,只要有一个节点的状态是SIGNAL,就表示当前节点之前的节点正在被等待唤醒,那么当前线程就需要被阻塞,以等待RentrantLock.unlock()唤醒之前的线程。 

 

  在过程2中,node1刚刚入队,没有争抢到锁,此时head状态为初始化的0状态,于是调用了compareAndSetWaitStatus(pred,ws,Node.SIGNAL),这个方法会把head的状态改为SIGNAL。

  在过程3中,acquired()方法里的for循环会在执行一次,此时,node1的前驱节点依然是head,如果它依然没有竞争锁,那么由于head的waitStatus属性的值为SIGNAL,这会导致shouldParkAfterFailedAcquire()方法返回true,当前线程(node1持有的线程)被阻塞,代码不在继续往下执行。这样就达到了让等待队列里的线程阻塞的目的,由此可以类推更多线程入队的过程。由此可以类推更多线程入队的过程:

 

       SIGNAL状态由release()方法进行修改,这个方法首先调用tryRelease()方法尝试释放锁,它返回的是锁是否处于可用状态,如果锁可用,那么该方法也不负责中断等待线程的阻塞,它仅仅把锁的线程持有者设为null;然后,如果成功的释放锁,那么判断队头状态,队头为空则说明队列没有等待线程,不再做其它操作,反之再判断队头的状态waitStatus,只要它不为0,就说明等待队列中有被阻塞的节点。

 1 public final boolean release(int arg) {  2     if(tryRelease(arg)) {  3         Node h = head;  4         if(h != null && h.waitStatus != 0) {  5  unparkSuccessor(h);  6  }  7         return true;  8  }  9     return false; 10 }
 1 private void unparkSuccessor(Node node) {  2     int ws = node.waitStatus;  3     //小于0的状态waitStatus只有SIGNAL和CONDITION
 4     if(ws < 0) {  5         compareAndSetWaitStatus(node, ws, 0);  6  }  7     Node s = node.next;  8     //前驱查找需要唤醒的节点
 9     if(s == null || s.waitStatus > 0) { 10        s = null; 11         for(Node t = tail; t != null && t != node; t = t.prev) { 12             if(t.waitStatus <= 0) s = t; 13  } 14  } 15     if(s != null) { 16  LockSupport.unpark(s.thread); 17  } 18 }

       unparkSuccessor()负责确保中断正确的线程阻塞。在ReentrantLock.unlock()的调用过程中,unparkSuccessor(Node node)的形参node始终为head节点,这个方法执行的主要操作为:

  • 首先把head节点的waitStatus设置为0,表示队列里没有需要中断阻塞的线程。
  • 然后确定需要被唤醒的节点,该节点是队列中第一个waitStatus小于等于0的节点。
  • 最后,调用LockSupport.unlock()方法中断指定线程的阻塞状态。

 

       需要注意的是,node1对应的线程此时已经中断了阻塞,它会开始继续执行AQS的AacquireQueued()方法中for循环的代码final Node p = node.predecessor();显然node1的前驱节点head由于锁已经被释放,队列变化为

 

       这部分代码比较巧妙,可以注意到,在释放的过程中,代码里并没有改变head的waitStatus为SIGNAL,而是直接使用node1替代了原先的head。换言之,原本需要修改head/node2的前驱和后置,并且把head的waitStatus修改为SIGNAL,使用当前的代码,只需要释放node1的持有线程,然后移除head节点,这样可以更快的到达队列规整的目的。

AQS如何阻塞线程和中断阻塞

       在acquired()方法中,当前线程尝试获取锁,如果没有获得,那么会把线程加入等待队列中,加入到队列的线程会被阻塞。

       线程阻塞有三种常见的实现方式:Object.wait()、Thread.join()、或者Thread.sleep()。

       中断阻塞则通过Thread.interrupt()方法来实现,这个方法会发出一个中断信号量从而导致线程抛出中断异常InterruptedException,已达到结束阻塞的目的。需要注意的是Interrupt不会中断用户循环体造成阻塞,它仅仅是抛出信号量,具体处理方式还是由用户处理。Thread.isInterrupted可以得到中断状态。

       对于wait、sleep、join等会造成线程阻塞的方法,由于它们都会抛出Interrupted Exception,处理方式如下

1 try { 2     Thread.currentThread().sleep(500); 3 } catch (InterruptedException e) { 4     //中断后抛出异常,在异常捕获里可以对中断定制处理
5 }

        对循环体处理方式如下表示:

1 //使用Thread.isInterrupted方法获取中断信号量
2 while(!Thread.currentThread().isInterrupted && 用户自定义条件) { 3 }


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM