背景介紹
有時候我們需要執行一批相似的任務,並且要求這些任務能夠並行執行。通常,我們的需求會分為兩種情況:
- 並行執行一批任務,等待耗時最長的任務完成之后,再處理所有任務的結果。
- 並行執行一批任務,依次處理完成的任務結果(哪個任務先執行完就先處理哪個)。
這篇文章要介紹的兩種批量執行任務的方式,正好對應了上述兩種情況,下面分別介紹在Java中,如何使用並發包里面的API完成我們的需求。
使用ExecutorSevice#invokeAll()
通過向線程池提交一組任務,可以實現上述第一種批量執行的需求。下面來看具體例子。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 創建兩個任務
Callable<Integer> task1 = () -> {
TimeUnit.SECONDS.sleep(3);
System.out.println("[" + Thread.currentThread().getName() + "]" + " task1 finished");
return 1;
};
Callable<Integer> task2 = () -> {
TimeUnit.SECONDS.sleep(10);
System.out.println("[" + Thread.currentThread().getName() + "]" + " task2 finished");
return 2;
};
List<Callable<Integer>> tasks = new ArrayList<>();
tasks.add(task1);
tasks.add(task2);
// 創建線程池
ExecutorService service = Executors.newFixedThreadPool(2);
// 調用invokeAll方法,批量提交一組任務
List<Future<Integer>> futures = service.invokeAll(tasks);
for (Future<Integer> future : futures) {
// 在這里獲取任務的返回值時,會等待所有任務都執行完才返回結果
Integer result = future.get();
System.out.println("[" + Thread.currentThread().getName() + "]" + " " + result);
}
// 關閉線程池
service.shutdown();
}
}
執行結果:
使用CompletionService
CompletionService也可以用來提交一組任務,讓這些任務並行執行,任何一個任務執行完之后別的線程可以立即獲得計算結果。看一下具體例子:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 創建兩個任務
Callable<Integer> task1 = () -> {
TimeUnit.SECONDS.sleep(3);
System.out.println("[" + Thread.currentThread().getName() + "]" + " task1 finished");
return 1;
};
Callable<Integer> task2 = () -> {
try {
TimeUnit.SECONDS.sleep(10);
System.out.println("[" + Thread.currentThread().getName() + "]" + " task2 finished");
}catch (InterruptedException e) {
// 第二個任務耗時長,等第一個任務完成之后,手動取消第二個任務的執行,此時第二個任務可能會收到中斷。
e.printStackTrace();
}
return 2;
};
List<Callable<Integer>> tasks = new ArrayList<>();
tasks.add(task1);
tasks.add(task2);
// 創建線程池
ExecutorService pool = Executors.newFixedThreadPool(2);
// 使用CompletionService批量提交任務
CompletionService<Integer> cs = new ExecutorCompletionService<>(pool);
List<Future<Integer>> futures = new ArrayList<>();
for (Callable<Integer> task : tasks) {
Future<Integer> f = cs.submit(task);
futures.add(f);
}
// 獲取第一個執行完的任務結果
Future<Integer> f = cs.take();
System.out.println("[" + Thread.currentThread().getName() + "]" + f.get());
futures.remove(f);
// 取消其他任務
for (Future<Integer> future : futures) {
future.cancel(true);
}
// 關閉線程池
pool.shutdown();
}
}
運行結果:
第一個任務執行完之后,第二個任務還在運行中,但是我們已經不關心其計算結果了,手動取消這個任務的執行,因此第二個任務會收到一個中斷。