DelayQueue詳解


一、DelayQueue是什么

  DelayQueue是一個無界的BlockingQueue,用於放置實現了Delayed接口的對象,其中的對象只能在其到期時才能從隊列中取走。這種隊列是有序的,即隊頭對象的延遲到期時間最長。注意:不能將null元素放置到這種隊列中。

二、DelayQueue能做什么

 1. 淘寶訂單業務:下單之后如果三十分鍾之內沒有付款就自動取消訂單。 
 2. 餓了嗎訂餐通知:下單成功后60s之后給用戶發送短信通知。

 3.關閉空閑連接。服務器中,有很多客戶端的連接,空閑一段時間之后需要關閉之。

 4.緩存。緩存中的對象,超過了空閑時間,需要從緩存中移出。

 5.任務超時處理。在網絡協議滑動窗口請求應答式交互時,處理超時未響應的請求等。

三、實例展示

 定義元素類,作為隊列的元素

 DelayQueue只能添加(offer/put/add)實現了Delayed接口的對象,意思是說我們不能想往DelayQueue里添加什么就添加什么,不能添加int、也不能添加String進去,必須添加我們自己的實現了Delayed接口的類的對象,來代碼:

/**
 *  compareTo 方法必須提供與 getDelay 方法一致的排序
 */
class MyDelayedTask implements Delayed{

    private String name ;
    private long start = System.currentTimeMillis();
    private long time ;

    public MyDelayedTask(String name,long time) {
        this.name = name;
        this.time = time;
    }

    /**
     * 需要實現的接口,獲得延遲時間   用過期時間-當前時間
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((start+time) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    /**
     * 用於延遲隊列內部比較排序   當前時間的延遲時間 - 比較對象的延遲時間
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        MyDelayedTask o1 = (MyDelayedTask) o;
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return "MyDelayedTask{" +
                "name='" + name + '\'' +
                ", time=" + time +
                '}';
    }
}

  其中,compareTo 方法 getDelay 方法 就是Delayed接口的方法,我們必須實現,而且按照JAVASE文檔,compareTo 方法必須提供與 getDelay 方法一致的排序,也就是說compareTo方法里可以按照getDelay方法的返回值大小排序,即在compareTo方法里比較getDelay方法返回值大小

寫main方法測試

  定義一個DelayQueue,添加幾個元素,while循環獲取元素

private static DelayQueue delayQueue  = new DelayQueue();
    public static void main(String[] args) throws InterruptedException {

        new Thread(new Runnable() {
            @Override
            public void run() {

                delayQueue.offer(new MyDelayedTask("task1",10000));
                delayQueue.offer(new MyDelayedTask("task2",3900));
                delayQueue.offer(new MyDelayedTask("task3",1900));
                delayQueue.offer(new MyDelayedTask("task4",5900));
                delayQueue.offer(new MyDelayedTask("task5",6900));
                delayQueue.offer(new MyDelayedTask("task6",7900));
                delayQueue.offer(new MyDelayedTask("task7",4900));

            }
        }).start();

        while (true) {
            Delayed take = delayQueue.take();
            System.out.println(take);
        }
    }

執行結果

MyDelayedTask{name='task3', time=1900}
MyDelayedTask{name='task2', time=3900}
MyDelayedTask{name='task7', time=4900}
MyDelayedTask{name='task4', time=5900}
MyDelayedTask{name='task5', time=6900}
MyDelayedTask{name='task6', time=7900}
MyDelayedTask{name='task1', time=10000}

 DelayQueue屬於排序隊列,它的特殊之處在於隊列的元素必須實現Delayed接口,該接口需要實現compareTo和getDelay方法。

static class Task implements Delayed{
        @Override
                //比較延時,隊列里元素的排序依據
        public int compareTo(Delayed o) {
            return 0;
        }
        
        @Override
                //獲取剩余時間
        public long getDelay(TimeUnit unit) {
            return 0;
        }
    }

 

  元素進入隊列后,先進行排序,然后,只有getDelay也就是剩余時間為0的時候,該元素才有資格被消費者從隊列中取出來,所以構造函數一般都有一個時間傳入。

具體另一個實例:

import java.sql.Time;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class Delayquue {

    public static void main(String[] args) throws Exception {
        BlockingQueue<Task> delayqueue = new DelayQueue<>();
        long now = System.currentTimeMillis();
        delayqueue.put(new Task(now+3000));
        delayqueue.put(new Task(now+4000));
        delayqueue.put(new Task(now+6000));
        delayqueue.put(new Task(now+1000));
        System.out.println(delayqueue);
        
        for(int i=0; i<4; i++) {
            System.out.println(delayqueue.take());
        }
        
    }
    
    static class Task implements Delayed{
        long time = System.currentTimeMillis();
        public Task(long time) {
            this.time = time;
        }
        @Override
        public int compareTo(Delayed o) {
            if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
                return -1;
            else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) 
                return 1;
            else 
                return 0;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
        }
        @Override
        public String toString() {
            return "" + time;
        }
    }
} 

輸出結果:

  可以看出來,每隔一段時間就會輸出一個元素,這個間隔時間就是由構造函數定義的秒數來決定的。


 

原理分析:

 內部結構

  • 可重入鎖
  • 用於根據delay時間排序的優先級隊列
  • 用於優化阻塞通知的線程元素leader
  • 用於實現阻塞和通知的Condition對象

delayed和PriorityQueue

 在理解delayQueue原理之前我們需要先了解兩個東西,delayed和PriorityQueue.

  • delayed是一個具有過期時間的元素
  • PriorityQueue是一個根據隊列里元素某些屬性排列先后的順序隊列

  delayQueue其實就是在每次往優先級隊列中添加元素,然后以元素的delay/過期值作為排序的因素,以此來達到先過期的元素會拍在隊首,每次從隊列里取出來都是最先要過期的元素

offer方法

  1. 執行加鎖操作
  2. 吧元素添加到優先級隊列中
  3. 查看元素是否為隊首
  4. 如果是隊首的話,設置leader為空,喚醒所有等待的隊列
  5. 釋放鎖
代碼如下:
public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

take方法

  1. 執行加鎖操作
  2. 取出優先級隊列元素q的隊首
  3. 如果元素q的隊首/隊列為空,阻塞請求
  4. 如果元素q的隊首(first)不為空,獲得這個元素的delay時間值
  5. 如果first的延遲delay時間值為0的話,說明該元素已經到了可以使用的時間,調用poll方法彈出該元素,跳出方法
  6. 如果first的延遲delay時間值不為0的話,釋放元素first的引用,避免內存泄露
  7. 判斷leader元素是否為空,不為空的話阻塞當前線程
  8. 如果leader元素為空的話,把當前線程賦值給leader元素,然后阻塞delay的時間,即等待隊首到達可以出隊的時間,在finally塊中釋放leader元素的引用
  9. 循環執行從1~8的步驟
  10. 如果leader為空並且優先級隊列不為空的情況下(判斷還有沒有其他后續節點),調用signal通知其他的線程
  11. 執行解鎖操作
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }  

get點

 整個代碼的過程中並沒有使用上太難理解的地方,但是有幾個比較難以理解他為什么這么做的地方

leader元素的使用

 大家可能看到在我們的DelayQueue中有一個Thread類型的元素leader,那么他是做什么的呢,有什么用呢?

 讓我們先看一下元素注解上的doc描述:

Thread designated to wait for the element at the head of the queue.
This variant of the Leader-Follower pattern serves to minimize unnecessary timed waiting.
when a thread becomes the leader, it waits only for the next delay to elapse, but other threads await indefinitely.
The leader thread must signal some other thread before returning from take() or poll(...), unless some other thread becomes leader in the interim.
Whenever the head of the queue is replaced with an element with an earlier expiration time, the leader field is invalidated by being reset to null, and some waiting thread, but not necessarily the current leader, is signalled.
So waiting threads must be prepared to acquire and lose leadership while waiting.

 上面主要的意思就是說用leader來減少不必要的等待時間,那么這里我們的DelayQueue是怎么利用leader來做到這一點的呢:

 這里我們想象着我們有個多個消費者線程用take方法去取,內部先加鎖,然后每個線程都去peek第一個節點.
 如果leader不為空說明已經有線程在取了,設置當前線程等待

if (leader != null) available.await(); 

 如果為空說明沒有其他線程去取這個節點,設置leader並等待delay延時到期,直到poll后結束循環

     else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } 

take方法中為什么釋放first元素

first = null; // don't retain ref while waiting 

 我們可以看到doug lea后面寫的注釋,那么這段代碼有什么用呢?

 想想假設現在延遲隊列里面有三個對象。

  • 線程A進來獲取first,然后進入 else 的else ,設置了leader為當前線程A
  • 線程B進來獲取first,進入else的阻塞操作,然后無限期等待
  • 這時在JDK 1.7下面他是持有first引用的
  • 如果線程A阻塞完畢,獲取對象成功,出隊,這個對象理應被GC回收,但是他還被線程B持有着,GC鏈可達,所以不能回收這個first.
  • 假設還有線程C 、D、E.. 持有對象1引用,那么無限期的不能回收該對象1引用了,那么就會造成內存泄露.

鏈接:
  https://www.jianshu.com/p/e0bcc9eae0ae
  https://www.jianshu.com/p/bf9f6b08ba5b
  https://blog.csdn.net/toocruel/article/details/82769595

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM