https://mp.weixin.qq.com/s/yDeH0ei6Sq4zos11K0I9Rg
一、應用場景
在需求開發過程中,我們經常會遇到一些類似下面的場景:
a. 外賣訂單超過15分鍾未支付,自動取消
b. 使用搶票軟件訂到車票后,1小時內未支付,自動取消
c. 待處理申請超時1天,通知審核人員經理,超時2天通知審核人員總監
d. 客戶預定自如房子后,24小時內未支付,房源自動釋放
二、JDK 延時隊列實現
DelayQueue 是 JDK 中 java.util.concurrent 包下的一種無界阻塞隊列,底層是優先隊列 PriorityQueue。對於放到隊列中的任務,可以按照到期時間進行排序,只需要取已經到期的元素處理即可。
具體的步驟是,要放入隊列的元素需要實現 Delayed 接口並實現 getDelay 方法來計算到期時間,compare 方法來對比到期時間以進行排序。一個簡單的使用例子如下:
package com.lyqiang.delay.jdk;
import java.time.LocalDateTime;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author lyqiang
*/
public class TestDelayQueue {
public static void main(String[] args) throws InterruptedException {
// 新建3個任務,並依次設置超時時間為 20s 10s 30s
DelayTask d1 = new DelayTask(1, System.currentTimeMillis() + 20000L);
DelayTask d2 = new DelayTask(2, System.currentTimeMillis() + 10000L);
DelayTask d3 = new DelayTask(3, System.currentTimeMillis() + 30000L);
DelayQueue<DelayTask> queue = new DelayQueue<>();
queue.add(d1);
queue.add(d2);
queue.add(d3);
int size = queue.size();
System.out.println("當前時間是:" + LocalDateTime.now());
// 從延時隊列中獲取元素, 將輸出 d2 、d1 、d3
for (int i = 0; i < size; i++) {
System.out.println(queue.take() + " ------ " + LocalDateTime.now());
}
}
}
class DelayTask implements Delayed {
private Integer taskId;
private long exeTime;
DelayTask(Integer taskId, long exeTime) {
this.taskId = taskId;
this.exeTime = exeTime;
}
@Override
public long getDelay(TimeUnit unit) {
return exeTime - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
DelayTask t = (DelayTask) o;
if (this.exeTime - t.exeTime <= 0) {
return -1;
} else {
return 1;
}
}
@Override
public String toString() {
return "DelayTask{" +
"taskId=" + taskId +
", exeTime=" + exeTime +
'}';
}
}
代碼的執行結果如下:
使用 DelayQueue, 只需要有一個線程不斷從隊列中獲取數據即可,它的優點是不用引入第三方依賴,實現也很簡單,缺點也很明顯,它是內存存儲,對分布式支持不友好,如果發生單點故障,可能會造成數據丟失,無界隊列還存在 OOM 的風險。
三、時間輪算法實現
1996 年 George Varghese 和 Tony Lauck 的論文《Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facility》中提出了一種時間輪管理 Timeout 事件的方式。其設計非常巧妙,並且類似時鍾的運行,如下圖的原始時間輪有 8 個格子,假定指針經過每個格子花費時間是 1 個時間單位,當前指針指向 0,一個 17 個時間單位后超時的任務則需要運轉 2 圈再通過一個格子后被執行,放在相同格子的任務會形成一個鏈表。
Netty 包里提供了一種時間輪的實現——HashedWheelTimer,其底層使用了數組+鏈表的數據結構,使用方式如下:
package com.lyqiang.delay.wheeltimer;
import io.netty.util.HashedWheelTimer;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
/**
* @author lyqiang
*/
public class WheelTimerTest {
public static void main(String[] args) {
//設置每個格子是 100ms, 總共 256 個格子
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 256);
//加入三個任務,依次設置超時時間是 10s 5s 20s
System.out.println("加入一個任務,ID = 1, time= " + LocalDateTime.now());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println("執行一個任務,ID = 1, time= " + LocalDateTime.now());
}, 10, TimeUnit.SECONDS);
System.out.println("加入一個任務,ID = 2, time= " + LocalDateTime.now());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println("執行一個任務,ID = 2, time= " + LocalDateTime.now());
}, 5, TimeUnit.SECONDS);
System.out.println("加入一個任務,ID = 3, time= " + LocalDateTime.now());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println("執行一個任務,ID = 3, time= " + LocalDateTime.now());
}, 20, TimeUnit.SECONDS);
System.out.println("等待任務執行===========");
}
}
代碼執行結果如下:
四、Redis ZSet 實現
Redis 里有 5 種數據結構,最常用的是 String 和 Hash,而 ZSet 是一種支持按 score 排序的數據結構,每個元素都會關聯一個 double 類型的分數,Redis 通過分數來為集合中的成員進行從小到大的排序,借助這個特性我們可以把超時時間作為 score 來將任務進行排序。
使用 zadd key score member
命令向 redis 中放入任務,超時時間作為 score, 任務 ID 作為 member, 使用 zrange key start stop withscores
命令從 redis 中讀取任務,使用 zrem key member
命令從 redis 中刪除任務。代碼如下:
package com.lyqiang.delay.redis;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author lyqiang
*/
public class TestRedisDelay {
public static void main(String[] args) {
TaskProducer taskProducer = new TaskProducer();
//創建 3個任務,並設置超時間為 10s 5s 20s
taskProducer.produce(1, System.currentTimeMillis() + 10000);
taskProducer.produce(2, System.currentTimeMillis() + 5000);
taskProducer.produce(3, System.currentTimeMillis() + 20000);
System.out.println("等待任務執行===========");
//消費端從redis中消費任務
TaskConsumer taskConsumer = new TaskConsumer();
taskConsumer.consumer();
}
}
class TaskProducer {
public void produce(Integer taskId, long exeTime) {
System.out.println("加入任務, taskId: " + taskId + ", exeTime: " + exeTime + ", 當前時間:" + LocalDateTime.now());
RedisOps.getJedis().zadd(RedisOps.key, exeTime, String.valueOf(taskId));
}
}
class TaskConsumer {
public void consumer() {
Executors.newSingleThreadExecutor().submit(new Runnable() {
@Override
public void run() {
while (true) {
Set<String> taskIdSet = RedisOps.getJedis().zrangeByScore(RedisOps.key, 0, System.currentTimeMillis(), 0, 1);
if (taskIdSet == null || taskIdSet.isEmpty()) {
//System.out.println("沒有任務");
} else {
taskIdSet.forEach(id -> {
long result = RedisOps.getJedis().zrem(RedisOps.key, id);
if (result == 1L) {
System.out.println("從延時隊列中獲取到任務,taskId:" + id + " , 當前時間:" + LocalDateTime.now());
}
});
}
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
}
執行結果如下:
相比前兩種實現方式,使用 Redis 可以將數據持久化到磁盤,規避了數據丟失的風險,並且支持分布式,避免了單點故障。
五、MQ 延時隊列實現
以 RabbitMQ 為例,它本身並沒有直接支持延時隊列的功能,但是通過一些特性,我們可以達到實現延時隊列的效果。
RabbitMQ 可以為 Queue 設置 TTL,,到了過期時間沒有被消費的消息將變為死信——Dead Letter。我們還可以為Queue 設置死信轉發 x-dead-letter-exchange,過期的消息可以被路由到另一個 Exchange。下圖說明了這個流程,生產者通過不同的 RoutingKey 發送不同過期時間的消息,多個隊列分別消費並產生死信后被路由到 exe-dead-exchange,再有一些隊列綁定到這個 exchange,從而進行不同業務邏輯的消費。
使用 MQ 實現的方式,支持分布式,並且消息支持持久化,在業內應用比較多,它的缺點是每種間隔時間的場景需要分別建立隊列。
六、總結
通過上面不同實現方式的比較,可以很明顯的看出各個方案的優缺點,在分布式系統中我們會優先考慮使用 Redis 和 MQ 的實現方式。
在需求開發中實現一個功能的方式多種多樣,需要我們進行多維度的比較,才能選擇出合理的、可靠的、高效的並且適合自己業務的解決方案。
https://mp.weixin.qq.com/s/VqXjGvfMunE4DWGXOOXVWg
引言
很多時候,業務系統有延時處理任務的需求,當任務量很大時,可能需要維護大量的定時器,或者進行低效的掃描。例如:電商下單成功后60s之后給用戶發送短信通知;電商下單后30分鍾未支付,自動取消訂單;出行乘客叫單后30秒沒有司機接單,重新給周邊司機推單等。實現這類需求有一些常見方案。
在討論方案前我們需要搞清楚,延時任務與定時任務究竟有啥區別?定時任務有明確的執行時間或周期性,比如定時充電,要選開始充電的具體時間點,再比如每10分鍾做一次未支付訂單的狀態檢查。而延時任務沒有這些特性,它具有不確定性,是在某個事件觸發后一段時間內執行。
下面以“出行乘客叫單后30秒內沒有司機接單,重新給周邊司機推單,直到司機接單”為例,講解每種方案的實現。
方案分析
一、輪詢掃描
啟動一個Timer,30秒間隔輪詢掃描訂單表,檢查每個未接訂單創建時間是否超過30秒的整數倍,如果超過,重新給周邊司機推送訂單。
優點:簡單易行。
缺點:如果訂單量過大,延遲會比較高。
適用范圍:這種方案一般適用於延時任務量比較少,對於延時精確度要求不高的任務。
二、多Timer觸發
為每個訂單創建一個Timer,並且設置為30秒的觸發時間間隔,事件觸發后檢查訂單狀態,如果是初始狀態,則繼續執行,否則停止並釋放Timer。
優點:簡單易行,不需要輪詢,精確度較高。
缺點:但每個訂單要啟動一個Timer,比較耗資源。
適用范圍:同樣適用於延時任務量比較少的系統。
三、RabbitMQ死信隊列
死信:Dead Letter,是指被拒絕或TTL過期或隊列已達到最大長度限制,無法再入隊的消息。利用DLX,當消息在一個隊列中變成死信之后,它能被重新publish到另一個Exchange,這個Exchange就是DLX。
(死信隊列生產消費模型)
利用死信隊列可以實現延時任務,每個訂單創建一個消息,消息的TTL被設置為30秒。當消息過期后,通過交換機轉發給業務消費隊列,消費處理程序訂閱業務消費隊列,有消息則進行處理,檢查訂單狀態,如果是初始狀態,則重新對該訂單創建一個消息。
優點:不需要輪詢,精確度高。
缺點:引入消息組件,系統復雜度提高。
適用范圍:適用於有大量延時任務需求的系統。
四、環形隊列
環形隊列本質上就是一個數組,首尾相接,形成一個環。數組中的每個索引位稱為槽(Slot),每個槽中放一個集合,用於存放需要處理的任務。啟動一個Timer,從Slot=0處開始,每秒鍾向前移動一次,拿到當前Slot中任務數據進行處理,直到數組的最后一個Slot,再從Slot=0開始,循環往復。
(環形隊列任務處理流程)
實際的業務場景中還要考慮任務不丟失,故障恢復等問題,所以增加了持久化任務隊列和移除任務隊列,將新加入槽中的任務持久化到Redis,將處理完的任務清除出Redis,進程故障恢復后初始化時將保存在Redis中的任務還原到環形隊列的相應槽位中。
實際的業務場景中還要考慮任務不丟失,故障恢復等問題,所以增加了持久化任務隊列和移除任務隊列,將新加入槽中的任務持久化到Redis,將處理完的任務清除出Redis,進程故障恢復后初始化時將保存在Redis中的任務還原到環形隊列的相應槽位中。
(推單任務處理模型)
總結
本文主要講解了實現延時任務的不同方案,各自有不同的優缺點及適用范圍,大家有延時任務的需求時可參考,希望給大家帶來幫助。
一口氣說出 6種 延時隊列的實現方案,面試穩穩的_博客_雲社區_開發者中心-華為雲 https://bbs.huaweicloud.com/blogs/174555
下邊會介紹多種實現延時隊列的思路,哪種方式都沒有絕對的好與壞,只是看把它用在什么業務場景中,技術這東西沒有最好的只有最合適的。
一、延時隊列的應用
什么是延時隊列?顧名思義:首先它要具有隊列的特性,再給它附加一個延遲消費隊列消息的功能,也就是說可以指定隊列中的消息在哪個時間點被消費。
延時隊列在項目中的應用還是比較多的,尤其像電商類平台:
1、訂單成功后,在30分鍾內沒有支付,自動取消訂單
2、外賣平台發送訂餐通知,下單成功后60s給用戶推送短信。
3、如果訂單一直處於某一個未完結狀態時,及時處理關單,並退還庫存
4、淘寶新建商戶一個月內還沒上傳商品信息,將凍結商鋪等
。。。。
上邊的這些場景都可以應用延時隊列解決。
二、延時隊列的實現
我個人一直秉承的觀點:工作上能用JDK
自帶API
實現的功能,就不要輕易自己重復造輪子,或者引入三方中間件。一方面自己封裝很容易出問題(大佬除外),再加上調試驗證產生許多不必要的工作量;另一方面一旦接入三方的中間件就會讓系統復雜度成倍的增加,維護成本也大大的增加。
1、DelayQueue 延時隊列
JDK
中提供了一組實現延遲隊列的API
,位於Java.util.concurrent
包下DelayQueue
。
DelayQueue
是一個BlockingQueue
(***阻塞)隊列,它本質就是封裝了一個PriorityQueue
(優先隊列),PriorityQueue
內部使用完全二叉堆
(不知道的自行了解哈)來實現隊列元素排序,我們在向DelayQueue
隊列中添加元素時,會給元素一個Delay
(延遲時間)作為排序條件,隊列中最小的元素會優先放在隊首。隊列中的元素只有到了Delay
時間才允許從隊列中取出。隊列中可以放基本數據類型或自定義實體類,在存放基本數據類型時,優先隊列中元素默認升序排列,自定義實體類就需要我們根據類屬性值比較計算了。
先簡單實現一下看看效果,添加三個order
入隊DelayQueue
,分別設置訂單在當前時間的5秒
、10秒
、15秒
后取消。
要實現DelayQueue
延時隊列,隊中元素要implements
Delayed
接口,這哥接口里只有一個getDelay
方法,用於設置延期時間。Order
類中compareTo
方法負責對隊列中的元素進行排序。
public class Order implements Delayed {
/**
* 延遲時間
*/
@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private long time;
String name;
public Order(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Order Order = (Order) o;
long diff = this.time - Order.time;
if (diff <= 0) {
return -1;
} else {
return 1;
}
}
}
DelayQueue
的put
方法是線程安全的,因為put
方法內部使用了ReentrantLock
鎖進行線程同步。DelayQueue
還提供了兩種出隊的方法 poll()
和 take()
, poll()
為非阻塞獲取,沒有到期的元素直接返回null;take()
阻塞方式獲取,沒有到期的元素線程將會等待。
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
DelayQueue delayQueue = new DelayQueue<>();
delayQueue.put(Order1);
delayQueue.put(Order2);
delayQueue.put(Order3);
System.out.println("訂單延遲隊列開始時間:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
while (delayQueue.size() != 0) {
/**
* 取隊列頭部元素是否過期
*/
Order task = delayQueue.poll();
if (task != null) {
System.out.format("訂單:{%s}被取消, 取消時間:{%s}\n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
Thread.sleep(1000);
}
}
}
上邊只是簡單的實現入隊與出隊的操作,實際開發中會有專門的線程,負責消息的入隊與消費。
執行后看到結果如下,Order1
、Order2
、Order3
分別在 5秒
、10秒
、15秒
后被執行,至此就用DelayQueue
實現了延時隊列。
訂單延遲隊列開始時間:2020-05-06 14:59:09
訂單:{Order1}被取消, 取消時間:{2020-05-06 14:59:14}
訂單:{Order2}被取消, 取消時間:{2020-05-06 14:59:19}
訂單:{Order3}被取消, 取消時間:{2020-05-06 14:59:24}
2、Quartz 定時任務
Quartz
一款非常經典任務調度框架,在Redis
、RabbitMQ
還未廣泛應用時,超時未支付取消訂單功能都是由定時任務實現的。定時任務它有一定的周期性,可能很多單子已經超時,但還沒到達觸發執行的時間點,那么就會造成訂單處理的不夠及時。
引入quartz
框架依賴包
org.springframework.boot
spring-boot-starter-quartz
復制代碼
在啟動類中使用@EnableScheduling注解開啟定時任務功能。
@EnableScheduling
@SpringBootApplication
public class DelayqueueApplication {
public static void main(String[] args) {
SpringApplication.run(DelayqueueApplication.class, args);
}
}
編寫一個定時任務,每個5秒執行一次。
@Component
public class QuartzDemo {
//每隔五秒
@Scheduled(cron = "0/5 * * * * ? ")
public void process(){
System.out.println("我是定時任務!");
}
}
3、Redis sorted set
Redis
的數據結構Zset
,同樣可以實現延遲隊列的效果,主要利用它的score
屬性,redis
通過score
來為集合中的成員進行從小到大的排
通過zadd
命令向隊列delayqueue
中添加元素,並設置score
值表示元素過期的時間;向delayqueue
添加三個order1
、order2
、order3
,分別是10秒
、20秒
、30秒
后過期。
zadd delayqueue 3 order3
消費端輪詢隊列delayqueue
, 將元素排序后取最小時間與當前時間比對,如小於當前時間代表已經過期移除key
。
/**
* 消費消息
*/
public void pollOrderQueue() {
while (true) {
Set set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);
String value = ((Tuple) set.toArray()[0]).getElement();
int score = (int) ((Tuple) set.toArray()[0]).getScore();
Calendar cal = Calendar.getInstance();
int nowSecond = (int) (cal.getTimeInMillis() / 1000);
if (nowSecond >= score) {
jedis.zrem(DELAY_QUEUE, value);
System.out.println(sdf.format(new Date()) + " removed key:" + value);
}
if (jedis.zcard(DELAY_QUEUE) <= 0) {
System.out.println(sdf.format(new Date()) + " zset empty ");
return;
}
Thread.sleep(1000);
}
}
我們看到執行結果符合預期
2020-05-07 13:24:09 add finished.
2020-05-07 13:24:19 removed key:order1
2020-05-07 13:24:29 removed key:order2
2020-05-07 13:24:39 removed key:order3
2020-05-07 13:24:39 zset empty
4、Redis 過期回調
Redis
的key
過期回調事件,也能達到延遲隊列的效果,簡單來說我們開啟監聽key是否過期的事件,一旦key過期會觸發一個callback事件。
修改redis.conf
文件開啟notify-keyspace-events Ex
notify-keyspace-events Ex
Redis
監聽配置,注入Bean RedisMessageListenerContainer
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
編寫Redis過期回調監聽方法,必須繼承KeyExpirationEventMessageListener
,有點類似於MQ的消息監聽。
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
System.out.println("監聽到key:" + expiredKey + "已過期");
}
}
到這代碼就編寫完成,非常的簡單,接下來測試一下效果,在redis-cli
客戶端添加一個key
並給定3s
的過期時間。
set xiaofu 123 ex 3
在控制台成功監聽到了這個過期的key
。
監聽到過期的key為:xiaofu
5、RabbitMQ 延時隊列
利用 RabbitMQ
做延時隊列是比較常見的一種方式,而實際上RabbitMQ
自身並沒有直接支持提供延遲隊列功能,而是通過 RabbitMQ
消息隊列的 TTL
和 DXL
這兩個屬性間接實現的。
先來認識一下 TTL
和 DXL
兩個概念:
Time To Live
(TTL
) :
TTL
顧名思義:指的是消息的存活時間,RabbitMQ
可以通過x-message-tt
參數來設置指定Queue
(隊列)和 Message
(消息)上消息的存活時間,它的值是一個非負整數,單位為微秒。
RabbitMQ
可以從兩種維度設置消息過期時間,分別是隊列
和消息本身
- 設置隊列過期時間,那么隊列中所有消息都具有相同的過期時間。
- 設置消息過期時間,對隊列中的某一條消息設置過期時間,每條消息
TTL
都可以不同。
如果同時設置隊列和隊列中消息的TTL
,則TTL
值以兩者中較小的值為准。而隊列中的消息存在隊列中的時間,一旦超過TTL
過期時間則成為Dead Letter
(死信)。
Dead Letter Exchanges
(DLX
)
DLX
即死信交換機,綁定在死信交換機上的即死信隊列。RabbitMQ
的 Queue
(隊列)可以配置兩個參數x-dead-letter-exchange
和 x-dead-letter-routing-key
(可選),一旦隊列內出現了Dead Letter
(死信),則按照這兩個參數可以將消息重新路由到另一個Exchange
(交換機),讓消息重新被消費。
x-dead-letter-exchange
:隊列中出現Dead Letter
后將Dead Letter
重新路由轉發到指定 exchange
(交換機)。
x-dead-letter-routing-key
:指定routing-key
發送,一般為要指定轉發的隊列。
隊列出現Dead Letter
的情況有:
- 消息或者隊列的
TTL
過期 - 隊列達到最大長度
- 消息被消費端拒絕(basic.reject or basic.nack)
下邊結合一張圖看看如何實現超30分鍾未支付關單功能,我們將訂單消息A0001發送到延遲隊列order.delay.queue
,並設置x-message-tt
消息存活時間為30分鍾,當到達30分鍾后訂單消息A0001成為了Dead Letter
(死信),延遲隊列檢測到有死信,通過配置x-dead-letter-exchange
,將死信重新轉發到能正常消費的關單隊列,直接監聽關單隊列處理關單邏輯即可。
發送消息時指定消息延遲的時間
public void send(String delayTimes) {
amqpTemplate.convertAndSend("order.pay.exchange", "order.pay.queue","大家好我是延遲數據", message -> {
// 設置延遲毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
return message;
});
}
}
設置延遲隊列出現死信后的轉發規則
/**
* 延時隊列
*/
@Bean(name = "order.delay.queue")
public Queue getMessageQueue() {
return QueueBuilder
.durable(RabbitConstant.DEAD_LETTER_QUEUE)
// 配置到期后轉發的交換
.withArgument("x-dead-letter-exchange", "order.close.exchange")
// 配置到期后轉發的路由鍵
.withArgument("x-dead-letter-routing-key", "order.close.queue")
.build();
}
6、時間輪
前邊幾種延時隊列的實現方法相對簡單,比較容易理解,時間輪算法就稍微有點抽象了。kafka
、netty
都有基於時間輪算法實現延時隊列,下邊主要實踐Netty
的延時隊列講一下時間輪是什么原理。
先來看一張時間輪的原理圖,解讀一下時間輪的幾個基本概念
wheel
:時間輪,圖中的圓盤可以看作是鍾表的刻度。比如一圈round
長度為24秒
,刻度數為 8
,那么每一個刻度表示 3秒
。那么時間精度就是 3秒
。時間長度 / 刻度數值越大,精度越大。
當添加一個定時、延時任務A
,假如會延遲25秒
后才會執行,可時間輪一圈round
的長度才24秒
,那么此時會根據時間輪長度和刻度得到一個圈數 round
和對應的指針位置 index
,也是就任務A
會繞一圈指向0格子
上,此時時間輪會記錄該任務的round
和 index
信息。當round=0,index=0 ,指針指向0格子
任務A
並不會執行,因為 round=0不滿足要求。
所以每一個格子代表的是一些時間,比如1秒
和25秒
都會指向0格子上,而任務則放在每個格子對應的鏈表中,這點和HashMap
的數據有些類似。
Netty
構建延時隊列主要用HashedWheelTimer
,HashedWheelTimer
底層數據結構依然是使用DelayedQueue
,只是采用時間輪的算法來實現。
下面我們用Netty
簡單實現延時隊列,HashedWheelTimer
構造函數比較多,解釋一下各參數的含義。
ThreadFactory
:表示用於生成工作線程,一般采用線程池;tickDuration
和unit
:每格的時間間隔,默認100ms;ticksPerWheel
:一圈下來有幾格,默認512,而如果傳入數值的不是2的N次方,則會調整為大於等於該參數的一個2的N次方數值,有利於優化hash
值的計算。
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}
TimerTask
:一個定時任務的實現接口,其中run方法包裝了定時任務的邏輯。Timeout
:一個定時任務提交到Timer
之后返回的句柄,通過這個句柄外部可以取消這個定時任務,並對定時任務的狀態進行一些基本的判斷。Timer
:是HashedWheelTimer
實現的父接口,僅定義了如何提交定時任務和如何停止整個定時機制。
public class NettyDelayQueue {
public static void main(String[] args) {
final Timer timer = new HashedWheelTimer(Executors.defaultThreadFactory(), 5, TimeUnit.SECONDS, 2);
//定時任務
TimerTask task1 = new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order1 5s 后執行 ");
timer.newTimeout(this, 5, TimeUnit.SECONDS);//結束時候再次注冊
}
};
timer.newTimeout(task1, 5, TimeUnit.SECONDS);
TimerTask task2 = new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order2 10s 后執行");
timer.newTimeout(this, 10, TimeUnit.SECONDS);//結束時候再注冊
}
};
timer.newTimeout(task2, 10, TimeUnit.SECONDS);
//延遲任務
timer.newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
System.out.println("order3 15s 后執行一次");
}
}, 15, TimeUnit.SECONDS);
}
}
從執行的結果看,order3
、order3
延時任務只執行了一次,而order2
、order1
為定時任務,按照不同的周期重復執行。
order1 5s 后執行
order2 10s 后執行
order3 15s 后執行一次
order1 5s 后執行
order2 10s 后執行
原文鏈接:https://blog.51cto.com/14570694/2502712