DelayQueue源碼解析


DelayQueue是一個支持延時獲取元素的無界阻塞隊列。里面的元素全部都是“可延期”的元素,列頭的元素是最先“到期”的元素,如果隊列里面沒有元素到期,是不能從列頭獲取元素的,哪怕有元素也不行。也就是說只有在延遲期到時才能夠從隊列中取元素。

DelayQueue主要用於兩個方面:
- 緩存:清掉緩存中超時的緩存數據
- 任務超時處理 

class DelayedEle implements Delayed {

    private final long delayTime; //延遲時間
    private final long expire;  //到期時間
    private String data;   //數據

    public DelayedEle(long delay, String data) {
        delayTime = delay;
        this.data = data;
        expire = System.currentTimeMillis() + delay; 
    }

    /**
     * 剩余時間=到期時間-當前時間
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }

    /**
     * 優先隊列里面優先級規則
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("DelayedElement{");
        sb.append("delay=").append(delayTime);
        sb.append(", expire=").append(expire);
        sb.append(", data='").append(data).append('\'');
        sb.append('}');
        return sb.toString();
    }
}

看了DelayQueue的內部結構就對上面幾個關鍵點一目了然了,但是這里有一點需要注意,DelayQueue的元素都必須繼承Delayed接口。同時也可以從這里初步理清楚DelayQueue內部實現的機制了:以支持優先級無界隊列的PriorityQueue作為一個容器,容器里面的元素都應該實現Delayed接口,在每次往優先級隊列中添加元素時以元素的過期時間作為排序條件,最先過期的元素放在優先級最高


 基於PriorityQueue,一種延時阻塞隊列,DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。

DelayQueue也是一個無界隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據為空的操作(消費者)才會被阻塞
放的時候不用因為滿了而阻塞,取的時候空了會阻塞等待有時間沒到也會阻塞等到,自己也可以指定超時時間。


 只有一個getDelay(TimeUnit)方法,該方法返回與此對象相關的的剩余時間。同時我們看到Delayed接口繼承自Comperable接口,所以實現Delayed接口的類還必須要定義一個compareTo方法,該方法提供與此接口的getDelay方法一致的排序。

DelayQueue隊列中每個元素都有個過期時間,並且隊列是個優先級隊列,當從隊列獲取元素時候,只有過期元素才會出隊列。

q:優先隊列,用於存儲元素,並按優先級排序
leader:用於優化內部阻塞通知的線程,第一個阻塞等待的線程。
以支持優先級的PriorityQueue無界隊列作為一個容器,因為元素都必須實現Delayed接口,可以根據元素的過期時間來對元素進行排列,因此,先過期的元素會在隊首,每次從隊列里取出來都是最先要過期的元素。

leader是等待獲取隊列元素的線程,應用主從式設計減少不必要的等待。如果leader不等於空,表示已經有線程在等待獲取隊列的元素。所以,通過await()方法讓出當前線程等待信號。如果leader等於空,則把當前線程設置為leader,當一個線程為leader,它會使用awaitNanos()方法讓當前線程等待接收信號或等待delay時間。 

private final PriorityQueue<E> q = new PriorityQueue<E>();
private final transient ReentrantLock lock = new ReentrantLock();
private Thread leader = null;
private final Condition available = lock.newCondition();

public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);  //放一個元素
            if (q.peek() == e) {  //隊列的第0個元素是這個元素,放進去的元素成為第0個元素。最大堆第0和元素是最大的,依次是第23大,第4567大,
                leader = null;    //  leader 線程為空
                available.signal();   //其他線程喚醒,如果隊首元素是剛插入的元素,則設置leader為null,並喚醒阻塞在available上的線程
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)   //時間不到不走
            return null;
        else
            return q.poll();   //  取出優先級隊列里面第0個元素,優先級隊列是根據時間排序了的
    } finally {
        lock.unlock();
    }
}

線程阻塞地方,與代碼對應關系。 

推論1:有leader線程等待的時候,新的線程要取,必然加入await隊列排隊。所有已經await的線程,喚醒了非leader線程就會繼續await。每次喚醒一個處於await的線程。同時之能一個線程取就是leader去取。
await釋放鎖別的線程可以執行,singal不會釋放鎖別的線程不執行。
推論2:take(),put()方法全部鎖住,只能一個線程調用take或put。里面有await就不行了,await后別的線程就可以進來take,put方法,只要不是執行take里面的await方法,那么就只能一個線程執行take,put方法。
推論3:如果都不設置等待時間,那么就沒有優先級,只有等到放的時候  或者  成功取到的線程喚醒,都設置等待時間,時間到了一起搶。
推論4:取的時候為空等待,時間到了直接返回,有元素但是時間沒到,就設置leader
推論5:等待取的線程,有一個是leader,就是由第一優先權的線程也是第一個嘗試獲取的線程。所有的等待取的線程中,下一次不一定是ledaer去取,如果喚醒了非leader線程,那也不行,要繼續等到,一定要leader線程喚醒后去取,其他線程才能去取,然后成為新的leader。

 方法調用是根據線程隨機來的,一旦走進去方法就根據代碼來了。

 

public E take() throws InterruptedException {
    // 獲取全局獨占鎖,只能同時一個線程取或者放,喚醒只是喚醒一個線程。 
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            // 獲取隊首元素
            E first = q.peek();
            // 隊首為空,則阻塞當前線程
            if (first == null) 
                available.await();//等待offer方法喚醒,再次循環去peek,
            else {
                // 獲取隊首元素的超時時間
                long delay = first.getDelay(NANOSECONDS);//返回還需等待3秒
                // 已超時,直接出隊
                if (delay <= 0)
                    return q.poll();
                // 釋放first的引用,避免內存泄漏
                first = null; 
                //有元素,但是元素沒到時間  
                // leader != null表明有其他線程在等待,阻塞當前線程,自己不是第一個等待線程
                if (leader != null)
                    available.await();  //等待獲取了頭元素的線程喚醒,或者offer方法喚醒,再次循環去peek,
                else {
                    //沒有其他線程在等待, leader指向當前線程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 超時阻塞,只需要等3秒,醒了之后也要重新獲取。
                        available.awaitNanos(delay);
                    } finally {
                        // 釋放leader
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // leader為null並且隊列不為空,說明沒有其他線程在等待,那就通知條件隊列
        if (leader == null && q.peek() != null)
            available.signal();
        // 釋放全局獨占鎖
        lock.unlock();
    }
}

這里為什么如果不設置first = null,則會引起內存泄漏呢?線程A到達,列首元素沒有到期,設置leader = 線程A,這是線程B來了因為leader != null,則會阻塞,線程C一樣。假如線程阻塞完畢了,獲取列首元素成功,出列。這個時候列首元素應該會被回收掉,但是問題是它還被線程B、線程C持有着,所以不會回收,這里只有兩個線程,如果有線程D、線程E…呢?這樣會無限期的不能回收,就會造成內存泄漏。 

 try里面有return,也要先執行finally在執行return。Condition.await(16,TimeUnit.SECONDS):睡醒之后,獲得了鎖就執行,沒有獲得鎖阻塞。睡的中間被人singnal了,可以醒來,有鎖執行,無鎖阻塞

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM