背景
最近項目中有個業務,需要對用戶新增任務到期后進行業務處理。使用定時任務定時掃描過期時間,浪費資源,且不實時。只能使用延時隊列處理。
DelayQueue
第一想到的是java自帶的延時隊列delayqueue。
首先實現一個Delyed類。
實現兩個最重要方法。第一個是隊列里面的消息排序。DelayQueue底層使用的是阻塞隊列。隊列的消費端會去take隊列的頭部元素,沒有元素就阻塞在那里。因此,延遲隊列中的元素必須按執行時間順序排列。
@Override public int compareTo(Delayed delayed) { Message message = (Message) delayed; return this.exceptTime > message.getExceptTime() ? 1 : 0; }
第二個方法是剩余時間延遲時間。每加入一個元素時將延遲時間傳入,得到一個預期執行時間。每當執行此方法的時候,使用預期時間減去當前時間,即時剩余延遲時間。換句話說,還有多長時間執行。為0時立即執行。
@Override public long getDelay(TimeUnit unit) { System.out.println(exceptTime - System.nanoTime()); return unit.convert(exceptTime - System.nanoTime(), TimeUnit.SECONDS); }
全部代碼:

import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class Message implements Delayed{ private Integer id; private String content; private long delay;//延遲時間 private long exceptTime;//執行時間 public Message() {} public Message(Integer id, String content, long delay) { this.id = id; this.content = content; this.delay = delay; this.exceptTime = System.nanoTime() + delay; } @Override public int compareTo(Delayed delayed) { Message message = (Message) delayed; return this.exceptTime > message.getExceptTime() ? 1 : 0; } @Override public long getDelay(TimeUnit unit) { System.out.println(exceptTime - System.nanoTime()); return unit.convert(exceptTime - System.nanoTime(), TimeUnit.SECONDS); } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public long getDelay() { return delay; } public void setDelay(long delay) { this.delay = delay; } public long getExceptTime() { return exceptTime; } public void setExceptTime(long exceptTime) { this.exceptTime = exceptTime; } }
然后初始化一個DelayQueue,加入任務。並創建一個線程異步執行。

DelayQueue<Message> delayqueue = new DelayQueue<>(); Random random = new Random(); for (int i = 0; i < 10; i++) { Message message = new Message(i, "content" + i, random.nextInt(1000000)); delayqueue.add(message); } new Thread(new Runnable() { @Override public void run() { while (true) { Message message; try { message = delayqueue.take(); System.out.println("message = " + message.getId()); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start();
缺陷
1.畢竟是jdk級別的,不可能做過多的封裝。很多API並不是那么好直接使用。比如直接傳入一個延遲時間是並不能自動實現的,需要手動封裝。
2.DelayQueue並沒有長度限制。有內存占用的風險。
3.效率,穩定性方面,在DelayQueue本身肯定是沒有問題的,但是在項目中使用,勢必需要做一些封裝,直接上生產環境心里並沒有底。
HashedWheelTimer
netty畢竟是一個大名鼎鼎的框架,廣泛使用於業界。它有許多心跳檢測等定時任務,使用延時隊列來實現。HashedWheelTimer底層數據結構依然是使用DelayedQueue。加上一種叫做時間輪的算法來實現。
關於時間輪算法,有點類似於HashMap。在new 一個HashedWheelTimer實例的時候,可以傳入幾個參數。
第一,一個時間長度,這個時間長度跟具體任務何時執行沒有關系,但是跟執行精度有關。這個時間可以看作手表的指針循環一圈的長度。
然后第二,刻度數。這個可以看作手表的刻度。比如第一個參數為24小時,刻度數為12,那么每一個刻度表示2小時。時間精度只能到兩小時。時間長度/刻度數值越大,精度越大。
然后添加一個任務的時候,根據hash算法得到hash值並對刻度數求模得到一個下標,這個下標就是刻度的位置。
然而有一些任務的執行周期超過了第一個參數,比如超過了24小時,就會得到一個圈數round。
簡點說,添加一個任務時會根據任務得到一個hash值,並根據時間輪長度和刻度得到一個商值round和模index,比如時間長度24小時,刻度為12,延遲時間為32小時,那么round=1,index=8。時間輪從開啟之時起每24/12個時間走一個指針,即index+1,第一圈round=0。當走到第7個指針時,此時index=7,此時剛才的任務並不能執行,因為剛才的任務round=1,必須要等到下一輪index=7的時候才能執行。
如圖所示
對於Delayed兩個重要實現方法,第一排序,其實是通過hash求商和模決定放入哪個位置。這些位置本身就已經按照時間順序排序了。第二,延遲時間,已經被封裝好了,傳入一個延遲的時間就好了。
代碼實例:
得到一個延遲隊列實例
HashedWheelTimer timer = new HashedWheelTimer(24, //時間輪一圈的長度 TimeUnit.SECONDS, 12);//時間輪的度刻
創建一個任務
TimerTask task = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { System.out.println("任務執行"); } };
將任務加入延遲隊列
timer.newTimeout(task, 1000, TimeUnit.SECONDS);
總結
以上兩種方案都沒有實現持久化和分布式。持久化可以借助數據庫來達到。分布式的話還是使用消息中間件吧。RabbitMq聽說已經可以借助某些參數實現。