Executors框架之ScheduledExecutorService實現定時任務


一、簡介

An ExecutorService that can schedule commands to run after a given delay, or to execute periodically.
(ExecutorService可以安排命令在給定的延遲后運行或定期執行。)

The schedule methods create tasks with various delays and return a task object that can be used to cancel or check execution. The scheduleAtFixedRate and scheduleWithFixedDelay methods create and execute tasks that run periodically until cancelled.
(調度方法會創建具有各種延遲的任務,並返回可用於取消或檢查執行的任務對象。 scheduleAtFixedRate和scheduleWithFixedDelay方法創建並執行定期運行的任務,直到被取消為止。
)

Commands submitted using the Executor.execute(java.lang.Runnable) and ExecutorService submit methods are scheduled with a requested delay of zero. Zero and negative delays (but not periods) are also allowed in schedule methods, and are treated as requests for immediate execution.
(使用Executor.execute(java.lang.Runnable)和ExecutorService提交方法提交的命令的計划延遲為零。調度方法中還允許零延遲和負延遲(但不允許使用周期),並將其視為立即執行的請求。
)

All schedule methods accept relative delays and periods as arguments, not absolute times or dates. It is a simple matter to transform an absolute time represented as a Date to the required form. For example, to schedule at a certain future date, you can use: schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS). Beware however that expiration of a relative delay need not coincide with the current Date at which the task is enabled due to network time synchronization protocols, clock drift, or other factors. The Executors class provides convenient factory methods for the ScheduledExecutorService implementations provided in this package.
(所有調度方法都接受相對延遲和周期作為參數,而不是絕對時間或日期作為參數。將代表日期的絕對時間轉換為所需的形式很簡單。例如,要計划在某個將來的日期進行計划,可以使用:schedule(task,date.getTime()-System.currentTimeMillis(),TimeUnit.MILLISECONDS)。但是請注意,由於網絡時間同步協議,時鍾漂移或其他因素,相對延遲的到期時間不必與啟用任務的當前日期一致。 Executors類為此程序包中提供的ScheduledExecutorService實現提供了方便的工廠方法。
)

(jdk7 doc , translate by google )

二、兩個常用定時任務

1. 固定周期的定時任務

ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

創建並執行一個周期性操作,該操作將在給定的初始延遲后首先啟用,然后在給定的時間段內啟用;即后執行將開始 在initialDelay然后在initialDelay +周期,然后 在initialDelay + 2 *周期,依此類推。

2. 固定延遲的定時任務

ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

創建並執行一個周期性操作,該操作將在給定的初始延遲后首先啟用,然后在一個執行的終止與下一個執行的開始之間具有給定的延遲。

3. 一次性延遲任務 Runnable

ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)

創建並執行一次操作,該操作在給定的延遲后變為啟用狀態。

ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
System.out.println("一次性的延遲任務, 10S 后執行");
executorService.schedule(new Runnable() {
    @Override
    public void run() {
        System.out.println("一次性延遲任務");
    }
}, 10L, SECONDS);
        

4. 一次性延遲任務 Callable

<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)

創建並執行ScheduledFuture,該ScheduledFuture在給定的延遲后變為啟用狀態。

三、示例代碼

class PrintControl {
    private final SimpleDateFormat SDF = new SimpleDateFormat("hh:mm:ss");

    /**
     * @param int corePoolSize 線程池中最小的線程數, 無任務時保持, 任務過多時可以超出這個值
     */
    private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);

    /**
     * 給定延遲的打印任務
     * 以固定的延遲時間(delay)去執行任務
     */
    public void printForDelay() {
        Runnable print = () -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("ScheduledWithFixedDelay" + SDF.format(new Date()));
        };

        /**
         * @param Runnable command
         * @param long initialDelay
         * @param long delay
         * @param TimeUnit unit
         */
        scheduled.scheduleWithFixedDelay(print, 0L, 5L, SECONDS);
    }

    /**
     * 定期去執行打印任務
     * 以固定的周期(period)去執行任務
     */
    public void printForRate() {
        Runnable print = () -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("ScheduledAtFixedRate" + SDF.format(new Date()));
        };

        /**
         * @param Runnable command
         * @param long initialDelay
         * @param long period
         * @param TimeUnit unit
         */
        scheduled.scheduleAtFixedRate(print, 0L, 5L, SECONDS);
    }


}

四、通過ThreadFactory 指定任務線程名稱

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)

使用ThreadFactory設置明確的線程名稱, 這樣在調試的時候就可以很清晰的找到任務線程, 便於調試

參考 Executors.defaultThreadFactory()

class PrintControl {
    private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, ExecutorsefaultThreadFactory());
}
class PrintThreadFactory implements ThreadFactory {

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "PrintThreadFactory");
    }
}

五、結束任務

1. 異常拋出

如果任務拋出異常會導致周期任務無法繼續進行, 所以最好結合try cache處理.

此時線程不會結束.

final AtomicInteger count = new AtomicInteger(0);

ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1, new PrintThreadFactory());

StringBuilder nullPoint = null;

Runnable print = new Runnable() {
    @Override
    public void run() {
        System.out.println("print " + count.getAndIncrement());
        // print 4 之后, 任務不再繼續執行
        if (count.get() == 5) {
            nullPoint.toString();
        }
    }
};

schedule.scheduleAtFixedRate(print, 0L, 2L, SECONDS);

2. 結束任務

Future.cancle(boolean mayInterruptIfRunning)

通過Debugger可以看出, 只是任務結束了,線程還在繼續wait

// 最終打印 "print 3"之后打印"is canclelled", 任務不再繼續, 線程不被關閉

final AtomicInteger count = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(1);
Future future = null;
ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1, new PrintThreadFactory());

Runnable print = new Runnable() {
    @Override
    public void run() {
        System.out.println("print " + count.getAndIncrement());
        if (count.get() == 3) {
            countDownLatch.countDown();
        }
    }
};

future = schedule.scheduleAtFixedRate(print, 0L, 2L, SECONDS);
try {
    countDownLatch.await();
    future.cancel(true);
    if (future.isCancelled()) {
        System.out.println("is canclelled");
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

3. 結束線程

3.1. void ExecutorService.shutdown();

Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
(啟動有序關閉,在該關閉中執行先前提交的任務,但不接受任何新任務。)

final AtomicInteger count = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(1);
ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1, new PrintThreadFactory());

Runnable print = new Runnable() {
    @Override
    public void run() {
        System.out.println("print " + count.getAndIncrement());
        if (count.get() == 3) {
            countDownLatch.countDown();
            System.out.println("任務還在繼續...");
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任務結束");
        }
    }
};

schedule.scheduleAtFixedRate(print, 0L, 2L, SECONDS);
try {
    countDownLatch.await();
    schedule.shutdown();
    // schedule.shutdownNow();

    if (schedule.isShutdown()) {
        System.out.println("Schedule is shutdown");
    }
    // 阻塞
    if (schedule.awaitTermination(10L, SECONDS)) {
        System.out.println("termination");
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

打印結果:

print 0
print 1
print 2
任務還在繼續...
Schedule is shutdown
任務結束
termination

3.2. List ExecutorService.shutdownNow();

Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.
(嘗試停止所有正在執行的任務,暫停正在等待的任務的處理,並返回正在等待執行的任務的列表。)

final AtomicInteger count = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(1);
ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1, new PrintThreadFactory());

Runnable print = new Runnable() {
    @Override
    public void run() {
        System.out.println("print " + count.getAndIncrement());
        if (count.get() == 3) {
            countDownLatch.countDown();
            System.out.println("任務還在繼續...");
            try {
                Thread.sleep(3000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("任務結束");
        }
    }
};

schedule.scheduleAtFixedRate(print, 0L, 2L, SECONDS);
try {
    countDownLatch.await();
    // schedule.shutdown();
    schedule.shutdownNow();

    if (schedule.isShutdown()) {
        System.out.println("Schedule is shutdown");
    }
    // 阻塞
    if (schedule.awaitTermination(10L, SECONDS)) {
        System.out.println("termination");
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

打印結果: 可以看出 Thread.sleep()發生了中斷異常

print 0
print 1
print 2
任務還在繼續...
Schedule is shutdown
java.lang.InterruptedException: sleep interrupted
    ...
任務結束
termination

3.3. boolean ExecutorService.awaitTermination(long timeout, TimeUnit unit)

Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.
(阻塞直到關閉請求后所有任務完成執行,或者發生超時,或者當前線程被中斷(以先發生的為准)。)

六、參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM