DelayQueue基本原理
DelayQueue是一個沒有邊界BlockingQueue實現,加入其中的元素必需實現Delayed接口。當生產者線程調用put之類的方法加入元素時,會觸發Delayed接口中的compareTo方法進行排序,也就是說隊列中元素的順序是按到期時間排序的,而非它們進入隊列的順序。排在隊列頭部的元素是最早到期的,越往后到期時間赿晚。
消費者線程查看隊列頭部的元素,注意是查看不是取出。然后調用元素的getDelay方法,如果此方法返回的值小0或者等於0,則消費者線程會從隊列中取出此元素,並進行處理。如果getDelay方法返回的值大於0,則消費者線程wait返回的時間值后,再從隊列頭部取出元素,此時元素應該已經到期。
DelayQueue是Leader-Followr模式的變種,消費者線程處於等待狀態時,總是等待最先到期的元素,而不是長時間的等待。消費者線程盡量把時間花在處理任務上,最小化空等的時間,以提高線程的利用效率。
以下通過隊列及消費者線程狀態變化大致說明一下DelayQueue的運行過程。
初始狀態
因為隊列是沒有邊界的,向隊列中添加元素的線程不會阻塞,添加操作相對簡單,所以此圖不考慮向隊列添加元素的生產者線程。假設現在共有三個消費者線程。
隊列中的元素按到期時間排序,隊列頭部的元素2s以后到期。消費者線程1查看了頭部元素以后,發現還需要2s才到期,於是它進入等待狀態,2s以后醒來,等待頭部元素到期的線程稱為Leader線程。
消費者線程2與消費者線程3處於待命狀態,它們不等待隊列中的非頭部元素。當消費者線程1拿到對象5以后,會向它們發送signal。這個時候兩個中的一個會結束待命狀態而進入等待狀態。
2S以后
消費者線程1已經拿到了對象5,從等待狀態進入處理狀態,處理它取到的對象5,同時向消費者線程2與消費者線程3發送signal。
消費者線程2與消費者線程3會爭搶領導權,這里是消費者線程2進入等待狀態,成為Leader線程,等待2s以后對象4到期。而消費者線程3則繼續處於待命狀態。
此時隊列中加入了一個新元素對象6,它10s后到期,排在隊尾。
又2S以后
先看線程1,如果它已經結束了對象5的處理,則進入待命狀態。如果還沒有結束,則它繼續處理對象5。
消費線程2取到對象4以后,也進入處理狀態,同時給處於待命狀態的消費線程3發送信號,消費線程3進入等待狀態,成為新的Leader。現在頭部元素是新插入的對象7,因為它1s以后就過期,要早於其它所有元素,所以排到了隊列頭部。
又1S后
一種不好的結果:
消費線程3一定正在處理對象7。消費線程1與消費線程2還沒有處理完它們各自取得的對象,無法進入待命狀態,也更加進入不了等待狀態。此時對象3馬上要到期,那么如果它到期時沒有消費者線程空下來,則它的處理一定會延期。
可以想見,如果元素進入隊列的速度很快,元素之間的到期時間相對集中,而處理每個到期元素的速度又比較慢的話,則隊列會越來越大,隊列后邊的元素延期處理的時間會越來越長。
另外一種好的結果:
消費線程1與消費線程2很快的完成對取出對象的處理,及時返回重新等待隊列中的到期元素。一個處於等待狀態(Leader),對象3一到期就立刻處理。另一個則處於待命狀態。這樣,每一個對象都能在到期時被及時處理,不會發生明顯的延期。
所以,消費者線程的數量要夠,處理任務的速度要快。否則,隊列中的到期元素無法被及時取出並處理,造成任務延期、隊列元素堆積等情況。
示例代碼
DelayQueue的一個應用場景是定時任務調度。本例中先讓主線程向DelayQueue添加10個任務,任務之間的啟動間隔在1~2s之間,每個任務的執行時間固定為2s,代碼如下:
package com.zhangdb.thread; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; class DelayTask implements Delayed { private static long currentTime = System.currentTimeMillis(); protected final String taskName; protected final int timeCost; protected final long scheduleTime; protected static final AtomicInteger taskCount = new AtomicInteger(0); // 定時任務之間的啟動時間間隔在1~2s之間,timeCost表示處理此任務需要的時間,本示例中為2s public DelayTask(String taskName, int timeCost) { this.taskName = taskName; this.timeCost = timeCost; taskCount.incrementAndGet(); currentTime += 1000 + (long) (Math.random() * 1000); scheduleTime = currentTime; } @Override public int compareTo(Delayed o) { return (int) (this.scheduleTime - ((DelayTask) o).scheduleTime); } @Override public long getDelay(TimeUnit unit) { long expirationTime = scheduleTime - System.currentTimeMillis(); return unit.convert(expirationTime, TimeUnit.MILLISECONDS); } public void execTask() { long startTime = System.currentTimeMillis(); System.out.println("Task " + taskName + ": schedule_start_time=" + scheduleTime + ",real start time=" + startTime + ",delay=" + (startTime - scheduleTime)); try { Thread.sleep(timeCost); } catch (InterruptedException e) { e.printStackTrace(); } } } class DelayTaskComsumer extends Thread { private final BlockingQueue<DelayTask> queue; public DelayTaskComsumer(BlockingQueue<DelayTask> queue) { this.queue = queue; } @Override public void run() { DelayTask task = null; try { while (true) { task = queue.take(); task.execTask(); DelayTask.taskCount.decrementAndGet(); } } catch (InterruptedException e) { System.out.println(getName() + " finished"); } } } public class DelayQueueExample { public static void main(String[] args) { BlockingQueue<DelayTask> queue = new DelayQueue<DelayTask>(); for (int i = 0; i < 10; i++) { try { queue.put(new DelayTask("work " + i, 2000)); } catch (InterruptedException e) { e.printStackTrace(); } } ThreadGroup g = new ThreadGroup("Consumers"); for (int i = 0; i < 1; i++) { new Thread(g, new DelayTaskComsumer(queue)).start(); } while (DelayTask.taskCount.get() > 0) { try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } g.interrupt(); System.out.println("Main thread finished"); } }
首先啟動一個消費者線程。因為消費者線程處單個任務的時間為2s,而任務的調度間隔為1~2s。這種情況下,每當消費者線程處理完一個任務,回頭再從隊列中新取任務時,新任務肯定延期了,無法按給定的時間調度任務。而且越往后情況越嚴重。運行代碼看一下輸出:
Task work 0: schedule_start_time=1554203579096,real start time=1554203579100,delay=4 Task work 1: schedule_start_time=1554203580931,real start time=1554203581101,delay=170 Task work 2: schedule_start_time=1554203582884,real start time=1554203583101,delay=217 Task work 3: schedule_start_time=1554203584660,real start time=1554203585101,delay=441 Task work 4: schedule_start_time=1554203586075,real start time=1554203587101,delay=1026 Task work 5: schedule_start_time=1554203587956,real start time=1554203589102,delay=1146 Task work 6: schedule_start_time=1554203589041,real start time=1554203591102,delay=2061 Task work 7: schedule_start_time=1554203590127,real start time=1554203593102,delay=2975 Task work 8: schedule_start_time=1554203591903,real start time=1554203595102,delay=3199 Task work 9: schedule_start_time=1554203593577,real start time=1554203597102,delay=3525 Main thread finished Thread-0 finished
最后一個任務的延遲時間已經超過3.5s了。
再作一次測試,將消費者線程的個數調整為2,這時任務應該能按時啟動,延遲應該很小,運行程序看一下結果:
Task work 0: schedule_start_time=1554204395427,real start time=1554204395430,delay=3 Task work 1: schedule_start_time=1554204396849,real start time=1554204396850,delay=1 Task work 2: schedule_start_time=1554204398050,real start time=1554204398051,delay=1 Task work 3: schedule_start_time=1554204399590,real start time=1554204399590,delay=0 Task work 4: schedule_start_time=1554204401289,real start time=1554204401289,delay=0 Task work 5: schedule_start_time=1554204402883,real start time=1554204402883,delay=0 Task work 6: schedule_start_time=1554204404663,real start time=1554204404664,delay=1 Task work 7: schedule_start_time=1554204406154,real start time=1554204406154,delay=0 Task work 8: schedule_start_time=1554204407991,real start time=1554204407991,delay=0 Task work 9: schedule_start_time=1554204409540,real start time=1554204409540,delay=0 Main thread finished Thread-0 finished Thread-2 finished
基本上按時啟動,最大延遲為3毫秒,大部分都是0毫秒。
將消費者線程個數調整成3個,運行看一下結果:
Task work 0: schedule_start_time=1554204499695,real start time=1554204499698,delay=3 Task work 1: schedule_start_time=1554204501375,real start time=1554204501376,delay=1 Task work 2: schedule_start_time=1554204503370,real start time=1554204503371,delay=1 Task work 3: schedule_start_time=1554204504860,real start time=1554204504861,delay=1 Task work 4: schedule_start_time=1554204506419,real start time=1554204506420,delay=1 Task work 5: schedule_start_time=1554204508191,real start time=1554204508192,delay=1 Task work 6: schedule_start_time=1554204509495,real start time=1554204509496,delay=1 Task work 7: schedule_start_time=1554204510663,real start time=1554204510664,delay=1 Task work 8: schedule_start_time=1554204512598,real start time=1554204512598,delay=0 Task work 9: schedule_start_time=1554204514276,real start time=1554204514277,delay=1 Main thread finished Thread-0 finished Thread-2 finished Thread-4 finished
大部分延遲時間變成1毫秒,情況好像還不如2個線程的情況。
將消費者線程數調整成5,運行看一下結果:
Task work 0: schedule_start_time=1554204635015,real start time=1554204635019,delay=4 Task work 1: schedule_start_time=1554204636856,real start time=1554204636857,delay=1 Task work 2: schedule_start_time=1554204637968,real start time=1554204637970,delay=2 Task work 3: schedule_start_time=1554204639758,real start time=1554204639759,delay=1 Task work 4: schedule_start_time=1554204641089,real start time=1554204641090,delay=1 Task work 5: schedule_start_time=1554204642879,real start time=1554204642880,delay=1 Task work 6: schedule_start_time=1554204643941,real start time=1554204643942,delay=1 Task work 7: schedule_start_time=1554204645006,real start time=1554204645007,delay=1 Task work 8: schedule_start_time=1554204646309,real start time=1554204646310,delay=1 Task work 9: schedule_start_time=1554204647537,real start time=1554204647538,delay=1 Thread-2 finished Thread-0 finished Main thread finished Thread-8 finished Thread-4 finished Thread-6 finished
與3個消費者線程的情況差不多。
結論
最優的消費者線程的個數與任務啟動的時間間隔好像存在這樣的關系:單個任務處理時間的最大值 / 相鄰任務的啟動時間最小間隔 = 最優線程數,如果最優線程數是小數,則取整數后加1,比如1.3的話,那么最優線程數應該是2。
本例中,單個任務處理時間的最大值固定為2s。
相鄰任務的啟動時間最小間隔為1s。
則消費者線程數為2/1=2。
如果消費者線程數小於此值,則來不及處理到期的任務。如果大於此值,線程太多,在調度、同步上花更多的時間,無益改善性能。