java延遲隊列DelayQueue使用及原理


概述

  java延遲隊列提供了在指定時間才能獲取隊列元素的功能,隊列頭元素是最接近過期的元素。沒有過期元素的話,使用poll()方法會返回null值,超時判定是通過getDelay(TimeUnit.NANOSECONDS)方法的返回值小於等於0來判斷。延時隊列不能存放空元素。

   延時隊列實現了Iterator接口,但iterator()遍歷順序不保證是元素的實際存放順序。

隊列元素

  DelayQueue<E extends Delayed>的隊列元素需要實現Delayed接口,該接口類定義如下:

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

 

  由Delayed定義可以得知,隊列元素需要實現getDelay(TimeUnit unit)方法和compareTo(Delayed o)方法, getDelay定義了剩余到期時間,compareTo方法定義了元素排序規則,注意,元素的排序規則影響了元素的獲取順序,將在后面說明。

內部存儲結構  

DelayedQuene的元素存儲交由優先級隊列存放。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();//元素存放

 DelayedQuene的優先級隊列使用的排序方式是隊列元素的compareTo方法,優先級隊列存放順序是從小到大的,所以隊列元素的compareTo方法影響了隊列的出隊順序。

若compareTo方法定義不當,會造成延時高的元素在隊頭,延時低的元素無法出隊。

  獲取隊列元素

  非阻塞獲取

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
------------------------------------------------------------------------------------------------------------------------
  PriorityQueue隊列peek()方法。
public E peek() {
return (size == 0) ? null : (E) queue[0];
}
 

  由代碼我們可以看出,獲取元素時,總是判斷PriorityQueue隊列的隊首元素是否到期,若未到期,返回null,所以compareTo()的方法實現不當的話,會造成隊首元素未到期,當隊列中有到期元素卻獲取不到的情況。因此,隊列元素的compareTo方法實現需要注意。

  阻塞方式獲取

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) //沒有元素,讓出線程,等待java.lang.Thread.State#WAITING
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0) // 已到期,元素出隊
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null) 
                        available.await();// 其它線程在leader線程TIMED_WAITING期間,會進入等待狀態,這樣可以只有一個線程去等待到時喚醒,避免大量喚醒操作
else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay);// 等待剩余時間后,再嘗試獲取元素,他在等待期間,由於leader是當前線程,所以其它線程會等待。 } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }

 

 

示例:

package org.dromara.hmily.demo.springcloud.account.service;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @description: 延時隊列測試
 * @author: hh
 */
public class DelayedQueneTest {

    public static void main(String[] args) throws InterruptedException {
        Item item1 = new Item("item1", 5, TimeUnit.SECONDS);
        Item item2 = new Item("item2",10, TimeUnit.SECONDS);
        Item item3 = new Item("item3",15, TimeUnit.SECONDS);
        DelayQueue<Item> queue = new DelayQueue<>();
        queue.put(item1);
        queue.put(item2);
        queue.put(item3);
        System.out.println("begin time:" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        for (int i = 0; i < 3; i++) {
            Item take = queue.take();
            System.out.format("name:{%s}, time:{%s}\n",take.name, LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
        }
    }

}

class Item implements Delayed{
    /* 觸發時間*/
    private long time;
    String name;

    public Item(String name, long time, TimeUnit unit) {
        this.name = name;
        this.time = System.currentTimeMillis() + (time > 0? unit.toMillis(time): 0);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return time - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        Item item = (Item) o;
        long diff = this.time - item.time;
        if (diff <= 0) {// 改成>=會造成問題
            return -1;
        }else {
            return 1;
        }
    }

    @Override
    public String toString() {
        return "Item{" +
                "time=" + time +
                ", name='" + name + '\'' +
                '}';
    }
}

  運行結果:每5秒取出一個

begin time:2019-05-31T11:58:24.445
name:{item1}, time:{2019-05-31T11:58:29.262}
name:{item2}, time:{2019-05-31T11:58:34.262}
name:{item3}, time:{2019-05-31T11:58:39.262}

  修改compareTo方法 diff >= 0 后的運行結果: 在15秒之后幾乎同時取出,

begin time:2019-05-31T12:02:50.157
name:{item3}, time:{2019-05-31T12:03:04.959}
name:{item2}, time:{2019-05-31T12:03:04.999}
name:{item1}, time:{2019-05-31T12:03:05}

  

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM