阻塞隊列 BlockingQueue 詳解


 

轉自:https://mp.weixin.qq.com/s?__biz=MzI4Njc5NjM1NQ==&mid=2247487078&idx=2&sn=315f39b6d53862dcb732390729951628&chksm=ebd6314adca1b85c33db1134fbe98bf7526943b02dfc23021781abff265bb231ad9bcd3e1ad1&mpshare=1&scene=23&srcid=0113Gq7QC98Q8Q59Ez6SkUdC#rd

鏈接:https://www.omgleoo.top

 

BlockingQueues在java.util.concurrent包下,提供了線程安全的隊列訪問方式,當阻塞隊列插入數據時,如果隊列已經滿了,線程則會阻塞,等待隊列中元素被取出后再插入,當從阻塞隊列中取數據時,如果隊列是空的,則線程會阻塞,等待隊列中有新元素。

BlockingQueue的核心方法

package java.util.concurrent;

import java.util.Collection;
import java.util.Queue;

public interface BlockingQueue<E> extends Queue<E> {
    // 添加成功返回true,失敗拋IllegalStateException異常
    boolean add(E e);

    // 成功返回 true,如果此隊列已滿,則返回 false。
    boolean offer(E e);

    // 將元素插入此隊列的尾部,如果該隊列已滿,則一直阻塞
    void put(E e) throws InterruptedException;

    //將指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量) 
    // 將指定的元素插入此隊列的尾部,如果該隊列已滿, 
    //則在到達指定的等待時間之前等待可用的空間,該方法可中斷 
    boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    // 獲取並移除此隊列頭元素,若沒有元素則一直阻塞。
    take() throws InterruptedException;
    // 獲取並移除此隊列的頭元素,若隊列為空,則返回 null
    poll(long timeout, TimeUnit unit)
        throws InterruptedException;

    int remainingCapacity();
    // 移除指定元素,成功返回true,失敗返回false
    boolean remove(Object o);

    public boolean contains(Object o);
    // 從該隊列中移除所有可用元素,並將它們添加到給定集合中
    int drainTo(Collection<? super E> c);
    // 從該隊列中移除最多給定數量的可用元素,並將它們添加到給定集合中。
    int drainTo(Collection<? super E> c, int maxElements);
}

Java並發包中的阻塞隊列一共7個:

  1. ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。

  2. LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列。

  3. PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列。

  4. DealyQueue:一個使用優先級隊列實現的無界阻塞隊列。

  5. SynchronousQueue:一個不存儲元素的阻塞隊列。

  6. LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。

  7. LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

     

主要關注兩個 ArrayBlockingQueue 和 LinkedBlockingQueue

ArrayBlockingQueu

主要參數:

/** 存儲數據的數組 */
final Object[] items;

/**獲取數據的索引,主要用於take,poll,peek,remove方法 */
int takeIndex;

/**添加數據的索引,主要用於 put, offer, or add 方法*/
int putIndex;

/** 隊列元素的個數 */
int count;

/** 控制並非訪問的鎖 */
final ReentrantLock lock;

/**notEmpty條件對象,用於通知take方法隊列已有元素,可執行獲取操作 */
private final Condition notEmpty;

/**notFull條件對象,用於通知put方法隊列未滿,可執行添加操作 */
private final Condition notFull;

/** 迭代器 */
transient Itrs itrs = null;

public ArrayBlockingQueue(int capacity) { 
  this(capacity, false);//默認構造非公平鎖的阻塞隊列 

public ArrayBlockingQueue(int capacity, boolean fair) { 
  if (capacity <= 0)  
    throw new IllegalArgumentException(); 
  this.items = new Object[capacity]; 
  lock = new ReentrantLock(fair);//初始化ReentrantLock重入鎖,出隊入隊擁有這同一個鎖 
  notEmpty = lock.newCondition;//初始化非空等待隊列
  notFull = lock.newCondition;//初始化非滿等待隊列 

public ArrayBlockingQueue(int capacity, boolean fair, Collecation<? extends E> c) { 
  this(capacity, fair); 
  final ReentrantLock lock = this.lock; 

    // 這個鎖的操作並不是為了互斥操作,而是保證其可見性。
    // 假如線程1是實例化ArrayBlockingQueue對象,線程2是對實例化的ArrayBlockingQueue對象做入隊操作
    // (當然要保證線程1和線程2的執行順序),如果不對它進行加鎖操作(加鎖會保證其可見性,也就是寫回主存)
    // 線程1的集合有可能只存在線程1維護的緩存中,並沒有寫回主存
    // 線程2中實例化的ArrayBlockingQueue維護的緩存以及主存中並沒有集合存在
    // 此時就因為可見性造成數據不一致的情況,引發線程安全問題。 
  lock.lock(); // Lock only for visibility, not mutual exclusion

  try { 
    int i = 0; 
    try { 
      for (E e : c) { 
        checkNotNull(e); 
        item[i++] = e;//將集合添加進數組構成的隊列中 
      } 
    } catch (ArrayIndexOutOfBoundsException ex) { 
      throw new IllegalArgumentException(); 
    } 
    count = i;//隊列中的實際數據數量 
    putIndex = (i == capacity) ? 0 : i; 
  } finally { 
    lock.unlock(); 
  } 
}

ArrayBlockingQueue內部通過數組對象items來存儲所有的數據並通過一個ReentrantLock來同時控制添加線程與移除線程的並發訪問。

notEmpty條件對象用於存放等待或喚醒調用take方法的線程,告訴他們隊列已有元素,可以執行獲取操作。

notFull條件對象是用於等待或喚醒調用put方法的線程,告訴它們,隊列未滿,可以執行添加元素的操作。

takeIndex代表的是下一個方法(take,poll,peek,remove)被調用時獲取數組元素的索引。

putIndex則代表下一個方法(put, offer, or add)被調用時元素添加到數組中的索引。

添加元素時阻塞

public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

public boolean offer(E e) {
     checkNotNull(e);//檢查元素是否為null
     final ReentrantLock lock = this.lock;
     lock.lock();//加鎖
     try {
         if (count == items.length)//判斷隊列是否滿
             return false;
         else {
             enqueue(e);//添加元素到隊列
             return true;
         }
     } finally {
         lock.unlock(); // 釋放鎖
     }
 }

//入隊操作
private void enqueue(E x) {
    //獲取當前數組
    final Object[] items = this.items;
    //通過putIndex索引對數組進行賦值
    items[putIndex] = x;
    //索引自增,如果已是最后一個位置,重新設置 putIndex = 0;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;//隊列中元素數量加1
    //喚醒調用take()方法的線程,執行元素獲取操作。
    notEmpty.signal();
}

enqueue方法內部通過putIndex索引直接將元素添加到數組items中。

這里需要注意的是當putIndex索引大小等於數組長度時,需要將putIndex重新設置為0。這是因為當前隊列執行元素獲取時總是從隊列頭部獲取,而添加元素是從隊列尾部添加。所以當隊列索引(從0開始)與數組長度相等時,需要從數組頭部開始添加。

//put方法,阻塞時可中斷
public void put(E e) throws InterruptedException {
   checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();//該方法可中斷
    try {
        //當隊列元素個數與數組長度相等時,無法添加元素
        while (count == items.length)
            //將當前調用線程掛起,添加到notFull條件隊列中等待喚醒
            notFull.await();
        enqueue(e);//如果隊列沒有滿直接添加。。
    } finally {
        lock.unlock();
    }
}

put方法是一個阻塞的方法,如果隊列元素已滿,那么當前線程將會被notFull條件對象掛起加到等待隊列中,直到隊列有空檔才會喚醒執行添加操作。

但如果隊列沒有滿,那么就直接調用enqueue(e)方法將元素加入到數組隊列中。

移除元素時阻塞

移除元素有這些方法:poll、remove、take

poll方法獲取並移除此隊列的頭元素,若隊列為空,則返回 null

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
       //判斷隊列是否為null,不為null執行dequeue()方法,否則返回null
       return (count == 0) ? null : dequeue();
    } finally {
       lock.unlock();
    }
}
//刪除隊列頭元素並返回
private E dequeue() {
    //拿到當前數組的數據
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    //獲取要刪除的對象
    E x = (E) items[takeIndex];
    將數組中takeIndex索引位置設置為null
    items[takeIndex] = null;
    //takeIndex索引加1並判斷是否與數組長度相等,
    //如果相等說明已到盡頭,恢復為0
    if (++takeIndex == items.length)
      takeIndex = 0;
    count--;//隊列個數減1
    if (itrs != null)
      itrs.elementDequeued();//同時更新迭代器中的元素數據
    //刪除了元素說明隊列有空位,喚醒notFull條件對象添加線程,執行添加操作
    notFull.signal();
    return x;
}

remove(Object o)方法的刪除過程相對復雜些,因為該方法並不是直接從隊列頭部刪除元素。

首先線程先獲取鎖,接着判斷隊列count>0,這點是保證並發情況下刪除操作安全執行。

接下來獲取下一個要添加源的索引putIndex以及takeIndex索引 ,作為后續循環的結束判斷,因為只要putIndex與takeIndex不相等就說明隊列沒有結束。

然后通過while循環找到要刪除的元素索引,執行removeAt(i)方法刪除。

在removeAt(i)方法中實際上做了兩件事:

判斷隊列頭部元素是否為刪除元素,如果是直接刪除,並喚醒添加線程。

如果要刪除的元素並不是隊列頭元素,那么執行循環操作,從要刪除元素的索引removeIndex之后的元素都往前移動一個位置,那么要刪除的元素就被removeIndex之后的元素替換,從而也就完成了刪除操作。

public boolean remove(Object o) {
    if (o == null) return false;
    //獲取數組數據
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();//加鎖
    try {
        //如果此時隊列不為null,這里是為了防止並發情況
        if (count > 0) {
            //獲取下一個要添加元素時的索引
            final int putIndex = this.putIndex;
            //獲取當前要被刪除元素的索引
            int i = takeIndex;
            //執行循環查找要刪除的元素
            do {
                //找到要刪除的元素
                if (o.equals(items[i])) {
                    removeAt(i);//執行刪除
                    return true;//刪除成功返回true
                }
                //當前刪除索引執行加1后判斷是否與數組長度相等
                //若為true,說明索引已到數組盡頭,將i設置為0
                if (++i == items.length)
                    i = 0; 
            } while (i != putIndex);//繼承查找
        }
        return false;
    } finally {
        lock.unlock();
    }
}

//根據索引刪除元素,實際上是把刪除索引之后的元素往前移動一個位置
void removeAt(final int removeIndex) {

    final Object[] items = this.items;
    //先判斷要刪除的元素是否為當前隊列頭元素
    if (removeIndex == takeIndex) {
      //如果是直接刪除
      items[takeIndex] = null;
      //當前隊列頭元素加1並判斷是否與數組長度相等,若為true設置為0
      if (++takeIndex == items.length)
          takeIndex = 0;
      count--;//隊列元素減1
      if (itrs != null)
          itrs.elementDequeued();//更新迭代器中的數據
    } else {
    //如果要刪除的元素不在隊列頭部,
    //那么只需循環迭代把刪除元素后面的所有元素往前移動一個位置
      //獲取下一個要被添加的元素的索引,作為循環判斷結束條件
      final int putIndex = this.putIndex;
      //執行循環
      for (int i = removeIndex;;) {
          //獲取要刪除節點索引的下一個索引
          int next = i + 1;
          //判斷是否已為數組長度,如果是從數組頭部(索引為0)開始找
          if (next == items.length)
              next = 0;
           //如果查找的索引不等於要添加元素的索引,說明元素可以再移動
          if (next != putIndex) {
              items[i] = items[next];//把后一個元素前移覆蓋要刪除的元
              i = next;
          } else {
          //在removeIndex索引之后的元素都往前移動完畢后清空最后一個元素
              items[i] = null;
              this.putIndex = i;
              break;//結束循環
          }
      }
      count--;//隊列元素減1
      if (itrs != null)
          itrs.removedAt(removeIndex);//更新迭代器數據
    }
    notFull.signal();//喚醒添加線程
}

take方法其實很簡單,有就刪除沒有就阻塞,只不過這個阻塞是可以中斷的,如果隊列沒有數據那么就加入notEmpty條件隊列等待(有數據就直接取走,方法結束),如果有新的put線程添加了數據,那么put操作將會喚醒take線程,執行take操作。

//從隊列頭部刪除,隊列沒有元素就阻塞,可中斷
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();//中斷
    try {
      //如果隊列沒有元素
      while (count == 0)
          //執行阻塞操作
          notEmpty.await();
      return dequeue();//如果隊列有元素執行刪除操作
    } finally {
      lock.unlock();
    }
}

peek方法直接返回當前隊列的頭元素但不刪除任何元素。

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    //直接返回當前隊列的頭元素,但不刪除
      return itemAt(takeIndex); // null when queue is empty
    } finally {
      lock.unlock();
    }
}

final E itemAt(int i) {
    return (E) items[i];
}

LinkedBlockingQueue

LinkedBlockingQueue是一個基於鏈表的阻塞隊列,其內部維持一個基於鏈表的數據隊列,使用兩把鎖(takeLock,putLock)允許讀寫並行,remove和iterator時需要同時獲取兩把鎖。

LinkedBlockingQueue默認為無界隊列,即大小為Integer.MAX_VALUE,如果消費者速度慢於生產者速度,可能造成內存空間不足,建議手動設置隊列大小。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    /**
     * 節點類,用於存儲數據
     */
    static class Node<E> {
        E item;
        Node<E> next;
        Node(E x) { item = x; }
    }

    /** 阻塞隊列的大小,默認為Integer.MAX_VALUE */
    private final int capacity;

    /** 當前阻塞隊列中的元素個數 */
    private final AtomicInteger count = new AtomicInteger();

    /** 阻塞隊列的頭結點 */
    transient Node<E> head;

    /** 阻塞隊列的尾節點 */
    private transient Node<E> last;

    /** 獲取並移除元素時使用的鎖,如take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** notEmpty條件對象,當隊列沒有數據時用於掛起執行刪除的線程 */
    private final Condition notEmpty = takeLock.newCondition();

    /** 添加元素時使用的鎖如 put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** notFull條件對象,當隊列數據已滿時用於掛起執行添加的線程 */
    private final Condition notFull = putLock.newCondition();

}

每次插入操作都將動態構造Linked nodes。

每個添加到LinkedBlockingQueue隊列中的數據都將被封裝成Node節點,添加的鏈表隊列中,其中head和last分別指向隊列的頭結點和尾結點。

與ArrayBlockingQueue不同的是,LinkedBlockingQueue內部分別使用了takeLock 和 putLock 對並發進行控制,也就是說,添加和刪除操作並不是互斥操作,可以同時進行,這樣也就可以大大提高吞吐量。

構造函數

1、LinkedBlockingQueue():初始化容量為Integer.MAX_VALUE的隊列;
2、LinkedBlockingQueue(int capacity):指定隊列容量並初始化頭尾節點
3、LinkedBlockingQueue(Collection c):初始化一個容量為Integer.MAX_VALUE且包含集合c所有元素的隊列,且阻塞隊列的迭代順序同集合c。若集合c元素包含null,將throwNullPointerException;若集合c元素個數達到Integer.MAX_VALUE,將throwIllegalStateException("Queue full")。

// 將node鏈接到隊列尾部
private void enqueue(Node<E> node) { // 入隊
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node; // 等價於last.next = node;last = last.next(即node)
}
public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); // Never contended(競爭), but necessary for visibility(可見性)
    try {
        int n = 0;
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            enqueue(new Node<E>(e)); // 執行last = last.next = node;
            ++n;
        }
        count.set(n); // 設置隊列元素個數
    } finally {
        putLock.unlock();
    }
}

添加元素阻塞

添加元素主要有這幾個方法add、offer、put

public boolean add(E e) {
     if (offer(e))
         return true;
     else
         throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
    //添加元素為null直接拋出異常
    if (e == null) throw new NullPointerException();
    //獲取隊列的個數
    final AtomicInteger count = this.count;
    //判斷隊列是否已滿
    if (count.get() == capacity)
      return false;
    int c = -1;
    //構建節點
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
      //再次判斷隊列是否已滿,考慮並發情況
      if (count.get() < capacity) {
          enqueue(node);//添加元素
          c = count.getAndIncrement();//拿到當前未添加新元素時的隊列長度
          //如果容量還沒滿
          if (c + 1 < capacity)
              notFull.signal();//喚醒下一個添加線程,執行添加操作
      }
    } finally {
      putLock.unlock();
    }
    // 由於存在添加鎖和消費鎖,而消費鎖和添加鎖都會持續喚醒等到線程,因此count肯定會變化。
    //這里的if條件表示如果隊列中還有1條數據
    if (c == 0) 
    signalNotEmpty();//如果還存在數據那么就喚醒消費鎖
    return c >= 0; // 添加成功返回true,否則返回false
}

//入隊操作
private void enqueue(Node<E> node) {
    //隊列尾節點指向新的node節點
    last = last.next = node;
}

//signalNotEmpty方法
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
      //喚醒獲取並刪除元素的線程
      notEmpty.signal();
    } finally {
      takeLock.unlock();
    }
}

這里的Offer()方法做了兩件事:

判斷隊列是否滿,滿了就直接釋放鎖,沒滿就將節點封裝成Node入隊,然后再次判斷隊列添加完成后是否已滿,不滿就繼續喚醒等到在條件對象notFull上的添加線程。

判斷是否需要喚醒等到在notEmpty條件對象上的消費線程。

為什么添加完成后是繼續喚醒在條件對象notFull上的添加線程而不是像ArrayBlockingQueue那樣直接喚醒notEmpty條件對象上的消費線程?為什么要當if (c == 0)時才去喚醒消費線程呢?

第一個疑問:在添加新元素完成后,會判斷隊列是否已滿,不滿就繼續喚醒在條件對象notFull上的添加線程,這點與前面分析的ArrayBlockingQueue很不相同,在ArrayBlockingQueue內部完成添加操作后,會直接喚醒消費線程對元素進行獲取,這是因為ArrayBlockingQueue只用了一個ReenterLock同時對添加線程和消費線程進行控制,這樣如果在添加完成后再次喚醒添加線程的話,消費線程可能永遠無法執行。

而對於LinkedBlockingQueue來說就不一樣了,其內部對添加線程和消費線程分別使用了各自的ReenterLock鎖對並發進行控制,也就是說添加線程和消費線程是不會互斥的,所以添加鎖只要管好自己的添加線程即可,添加線程自己直接喚醒自己的其他添加線程,如果沒有等待的添加線程,直接結束了。

如果有就直到隊列元素已滿才結束掛起,當然offer方法並不會掛起,而是直接結束,只有put方法才會當隊列滿時才執行掛起操作。注意消費線程的執行過程也是如此。這也是為什么LinkedBlockingQueue的吞吐量要相對大些的原因。

第二個疑問:消費線程一旦被喚醒是一直在消費的(前提是有數據),所以c值是一直在變化的,c值是添加完元素前隊列的大小,此時c只可能是0或c>0,如果是c=0,那么說明之前消費線程已停止,條件對象上可能存在等待的消費線程,添加完數據后應該是c+1,那么有數據就直接喚醒等待消費線程,如果沒有就結束啦,等待下一次的消費操作。如果c>0那么消費線程就不會被喚醒,只能等待下一個消費操作(poll、take、remove)的調用,那為什么不是條件c>0才去喚醒呢?我們要明白的是消費線程一旦被喚醒會和添加線程一樣,一直不斷喚醒其他消費線程,如果添加前c>0,那么很可能上一次調用的消費線程后,數據並沒有被消費完,條件隊列上也就不存在等待的消費線程了,所以c>0喚醒消費線程得意義不是很大,當然如果添加線程一直添加元素,那么一直c>0,消費線程執行的換就要等待下一次調用消費操作了(poll、take、remove)。

根據時間阻塞

//在指定時間內阻塞添加的方法,超時就結束
 public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

    if (e == null) throw new NullPointerException();
    //將時間轉換成納秒
    long nanos = unit.toNanos(timeout);
    int c = -1;
    //獲取鎖
    final ReentrantLock putLock = this.putLock;
    //獲取當前隊列大小
    final AtomicInteger count = this.count;
    //鎖中斷(如果需要)
    putLock.lockInterruptibly();
    try {
        //判斷隊列是否滿
        while (count.get() == capacity) {
            if (nanos <= 0)
                return false;
            //如果隊列滿根據阻塞的等待
            nanos = notFull.awaitNanos(nanos);
        }
        //隊列沒滿直接入隊
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        //喚醒條件對象上等待的線程
        if (c + 1 < capacity)
            notFull.signal();
    } finally { 
        putLock.unlock();
    }
    //喚醒消費線程
    if (c == 0)
        signalNotEmpty();
    return true;
}

//CoditionObject(Codition的實現類)中的awaitNanos方法
 public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //這里是將當前添加線程封裝成NODE節點加入Condition的等待隊列中
    //注意這里的NODE是AQS的內部類Node
    Node node = addConditionWaiter();
    //加入等待,那么就釋放當前線程持有的鎖
    int savedState = fullyRelease(node);
    //計算過期時間
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;

    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        //主要看這里!!由於是while 循環,這里會不斷判斷等待時間
        //nanosTimeout 是否超時
        //static final long spinForTimeoutThreshold = 1000L;
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);//掛起線程
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        //重新計算剩余等待時間,while循環中繼續判斷下列公式
        //nanosTimeout >= spinForTimeoutThreshold
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return deadline - System.nanoTime();
}

據傳遞進來的時間計算超時阻塞nanosTimeout,然后通過while循環中判斷nanosTimeout >= spinForTimeoutThreshold 該公式是否成立,當其為true時則說明超時時間nanosTimeout 還未到期,再次計算nanosTimeout = deadline - System.nanoTime();即nanosTimeout ,持續判斷,直到nanosTimeout 小於spinForTimeoutThreshold結束超時阻塞操作,方法也就結束。

這里的spinForTimeoutThreshold其實更像一個經驗值,因為非常短的超時等待無法做到十分精確,因此采用了spinForTimeoutThreshold這樣一個臨界值。offer(E e, long timeout, TimeUnit unit)方法內部正是利用這樣的Codition的超時等待awaitNanos方法實現添加方法的超時阻塞操作。同樣對於poll(long timeout, TimeUnit unit)方法也是一樣的道理。

移除元素阻塞

移除的方法主要有remove、poll、take

remove方法刪除指定的對象。由於remove方法刪除的數據的位置不確定,為了避免造成並非安全問題,所以需要同時對putLock和takeLock加鎖。

public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock();//同時對putLock和takeLock加鎖
    try {
        //循環查找要刪除的元素
        for (Node<E> trail = head, p = trail.next;
          p != null;
          trail = p, p = p.next) {
         if (o.equals(p.item)) {//找到要刪除的節點
             unlink(p, trail);//直接刪除
             return true;
         }
        }
        return false;
    } finally {
     fullyUnlock();//解鎖
    }
}

//兩個同時加鎖
void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}

poll方法比較簡單,如果隊列沒有數據就返回null,如果隊列有數據,那么就取出來,如果隊列還有數據那么喚醒等待在條件對象notEmpty上的消費線程。然后判斷if (c == capacity)為true就喚醒添加線程,這點與前面分析if(c==0)是一樣的道理。因為只有可能隊列滿了,notFull條件對象上才可能存在等待的添加線程。

public E poll() {
    //獲取當前隊列的大小
    final AtomicInteger count = this.count;
    if (count.get() == 0)//如果沒有元素直接返回null
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        //判斷隊列是否有數據
        if (count.get() > 0) {
            //如果有,直接刪除並獲取該元素值
            x = dequeue();
            //當前隊列大小減一
            c = count.getAndDecrement();
            //如果隊列未空,繼續喚醒等待在條件對象notEmpty上的消費線程
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    //判斷c是否等於capacity,這是因為如果滿說明NotFull條件對象上
    //可能存在等待的添加線程
    if (c == capacity)
        signalNotFull();
    return x;
}

private E dequeue() {
    Node<E> h = head;//獲取頭結點
    Node<E> first = h.next; 獲取頭結的下一個節點(要刪除的節點)
    h.next = h; // help GC//自己next指向自己,即被刪除
    head = first;//更新頭結點
    E x = first.item;//獲取刪除節點的值
    first.item = null;//清空數據,因為first變成頭結點是不能帶數據的,這樣也就刪除隊列的帶數據的第一個節點
    return x;
}

take方法是一個可阻塞可中斷的移除方法主要做了兩件事

如果隊列沒有數據就掛起當前線程到 notEmpty條件對象的等待隊列中一直等待,如果有數據就刪除節點並返回數據項,同時喚醒后續消費線程,嘗試喚醒條件對象notFull上等待隊列中的添加線程。 

移除方法中只有take方法具備阻塞功能。remove方法是成功返回true失敗返回false,poll方法成功返回被移除的值,失敗或沒數據返回null。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    //獲取當前隊列大小
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();//可中斷
    try {
        //如果隊列沒有數據,掛機當前線程到條件對象的等待隊列中
        while (count.get() == 0) {
            notEmpty.await();
        }
        //如果存在數據直接刪除並返回該數據
        x = dequeue();
        c = count.getAndDecrement();//隊列大小減1
        if (c > 1)
            notEmpty.signal();//還有數據就喚醒后續的消費線程
    } finally {
        takeLock.unlock();
    }
    //滿足條件,喚醒條件對象上等待隊列中的添加線程
    if (c == capacity)
        signalNotFull();
    return x;
}

lock 與 lockInterruptibly的區別

lock優先考慮獲取鎖,待獲取鎖成功后,才響應中斷。

lockInterruptibly 優先考慮響應中斷,而不是響應鎖的普通獲取或重入獲取。

詳細區別:

ReentrantLock.lockInterruptibly允許在等待時由其它線程調用等待線程的Thread.interrupt方法來中斷等待線程的等待而直接返回,這時不用獲取鎖,而會拋出一個InterruptedException。

ReentrantLock.lock方法不允許Thread.interrupt中斷,即使檢測到Thread.isInterrupted,一樣會繼續嘗試獲取鎖,失敗則繼續休眠。只是在最后獲取鎖成功后再把當前線程置為interrupted狀態,然后再中斷線程。

線程喚醒
對notEmpty和notFull的喚醒操作均使用的是signal()而不是signalAll()。

signalAll() 雖然能喚醒Condition上所有等待的線程,但卻並不見得會節省資源,相反,喚醒操作會帶來上下文切換,且會有鎖的競爭。此外,由於此處獲取的鎖均是同一個(putLock或takeLock),同一時刻被鎖的線程只有一個,也就無從談起喚醒多個線程了。

LinkedBlockingQueue與ArrayBlockingQueue比較
ArrayBlockingQueue底層基於數組,創建時必須指定隊列大小,“有界”LinkedBlockingQueue“無界”,節點動態創建,節點出隊后可被GC,故伸縮性較好;

ArrayBlockingQueue入隊和出隊使用同一個lock(但數據讀寫操作已非常簡潔),讀取和寫入操作無法並行,LinkedBlockingQueue使用雙鎖可並行讀寫,其吞吐量更高。

ArrayBlockingQueue在插入或刪除元素時直接放入數組指定位置(putIndex、takeIndex),不會產生或銷毀任何額外的對象實例;而LinkedBlockingQueue則會生成一個額外的Node對象,在高效並發處理大量數據時,對GC的影響存在一定的區別。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM