java延遲隊列DelayQueue及底層優先隊列PriorityQueue實現原理源碼詳解


  DelayQueue是基於java中一個非常牛逼的隊列PriorityQueue(優先隊列),PriorityQueue是java1.5新加入的,當我看到Doug Lea大神的署名之后,我就知道這個隊列不簡單,那我們先來看一下他的源碼吧:

作為一個隊列來說,最基礎的就是新增和查詢,首先我們看下入隊的邏輯:

1.入隊

PriorityQueue提供了offer方法新增元素(add方法其實也是offer實現的),我們直接看下源碼:

public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        if (i >= queue.length)
            grow(i + 1);
        size = i + 1;
        if (i == 0)
            queue[0] = e;
        else
            siftUp(i, e);
        return true;
    }

offer方法首先判斷是否需要擴容,若需要則走grow方法:

private void grow(int minCapacity) {
        int oldCapacity = queue.length;
        // Double size if small; else grow by 50%
        int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                         (oldCapacity + 2) :
                                         (oldCapacity >> 1));
        // overflow-conscious code
        if (newCapacity - MAX_ARRAY_SIZE > 0)
            newCapacity = hugeCapacity(minCapacity);
        queue = Arrays.copyOf(queue, newCapacity);
    }

當長度小於64,擴容一倍+2,否則擴容50%。

再往下看若隊列中沒有元素,直接復制下標為0的元素,否則調用siftUp方法:

private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x);
        else
            siftUpComparable(k, x);
    }

倆方法差不多一個,具體可搜compare和compareTo的區別如:https://blog.csdn.net/fly910905/article/details/81670353,我們直接看siftUpComparable方法:

private void siftUpComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (key.compareTo((E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }

     

 結合上圖和代碼可以看出每個節點新增時,首先會根據節點下標計算出當前新節點應該屬於的節點的父節點,比較當小於父節點則交換,無限循環,知道不存在父節點或者當前節點大於父節點的值,這樣可以保證每個節點都比起子節點要小。

2.出隊

入隊的時候基本都差不多,但出隊卻有好幾種,我們首先看peek方法:

public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }
public E peek() {
        return (size == 0) ? null : (E) queue[0];
    }

代碼簡潔明了,就是查詢出第一個,這只能算查詢,算不上出隊,我覺得應該叫點名。

再看poll方法:

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();  //取第一個節點
            if (first == null || first.getDelay(NANOSECONDS) > 0)  //節點為空或者首節點未到延時時間直接返回null
                return null;
            else
                return q.poll();  //PriorityQueue取節點邏輯
        } finally {
            lock.unlock();
        }
    }

再看PriorityQueue.poll方法:

public E poll() {
        if (size == 0)
            return null;
        int s = --size;
        modCount++;
        E result = (E) queue[0];
        E x = (E) queue[s];
        queue[s] = null;
        if (s != 0)
            siftDown(0, x);
        return result;
    }

首先取出第一個節點,然后將最后一個節點放替換首節點,並與子節點對比找出最小的並替換直到當前節點為最小為止,具體替換流程見siftDown代碼:

private void siftDown(int k, E x) {
        if (comparator != null)
            siftDownUsingComparator(k, x);
        else
            siftDownComparable(k, x);
    }
private void siftDownComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>)x;
        int half = size >>> 1;        // loop while a non-leaf
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least  拿到左子節點下標
            Object c = queue[child]; 
            int right = child + 1;  //右子節點下標
            if (right < size &&
                ((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
                c = queue[child = right];  //取出左右節點較小的
            if (key.compareTo((E) c) <= 0)  //當前節點比子節點小,結束流程
                break;
            queue[k] = c;   //替換子節點至父節點
            k = child;
        }
        queue[k] = key;
    }

這個代碼看起來稍微復雜點,會首先拿到左子節點和右子節點,對比取出較小的節點后與當前節點對比,將小的放在父節點位置,其實這里也是保證替換后的節點依然保持每個父節點最小,符合小頂堆。具體流程如下圖所示:

 

 

 

 

我們最后看下take方法的實現:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();  //取出第一個節點
                if (first == null)  //首節點為空說明隊列為空,await等待
                    available.await();
                else {              //說明隊列中有節點
                    long delay = first.getDelay(NANOSECONDS);   //獲取首節點延時時間
                    if (delay <= 0)         //延時時間到期,直接取
                        return q.poll();
                    first = null; // don't retain ref while waiting 
                    if (leader != null)    //說明當前有其他線程在操作(一般是其他線程在await)
                        available.await();
                    else {  //這里設置操作線程為自己,並等待延時時間時長
                        Thread thisThread = Thread.currentThread(); 
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

這個實現一看就是阻塞式等待,取不到不罷休系列。

 

總結:

  這篇寫的還是比較簡單的,大體介紹了DelayQueue的實現,也從底層了解了小頂堆PriorityQueue的實現,算是補充了之前對延時隊列的具體實現,這篇主要是通過一個小頂堆的實現,保證每次取得值都是最小的,而又不用像數組那樣每次插入都要重新排序,這里只要排序一部分就可以,也保證了性能,而DelayQueue中,加入了ReentrantLock保證了多線程的線程安全,同時加入Condition實現了延時阻塞式存取的機制,jdk的代碼還是牛,這里其實就是我之前寫鎖的時候介紹的等待通知模式的一種實現,結合起來看還是有一些收獲的。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM