Executor框架(四)周期/延時任務ScheduleThreadPoolExecutor


ScheduledThreadPoolExecutor 介紹

  ScheduledThreadPoolExecutor 是一個可以實現定時任務的 ThreadPoolExecutor(線程池)。比 timer 更加靈活,效率更高!

//繼承自 ThreadPoolExecutor 類,並實現了 ScheduledExecutorService 接口
public class ScheduledThreadPoolExecutorextends extends  ThreadPoolExecutor implements ScheduledExecutorService

ScheduledThreadPoolExecutor的四個構造方法如下:

	public ScheduledThreadPoolExecutor(int corePoolSize) {
		super(corePoolSize, Integer.MAX_VALUE, 0L, TimeUnit.NANOSECONDS, new DelayedWorkQueue());
	}

	public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
		super(corePoolSize, Integer.MAX_VALUE, 0L, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory);
	}

	public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
		super(corePoolSize, Integer.MAX_VALUE, 0L, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), handler);
	}

	public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
		super(arg0, Integer.MAX_VALUE, 0L, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler);
	}

  ScheduledThreadPoolExecutor 繼承於 ThreadPoolExecutor ,從其構造方法可以看出,此線程池的線程也不會空閑超時(keepAliveTime = 0),同時使用隊列是無邊界的DelayedWorkQueue;要注意是,雖然此類繼承自 ThreadPoolExecutor,但是有幾個繼承的調整方法對此類並無作用,特別是在此類中設置 maximumPoolSize 是沒有意義的,因為ScheduleThreadPoolExecutor 使用了無邊界的任務隊列,所以根本不需要創建多於 corePoolsize 數量的線程。

擴展此類的注意事項:

  此類重寫 AbstractExecutorService 的 submit 方法,以生成內部對象控制每個任務的延遲和調度。若要保留功能性,子類中任何進一步重寫的這些方法都必須調用超類版本,超類版本有效地禁用附加任務的定制。但是,此類提供 protected 訪問類型的擴展方法 decorateTask(為 Runnable 和 Callable 各提供一種版本),可定制用於通過 execute、submit、schedule、scheduleAtFixedRate 和 scheduleWithFixedDelay 進入的執行命令的具體任務類型。

//修改或替換用於執行 callable 的任務。
protected <V> RunnableScheduledFuture<V> decorateTask( Callable<V> callable, RunnableScheduledFuture<V> task) 

//修改或替換用於執行 runnable 的任務。
protected <V> RunnableScheduledFuture<V> decorateTask( Runnable runnable, RunnableScheduledFuture<V> task)

下面是 schedule()方法的源碼,可以看出提交的 Runnable 任務通過 decorateTask() 方法封裝成 RunnableScheduledFuture 對象,然后才處理這個對象。其他的submit()、execute() 等也是如此。

public ScheduledFuture<?> schedule(Runnable arg0, long arg1, TimeUnit arg3) {
		if (arg0 != null && arg3 != null) {
            //封裝Runnable對象
			RunnableScheduledFuture arg4 = this.decorateTask((Runnable) arg0,
					new ScheduledFutureTask(this, arg0, (Object) null, this.triggerTime(arg1, arg3)));
			this.delayedExecute(arg4);
			return arg4;
		} else {
			throw new NullPointerException();
		}
	}

  默認情況下,ScheduledThreadPoolExecutor 使用一個擴展 FutureTask 的任務類型。但是,可以使用下列形式的子類修改或替換該類型。

 public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {

   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }

   protected <V> RunnableScheduledFuture<V> decorateTask(
                Runnable r, RunnableScheduledFuture<V> task) {
       return new CustomTask<V>(r, task);
   }

   protected <V> RunnableScheduledFuture<V> decorateTask(
                Callable<V> c, RunnableScheduledFuture<V> task) {
       return new CustomTask<V>(c, task);
   }
   // ... add constructors, etc.
 }

ScheduleThreadPoolExecutor 主要的方法介紹

1. 零延時的 execute()、submit() 方法

   execute()、submit() 方法都被重寫了,本質上調用的還是 schedule() 方法;從下面的源碼可以看出,這兩個方法提交的任務都是延時為0的 “實時任務”;

public void execute(Runnable arg0) {
		this.schedule(arg0, 0L, TimeUnit.NANOSECONDS);
	}

	public Future<?> submit(Runnable arg0) {
		return this.schedule(arg0, 0L, TimeUnit.NANOSECONDS);
	}

2. 提交一個延時任務的 schedule() 方法

方法描述:

<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit):
創建並執行在給定延遲后啟用的 ScheduledFuture。
ScheduledFuture<?> schedule(Runnable command, long delay,imeUnit unit):
創建並執行在給定延遲后啟用的一次性操作。

3、 提交周期性的任務 scheduleAtFixedRate()scheduleWithFixedDelay()

方法描述:

ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):
initialDelay 是此周期任務的開始執行時的延時時間(即只在第一次開始執行時延時,此后周期性地執行這個任務)。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit):
指定了首次執行前的初始延時時間,隨后,在每一次執行終止和下一次執行開始之間都存在給定的延遲。

兩者的區別:

scheduleAtFixedRate: 固定的周期時間。此方法的 period 參數所指的間隔時間是 從上一周期的任務開始時間到當前周期的任務開始時間的間隔。當上一周期任務執行結束了,如果任務的執行時間大於 指定的周期時間period ,那么便可以開始此周期任務的下一周期的執行。否則,便是間隔時間還沒有達到一周期的時間,還需要繼續等待,直到周期時間到來;總的來說,可以分為以下兩種情況:

  • 任務的執行時間 > period參數:那么周期運行的時間便是 任務的執行時間。
  • 任務的執行時間 < period參數:那么周期運行的時間便是 period參數。

scheduleWithFixedDelay: 固定的間隔時間。此方法的 delay 參數所指的間隔時間是 從上一周期的任務的執行結束時間到當前周期的任務開始時間的間隔,是指定任務的固定的運行間隔,與任務的執行時間無關。

@ Example1 scheduleAtFixedRate 測試

簡單起見,下面創建了只有一個線程 ScheduledThreadPoolExecutor 對象,也只提交一個周期任務。 下面的例子中,任務的執行時間大於 period 參數。

public class Test {
	public static void main(String[] args) {
		//池中只有一個線程
		ScheduledThreadPoolExecutor schedulePool = new ScheduledThreadPoolExecutor(1);
		//作為一個周期任務提交,period 為1000ms,任務執行時間為2000ms
		schedulePool.scheduleAtFixedRate(new MyRunnable(), 50, 1000, TimeUnit.MILLISECONDS);
	}
}

class MyRunnable implements Runnable {

	int perio = 1;

	@Override
	public void run() {
       //為周期任務捕獲異常,避免異常影響下一周期的任務執行
		try {
			System.out.println("---------------第 " + perio + " 周期-------------");
			System.out.println("begin = " + System.currentTimeMillis() / 1000);//秒
			//任務執行時間
			Thread.sleep(2000);
			System.out.println("end =   " + System.currentTimeMillis() / 1000);
			perio++;
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (Exception e) { 
			e.printStackTrace();
		}

	}
}

運行結果:

---------------第 1 周期-------------
begin = 1513938114
end = 1513938116
---------------第 2 周期-------------
begin = 1513938116
end = 1513938118
---------------第 3 周期-------------
begin = 1513938118
end = 1513938120
---------------第 4 周期-------------
begin = 1513938120
end = 1513938122
---------------第 5 周期-------------
begin = 1513938122
end = 1513938124

從結果可以看出,任務的周期執行是連着的,沒有間隔時間。這是因為任務的運行時間大於周期執行時間,即當任務還沒結束時,周期時間已經到了,所以任務剛結束,就可以進行下一周期的執行。


@ Example2 scheduleWithFixedDelay 測試

  同樣也是上面的例子,將周期方法換成 scheduleWithFixedDelay( )

public class Test {
	public static void main(String[] args) {
		//池中只有一個線程
		ScheduledThreadPoolExecutor schedulePool = new ScheduledThreadPoolExecutor(1);
		//作為一個周期任務提交,delay 為1000ms
		schedulePool.scheduleAtFixedRate(new MyRunnable(), 50, 1000, TimeUnit.MILLISECONDS);
	}
}

運行結果:

---------------第 1 周期-------------
begin = 1513938832
end = 1513938834
---------------第 2 周期-------------
begin = 1513938835
end = 1513938837
---------------第 3 周期-------------
begin = 1513938838
end = 1513938840
---------------第 4 周期-------------
begin = 1513938841
end = 1513938843

上面的scheduleWithFixedDelay例子的任務是間隔一個固定的時間執行的,無論任務的執行時間是否大於周期時間。

4. 線程池關閉

兩個關閉線程池的方法,一旦線程池被關閉,就會拒絕以后提交的所有任務

void shutdown()
在以前已提交任務的執行中發起一個有序的關閉,但是不接受新任務。線程池中的周期任務、延時任務,根據下面的兩個策略來判斷是否繼續正常運行,還是停止運行。

List<Runnable> shutdownNow()
嘗試停止所有正在執行的任務、暫停等待任務的處理,並返回等待執行的任務列表。對於正在運行,嘗試通過中斷該線程來結束線程。對於尚未運行的任務,則都不再執行。

線程池關閉(shutdown())下的兩個策略的描述

  • void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value):
    在調用線程池調用了 shutdown()方法后,是否繼續執行現有延時任務(就是通過 schedule()方法提交的延時任務 )的策略;默認值為false;在以下兩種種的情況下,延時任務將會被終止:

  • void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)
    在調用線程池調用了 shutdown()方法后,是否繼續執行現有周期任務(通過 scheduleAtFixedRate、scheduleWithFixedDelay 提交的周期任務)的策略;默認值為false;在以下兩種的情況下,周期任務將會被終止:

獲取這個兩個策略的設置值:

boolean getContinueExistingPeriodicTasksAfterShutdownPolicy():
取有關在此執行程序已 shutdown 的情況下、是否繼續執行現有定期任務的策略。
boolean getExecuteExistingDelayedTasksAfterShutdownPolicy():
獲取有關在此執行程序已 shutdown 的情況下是否繼續執行現有延遲任務的策略

@ Example3 shoutdown下的周期任務測試

  還是基於上面的例子進行改造,main線程休眠10秒后,shutdown線程池。在默認的情況下(策略為false),因為間隔為1s,任務執行時間為2s,所以 shutdown 后,最多能執行4個周期;但是下面的例子,將策略的值設置為true,shutdown后,周期任務也可以正常運行下去。

public class Test {
	public static void main(String[] args) throws InterruptedException {
		//池中只有一個線程
		ScheduledThreadPoolExecutor schedulePool = new ScheduledThreadPoolExecutor(1);
		//shutdown時,周期任務的策略
		schedulePool.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
         //作為周期任務提交 
		ScheduledFuture future = schedulePool.scheduleWithFixedDelay(new MyRunnable(), 50, 1000, TimeUnit.MILLISECONDS);
		
		Thread.sleep(10*1000);
		
		schedulePool.shutdown();
	}
}

運行結果:

---------------第 1 周期-------------
begin = 1513945378
end = 1513945380
---------------第 2 周期-------------
begin = 1513945381
end = 1513945383
---------------第 3 周期-------------
begin = 1513945384
end = 1513945386
---------------第 4 周期-------------
begin = 1513945387
end = 1513945389
---------------第 5 周期-------------
begin = 1513945390
end = 1513945392
---------------第 6 周期-------------
begin = 1513945393
end = 1513945395
.......

5. 移除任務、取消任務

BlockingQueue getQueue():
返回此執行程序使用的任務隊列。此隊列中的每個元素都是一個 ScheduledFuture,包括用 execute 所提交的那些任務
boolean remove(Runnable task):
從執行程序的內部隊列中移除此任務(如果存在),從而如果尚未開始,則其不再運行。
void setRemoveOnCancelPolicy(boolean value):
此方法是在1.7引入的,是用於對調用cancel()的任務的處理策略:是否馬上移除出隊列;默認為false;
周期任務也可以通過 ScheduledFuture的 cancel()取消運行;

Executors 提供了兩個常用的ScheduledThreadPoolExecutor

  這兩個常用的ScheduledThreadPoolExecutor:SingleThreadScheduledExecutor(單線程的線程池)、ScheduledThreadPool(線程數量固定的線程池),下面是 Executors 對應的源代碼。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
		return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
	}

	public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory arg) {
		return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, arg));
	}

	public static ScheduledExecutorService newScheduledThreadPool(int arg) {
		return new ScheduledThreadPoolExecutor(arg);
	}

	public static ScheduledExecutorService newScheduledThreadPool(int arg, ThreadFactory arg0) {
		return new ScheduledThreadPoolExecutor(arg, arg0);
	}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM