DelayedQueue的學習


參考鏈接:http://tool.oschina.net/apidocs/apidoc?api=jdk-zh

1.DelayedQueue是一個無界的阻塞隊列,其內的元素是實現了Delayed接口的元素。

Delayed,一種混合風格的接口,用來標記那些應該在給定延遲時間之后執行的對象。此接口的實現必須定義一個 compareTo 方法,該方法提供與此接口的 getDelay 方法一致的排序。

getDelay的方法返回時剩余的延遲時間。

創建一個Delayed接口的實現類,該元素是要放入到QueueDelayed隊列中的。

 1 package com.zimo.mybaties.delayed;
 2 
 3 import org.springframework.transaction.annotation.Transactional;
 4 
 5 import java.util.concurrent.Delayed;
 6 import java.util.concurrent.TimeUnit;
 7 
 8 public class MyDelayedEvent implements Delayed{
 9     //要執行的任務
10     private Task task;
11 
12     private Long endTime;
13 
14     public MyDelayedEvent(Task task, Long endTime) {
15         this.task = task;
16         this.endTime = endTime;
17     }
18 
19     //獲取剩余的時間,為0獲取負數時取出
20     //TimeUnit.NANOSECONDS 毫微妙
21     @Override
22     public long getDelay(TimeUnit unit) {
23 //        return unit.convert(endTime,TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(),TimeUnit.MILLISECONDS);
24         return unit.convert(endTime,TimeUnit.NANOSECONDS) - unit.convert(System.currentTimeMillis(),TimeUnit.NANOSECONDS);
25     }
26 
27     @Override
28     public int compareTo(Delayed o) {
29         if (this == o)
30             return 1;
31         if (o==null)
32             return -1;
33         long diff = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
34         return diff<0?-1:(diff==0?0:1);
35     }
36 
37     public Task getTask() {
38         return task;
39     }
40 
41     public void setTask(Task task) {
42         this.task = task;
43     }
44 
45     public Long getEndTime() {
46         return endTime;
47     }
48 
49     public void setEndTime(Long endTime) {
50         this.endTime = endTime;
51     }
52 }
MyDelayedEvent

其中Task是要執行的任務。創建一個Task接口,我們的任務都實現該接口,表示是一個任務調度到期后要執行的任務。

1 /**
2  * 任務
3  */
4 public interface Task {
5     //調用該方法,則會執行任務
6     void executeTask();
7 }
Task

下面是我們實現的其中一個任務

 1 package com.zimo.mybaties.delayed.delayedtest;
 2 
 3 import com.zimo.mybaties.delayed.Task;
 4 
 5 public class StudentTask implements Task{
 6 
 7     private Integer student;
 8 
 9     public StudentTask(Integer student) {
10         this.student = student;
11     }
12 
13     @Override
14     public void executeTask() {
15         System.out.println("學生任務執行"+student);
16     }
17 
18     public Integer getStudent() {
19         return student;
20     }
21 
22     public void setStudent(Integer student) {
23         this.student = student;
24     }
25 }
StudentTask

 

上面的類創建完畢后,我們需要創建一個守護線程來后台執行任務調度是否到到期的查詢,時間到了后執行。getDelay()返回剩余的時間,

1 public interface MyDelayedService {
2     //插入任務調度
3     void put(MyDelayedEvent delayed);
4     //移除任務調度
5     boolean remove(MyDelayedEvent delayed);
6     //初始化該任務調度
7     void init();
8 }
MyDelayedService

 MyDelayedService的實現類

 

package com.zimo.mybaties.delayed;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Service
public class MyDelayedServiceImp implements MyDelayedService{

    private static final Logger logger = LoggerFactory.getLogger(MyDelayedServiceImp.class);
    private static final SimpleDateFormat TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private DelayQueue<MyDelayedEvent> queue = new DelayQueue<>();
    private Executor executor = Executors.newFixedThreadPool(30);//線程池,保證同一時刻執行的任務能執行s
    private Thread damon;//守護線程

    @Override
    public void init(){
        logger.info("初始化學生守護線程");
        damon = new Thread(() -> execute()); //新建一個線程,執行execute方法
        damon.setDaemon(true);  //設置為守護線程
        damon.setName("student queue thread");  //線程名稱
        damon.start();  //啟動線程
    }

    @Override
    public void put(MyDelayedEvent delayed){
        logger.info("插入任務");
        queue.put(delayed);
    }

    @Override
    public boolean remove(MyDelayedEvent delayed){
        logger.info("移除任務");
        return queue.remove(delayed);
    }

    private void execute(){
        while (true){
            //該線程要執行的內容
            try {
                MyDelayedEvent delayed = queue.take();
                if (delayed!=null){
                    logger.info("執行任務,任務執行時當前時間是 {}",TIME_FORMAT.format(delayed.getEndTime()));
                    executor.execute(new Runnable() {
                        //將執行的任務放入線程池,同一個時刻可能有多個任務要執行
                        @Override
                        public void run() {
                            delayed.getTask().executeTask();//執行任務
                        }
                    });

                }
            }catch (InterruptedException e){
                logger.error("任務調度被中斷");
            }
        }
    }
}
MyDelayedServiceImp

 

通過測試可知,設置同一時刻的多個任務調度時,在時間到了之后,會全部進行執行。

 1 2018-08-28 18:04:19.020  INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList  : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19
 2 2018-08-28 18:04:19.020  INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList  : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19
 3 學生任務執行0
 4 學生任務執行8
 5 2018-08-28 18:04:19.021  INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList  : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19
 6 2018-08-28 18:04:19.021  INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList  : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19
 7 學生任務執行7
 8 2018-08-28 18:04:19.021  INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList  : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19
 9 學生任務執行6
10 學生任務執行5
11 2018-08-28 18:04:19.022  INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList  : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19
12 2018-08-28 18:04:19.022  INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList  : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19
13 學生任務執行4
14 2018-08-28 18:04:19.022  INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList  : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19
15 學生任務執行2
16 學生任務執行1
Test Code

 當要執行的線程數大於線程池的最大數目時,會進行等待。

 

?如何移除我們要取消的任務呢。

首先我們的任務是存放如QueueDelayed隊列中的,在本文中。他存放於類MyDelayedServiceImp中的 queue變量中。其內存放是實現了delayed接口的元素。通過查看DelayedQueue的文檔可知。

其通過remove(Object o)來移除,o是我們的元素。通過查看remove的實現可知。其內是通過equals比較兩個對象是否相等來判斷是否是同一個delayed。所以我們需要在我們的QueueDelayed中實現我們的equals方法,使其兩個對象之間的相等能夠進行判斷。

內部實現:

    /**
     * Removes a single instance of the specified element from this
     * queue, if it is present, whether or not it has expired.
     */
    public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.remove(o);//進入該方法
        } finally {
            lock.unlock();
        }
    }
public boolean remove(Object o) {
        int i = indexOf(o);
        if (i == -1)
            return false;
        else {
            removeAt(i);
            return true;
        }
    }

private int indexOf(Object o) {
        if (o != null) {
            for (int i = 0; i < size; i++)
                if (o.equals(queue[i]))  //比較
                    return i;
        }
        return -1;
    }

 

我們的實現;實現我們的Delayed元素的equals 和hashCode。

package com.zimo.mybaties.delayed;

import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class MyDelayedEvent implements Delayed{
    //要執行的任務
    private Task task;
    private String uniqueKey; //該uniqueKey的生成規則根據業務來進行確定。只需要確保uniqueKey是唯一標識的
    private Long endTime;

    public MyDelayedEvent(Task task, Long endTime) {
        this.task = task;
        this.endTime = endTime;
    }

    public MyDelayedEvent(Task task, Long endTime, Integer targetClassId) {
        this.task = task;
        this.endTime = endTime;
        setUniqueKey(targetClassId);
        System.out.println("unique key : "+getUniqueKey());
    }

    //獲取剩余的時間,為0獲取負數時取出
    //TimeUnit.NANOSECONDS 毫微妙
    @Override
    public long getDelay(TimeUnit unit) {
//        return unit.convert(endTime,TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(),TimeUnit.MILLISECONDS);
        return unit.convert(endTime,TimeUnit.NANOSECONDS) - unit.convert(System.currentTimeMillis(),TimeUnit.NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if (this == o)
            return 1;
        if (o==null)
            return -1;
        long diff = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
        return diff<0?-1:(diff==0?0:1);
    }

    @Override
    public int hashCode() {
        final int prime = 31; //hashCode就是用的31
        int result = 1;
        result = prime*result + endTime.hashCode();
        result = prime*result + ((uniqueKey==null)?0:uniqueKey.hashCode());
        //我這里因為Task也是一個對象,為了簡便,所以不用task作為hashCode生成對象。而是新增加一個uniqueKey。
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this==obj)
            return true;
        if (obj == null)
            return false;
        if (getClass()!=obj.getClass())
            return false;
        MyDelayedEvent o = (MyDelayedEvent)obj;
        //Long對象的比較,判斷值是否相同也是通過equals
        if (!getEndTime().equals(o.getEndTime()))
            return false;
        if (!getUniqueKey().equals(o.getUniqueKey()))
            return false;
        return true;
    }

    public String getUniqueKey() {
        return uniqueKey;
    }

    public void setUniqueKey(Integer targetClassId) {
        //uniqueKey生成規則
        this.uniqueKey = new StringBuffer().append(task.getClass()).append(targetClassId).append(endTime).toString();
    }

    public Task getTask() {
        return task;
    }

    public void setTask(Task task) {
        this.task = task;
    }

    public Long getEndTime() {
        return endTime;
    }

    public void setEndTime(Long endTime) {
        this.endTime = endTime;
    }
}

 后面有需要新的定時任務需求的時候,只需要新增一個實現了Task接口的實現類即可。

public class AssistantTask implements Task {

    @Override
    public void executeTask() {
        System.out.println("assistant task run ");
    }
}
MyDelayedEvent delayed_assistant = new MyDelayedEvent(new AssistantTask(),nowTime+delay*1000+1000,2);
            myDelayedService.put(delayed_assistant);

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM