面試侃集合 | DelayQueue篇


面試官:好久不見啊,上次我們聊完了PriorityBlockingQueue,今天我們再來聊聊和它相關的DelayQueue吧。

Hydra:就知道你前面肯定給我挖了坑,DelayQueue也是一個無界阻塞隊列,但是和之前我們聊的其他隊列不同,不是所有類型的元素都能夠放進去,只有實現了Delayed接口的對象才能放進隊列。Delayed對象具有一個過期時間,只有在到達這個到期時間后才能從隊列中取出。

面試官:有點意思,那么它有什么使用場景呢?

Hydra:不得不說,由於DelayQueue的精妙設計,使用場景還是蠻多的。例如在電商系統中,如果有一筆訂單在下單30分鍾內沒有完成支付,那么就需要自動取消這筆訂單。還有,如果我們緩存了一些數據,並希望這些緩存在一定時間后失效的話,也可以使用延遲隊列將它從緩存中刪除。

以電商系統為例,可以簡單看一下這個流程:

面試官:看起來和任務調度有點類似啊,它們之間有什么區別嗎?

Hydra:任務調度更多的偏向於定時的特性,是在指定的時間點時間間隔執行特定的任務,而延遲隊列更多偏向於在指定的延遲時間后執行任務。相對任務調度來說,上面舉的例子中的延遲隊列場景都具有高頻率的特性,使用定時任務來實現它們的話會顯得有些過於笨重了。

面試官:好了,你也白話了半天了,能動手就別吵吵,還是先給我寫個例子吧。

Hydra:好嘞,前面說過存入隊列的元素要實現Delayed接口,所以我們先定義這么一個類:

public class Task implements Delayed {
    private String name;
    private long delay,expire;
    public Task(String name, long delay) {
        this.name = name;
        this.delay = delay;
        this.expire=System.currentTimeMillis()+delay;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

}

實現了Delayed接口的類必須要實現下面的兩個方法:

  • getDelay方法用於計算對象的剩余延遲時間,判斷對象是否到期,計算方法一般使用過期時間減當前時間。如果是0或負數,表示延遲時間已經用完,否則說明還沒有到期

  • compareTo方法用於延遲隊列的內部排序比較,這里使用當前對象的延遲時間減去被比較對象的延遲時間

在完成隊列中元素的定義后,向隊列中加入5個不同延遲時間的對象,並等待從隊列中取出:

public void delay() throws InterruptedException {
    DelayQueue<Task> queue=new DelayQueue<>();
    queue.offer(new Task("task1",5000));
    queue.offer(new Task("task2",1000));
    queue.offer(new Task("task3",6000));
    queue.offer(new Task("task4",100));
    queue.offer(new Task("task5",3000));

    while(true){
        Task task = queue.take();
        System.out.println(task);
    }
}

運行結果如下,可以看到按照延遲時間從短到長的順序,元素被依次從隊列中取出。

Task{name='task4', delay=100}
Task{name='task2', delay=1000}
Task{name='task5', delay=3000}
Task{name='task1', delay=5000}
Task{name='task3', delay=6000}

面試官:看起來應用還是挺簡單的,今天也不能這么草草了事吧,還是說說原理吧。

Hydra:開始的時候你自己不都說了嗎,今天咱們聊的DelayQueue和前幾天聊過的PriorityBlockingQueue多少有點關系。DelayQueue的底層是PriorityQueue,而PriorityBlockingQueue和它的差別也沒有多少,只是在PriorityQueue的基礎上加上鎖和條件等待,入隊和出隊用的都是二叉堆的那一套邏輯。底層使用的有這些:

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
private final Condition available = lock.newCondition();

面試官:你這樣也有點太糊弄我了吧,這就把我敷衍過去了?

Hydra:還沒完呢,還是先看入隊的offer方法,它的源碼如下:

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

DelayQueue每次向優先級隊列PriorityQueue中添加元素時,會以元素的剩余延遲時間delay作為排序的因素,來實現使最先過期的元素排在隊首,以此達到在之后從隊列中取出的元素都是先取出最先到達過期的元素。

二叉堆的構造過程我們上次講過了,就不再重復了。向隊列中添加完5個元素后,二叉堆和隊列中的結構是這樣的:

當每個元素在按照二叉堆的順序插入隊列后,會查看堆頂元素是否剛插入的元素,如果是的話那么設置leader線程為空,並喚醒在available上阻塞的線程。

這里先簡單的介紹一下leader線程的作用,leader是等待獲取元素的線程,它的作用主要是用於減少不必要的等待,具體的使用在后面介紹take方法的時候我們細說。

面試官:也別一會了,趁熱打鐵直接講隊列的出隊方法吧。

Hydra:這還真沒法着急,在看阻塞方法take前還得先看看非阻塞的poll方法是如何實現的:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}

代碼非常短,理解起來非常簡單,在加鎖后首先檢查堆頂元素,如果堆頂元素為空或沒有到期,那么直接返回空,否則返回堆頂元素,然后解鎖。

面試官:好了,鋪墊完了吧,該講阻塞方法的過程了吧?

Hydra:阻塞的take方法理解起來會比上面稍微困難一點,我們還是直接看它的源碼:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                if (leader != null)
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

阻塞過程中分支條件比較復雜,我們一個一個看:

  • 首先獲取堆頂元素,如果為空,那么說明隊列中還沒有元素,讓當前線程在available上進行阻塞等待
  • 如果堆頂元素不為空,那么查看它的過期時間,如果已到期,那么直接彈出堆頂元素
  • 如果堆頂元素還沒有到期,那么查看leader線程是否為空,如果leader線程不為空的話,表示已經有其他線程在等待獲取隊列的元素,直接阻塞當前線程。
  • 如果leader為空,那么把當前線程賦值給它,並調用awaitNanos方法,在阻塞delay時間后自動醒來。喚醒后,如果leader還是當前線程那么把它置為空,重新進入循環,再次判斷堆頂元素是否到期。

當有隊列中的元素完成出隊后,如果leader線程為空,並且堆中還有元素,就喚醒阻塞在available上的其他線程,並釋放持有的鎖。

面試官:我注意到一個問題,在上面的代碼中,為什么要設置first = null呢?

Hydra:假設有多個線程在執行take方法,當第一個線程進入時,堆頂元素還沒有到期,那么會將leader指向自己,然后阻塞自己一段時間。如果在這期間有其他線程到達,會因為leader不為空阻塞自己。

當第一個線程阻塞結束后,如果將堆頂元素彈出成功,那么first指向的元素應該被gc回收掉。但是如果還被其他線程持有的話,它就不會被回收掉,所以將first置為空可以幫助完成垃圾回收。

面試官:我突然有一個發散性的疑問,定時任務線程池ScheduledThreadPoolExecutor,底層使用的也是DelayQueue嗎?

Hydra:問題很不錯,但很遺憾並不是,ScheduledThreadPoolExecutor在類中自己定義了一個DelayedWorkQueue內部類,並沒有直接使用DelayQueue。不過如果你看一下源碼,就會看到它們實現的邏輯基本一致,同樣是基於二叉堆的上浮、下沉、擴容,也同樣基於leader、鎖、條件等待等操作,只不過自己用數組又實現了一遍而已。說白了,看看兩個類的作者,都是Doug Lea大神,所以差異根本沒有多大。

面試官:好了,今天先到這吧,能最后再總結一下嗎?

Hydra:DelayQueue整體理解起來也沒有什么困難的點,難的地方在前面聊優先級隊列的時候基本已經掃清了,新加的東西也就是一個對於leader線程的操作,使用了leader線程來減少不必要的線程等待時間。

面試官:今天的面試有點短啊,總是有點意猶未盡的感覺,看來下次得給你加點料了。

Hydra:

如果文章對您有所幫助,歡迎關注公眾號 碼農參上


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM