java集合(五)Queue集合之PriorityBlockingQueue詳解


Java並發編程筆記之PriorityBlockingQueue源碼分析

 

JDK 中無界優先級隊列PriorityBlockingQueue 內部使用堆算法保證每次出隊都是優先級最高的元素,元素入隊時候是如何建堆的,元素出隊后如何調整堆的平衡的?

PriorityBlockingQueue是帶優先級的無界阻塞隊列,每次出隊都返回優先級最好或者最低的元素,內部是平衡二叉樹堆的實現。

首先看一下PriorityBlockingQueue類圖結構,如下:

可以看到PriorityBlockingQueue內部有個數組queue用來存放隊列元素,size用來存放隊列元素個數,allocationSpinLock 是個自旋鎖,用CAS操作來保證只有一個線程可以擴容隊列,

狀態為0 或者1,其中0標示當前沒有在進行擴容,1標示當前正在擴容。

 

我們首先看看PriorityBlockingQueue的構造函數,源碼如下:

private static final int DEFAULT_INITIAL_CAPACITY = 11;

 public PriorityBlockingQueue() {
        this(DEFAULT_INITIAL_CAPACITY, null);
    }

    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }

    public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

如上構造函數,默認隊列容量為11,默認比較器為null,也就是使用元素的compareTo方法進行比較來確定元素的優先級,這意味着隊列元素必須實現Comparable接口。

 

接下來我們主要看PriorityBlockingQueue的幾個操作的源碼,如下:

  1.offer 操作,offer操作的作用是在隊列插入一個元素,由於是無界隊列,所以一直返回true,源碼如下:

public boolean offer(E e) {

    if (e == null)
        throw new NullPointerException();

    //獲取獨占鎖
    final ReentrantLock lock = this.lock;
    lock.lock();

    int n, cap;
    Object[] array;

    //如果當前元素個數>=隊列容量,則擴容(1)
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);

    try {
        Comparator<? super E> cmp = comparator;

        //默認比較器為null (2)
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            //自定義比較器 (3)
            siftUpUsingComparator(n, e, array, cmp);

        //隊列元素增加1,並且激活notEmpty的條件隊列里面的一個阻塞線程(9)
        size = n + 1;
        notEmpty.signal();//激活調用take()方法被阻塞的線程
    } finally {
        //釋放獨占鎖
        lock.unlock();
    }
    return true;
}

可以看到上面代碼,offer操作主流程比較簡單,接下來主要關注PriorityBlockingQueue是如何進行擴容的和內部如何建立堆的,首先看擴容源碼如下:

private void tryGrow(Object[] array, int oldCap) {
    lock.unlock(); //釋放獲取的鎖
    Object[] newArray = null;

    //cas成功則擴容(4)
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            //oldGap<64則擴容新增oldcap+2,否者擴容50%,並且最大為MAX_ARRAY_SIZE
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // 如果一開始容量很小,則擴容寬度變大
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // 可能溢出
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }

    //第一個線程cas成功后,第二個線程會進入這個地方,然后第二個線程讓出cpu,盡量讓第一個線程執行下面點獲取鎖,但是這得不到肯定的保證。(5)
    if (newArray == null) // 如果兩外一個線程正在分配,則讓出
        Thread.yield();
    lock.lock();//(6)
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}

tryGrow 目的是擴容,這里要思考下為啥在擴容前要先釋放鎖,然后使用 cas 控制只有一個線程可以擴容成功呢?

其實這里不先釋放鎖也是可以的,也就是在整個擴容期間一直持有鎖,但是擴容是需要花時間的,如果擴容的時候還占用鎖,那么其他線程在這個時候是不能進行出隊和入隊操作的,

這大大降低了並發性。所以為了提高性能,使用CAS控制只有一個線程可以進行擴容,並且在擴容前釋放了鎖,讓其他線程可以進行入隊和出隊操作。

 

spinlock鎖使用CAS控制只有一個線程可以進行擴容,CAS失敗的線程會調用Thread.yield() 讓出 cpu,目的是為了讓擴容線程擴容后優先調用 lock.lock 重新獲取鎖,

但是這得不到一定的保證。有可能yield的線程在擴容線程擴容完成前已經退出,並執行了代碼(6)獲取到了鎖。如果當前數組擴容還沒完畢,當前線程會再次調用tryGrow方法,

然后釋放鎖,這又給擴容線程獲取鎖提供了機會,如果這時候擴容線程還沒擴容完畢,則當前線程釋放鎖后又調用yield方法讓出CPU。可知當擴容線程進行擴容期間,

其他線程是原地自旋通過代碼(1)檢查當前擴容是否完畢,等擴容完畢后才退出代碼(1)的循環。

 

當擴容線程擴容完畢后會重置自旋鎖變量allocationSpinLock 為 0,這里並沒有使用UNSAFE方法的CAS進行設置是因為同時只可能有一個線程獲取了該鎖,並且 allocationSpinLock 被修飾為了 volatile。

當擴容線程擴容完畢后會執行代碼 (6) 獲取鎖,獲取鎖后復制當前 queue 里面的元素到新數組。

 

接下來我們看看建堆算法,源碼如下:

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    Comparable<? super T> key = (Comparable<? super T>) x;

    //隊列元素個數>0則判斷插入位置,否者直接入隊(7)
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;(8)
}

接下來用圖來解釋上面的算法過程,假設隊列初始化容量為2,創建的優先級隊列的泛型參數為Integer。

首先調用隊列offer(2)方法,希望插入元素2到隊列,插入前隊列狀態如下圖所示:

 

 首先執行代碼(1),從上圖變量值可以知道判斷值為false,所以緊接着執行代碼(2),由於 k=n=size=0 所以代碼(7)判斷結果為 false,所以會執行代碼(8)直接把元素 2 入隊,最后執行代碼(9)設置 size 的值加 1,這時候隊列的狀態如下圖:

然后調用隊列的 offer(4) 時候,首先執行代碼(1),從上圖變量值可知判斷為 false,所以執行代碼(2),由於 k=1, 所以進入 while 循環,由於 parent=0;e=2;key=4; 默認元素比較器是使用元素的 compareTo 方法,

可知 key>e 所以執行 break 退出 siftUpComparable 中的循環; 然后把元素存到數組下標為 1 的地方,最后執行代碼(9)設置 size 的值加 1,這時候隊列狀態為:

然后調用隊列的offer(6) 時候,首先執行代碼(1),從上圖變量值知道這時候判斷值為true,所以嗲用tryGrow進行數組擴容,由於2 < 64 所以newCap=2 + (2+2)=6; 然后創建新數組並拷貝,

然后調用siftUpComparable 方法,由於 k=2>0 進入 while 循環,由於 parent=0;e=2;key=6;key>e 所以 break 后退出 while 循環; 並把元素 6 放入數組下標為 2 的地方,最后設置 size 的值加 1,現在隊列狀態:

然后調用隊列的 offer(1) 時候,首先執行代碼(1),從上圖變量值知道這次判斷值為 false,所以執行代碼(2),由於k=3, 所以進入 while 循環,由於parent=0;e=4;key=1; key<e,所以把元素 4 復制到數組下標為 3 的地方,

然后 k=0 退出 while 循環;然后把元素 1 存放到下標為 0 地方,現在狀態:

此時此刻的二叉樹堆的樹形圖如下:

可知堆的根元素是 1,也就是這是一個最小堆,那么當調用這個優先級隊列的 poll 方法時候,會一次返回堆里面值最小的元素。

 

  2.poll操作,poll 操作作用是獲取隊列內部堆樹的根節點元素,如果隊列為空,則返回 null。源碼如下:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();//獲取獨占鎖
    try {
        return dequeue();
    } finally {
        lock.unlock();//釋放獨占鎖
    }
}

如上代碼可以知道在進行出隊操作過程中要先加鎖,這意味着,當前線程進行出隊操作的時候,其他線程不能再進行入隊和出隊操作,但是從前面介紹offer函數的時候,知道這時候可以有其他線程進行擴容,

接下來,我們要看一下出隊操作的dequeue方法的源碼如下:

private E dequeue() {

    //隊列為空,則返回null
    int n = size - 1;
    if (n < 0)
        return null;
    else {

        //獲取隊頭元素(1)
        Object[] array = queue;
        E result = (E) array[0];

        //獲取隊尾元素,並值null(2)
        E x = (E) array[n];
        array[n] = null;

        Comparator<? super E> cmp = comparator;
        if (cmp == null)//(3)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;//(4)
        return result;
    }
}

如上代碼,如果隊列為空則直接返回 null,否者執行代碼(1)獲取數組第一個元素作為返回值存放到變量 Result,這里要注意一下數組里面第一個元素是優先級最小或者最大的元素,出隊操作就是返回這個元素。

然后代碼(2)獲取隊列尾部元素存放到變量X,並且置空尾部節點,然后執行代碼(3)插入變量X 到數組下標為 0 的位置后,重新調整堆為最大或者最小堆,然后返回。

這里重要的是看如何去掉堆的根節點后,使用剩下的節點重新調整為一個最大或者最小堆。

接下來我們看看siftDownComparable 的源碼,如下:

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable<? super T> key = (Comparable<? super T>)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                int child = (k << 1) + 1; // 假設左邊子樹最小
                Object c = array[child];(5int right = child + 1;(6)
                if (right < n &&
                    ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)(7)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)(8)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;(9)
        }
    }

下面我們結合圖來模擬上面調整堆的算法過程,接着上節隊列的狀態繼續講解,上節隊列元素序列為 1,2,6,4:

第一次調用隊列的 poll() 方法時候,首先執行代碼(1)(2),這時候變量 size =4;n=3;result=1;x=4; 這時候隊列狀態圖如下:

然后執行代碼(3),調整堆后隊列狀態圖,如下:

第二次調用隊列的 poll() 方法時候,首先執行代碼(1)(2),這時候變量 size =3;n=2;result=2;x=6; 這時候隊列狀態圖,如下:

然后執行代碼(3)調整堆后隊列狀態圖,如下:

第三次調用隊列的 poll() 方法時候,首先執行代碼(1)(2),這時候變量 size =2;n=1;result=4;x=6; 這時候隊列狀態圖,如下:

然后執行代碼(3)調整堆后隊列狀態圖,如下:

第四次直接返回元素 6.

 

 

接下來重點說說 siftDownComparable 這個調整堆的算法: 首先說下堆調整的思路,由於隊列數組第 0 個元素為樹根,出隊時候要被移除,這時候數組就不在是最小堆了,所以需要調整堆,

具體是要從被移除的樹根的左右子樹中找一個最小的值來當樹根,左右子樹又會看自己作為樹根節點的樹的左右子樹里面哪個是最小值,這是一個遞歸的過程,直到樹葉節點結束遞歸,

如果不明白,下面結合圖形來說明,假如當前隊列內容如下:

對應的二叉堆樹如下:

這時候如果調用了 poll(); 那么 result=2;x=11;隊列末尾的元素設置為 null 后,剩下的元素調整堆的步驟如下圖:

 

 

如上圖(1)樹根的 leftChildVal = 4;rightChildVal = 6; 4<6; 所以 c=4; 然后 11>4 也就是 key>c;所以使用元素 4 覆蓋樹根節點的值,現在堆對應的樹如圖(2)。

然后樹根的左子樹樹根的左右孩子節點中 leftChildVal = 8;rightChildVal = 10; 8<10; 所以 c=8; 然后發現 11>8 也就是 key>c;所以元素 8 作為樹根左子樹的根節點,現在樹的形狀如圖(3),

這時候判斷 k<half 為 false 就會退出循環,然后把 x=11 設置到數組下標為 3 的地方,這時候堆樹如圖(4),至此調整堆完畢,siftDownComparable 返回 result=2,poll 方法也返回了。

 

 

  3.put操作,put 操作內部調用的 offer, 由於是無界隊列,所以不需要阻塞,源碼如下:

public void put(E e) {
    offer(e); // never need to block
}

4.take 操作,take 操作作用是獲取隊列內部堆樹的根節點元素,如果隊列為空則阻塞,源碼如下:

public E take() throws InterruptedException {
    //獲取鎖,可被中斷
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {

        //如果隊列為空,則阻塞,把當前線程放入notEmpty的條件隊列
        while ( (result = dequeue()) == null)
            notEmpty.await();//阻塞當前線程
    } finally {
        lock.unlock();//釋放鎖
    }
    return result;
}

如上代碼,首先通過 lock.lockInterruptibly() 獲取獨占鎖,這個方式獲取的鎖是對中斷進行響應的。然后調用 dequeue 方法返回堆樹根節點元素,如果隊列為空,則返回 false,

然后當前線程調用 notEmpty.await() 阻塞掛起當前線程,直到有線程調用了 offer()方法(offer 方法內在添加元素成功后調用了 notEmpty.signal 方法會激活一個阻塞在 notEmpty 的條件隊列里面的一個線程)。

另外這里使用 while 而不是 if 是為了避免虛假喚醒。

 

  5.size操作,獲取隊列元個數,如下代碼,在返回 size 前加了鎖,保證在調用 size() 方法時候不會有其它線程進行入隊和出隊操作,另外由於 size 變量沒有被修飾為 volatie,這里加鎖也保證了多線程下 size 變量的內存可見性。源碼如下:

public int size() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return size;
    } finally {
        lock.unlock();
    }
}

總結:PriorityBlockingQueue 隊列內部使用二叉樹堆維護元素優先級,內部使用數組作為元素存儲的數據結構,這個數組是可以擴容的,當前元素個數 >= 最大容量的時候會通過算法擴容,

出隊的時候始終保證出隊的元素是堆樹的根節點,而不是在隊列里面停留時間最長的元素,默認元素優先級比較規則是使用元素的compareTo方法來做,用戶可以自定義優先級的比較優先級。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM