Java 使用線程池執行若干任務


在執行一系列帶有IO操作(例如下載文件),且互不相關的異步任務時,采用多線程可以很極大的提高運行效率。線程池包含了一系列的線程,並且可以管理這些線程。例如:創建線程,銷毀線程等。本文將介紹如何使用Java中的線程池執行任務。

1. 任務類型

在使用線程池執行任務之前,我們弄清楚什么任務可以被線程池調用。按照任務是否有返回值可以將任務分為兩種,分別是實現Runnable的任務類(無參數無返回值)和實現Callable接口的任務類(無參數有返回值)。在打代碼時根據需求選擇對應的任務類型。

1.1 實現Runnable接口的類

多線程任務類型,首先自然想到的就是實現 Runnable 接口的類,Runnable接口提供了一個抽象方法run,這個方法無參數,無返回值。例如:

Runnable task = new Runnable() {
    @Override
    public void run() {
        System.out.println("Execute task.");
    }
};

或者Java 8 及以上版本更簡潔的寫法:

Runnable task = ()->{
    System.out.println("Execute task.");
};

1.2 實現Callable接口的類

與 Runnable 類似, Callable 也只有一個抽象方法,不過該抽象方法有返回值。在實現該接口的時候需要制定返回值的類型。例如:

Callable<String> callableTask = ()-> "finished";

2. 線程池類型

java.util.concurrent.Executors 提供了一系列靜態方法來創建各種線程池。下面例舉出了主要的一些線程池及特性,其它未例舉線程池的特性可由下面這些推導出來。

2.1 線程數固定的線程池 Fixed Thread Pool

顧名思義,這種類型線程池線程數量是固定的。如果線程數量設置為n,則任何時刻該線程池最多只有n個線程處於運行狀態。當線程池中處於飽和運行狀態時,再往線程池中提交的任務會被放到執行隊列中。如果線程池處於不飽和狀態,線程池也會一直存在,直到ExecuteService 的shutdown方法被調用,線程池才會被清除。

// 創建線程數量為5的線程池。
ExecutorService executorService = Executors.newFixedThreadPool(5);

2.2 可緩存的線程池 Cached Thread Pool

這種類型的線程池初始大小為0個線程,隨着往池里不斷提交任務,如果線程池里面沒有閑置線程(0個線程也表示沒有閑置線程),則會創建新的線程,保證沒有任務在等待;如果有閑置線程,則復用閑置狀態線程執行任務。處於閑置狀態的線程只會在線程池中緩存60秒,閑置時間達到60s的線程會被關閉並移出線程池。在處理大量短暫的(官方說法:short-lived)異步任務時可以顯著得提供程序性能。

//創建一個可緩存的線程池 
ExecutorService executorService = Executors.newCachedThreadPool();

2.3 單線程池

這或許不能叫線程池了,由於它里面的線程永遠只有1個,而且自始至終都只有1個(為什么說這句話,因為要和 Executors.newFixedThreadPool(1) 區別開來),所以還是叫它“單線程池把”。

可以往單線程池中添加任務,但是每次只執行1個,且任務是按順序執行的。如果前面的任務出現了異常,當前線程會被銷毀,但1個新的線程會被創建用來執行后面的任務。

以上這些和線程數只有1個的線程Fixed Thread Pool一樣。兩者唯一不同的是, Executors.newFixedThreadPool(1) 可以在運行時修改它里面的線程數,而 Executors.newSingleThreadExecutor() 永遠只能有1個線程。兩者之間的區別可以查看“Java 中 Executors.newSingleThreadExecutor() 與Executors.newFixedThreadPool(1)有什么區別”。

//創建一個單線程池
ExecutorService executorService = Executors.newSingleThreadExecutor();

2.4 工作竊取線程池

扒開源碼,會發現工作竊取線程池本質是 ForkJoinPool ,這類線程池充分利用CPU多核處理任務,適合處理消耗CPU資源多的任務。它的線程數不固定,維護的任務隊列有多個,當一個任務隊列完成時,相應的線程會從其它的任務隊列中竊取任務執行,這也意味着任務的開始執行順序並和提交順序相同。如果有更高的需求,可以直接通過ForkJoinPool獲取線程池。

//創建一個工作竊取線程池,使用CPU核數等於機器的CPU核數
ExecutorService executorService = Executors.newWorkStealingPool();

//創建一個工作竊取線程池,使用CPU 3 個核進行計算,工作竊取線程池不能設置線程數
ExecutorService executorService2 = Executors.newWorkStealingPool(3);

2.5 計划任務線程池

計划任務線程池可以按計划執行某些任務,例如:周期性的執行某項任務。

// 獲取一個大小為2的計划任務線程池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
// 添加一個打印當前線程信息計划任務,該任務在3秒后執行
scheduledExecutorService.schedule(() -> { System.out.println(Thread.currentThread()); }, 3,  TimeUnit.SECONDS);
// 添加一個打印當前線程信息計划任務,該任務在2秒后首次執行,之后每5秒執行一次。如果任務執行時間超過了5秒,則下一次將會在前一次執行完成之后立即執行
scheduledExecutorService.scheduleAtFixedRate(() -> { System.out.println(Thread.currentThread()); }, 2, 5, TimeUnit.SECONDS);
// 添加一個打印當前線程信息計划任務,該任務在2秒后首次執行,之后每次在任務執行之后5秒執行下一次。
scheduledExecutorService.scheduleWithFixedDelay(() -> { System.out.println(Thread.currentThread()); }, 2, 5, TimeUnit.SECONDS);
// 逐個清除 idle 狀態的線程
scheduledExecutorService.shutdown();
// 阻塞,在線程池被關調之前代碼不再往下走
scheduledExecutorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

3. 使用線程池執行任務

前面提到,任務類型分為有返回值和無返回值的類型,這里的調用也分為有返回值調用和無返回值的調用。

3.1 無返回值任務的調用

如果是無返回值任務的調用,可以用execute或者submit方法,這種情況下二者本質上一樣。為了於有返回值任務調用保持統一,建議采用submit方法。

//創建一個線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);

//提交一個無返回值的任務(實現了Runnable接口)
executorService.submit(()->System.out.println("Hello"));

executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

如果有一個任務集合,可以一個個提交。

//創建一個線程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Runnable> tasks = Arrays.asList(
        ()->System.out.println("Hello"),
        ()->System.out.println("World"));

//逐個提交任務
tasks.forEach(executorService::submit);

executorService.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

3.2 有返回值任務的調用

有返回值的任務需要實現Callable接口,實現的時候在泛型位置指定返回值類型。在調用submit方法時會返回一個Future對象,通過Future的方法get()可以拿到返回值。這里需要注意的是,調用get()時代碼會阻塞,直到任務完成,有返回值。

ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<String> future = executorService.submit(()->"Hello");
System.out.println(future.isDone());//false
String value = future.get();
System.out.println(future.isDone());//true
System.out.println(value);//Hello

如果要提交一批任務,ExecutorService除了可以逐個提交之外,還可以調用invokeAll一次性提交,invokeAll的內部實現其實就是用一個循環逐個提交任務。invokeAll 返回的值是一個Future List。

ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Callable<String>> tasks = Arrays.asList(()->"Hello", ()->"World");
List<Future<String>> futures = executorService.invokeAll(tasks);

invokeAny 方法也很有用,線程池執行若干個實現了 Callable 的任務,然后返回最先執行結束的任務的值,其它未完成的任務將被正常取消掉不會有異常。如下代碼不會輸出“Hello”

ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Callable<String>> tasks = Arrays.asList(
        () -> {
            Thread.sleep(500L);
            System.out.println("Hello");
            return "Hello";
        }, () -> {
            System.out.println("World");
            return "World";
        });
String s = executorService.invokeAny(tasks);
System.out.println(s);//World

輸出:

World
World

另外,在查看ExecutorService源碼時發現它還提供了一個方法 <T> Future<T> submit(Runnable task, T result); ,可以通過這個方法提交一個實現了Runnable接口的任務,然后有返回值,而Runnable接口中的run方法時沒有返回值的。那它的返回值是哪來的呢?其實問題在於該submit方法后面的一個參數,這個參數值就是返回的值。調用submit方法之后,有一通操作,然后直接把result參數返回了。

4. 小結

在利用多線程處理任務時,應該根據情況選擇合適的任務類型和線程池類型。如果無返回值,可以采用實現Runnable或Callable接口的任務;如果有返回值,應該使用實現Callable接口的任務,返回值通過Future的get方法取到。

選用線程池時,如果只用1個線程,用單線程池或者容量為1的固定容量線程池;處理大量short-live任務是,使用可緩存的線程池;若要有計划或者循環執行某些任務,可以采用計划任務線程池;如果任務需要消耗大量的CPU資源,則應用工作竊取線程池。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM