參考鏈接:http://tool.oschina.net/apidocs/apidoc?api=jdk-zh
1.DelayedQueue是一個無界的阻塞隊列,其內的元素是實現了Delayed接口的元素。
Delayed,一種混合風格的接口,用來標記那些應該在給定延遲時間之后執行的對象。此接口的實現必須定義一個 compareTo 方法,該方法提供與此接口的 getDelay 方法一致的排序。
getDelay的方法返回時剩余的延遲時間。
創建一個Delayed接口的實現類,該元素是要放入到QueueDelayed隊列中的。

1 package com.zimo.mybaties.delayed; 2 3 import org.springframework.transaction.annotation.Transactional; 4 5 import java.util.concurrent.Delayed; 6 import java.util.concurrent.TimeUnit; 7 8 public class MyDelayedEvent implements Delayed{ 9 //要執行的任務 10 private Task task; 11 12 private Long endTime; 13 14 public MyDelayedEvent(Task task, Long endTime) { 15 this.task = task; 16 this.endTime = endTime; 17 } 18 19 //獲取剩余的時間,為0獲取負數時取出 20 //TimeUnit.NANOSECONDS 毫微妙 21 @Override 22 public long getDelay(TimeUnit unit) { 23 // return unit.convert(endTime,TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(),TimeUnit.MILLISECONDS); 24 return unit.convert(endTime,TimeUnit.NANOSECONDS) - unit.convert(System.currentTimeMillis(),TimeUnit.NANOSECONDS); 25 } 26 27 @Override 28 public int compareTo(Delayed o) { 29 if (this == o) 30 return 1; 31 if (o==null) 32 return -1; 33 long diff = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); 34 return diff<0?-1:(diff==0?0:1); 35 } 36 37 public Task getTask() { 38 return task; 39 } 40 41 public void setTask(Task task) { 42 this.task = task; 43 } 44 45 public Long getEndTime() { 46 return endTime; 47 } 48 49 public void setEndTime(Long endTime) { 50 this.endTime = endTime; 51 } 52 }
其中Task是要執行的任務。創建一個Task接口,我們的任務都實現該接口,表示是一個任務調度到期后要執行的任務。

1 /** 2 * 任務 3 */ 4 public interface Task { 5 //調用該方法,則會執行任務 6 void executeTask(); 7 }
下面是我們實現的其中一個任務

1 package com.zimo.mybaties.delayed.delayedtest; 2 3 import com.zimo.mybaties.delayed.Task; 4 5 public class StudentTask implements Task{ 6 7 private Integer student; 8 9 public StudentTask(Integer student) { 10 this.student = student; 11 } 12 13 @Override 14 public void executeTask() { 15 System.out.println("學生任務執行"+student); 16 } 17 18 public Integer getStudent() { 19 return student; 20 } 21 22 public void setStudent(Integer student) { 23 this.student = student; 24 } 25 }
上面的類創建完畢后,我們需要創建一個守護線程來后台執行任務調度是否到到期的查詢,時間到了后執行。getDelay()返回剩余的時間,

1 public interface MyDelayedService { 2 //插入任務調度 3 void put(MyDelayedEvent delayed); 4 //移除任務調度 5 boolean remove(MyDelayedEvent delayed); 6 //初始化該任務調度 7 void init(); 8 }
MyDelayedService的實現類

package com.zimo.mybaties.delayed; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.util.concurrent.DelayQueue; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @Service public class MyDelayedServiceImp implements MyDelayedService{ private static final Logger logger = LoggerFactory.getLogger(MyDelayedServiceImp.class); private static final SimpleDateFormat TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private DelayQueue<MyDelayedEvent> queue = new DelayQueue<>(); private Executor executor = Executors.newFixedThreadPool(30);//線程池,保證同一時刻執行的任務能執行s private Thread damon;//守護線程 @Override public void init(){ logger.info("初始化學生守護線程"); damon = new Thread(() -> execute()); //新建一個線程,執行execute方法 damon.setDaemon(true); //設置為守護線程 damon.setName("student queue thread"); //線程名稱 damon.start(); //啟動線程 } @Override public void put(MyDelayedEvent delayed){ logger.info("插入任務"); queue.put(delayed); } @Override public boolean remove(MyDelayedEvent delayed){ logger.info("移除任務"); return queue.remove(delayed); } private void execute(){ while (true){ //該線程要執行的內容 try { MyDelayedEvent delayed = queue.take(); if (delayed!=null){ logger.info("執行任務,任務執行時當前時間是 {}",TIME_FORMAT.format(delayed.getEndTime())); executor.execute(new Runnable() { //將執行的任務放入線程池,同一個時刻可能有多個任務要執行 @Override public void run() { delayed.getTask().executeTask();//執行任務 } }); } }catch (InterruptedException e){ logger.error("任務調度被中斷"); } } } }
通過測試可知,設置同一時刻的多個任務調度時,在時間到了之后,會全部進行執行。

1 2018-08-28 18:04:19.020 INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19 2 2018-08-28 18:04:19.020 INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19 3 學生任務執行0 4 學生任務執行8 5 2018-08-28 18:04:19.021 INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19 6 2018-08-28 18:04:19.021 INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19 7 學生任務執行7 8 2018-08-28 18:04:19.021 INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19 9 學生任務執行6 10 學生任務執行5 11 2018-08-28 18:04:19.022 INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19 12 2018-08-28 18:04:19.022 INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19 13 學生任務執行4 14 2018-08-28 18:04:19.022 INFO 48417 --- [nt queue thread] c.z.mybaties.delayed.MyDelayedEventList : 執行任務,任務執行時當前時間是 2018-08-28 18:04:19 15 學生任務執行2 16 學生任務執行1
當要執行的線程數大於線程池的最大數目時,會進行等待。
?如何移除我們要取消的任務呢。
首先我們的任務是存放如QueueDelayed隊列中的,在本文中。他存放於類MyDelayedServiceImp中的 queue變量中。其內存放是實現了delayed接口的元素。通過查看DelayedQueue的文檔可知。
其通過remove(Object o)來移除,o是我們的元素。通過查看remove的實現可知。其內是通過equals比較兩個對象是否相等來判斷是否是同一個delayed。所以我們需要在我們的QueueDelayed中實現我們的equals方法,使其兩個對象之間的相等能夠進行判斷。
內部實現:
/** * Removes a single instance of the specified element from this * queue, if it is present, whether or not it has expired. */ public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.remove(o);//進入該方法 } finally { lock.unlock(); } }
public boolean remove(Object o) { int i = indexOf(o); if (i == -1) return false; else { removeAt(i); return true; } } private int indexOf(Object o) { if (o != null) { for (int i = 0; i < size; i++) if (o.equals(queue[i])) //比較 return i; } return -1; }
我們的實現;實現我們的Delayed元素的equals 和hashCode。
package com.zimo.mybaties.delayed; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.StringUtils; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class MyDelayedEvent implements Delayed{ //要執行的任務 private Task task; private String uniqueKey; //該uniqueKey的生成規則根據業務來進行確定。只需要確保uniqueKey是唯一標識的 private Long endTime; public MyDelayedEvent(Task task, Long endTime) { this.task = task; this.endTime = endTime; } public MyDelayedEvent(Task task, Long endTime, Integer targetClassId) { this.task = task; this.endTime = endTime; setUniqueKey(targetClassId); System.out.println("unique key : "+getUniqueKey()); } //獲取剩余的時間,為0獲取負數時取出 //TimeUnit.NANOSECONDS 毫微妙 @Override public long getDelay(TimeUnit unit) { // return unit.convert(endTime,TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(),TimeUnit.MILLISECONDS); return unit.convert(endTime,TimeUnit.NANOSECONDS) - unit.convert(System.currentTimeMillis(),TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { if (this == o) return 1; if (o==null) return -1; long diff = this.getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return diff<0?-1:(diff==0?0:1); } @Override public int hashCode() { final int prime = 31; //hashCode就是用的31 int result = 1; result = prime*result + endTime.hashCode(); result = prime*result + ((uniqueKey==null)?0:uniqueKey.hashCode()); //我這里因為Task也是一個對象,為了簡便,所以不用task作為hashCode生成對象。而是新增加一個uniqueKey。 return result; } @Override public boolean equals(Object obj) { if (this==obj) return true; if (obj == null) return false; if (getClass()!=obj.getClass()) return false; MyDelayedEvent o = (MyDelayedEvent)obj; //Long對象的比較,判斷值是否相同也是通過equals if (!getEndTime().equals(o.getEndTime())) return false; if (!getUniqueKey().equals(o.getUniqueKey())) return false; return true; } public String getUniqueKey() { return uniqueKey; } public void setUniqueKey(Integer targetClassId) { //uniqueKey生成規則 this.uniqueKey = new StringBuffer().append(task.getClass()).append(targetClassId).append(endTime).toString(); } public Task getTask() { return task; } public void setTask(Task task) { this.task = task; } public Long getEndTime() { return endTime; } public void setEndTime(Long endTime) { this.endTime = endTime; } }
后面有需要新的定時任務需求的時候,只需要新增一個實現了Task接口的實現類即可。
public class AssistantTask implements Task { @Override public void executeTask() { System.out.println("assistant task run "); } }
MyDelayedEvent delayed_assistant = new MyDelayedEvent(new AssistantTask(),nowTime+delay*1000+1000,2); myDelayedService.put(delayed_assistant);