java多線程之ScheduleThreadPoolExecutor


ScheduledThreadPoolExecutor 介紹

  ScheduledThreadPoolExecutor 是一個可以實現定時任務的 ThreadPoolExecutor(線程池)。比 timer 更加靈活,效率更高!

  ScheduledThreadPoolExecutor結果如下圖所示。

我們,ThreadPoolExecutor的execute和submit方法繼承於AbstractExecutorService。而ScheduleExecutorService是一個接口,里面並沒有execute和submit方法,ScheduleThreadPoolExecutor里面重寫了execute和submit方法。

ScheduledThreadPoolExecutor的四個構造方法如下:

/**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

    /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given initial parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if {@code threadFactory} is null
     */
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

    /**
     * Creates a new ScheduledThreadPoolExecutor with the given
     * initial parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if {@code handler} is null
     */
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), handler);
    }

    /**
     * Creates a new ScheduledThreadPoolExecutor with the given
     * initial parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if {@code threadFactory} or
     *         {@code handler} is null
     */
    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

  ScheduledThreadPoolExecutor 繼承於 ThreadPoolExecutor ,從其構造方法可以看出,此線程池的線程也不會空閑超時(keepAliveTime = 0),同時使用隊列是無邊界的DelayedWorkQueue;要注意是,雖然此類繼承自 ThreadPoolExecutor,但是有幾個繼承的調整方法對此類並無作用,特別是在此類中設置 maximumPoolSize 是沒有意義的,因為ScheduleThreadPoolExecutor 使用了無邊界的任務隊列,所以根本不需要創建多於 corePoolsize 數量的線程。

ScheduleThreadPoolExecutor 主要的方法介紹

1. 零延時的 execute()、submit() 方法

   execute()、submit() 方法都被重寫了,本質上調用的還是 schedule() 方法;從下面的源碼可以看出,這兩個方法提交的任務都是延時為0的 “實時任務”;

/**
     * Executes {@code command} with zero required delay.
     * This has effect equivalent to
     * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
     * Note that inspections of the queue and of the list returned by
     * {@code shutdownNow} will access the zero-delayed
     * {@link ScheduledFuture}, not the {@code command} itself.
     *
     * <p>A consequence of the use of {@code ScheduledFuture} objects is
     * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
     * called with a null second {@code Throwable} argument, even if the
     * {@code command} terminated abruptly.  Instead, the {@code Throwable}
     * thrown by such a task can be obtained via {@link Future#get}.
     *
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution because the
     *         executor has been shut down
     * @throws NullPointerException {@inheritDoc}
     */
    public void execute(Runnable command) {
        schedule(command, 0, NANOSECONDS);
    }

    // Override AbstractExecutorService methods

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        return schedule(task, 0, NANOSECONDS);
    }

2. 提交一個延時任務的 schedule() 方法

方法描述:

     /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

創建並執行在給定延遲后啟用的 ScheduledFuture。

/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

建並執行在給定延遲后啟用的一次性操作。

3、 提交周期性的任務 scheduleAtFixedRate() 和 scheduleWithFixedDelay()

方法描述:

/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

initialDelay 是此周期任務的開始執行時的延時時間(即只在第一次開始執行時延時,此后周期性地執行這個任務)。

/**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

指定了首次執行前的初始延時時間,隨后,在每一次執行終止和下一次執行開始之間都存在給定的延遲。

兩者的區別:

scheduleAtFixedRate: 固定的周期時間。此方法的 period 參數所指的間隔時間是 從上一周期的任務開始時間到當前周期的任務開始時間的間隔。當上一周期任務執行結束了,如果任務的執行時間大於 指定的周期時間period ,那么便可以開始此周期任務的下一周期的執行。否則,便是間隔時間還沒有達到一周期的時間,還需要繼續等待,直到周期時間到來;總的來說,可以分為以下兩種情況:

  • 任務的執行時間 > period參數:那么周期運行的時間便是 任務的執行時間。
  • 任務的執行時間 < period參數:那么周期運行的時間便是 period參數。

scheduleWithFixedDelay: 固定的間隔時間。此方法的 delay 參數所指的間隔時間是 從上一周期的任務的執行結束時間到當前周期的任務開始時間的間隔,是指定任務的固定的運行間隔,與任務的執行時間無關。

@ Example1 scheduleAtFixedRate 測試

簡單起見,下面創建了只有一個線程 ScheduledThreadPoolExecutor 對象,也只提交一個周期任務。 下面的例子中,任務的執行時間大於 period 參數。

public class ScheduledThreadPoolExecutorTest {

    public static void main(String[] args) {
        //池中只有一個線程
        ScheduledThreadPoolExecutor schedulePool = new ScheduledThreadPoolExecutor(1);
        //作為一個周期任務提交,period 為1000ms,任務執行時間為2000ms
        schedulePool.scheduleAtFixedRate(new MyRunnable(), 50, 1000, TimeUnit.MILLISECONDS);
    }

    static class MyRunnable implements Runnable {

        int period = 1;

        @Override
        public void run() {
            //為周期任務捕獲異常,避免異常影響下一周期的任務執行
            try {
                System.out.println("---------------第 " + period + " 周期-------------");
                System.out.println("begin = " + System.currentTimeMillis() / 1000);////任務執行時間
                Thread.sleep(2000);
                System.out.println("end =   " + System.currentTimeMillis() / 1000);
                period++;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }

        }
    }
}

運行結果:

---------------第 1 周期-------------
begin = 1563007610
end =   1563007612
---------------第 2 周期-------------
begin = 1563007612
end =   1563007614
---------------第 3 周期-------------
begin = 1563007614
end =   1563007616
---------------第 4 周期-------------
begin = 1563007616
end =   1563007618
---------------第 5 周期-------------
begin = 1563007618
end =   1563007620

從結果可以看出,任務的周期執行是連着的,沒有間隔時間。這是因為任務的運行時間大於周期執行時間,即當任務還沒結束時,周期時間已經到了,所以任務剛結束,就可以進行下一周期的執行。

@ Example2 scheduleWithFixedDelay 測試

同樣也是上面的例子,將周期方法換成 scheduleWithFixedDelay( )

public static void main(String[] args) {
        //池中只有一個線程
        ScheduledThreadPoolExecutor schedulePool = new ScheduledThreadPoolExecutor(1);
        //作為一個周期任務提交,delay 為1000ms
        schedulePool.scheduleWithFixedDelay(new MyRunnable(), 50, 1000, TimeUnit.MILLISECONDS);

    }

運行結果:

---------------第 1 周期-------------
begin = 1563007901
end =   1563007903
---------------第 2 周期-------------
begin = 1563007904
end =   1563007906
---------------第 3 周期-------------
begin = 1563007907
end =   1563007909
---------------第 4 周期-------------
begin = 1563007910
end =   1563007912

上面的scheduleWithFixedDelay例子的任務是間隔一個固定的時間執行的,無論任務的執行時間是否大於周期時間。

4. 線程池關閉

兩個關閉線程池的方法,一旦線程池被關閉,就會拒絕以后提交的所有任務

void shutdown():

在以前已提交任務的執行中發起一個有序的關閉,但是不接受新任務。線程池中的周期任務、延時任務,根據下面的兩個策略來判斷是否繼續正常運行,還是停止運行。

List<Runnable> shutdownNow():

嘗試停止所有正在執行的任務、暫停等待任務的處理,並返回等待執行的任務列表。對於正在運行,嘗試通過中斷該線程來結束線程。對於尚未運行的任務,則都不再執行。

線程池關閉(shutdown())下的兩個策略的描述

  • void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value):
    在調用線程池調用了 shutdown()方法后,是否繼續執行現有延時任務(就是通過 schedule()方法提交的延時任務 )的策略;默認值為false;在以下兩種種的情況下,延時任務將會被終止:

  • void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)
    在調用線程池調用了 shutdown()方法后,是否繼續執行現有周期任務(通過 scheduleAtFixedRate、scheduleWithFixedDelay 提交的周期任務)的策略;默認值為false;在以下兩種的情況下,周期任務將會被終止:

獲取這個兩個策略的設置值:

boolean getContinueExistingPeriodicTasksAfterShutdownPolicy():

取有關在此執行程序已 shutdown 的情況下、是否繼續執行現有定期任務的策略。

boolean getExecuteExistingDelayedTasksAfterShutdownPolicy():

獲取有關在此執行程序已 shutdown 的情況下是否繼續執行現有延遲任務的策略

@ Example3 shoutdown下的周期任務測試

還是基於上面的例子進行改造,main線程休眠10秒后,shutdown線程池。在默認的情況下(策略為false),因為間隔為1s,任務執行時間為2s,所以 shutdown 后,最多能執行4個周期;但是下面的例子,將策略的值設置為true,shutdown后,周期任務也可以正常運行下去。

public static void main(String[] args) throws InterruptedException{

        //池中只有一個線程
        ScheduledThreadPoolExecutor schedulePool = new ScheduledThreadPoolExecutor(1);
        //shutdown時,周期任務的策略
        schedulePool.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
        //作為周期任務提交
        ScheduledFuture future = schedulePool.scheduleWithFixedDelay(new MyRunnable(), 50, 1000, TimeUnit.MILLISECONDS);

        Thread.sleep(10*1000);

        schedulePool.shutdown();

    }

運行結果:

---------------第 1 周期-------------
begin = 1563008226
end =   1563008228
---------------第 2 周期-------------
begin = 1563008229
end =   1563008231
---------------第 3 周期-------------
begin = 1563008232
end =   1563008234
---------------第 4 周期-------------
begin = 1563008235
end =   1563008237
---------------第 5 周期-------------
begin = 1563008238
end =   1563008240
---------------第 6 周期-------------
begin = 1563008241
end =   1563008243
---------------第 7 周期-------------
begin = 1563008244

5. 移除任務、取消任務

BlockingQueue getQueue():
返回此執行程序使用的任務隊列。此隊列中的每個元素都是一個 ScheduledFuture,包括用 execute 所提交的那些任務
boolean remove(Runnable task):
從執行程序的內部隊列中移除此任務(如果存在),從而如果尚未開始,則其不再運行。
void setRemoveOnCancelPolicy(boolean value):
此方法是在1.7引入的,是用於對調用cancel()的任務的處理策略:是否馬上移除出隊列;默認為false;
周期任務也可以通過 ScheduledFuture的 cancel()取消運行;

Executors 提供了兩個常用的ScheduledThreadPoolExecutor

  這兩個常用的ScheduledThreadPoolExecutor:SingleThreadScheduledExecutor(單線程的線程池)、ScheduledThreadPool(線程數量固定的線程池),下面是 Executors 對應的源代碼。

/**
     * Creates a single-threaded executor that can schedule commands
     * to run after a given delay, or to execute periodically.
     * (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newScheduledThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     * @return the newly created scheduled executor
     */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

    /**
     * Creates a single-threaded executor that can schedule commands
     * to run after a given delay, or to execute periodically.  (Note
     * however that if this single thread terminates due to a failure
     * during execution prior to shutdown, a new one will take its
     * place if needed to execute subsequent tasks.)  Tasks are
     * guaranteed to execute sequentially, and no more than one task
     * will be active at any given time. Unlike the otherwise
     * equivalent {@code newScheduledThreadPool(1, threadFactory)}
     * the returned executor is guaranteed not to be reconfigurable to
     * use additional threads.
     * @param threadFactory the factory to use when creating new
     * threads
     * @return a newly created scheduled executor
     * @throws NullPointerException if threadFactory is null
     */
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @param threadFactory the factory to use when the executor
     * creates a new thread
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     * @throws NullPointerException if threadFactory is null
     */
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM