淺析PriorityBlockingQueue優先級隊列原理


介紹

當你看本文時,需要具備以下知識點

二叉樹、完全二叉樹、二叉堆二叉樹的表示方法

如果上述內容不懂也沒關系可以先看概念。

PriorityBlockingQueue是一個無界的基於數組的優先級阻塞隊列,數組的默認長度是11,雖然指定了數組的長度,但是可以無限的擴充,直到資源消耗盡為止,每次出隊都返回優先級別最高的或者最低的元素。其實內部是由平衡二叉樹堆來進行排序的,先進行構造二叉樹堆,二叉樹堆排序出來的數每次第一個元素和最后一個元素進行交換,這樣最大的或最小的數就到了最后面,然后最后一個不變,重新構造前面的數組元素以此類推進行堆排序。默認比較器是null,也就是使用隊列中元素的compareTo方法進行比較,意味着隊列元素要實現Comparable接口。

  • PriorityBlockingQueue是一個無界隊列,隊列滿時沒有進行阻塞限制,也就是沒有notFull進行阻塞,所以put是非阻塞的。
  • lock鎖獨占鎖控制只有一個線程進行入隊、出隊操作。

平衡二叉樹堆

二叉堆的本質其實是一個完全二叉樹,它分為兩種類型:

  • 最大堆:最大堆的任何一個父節點的值都大於或等於它的左、右孩子節點的值。
  • 最小堆:最小堆的任何一個父節點的值都小於或等於它的左、右孩子節點的值。

二叉堆的根節點叫做堆頂

最大堆最小堆的特點:最大堆的堆頂是整個堆中最大元素,最小堆的堆頂是整個堆中最小元素。

我們知道二叉堆內部實現其實是基於數組來實現的,為什么二叉堆又能使用數組來實現呢?因為二叉堆是完全二叉樹,並不會浪費空間資源,對於稀疏二叉樹如果使用數組來實現會有很多左右結點為空的情況,數組中需要進行占位處理,占位處理就會浪費很多空間,得不償失,但是二叉堆是一個完全二叉樹,所以不會有資源的浪費。

數組下標表示方法:

左節點:2*parent+1

右節點:2*parent+2

n坐標節點父節點:n/2

PriorityBlockingQueue類圖結構

14

源碼分析

通過類圖可以清晰的發現它其實也是繼承自BlockingQueue接口以及Queue接口,說明也是阻塞隊列。在構造函數中默認隊列的容量是11,由於上面我們已經提到了,優先隊列使用的是二叉堆來實現的,二叉堆實現是根據數組來實現的,所以默認構造器中初始化容量為11,如下代碼所示:

/**
 * 默認數組長度。
 */
private static final int DEFAULT_INITIAL_CAPACITY = 11;

/**
 * 最大數組允許的長度。
 * The maximum size of array to allocate.
 * Some VMs reserve some header words in an array.
 * Attempts to allocate larger arrays may result in
 * OutOfMemoryError: Requested array size exceeds VM limit
 */
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

上面講述了二叉堆的原理,二叉堆原理肯定是要進行比較大小,默認比較器是null,也就是使用元素的compareTo方法進行比較來確定元素的優先級,就意味着隊列元素必須實現Comparable接口,如下是構造函數:

/**
 * 創建一個默認長度為11的隊列,默認比較器為null。
 * Creates a {@code PriorityBlockingQueue} with the default
 * initial capacity (11) that orders its elements according to
 * their {@linkplain Comparable natural ordering}.
 */
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}

/**
 * 創建一個指定數組初始化長度的隊列,默認比較器為null。
 * Creates a {@code PriorityBlockingQueue} with the specified
 * initial capacity that orders its elements according to their
 * {@linkplain Comparable natural ordering}.
 *
 * @param initialCapacity the initial capacity for this priority queue
 * @throws IllegalArgumentException if {@code initialCapacity} is less
 *         than 1
 */
public PriorityBlockingQueue(int initialCapacity) {
    this(initialCapacity, null);
}

/**
 * 創建一個指定數組初始化長度的隊列,比較器可以自己指定。
 * Creates a {@code PriorityBlockingQueue} with the specified initial
 * capacity that orders its elements according to the specified
 * comparator.
 *
 * @param initialCapacity the initial capacity for this priority queue
 * @param  comparator the comparator that will be used to order this
 *         priority queue.  If {@code null}, the {@linkplain Comparable
 *         natural ordering} of the elements will be used.
 * @throws IllegalArgumentException if {@code initialCapacity} is less
 *         than 1
 */
public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    this.queue = new Object[initialCapacity];
}

/**
 * 傳入一個集合,如果集合是SortedSet或PriorityBlockingQueue的話不需要進行堆化,直接使用原有的排序即可,如果不是則需要調用
 * heapify方法進行堆初始化操作。
 * Creates a {@code PriorityBlockingQueue} containing the elements
 * in the specified collection.  If the specified collection is a
 * {@link SortedSet} or a {@link PriorityQueue}, this
 * priority queue will be ordered according to the same ordering.
 * Otherwise, this priority queue will be ordered according to the
 * {@linkplain Comparable natural ordering} of its elements.
 *
 * @param  c the collection whose elements are to be placed
 *         into this priority queue
 * @throws ClassCastException if elements of the specified collection
 *         cannot be compared to one another according to the priority
 *         queue's ordering
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public PriorityBlockingQueue(Collection<? extends E> c) {
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    boolean heapify = true; // true表示需要進行堆化也就是初始化基於現有集合初始化一個二叉堆,使用下沉方式。
    boolean screen = true;  // true表示需要篩選空值
    if (c instanceof Sort3edSet<?>) {
      	// 如果是SortedSet不需要進行堆初始化操作。
        SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
        this.comparator = (Comparator<? super E>) ss.comparator();
      	// 不需要進行堆初始化。
        heapify = false;
    }
    else if (c instanceof PriorityBlockingQueue<?>) {
      	// 如果是PriorityBlockingQueue不需要進行堆初始化操作。
        PriorityBlockingQueue<? extends E> pq =
            (PriorityBlockingQueue<? extends E>) c;
        this.comparator = (Comparator<? super E>) pq.comparator();
      	// 不需要篩選空值。
        screen = false;
        if (pq.getClass() == PriorityBlockingQueue.class) // exact match
          	// 不需要進行堆初始化操作。
            heapify = false;
    }
  	// 將集合轉換成數組類型。
    Object[] a = c.toArray();
  	// 獲取數組的長度大小
    int n = a.length;
    // If c.toArray incorrectly doesn't return Object[], copy it.
    if (a.getClass() != Object[].class)
        a = Arrays.copyOf(a, n, Object[].class);
    if (screen && (n == 1 || this.comparator != null)) {
        for (int i = 0; i < n; ++i)
            if (a[i] == null)
                throw new NullPointerException();
    }
  	// 將轉化數組的對象賦值給隊列,以及實際存儲的長度。
    this.queue = a;
    this.size = n;
    if (heapify)
      	// 調整堆大小。
        heapify();
}

構造函數中第四個構造函數傳遞的是一個集合,集合如果是SortSet和PriorityBlockingQueue是不需要進行堆初始化操作,如果是其他集合類型則需要進行堆排序。

private void heapify() {
    Object[] array = queue;
    int n = size;
  	// 這里其實就是尋找完全二叉樹中最后一個非葉子節點值,由於數組元素是從0開始的下標,長度是從1開始的所以需要減掉1相等於是數組元素下標最后一個元素下標/2得到的值,也就是最后一個元素的父節點。
    int half = (n >>> 1) - 1;
    Comparator<? super E> cmp = comparator;
    if (cmp == null) {
      	// 循環遍歷非葉子節點的值。
        for (int i = half; i >= 0; i--)
            siftDownComparable(i, (E) array[i], array, n);
    }
    else {
        for (int i = half; i >= 0; i--)
            siftDownUsingComparator(i, (E) array[i], array, n, cmp);
    }
}

例如我們初始化數組為a=[7,1,3,10,5,2,8,9,6],完全二叉樹表示為:
14

首先half=9/2-1=4-1=3,a[3]=10,剛好是完全二叉樹中最后一個非葉子節點,再往上非葉子節點是3,1,7剛好數組下標是遞減的,然后針對元素10進行下沉操作,發現左右子樹中最小元素是6,則6和10元素進行交換。
14

然后再下沉元素3,i--操作,得到如下完全二叉樹
14

繼續遍歷下一個非葉子節點,i=2減少1得到i=1,此時a[1]=1左右子節點6和5都比1大所以不需要進行變動,然后i進行減少,則到了i=0,此時a[7]=0,發現左右節點中元素1是最小的,所以先下沉到元素1的位置。
14
下沉后發現還可以繼續下沉,因為左右節點中右節點元素5是最小的所以還需要進行下沉操作。
14

至此下沉結束,二叉堆構建完成。

offer操作

接下來看一下隊列是如何進行構建成一個二叉堆的,其實這里面構建二叉堆以及二叉堆獲取數據時,會采用上浮下沉的操作來進行處理整個二叉堆的平衡,詳細來看一下offer方法,代碼如下所示:

/**
 * Inserts the specified element into this priority queue.
 * As the queue is unbounded, this method will never return {@code false}.
 *
 * @param e the element to add
 * @return {@code true} (as specified by {@link Queue#offer})
 * @throws ClassCastException if the specified element cannot be compared
 *         with elements currently in the priority queue according to the
 *         priority queue's ordering
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
  	// 首先獲取鎖對象。
    final ReentrantLock lock = this.lock;
  	// 只有一個線程操作入隊和出隊動作。
    lock.lock();
  	// n代表數組的實際存儲內容的大小
  	// cap代表隊列的整體大小,也就是數組的長度。
    int n, cap;
    Object[] array;
  	// 如果數組實際長度大於等於數組的長度時,需要進行擴容操作。
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
      	// 如果用戶指定比較器,則使用用戶指定的比較器來進行比較,如果沒有則使用默認比較器。
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
          	// 進行上浮操作。
            siftUpComparable(n, e, array);
        else
          	// 進行上浮操作。
            siftUpUsingComparator(n, e, array, cmp);
      	// 實際長度增加1,由於有且僅有一個線程操作隊列,所以這里並沒有使用原子性操作。
        size = n + 1;
      	// 通知等待的線程,隊列已經有數據,可以獲取數據。
        notEmpty.signal();
    } finally {
      	// 解鎖操作。
        lock.unlock();
    }
  	// 返回操作成功。
    return true;
}
  1. 首先獲取鎖對象,控制有且僅有一個線程操作隊列。
  2. 如果數組實際存儲的內容大小大於等於數組長度時進行擴容操作,調用tryGrow方法進行擴容。
  3. 使用比較器進行比較,進行上浮操作來創建二叉堆。

tryGrow是如何進行擴容的呢?

/**
 * Tries to grow array to accommodate at least one more element
 * (but normally expand by about 50%), giving up (allowing retry)
 * on contention (which we expect to be rare). Call only while
 * holding lock.
 *
 * @param array the heap array
 * @param oldCap the length of the array
 */
private void tryGrow(Object[] array, int oldCap) {
  	// 這里先釋放了鎖,為什么要釋放鎖呢?詳細請見下面。
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
  	// 這里allocationSpinLock默認是0,通過CAS來講該值修改為1,也就是同時只有一個線程進行擴容操作。
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
          	// 如果原容量小於64,就執行原容量+2,如果原容量大於64,擴大一倍容量。
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
          	// 新容量超過了最大容量,將容量調整為最大值。
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
          	// 創建新的數組對象。
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
          	// 擴容成功,則將值修改為0。
            allocationSpinLock = 0;
        }
    }
  	// 這里是為了其他線程進入后優先讓擴容的線程進行操作。
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
  	// 加鎖操作。
    lock.lock();
  	// 將原數據拷貝到新數組中。
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

看似擴容還是很容易理解的,開始的時候為什么要先釋放鎖呢,然后用CAS控制只有一個線程可以擴容呢?其實可以不釋放鎖,也能控制只允許一個線程進行擴容操作,但是會產生性能問題,因為當我擴容的時候,一直獲得鎖會不允許其他線程進行入隊和出隊操作,擴容的時候先釋放鎖,也就是可以其他線程進行處理出隊和入隊操作,例如第一個線程先CAS成功,並且進行擴容中,此時第二個線程進入后allocationSpinLock此時已經等於1了,也就是不會進行擴容操作,而是會直接進入到判斷newArray==null,此時線程二發現newArray為null,線程而會調用Thread.yield來讓出CPU,目的是讓擴容的線程擴容后優先調用lock.lock重新獲得鎖,但是這又不能完全保證,有可能yield退出了擴容還沒有結束,此時線程二獲得鎖,如果當前數組擴容沒有完畢,則線程二會再次調用offer的tryGrow進行擴容操作,再次給擴容線程讓出了鎖,再次調用yield讓出CPU。當擴容線程進行擴容時,其他線程自旋的檢測當前線程是否成功,成功了才會進行入隊的操作。

擴容成功后就需要構建二叉堆,構建二叉堆調用的是siftUpComparable方法,也就是上浮操作,接下來詳細講解一下上浮操作是如何進行的?

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
      	// 找到節點k的父節點。
        int parent = (k - 1) >>> 1;
      	// 獲取父節點的值。
        Object e = array[parent];
      	// 比較父節點和插入值的大小,如果插入值大於父節點直接插入到末尾。
        if (key.compareTo((T) e) >= 0)
            break;
      	// 將父節點設置到子節點,小數據進行上浮操作。
        array[k] = e;
      	// 將父節點的下標設置給k。
        k = parent;
    }
  	// 最后key上浮到k的位置。
    array[k] = key;
}

private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                   Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = x;
}

為了能夠演示算法的整個過程,這里舉例來說明一下:

為了演示擴容的過程我們先初始化隊列長度為2,后面會進行擴容操作。

  1. 當我們調用offer(3)時,此時k=n=size=0,x是我們要插入的內容,array是二叉堆數組,當我們插入第一個元素是,發現k>0是不成立的,所以直接運行 array[k] = key,此時二叉堆數組只有一個元素3,如下圖所示:
    14

  2. 第二次調用offer(5),此時k=n=size=1實際存儲長度為1,運行(k-1)>>>1尋找父節點,k-1=0,無符號向右側移動一位,就相當於是$(k-1)/2$,和開始介紹二叉堆特點的時候尋找父節點是一樣的,此時要插入的節點5的父節點是3,需要進行比較3和5的大小,發現父節點小於要插入的節點,所以執行break退出循環,執行array[k] = key操作,將節點5插入到父節點3下面,並且size進行增加1,此時size=2,如下所示:
    14

  3. 當第三次調用offer(9)時,此時k=n=size=2,發現(n = size) >= (cap = (array = queue).length)實際長度與數組的長度相等,此時進行擴容操作,通過上述源碼分析得到,oldCap+(oldCap+2)=2+(2+2)=6,數組的長度從長度為2擴容到長度為6,將原有數組內容賦值到新的數組中,擴容之后進入到上浮操作進行入隊操作,其實怎么理解呢?可以理解為我們將元素9插入到數組最后一個位置,也就是隊列的最后一個位置。
    14

    然后9這個元素需要找到它的父節點,那就是3,也即是(k-1)>>>1=(2-1)>>>1得到下標為0,array[0]=3,元素9的父節點是3,比較父節點和子節點大小,發現3<9位置不需要進行變動,則二叉堆就變成如下內容,size進行加1,size=3。
    14

  4. 第四次調用offer(2)時,此時k=n=size=3,還是按照上面意思將元素2暫時插入到數組的末尾,然后進行上浮操作,虛線的意思就是告訴你我現在還不一定在不在這里,我要和我的父親比較下到底誰大,如果父節點大那我只能乖乖在這里嘍,如果父節點小,不好意思我得往上浮動了,接下來看一上浮動的過程。
    14

    k=3,元素2的父節點=(k-1)>>>1=2>>>1等於1,也就是array[1]的元素5是元素2的父節點,通過上面的二叉圖也能夠清晰看到元素5是父節點,比較發現2<5,執行array[k] = e,array[3]=5,也就是說將5的位置進行下沉操作,這里源代碼並沒發現2上浮操作,但是在下一次比較中又用到了我們的元素2進行比較,其實現在樹的結構相當於如下所示:
    14
    發現剛開始元素2在最后節點,現在被替換成了元素5,這就意味着元素2的位置變到了之前元素5的位置,源碼中的k = parent,此時元素2的下標已經變到k=1,但是數組里面的內容並沒有變,只是元素下標上浮操作,上浮到父節點,相當於元素2就在元素5的位置,只是下標1的位置元素內容並沒有直接替換成元素2僅此而已。接下來還會進行循環,數組下標1的元素的父節點是元素3,這里就不計算了,因為上面第二步計算過,此時發現2<3,需要將元素3的內容進行下沉操作,元素2的下標進行上浮操作。
    14

    此時下標2的元素移動到父節點下標0的位置,發現k<0,則本次循環結束,將array[0]替換成元素2,本次上浮操作結束。
    14

    由此可見,次堆為最小二叉堆,當出隊操作時彈出的是最小的元素。

poll操作

poll操作就是將隊列內部二叉堆的堆頂元素出隊操作,如果隊列為空則返回null。如下是poll的源碼:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return dequeue();
    } finally {
        lock.unlock();
    }
}
  1. 首先獲取鎖對象,進行上鎖操作,有且僅有一個線程出隊和入隊操作
  2. 獲得到鎖對象之后進行出隊操作,調用dequeue方法
  3. 最后釋放鎖對象

接下來看一下出隊操作是如何進行的,

/**
 * Mechanics for poll().  Call only while holding lock.
 */
private E dequeue() {
  	// 數組的元素的個數。
    int n = size - 1;
  	// 如果數組中不存在元素則直接返回null。
    if (n < 0)
        return null;
    else {
      	// 獲取隊列數組。
        Object[] array = queue;
      	// 將第一個元素也就是二叉堆的根結點堆頂元素作為返回結果。
        E result = (E) array[0];
      	// 獲取數組中最后一個元素。
        E x = (E) array[n];
      	// 將最后一個元素設置為null。
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
          	// 進行下沉操作。
            siftDownComparable(0, x, array, n);
        else
          	// 進行下沉操作。
            siftDownUsingComparator(0, x, array, n, cmp);
      	// 實際元素大小減少1.
        size = n;
      	// 返回結果。
        return result;
    }
}

通過源碼可以看到,出隊的內容就是二叉堆的堆頂元素arrra[0],而后面它有取到的完全二叉樹的最后一個節點,將最后一個節點設置為null,然后在調用了siftDownComparable對堆進行了調整動作,看一下這個方法的具體實現內容,然后再結合上面入隊的內容進行講解出隊是如何保證二叉堆平衡的。

private static <T> void siftDownComparable(int k, T x, Object[] array,                                           int n) {    if (n > 0) {        Comparable<? super T> key = (Comparable<? super T>)x;      	// 最后一個節點的父節點,也就是代表到這里之后就結束了。        int half = n >>> 1;           // loop while a non-leaf        while (k < half) {          	// child=2n+1代表leftChild左節點。            int child = (k << 1) + 1; // assume left child is least          	// 獲取左節點的值。            Object c = array[child];          	// 獲取右節點=2n+1+1=2n+2的坐標            int right = child + 1;          	// 如果右側節點坐標小於數組中最有一個元素,並且右節點值小於左側節點值,則將c設置為右側節點值。            if (right < n &&                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)                c = array[child = right];          	// 對比c和當前最后一個元素的大小。            if (key.compareTo((T) c) <= 0)                break;          	// 將坐標k位置設置為比較的后的值。            array[k] = c;          	// 將光標移動到替換的節點上。            k = child;        }      	// 最后將元素最后一個值賦值到k的位置。        array[k] = key;    }}private static <T> void siftDownUsingComparator(int k, T x, Object[] array,                                                int n,                                                Comparator<? super T> cmp) {    if (n > 0) {        int half = n >>> 1;        while (k < half) {            int child = (k << 1) + 1;            Object c = array[child];            int right = child + 1;            if (right < n && cmp.compare((T) c, (T) array[right]) > 0)                c = array[child = right];            if (cmp.compare(x, (T) c) <= 0)                break;            array[k] = c;            k = child;        }        array[k] = x;    }}

看了源碼其實還是有點蒙的,因為根本沒有辦法想象到是如何實現的,接下來跟着思路一步一步的分析,上面offer時最后二叉堆的情況如下圖所示:
14

  1. 先調用一次poll操作,此時size=4,result=array[0]=2,n=size-1=3,x=array[n]=array[3]=5,將array[3]位置設置為null,就變成如下的二叉堆
    14

    數組中最后一個元素相當於直接被設置為空了,從二叉堆中移除掉了,移除掉了放在那里呢?其實這里大家可以理解為放在堆頂,原因是什么呢?我們想一下,堆頂元素相當於是要被出隊操作,元素2已經出隊了,但是沒有堆頂元素了,此時需要怎么操作呢?直接從數組中找到完全二叉樹葉子節點最后一個節點,將最后一個節點設置到堆頂,然后將堆頂元素進行下沉操作,但這里源碼中並沒有實際設置元素5到堆頂,而是經過比較的過程將元素5從堆頂進行下沉的操作,接下來一步一步分析,目前可以將堆看做如下:
    14

    然后通過源碼可以看到它是現將堆頂也就是元素2的左右節點相比較,比較3<9所以最小節點c=3,然后再跟元素5(可以看做現在是在堆頂)進行比較3<5,發現元素5大於元素3,將元素3的位置替換原堆頂的元素2(此時我們可以完全可以看成元素5的位置也再堆頂,其實就是替換元素5的位置內容),然后元素5的下標從0調整到原來元素3下標1的位置,這個動作我們稱之為下沉操作,下沉過程中並沒有進行內容交換,只是坐標進行下沉操作了,此時二叉堆內容如下所示:
    14

    half=3>>>1=1,k的下標在這個時候已經變為1,進行k<half時發現等於false,本次循環結束,將下標1位置替換真實下沉的內容 array[k] = key=array[1]=5,此時二叉堆內容如下所示:

14

最后將原調整前的result返回。

put操作

put的操作內部其實就是調用了offer操作,由於是無界數組可以進行擴充,所以不需要進行阻塞操作。

public void put(E e) {   
  offer(e); // never need to block
}

take操作

take操作當隊列為空時進行阻塞操作,當隊列不為空調用offer操作時會調用notEmpty.signal();通知等待的線程,隊列中已經有數據了,可以繼續獲取數據了,源碼如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
  	// 調用的事可以相應中斷。
    lock.lockInterruptibly();
    E result;
    try {
        while ( (result = dequeue()) == null)
          	// 如果隊列為空時等待。
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

總結

  1. PriorityBlockingQueue是一個阻塞隊列,繼承自BlockingQueue
  2. PriorityBlockingQueue的元素必須實現Comparable接口。
  3. PriorityBlockingQueue內部實現原理是基於二叉堆來實現的,二叉堆的實現是基於數組來實現的。
  4. PriorityBlockingQueue是一個無界數組,插入值時不需要進行阻塞操作,獲取值如果隊列為空時阻塞線程獲取值。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM