1.ScheduledExecutorService
介紹
Timer
對應的是單個后台線程,ScheduledExecutorService
可以在構造函數中指定多個核心線程數,並且其最大線程數默認為Integer.MAX_VALUE
。
對於希望某段時間后執行一次的定時任務和某段時間后周期執行(周期為兩次任務開始間隔時間-可能延遲,或者下次開始距離上次任務時間),可以使用ScheduledExecutorService
來提交執行。
注意:
- 調用周期任務執行方法
scheduleAtFixedRate
如果任務有延遲,兩個任務是不能並行執行的,例如周期設置2s,任務執行時間3s,則周期實際表現為3s,不會出現上個任務執行到第2s時下一個任務開始並行執行; - 核心線程池數量一般設置成要執行周期任務的數量,如1,設置多了浪費;
ScheduledExecutorService
最多可設置核心線程池、線程工廠和拒絕策略三個參數,最大線程數為Integer.MAX_VALUE
,工作隊列使用DelayedWorkQueue
,額外線程存活時間為0s,構造器源碼如下
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
2.周期任務使用代碼示例
public static void main(String[] args) throws ExecutionException, InterruptedException {
//如果要提交兩個周期任務並且耗時特別長,則可以使用兩個線程
ScheduledExecutorService schedulePool= Executors.newScheduledThreadPool(2);
//只執行一次:有返回值
Callable cal=()->1;
int res=(int)schedulePool.schedule(cal, 1, TimeUnit.SECONDS).get();
Runnable task=()->System.out.println("dgenkui");
//只執行一次
schedulePool.schedule(task,5,TimeUnit.SECONDS);
/**
*
* 創建並執行一個周期性操作,該操作在給定的初始延遲后首先啟用,隨后在給定的時間段內啟用;
* 即執行將在initialDelay之后開始,然后是initialDelay + period,然后是initialDelay + 2 * period,依此類推。
* 1.如果任務的任何執行遇到異常,則后續執行被禁止。 否則,任務將僅通過取消或終止執行者來終止。
* 2.如果此任務的執行時間超過其周期,則后續執行可能會延遲,但不會同時執行。
*
*/
schedulePool.scheduleAtFixedRate(task,0,5, TimeUnit.SECONDS);
/**
* 同上也是周期動作。但周期是上一次任務結束和下一次任務開始的時間間隔
* 如果任務的任何執行遇到異常,則后續執行被禁止。 否則,任務將僅通過取消或終止執行者來終止
*/
schedulePool.scheduleWithFixedDelay(task,2,10,TimeUnit.SECONDS);
}
3.DelayedWorkQueue
延遲隊列
DelayedWorkQueue
基本結構
延時任務使用的是靜態內部類DelayedWorkQueue
,其任務放在數組中,存取都需要鎖:
private RunnableScheduledFuture<?>[] queue =new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
//存放
public void put(Runnable e) {
offer(e);
}
public boolean offer(Runnable e, long timeout, TimeUnit unit) {
return offer(e);
}
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {
//存入元素為null則拋異常
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
//獲取鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
//釋放鎖
lock.unlock();
}
return true;
}
//取
//不需要移除元素
public RunnableScheduledFuture<?> peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return queue[0];
} finally {
lock.unlock();
}
}
2)延時隊列和定時任務線程池的交互
- 當調用
ScheduledThreadPoolExecutor
的scheduleXXX(Runnable command,long delay,TimeUnit unit)
方法時,會向延時隊列添加了實現了RunnableScheduledFuture
接口的ScheduledFutureTask
。源碼如下:
//ScheduledThreadPoolExecutor
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
...判斷command是否為null和period是否為正...
//構造含有觸發時間和周期信息的任務對象
ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));
delayedExecute(sft);
return sft;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
super.getQueue().add(task);
}
線程執行某個周期任務的流程如下:
- 獲取任務:線程從延時隊列
DelayWorkQueue
中獲取到期的任務ScheduledFutureTask
; - 執行任務:線程執行
ScheduledFutureTask
並修改其time屬性(初始延時+周期); - 將任務放回延時隊列中;
源碼如下:
public RunnableScheduledFuture<?> take() throws InterruptedException{
//獲取鎖
final ReentrantLock lock = this.lock;//private final Condition available = lock.newCondition();
lock.lockInterruptibly();
try {
//輪詢檢查延時隊列中的任務是否到期
for (;;) {
//獲取隊頭元素
RunnableScheduledFuture<?> first = queue[0];
//如果元素為空,則放棄鎖等待,直到其他線程放進元素並喚醒此線程時,開始下一輪檢查
if (first == null)
available.await();
else {
//注意getDelay()是返回ScheduleFutureTask的time屬性和當前時間的納秒差值
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)//time-now()<=0標示到達了開始執行任務的時間,則返回任務
return finishPoll(first);
//否則在等待一段時間
else {
available.awaitNanos(delay);
}
}
}
} finally {
//如果隊列中含有任務元素,則喚醒其他等待獲取任務的線程,並放棄鎖
if (queue[0] != null)
available.signal();
lock.unlock();
}
}
//執行任務
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
//執行任務
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();//設置下次執行時間
reExecutePeriodic(outerTask);//重新返回延時隊列
}
}
//設置下次執行時間
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
}
//放回延時隊列
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
3)ScheduledFutureTask
ScheduledFutureTask
是延時隊列中的任務元素,主要變量有:
long time
:第一次執行的延時;long sequenceNumber
:任務被添加到ScheduledThreadPoolExecutor
中的序號;long period
:周期;
隊列中的任務會先比較time
,如果相同則比較sequenceNumber
,都是小的先執行。