並發隊列之DelayQueue


  已經說了四個並發隊列了,DelayQueue這是最后一個,這是一個無界阻塞延遲隊列,底層基於前面說過的PriorityBlockingQueue實現的 ,隊列中每個元素都有過期時間,當從隊列獲取元素時,只有過期元素才會出隊列,而隊列頭部的元素是過期最快的元素;

 

一.簡單使用

  可以看到我們可以自己設置超時時間和優先級隊列中的比較規則,這樣我們在隊列中取的時候,按照最快超時的先出隊;

package com.example.demo.study;

import java.util.Random;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

import lombok.Data;

public class Study0210 {

    @Data
    static class MyDelayed implements Delayed {
        private long delayTime;//該任務需要再隊列中的延遲的時候
        private long expire;//這個時間表示當前時間和延遲時間相加,這里就叫做到期時間
        private String taskName;//任務的名稱

        public MyDelayed(long delayTime, String taskName) {
            this.delayTime = delayTime;
            this.taskName = taskName;
            this.expire = System.currentTimeMillis()+delayTime;
        }
        
        //指定優先級隊列里面的比較規則,就跟上篇博客中說的優先級隊列中說的比較器一樣
        @Override
        public int compareTo(Delayed o) {
            return (int)(this.getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS));
        }

        //這個方法表示該任務在隊列中還有多少剩余時間,也就是expire-當前時間
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expire-System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //創建延遲隊列
        DelayQueue<MyDelayed> queue = new DelayQueue<MyDelayed>();
        
        //創建任務丟到隊列中
        Random random = new Random();
        for (int i = 1; i < 11; i++) {
            MyDelayed myDelayed = new MyDelayed(random.nextInt(500),"task"+i);
            queue.add(myDelayed);
        }
        
        //獲取隊列中的任務,這里只會跟超時時間最小的有關,和入隊順序無關
        MyDelayed myDelayed = queue.take();
        while(myDelayed!=null) {
            System.out.println(myDelayed.toString());
            myDelayed = queue.take();
        }
    }
}

 

 

 

二.基本組成 

//由此可是這個隊列中存放的任務必須是Delayed類型的
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
    //獨占鎖
    private final transient ReentrantLock lock = new ReentrantLock();
    //優先級隊列
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    //leader線程,實際上每次進行入隊和出隊操作的只能是leader線程,其余的都叫做fallower線程,這里會用到一個leader-follower模式
    private Thread leader = null;
    //條件變量
    private final Condition available = lock.newCondition();

    //省略很多代碼
}

 

  

  具體的繼承關系可以看看下面這個圖,實際操作的都是內部的PriorityQueue;

 

 

三.offer方法

  上面代碼中我們雖然說調用的是add方法,其實就是調用的是offer方法;

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    //獲取鎖
    lock.lock();
    try {
        //往優先級隊列中添加一個元素
        q.offer(e);
        //注意,peek方法只是獲取優先級隊列中第一個元素,並不會刪除
        //如果優先級隊列中取的元素就是和當前添加的元素一樣,說明當前元素就是達到過期要求的,於是設置leader線程為null
        //然后通知條件隊列中的線程優先級隊列中已經有元素了,可以過來取了
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        //釋放鎖
        lock.unlock();
    }
}

 

 

四.take方法

  獲取並移除隊列中達到超時時間要求的元素,如果隊列中沒有元素,就把當前線程丟到條件隊列中阻塞;

  從下面的代碼邏輯中我們可以知道:線程分為兩種,一種是leader線程,一種是follower線程,其中leader線程只會阻塞一定的時間,follower線程會在條件隊列阻塞無限長的時間;當leader線程執行完take操作之后,就會重置leader線程為null,然后從條件隊列中拿一個出來設置為leader線程

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    //獲取鎖,可中斷
    lock.lockInterruptibly();
    try {
        for (;;) {
            //這里先是嘗試從優先級隊列中獲取一下節點,獲取不到的話,說明當前優先級隊列為空,就阻塞當前線程
            E first = q.peek();
            if (first == null)
                available.await();
            else {
                //如果優先級隊列中有元素,那么肯定能走到這里來,然后取到該元素的超時時間,如果小於0,說明已經達到要求了,可以獲取並刪除隊列中的元素
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                first = null; // don't retain ref while waiting
                //如果leader隊列不為空,說明有其他線程正在執行take,於是就把當前線程放到條件隊列中
                if (leader != null)
                    available.await();
                //到這里,說明優先級隊列中沒有元素到超時時間,而且此時沒有其他線程調用take方法,於是就把leader線程設置為當前線程,
                //然后當前leader線程就會等待一定的時間,等優先級隊列中最快超時的元素;
                //在等待的時候,leader線程會釋放鎖,這時其他線程B可以調用offer方法添加元素,線程C也可以調用take方法,然后線程C就會在
                //上面這里阻塞無限長的時間,直到被喚醒
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        //當前線程阻塞一定時間之后,不管成功了沒有,都會把leader線程重置為null,然后重新循環
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    //這里的意思就是當前線程移除元素成功之后,喚醒條件隊列中的線程去繼續從隊列中獲取元素
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        //釋放鎖
        lock.unlock();
    }
}

 

 

五.poll操作

  獲取並移除隊頭過期元素,如果隊列為空,或者對頭元素沒有過超時時間就返回null

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //嘗試獲取隊頭元素,如果隊頭元素為空或者該延遲過期時間還沒到,就返回null
        E first = q.peek();
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
        //否則就獲取並移除隊頭元素
            return q.poll();
    } finally {
        lock.unlock();
    }
}

 

 

六.總結

  這個隊列其實很容易,主要的是有一個延遲時間,我們從優先級隊列中獲取的根節點首先會判斷有沒有過超時時間,有的話就移除並返回就好了,沒有的話,就看看還剩下多少時間才會超時(由於是優先級隊列,所以根節點一般就是最快超時時間的,當然,也可以修改優先級隊列的比較規則),於是當前線程就會等這個節點超時,此時leader等於當前線程,在等待的過程中,會釋放鎖,所以其他線程可以往隊列中添加元素,也可以獲取元素(但是由於此時leader!=null,這些線程會阻塞無限長時間直到被喚醒);

  在leader線程超時時間到了之后自動喚醒,再進行一次循環,就會獲取並移除根節點了,最后再重置leader節點為null,順便喚醒條件隊列中的節點;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM