一、簡介
ExecutorService是Java中對線程池定義的一個接口,它java.util.concurrent包中,在這個接口中定義了和后台任務執行相關的方法。
二、線程池
Java給我們提供了一個Executors工廠類
,它可以幫助我們很方便的創建各種類型ExecutorService線程池,Executors一共可以創建下面這四類線程池:
- newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
- newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
- newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。
- newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
三、代碼舉例
import org.junit.Test; import java.util.concurrent.*; /** * @author :gongxr * @description:ExecutorService是Java中對線程池定義的一個接口,它java.util.concurrent包中,在這個接口中定義了和后台任務執行相關的方法。 Java通過Executors提供四種創建線程池的工廠類,分別為: * newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。 * newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。 * newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。 * newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。 */ public class TestExecutorService { /** * newCachedThreadPool:創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。 */ @Test public void test1() throws InterruptedException { ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " 開始工作!"); Thread.sleep((long) (Math.random() * 3000)); System.out.println(Thread.currentThread().getName() + " 工作結束!"); } catch (InterruptedException e) { e.printStackTrace(); } } }; for (int i = 0; i < 5; i++) { cachedThreadPool.execute(runnable); } Thread.sleep(3000); System.out.println("主線程結束!"); cachedThreadPool.shutdown(); } /** * newFixedThreadPool:創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。 */ @Test public void test2() throws InterruptedException { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " 開始工作!"); Thread.sleep((long) (Math.random() * 3000)); System.out.println(Thread.currentThread().getName() + " 工作結束!"); } catch (InterruptedException e) { e.printStackTrace(); } } }; for (int i = 0; i < 6; i++) { final int index = i; fixedThreadPool.execute(runnable); } Thread.sleep(6000); fixedThreadPool.shutdown(); System.out.println("主線程結束!"); } /** * newScheduledThreadPool:創建一個定長線程池,支持定時及周期性任務執行。 * 該例中延遲3秒執行。 */ @Test public void test3_1() throws InterruptedException { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3); Runnable runnable = new Runnable() { @Override public void run() { System.out.println("delay 3 seconds"); } }; scheduledThreadPool.schedule(runnable, 3, TimeUnit.SECONDS); Thread.sleep(10000); scheduledThreadPool.shutdown(); } /** * newScheduledThreadPool:創建一個定長線程池,支持定時及周期性任務執行。 * 延遲1秒后每3秒執行一次。 */ @Test public void test3_2() throws InterruptedException { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3); Runnable runnable = new Runnable() { @Override public void run() { System.out.println("delay 1 seconds, and excute every 3 seconds"); } }; scheduledThreadPool.scheduleAtFixedRate(runnable, 1, 3, TimeUnit.SECONDS); Thread.sleep(10000); scheduledThreadPool.shutdown(); } /** * newSingleThreadExecutor:創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。 */ @Test public void test4() throws InterruptedException { ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) { final int index = i; singleThreadExecutor.execute(new Runnable() { @Override public void run() { try { System.out.println(index); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } Thread.sleep(10000); singleThreadExecutor.shutdown(); } /** * submit(Callable) * submit(Callable)和submit(Runnable)類似,也會返回一個Future對象, * 但是除此之外,submit(Callable)接收的是一個Callable的實現, * Callable接口中的call()方法有一個返回值,可以返回任務的執行結果, * 而Runnable接口中的run()方法是void的,沒有返回值。 */ @Test public void testCallable() throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); Callable callable = new Callable() { @Override public Object call() throws InterruptedException { String name = Thread.currentThread().getName(); System.out.println("Thread Name:" + name + " 開始工作!"); Thread.sleep((long) (Math.random() * 3000)); System.out.println("Thread Name:" + name + " 結束工作!"); return name; } }; Future future = executorService.submit(callable); System.out.println("主線程繼續執行!"); Thread.sleep(3000); // future.get()使得主線程阻塞,等待子線程執行結束 System.out.println("子線程執行完回調結果:" + future.get().toString()); System.out.println("主線程結束!"); executorService.shutdown(); } /** * ExecutorService的shutdown、shutdownNow、awaitTermination用法: * 調用shutdown方法之后,ExecutorService不再接收新的任務,直到當前所有線程執行完成才會關閉,所有之前提交的任務都會被執行。 * 調用shutdownNow方法后,將試圖阻止所有正在執行的任務和被提交還沒有執行的任務。 * awaitTermination方法的作用:監視各任務的狀態是否已經結束,經過指定的時間后,全部完成返回true,反之返回false。 */ @Test public void testShutDown() throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); Runnable runnable = new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getName() + " 開始工作!"); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + " 工作結束!"); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " 子線程中斷!"); e.printStackTrace(); } } }; // 線程1 executorService.execute(runnable); Thread.sleep(1000); // 線程2 executorService.execute(runnable); executorService.shutdown(); // 等待一秒后檢查子線程運行狀態,全部完成返回true if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) { System.out.println("主線程強制中斷子線程!"); executorService.shutdownNow(); } Thread.sleep(4000); System.out.println("主線程結束!"); } }
與多線程同步工具結合使用:
/** * ExecutorService實戰用法: * 與CountDownLatch線程同步工具結合使用 */ @Test public void testExe() throws Exception { int count = 5; // 任務數量 ExecutorService executorService = Executors.newCachedThreadPool(); CountDownLatch countDownLatch = new CountDownLatch(count); //線程同步工具 for (int i = 1; i <= count; i++) { int taskId = i; executorService.execute(() -> task(taskId, countDownLatch)); } executorService.shutdown(); // 不再接受新任務 countDownLatch.await(); // 等待所有任務完成 // 等待一秒后檢查子線程運行狀態,全部完成返回true if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) { System.out.println("主線程強制中斷子線程!"); executorService.shutdownNow(); } System.out.println("主線程結束!"); } private void task(int index, CountDownLatch countDownLatch) { try { System.out.println("任務" + index); System.out.println(Thread.currentThread().getName() + " 開始工作!"); Thread.sleep((long) (Math.random() * 3000)); System.out.println(Thread.currentThread().getName() + " 工作結束!"); countDownLatch.countDown(); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " 子線程中斷!"); e.printStackTrace(); } }
運行結果:
任務1 任務3 任務4 pool-1-thread-4 開始工作! 任務2 pool-1-thread-2 開始工作! 任務5 pool-1-thread-3 開始工作! pool-1-thread-1 開始工作! pool-1-thread-5 開始工作! pool-1-thread-3 工作結束! pool-1-thread-2 工作結束! pool-1-thread-5 工作結束! pool-1-thread-4 工作結束! pool-1-thread-1 工作結束! 主線程結束!