ScheduledThreadPoolExecutor
提交的任務按照執行的時間排序放入到 DelayQueue 隊列中。
- DelayQueue內部封裝了一個PriorityQueue,它會根據time的先后時間排序(time小的排在前面),若time相同則根據sequenceNumber排序( sequenceNumber小的排在前面);
- DelayQueue也是一個無界隊列;
ScheduledThreadPoolExecutor 定時線程池類的類結構圖
SchedualedThreadPoolExecutor 接收SchduledFutureTask類型的任務,是線程池調度任務的最小單位,有三種提交任務的方式:
- schedule:延遲多長時間之后只執行一次;
- scheduledAtFixedRate:延遲指定時間后執行一次,之后按照固定的時長周期執行;
- scheduledWithFixedDelay:延遲指定時間后執行一次,之后按照:上一次任務執行時長 + 周期的時長 的時間去周期執行;
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
pool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("延遲執行");
}
},1, TimeUnit.SECONDS);
/**
* 這個執行周期是固定,不管任務執行多長時間,還是每過3秒中就會產生一個新的任務
*/
pool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//這個業務邏輯需要很長的時間,定時任務去統計一張數據上億的表,財務財務信息,需要30min
System.out.println("重復執行1");
}
},1,3,TimeUnit.SECONDS);
/**
* 假設12點整執行第一次任務12:00,執行一次任務需要30min,下一次任務 12:30 + 3s 開始執行
*/
pool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
//30min
try {
Thread.sleep(60000 * 30);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("" + new Date() +"重復執行2");
}
},1, 3, TimeUnit.SECONDS);
}
任務提交到線程池中,任務排序的 siftUp 方法
private void siftUp(int k, RunnableScheduledFuture<?> key) {
// 找到父節點的索引
while (k > 0) {
// 獲取父節點
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
// 如果key節點的執行時間大於父節點的執行時間,不需要再排序了
if (key.compareTo(e) >= 0)
break;
// 如果key.compareTo(e) < 0,說明key節點的執行時間小於父節點的執行時間,需要把父節點移到后面
queue[k] = e;
setIndex(e, k);
// 設置索引為k
k = parent;
}
// key設置為排序后的位置中
queue[k] = key;
setIndex(key, k);
}
循環的根據key節點與它的父節點來判斷,如果key節點的執行時間小於父節點,則將兩個節點交換,使執行時間靠前的節點排列在隊列的前面。
可以理解為一個樹形的結構,最小點堆的結構;父節點一定小於子節點;
實際上所謂的排序並不是絕對的按照順序大小去排的,只保證了隊列最前端的最小。為什么要這樣設計呢?
因為當隊列中的數據過大的時候,要保證絕對的排序消耗是比較大的,而且我們沒有必要去保證絕對排序,因為只需要保證隊列頭的數是最小的就可以了。
30min