Java在語言層面提供了多線程的支持,線程池能夠避免頻繁的線程創建和銷毀的開銷,因此很多時候在項目當中我們是使用的線程池去完成多線程的任務。
Java提供了Executors 框架提供了一些基礎的組件能夠輕松的完成多線程異步的操作,Executors提供了一系列的靜態工廠方法能夠獲取不同的ExecutorService實現,ExecutorService擴展了Executors接口,Executors相當簡單:
public interface Executor {
void execute(Runnable command);
}
- 1
- 2
- 3
- 4
把任務本身和任務的執行解耦了,如果說Runnable是可異步執行任務的抽象,那Executor就是如何執行可異步執行任務的抽象,說起來比較繞口。
本文不講解線程的一些基礎知識,因為網上的其他文章已經寫的足夠詳細和泛濫。我寫寫多個異步任務的並發執行與結果的獲取問題。假設這樣一個場景:我們要組裝一個對象,這個對象由大量小的內容組成,這些內容是無關聯無依賴關系的,如果我們串行的去執行,如果每個任務耗時10秒鍾,一共有10個任務,那我們就需要100秒才能獲取到結果。顯然我們可以采用線程池,每個任務起一個線程,忽略線程啟動時間,我們只需要10秒鍾就能獲取到結果。這里還有兩種選擇,這10秒鍾我們可以去做其他事,也可以等待結果。
我們來完成這樣的操作:
// 這是任務的抽象
class GetContentTask implements Callable<String> {
private String name;
private Integer sleepTimes;
public GetContentTask(String name, Integer sleepTimes) {
this.name = name;
this.sleepTimes = sleepTimes;
}
public String call() throws Exception {
// 假設這是一個比較耗時的操作
Thread.sleep(sleepTimes * 1000);
return "this is content : hello " + this.name;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
采用completionService :
// 方法一
ExecutorService executorService = Executors.newCachedThreadPool();
CompletionService<String> completionService = new ExecutorCompletionService(executorService);
ExecuteServiceDemo executeServiceDemo = new ExecuteServiceDemo();
// 十個
long startTime = System.currentTimeMillis();
int count = 0;
for (int i = 0;i < 10;i ++) {
count ++;
GetContentTask getContentTask = new ExecuteServiceDemo.GetContentTask("micro" + i, 10);
completionService.submit(getContentTask);
}
System.out.println("提交完任務,主線程空閑了, 可以去做一些事情。");
// 假裝做了8秒種其他事情
try {
Thread.sleep(8000);
System.out.println("主線程做完了,等待結果");
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
// 做完事情要結果
for (int i = 0;i < count;i ++) {
Future<String> result = completionService.take();
System.out.println(result.get());
}
long endTime = System.currentTimeMillis();
System.out.println("耗時 : " + (endTime - startTime) / 1000);
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
執行結果為:
提交完任務,主線程空閑了, 可以去做一些事情。
主線程做完了,等待結果
this is content : hello micro9
this is content : hello micro7
this is content : hello micro2
this is content : hello micro5
this is content : hello micro4
this is content : hello micro8
this is content : hello micro1
this is content : hello micro3
this is content : hello micro0
this is content : hello micro6
耗時 : 10
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
如果多個不想一個一個提交,可以采用 invokeAll一並提交,但是會同步等待這些任務
// 方法二
ExecutorService executeService = Executors.newCachedThreadPool();
List<GetContentTask> taskList = new ArrayList<GetContentTask>();
long startTime = System.currentTimeMillis();
for (int i = 0;i < 10;i ++) {
taskList.add(new GetContentTask("micro" + i, 10));
}
try {
System.out.println("主線程發起異步任務請求");
List<Future<String>> resultList = executeService.invokeAll(taskList);
// 這里會阻塞等待resultList獲取到所有異步執行的結果才會執行
for (Future<String> future : resultList) {
System.out.println(future.get());
}
// 主線程假裝很忙執行8秒鍾
Thread.sleep(8);
long endTime = System.currentTimeMillis();
System.out.println("耗時 : " + (endTime - startTime) / 1000);
} catch (Exception e) {
e.printStackTrace();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
主線程發起異步任務請求
this is content : hello micro0
this is content : hello micro1
this is content : hello micro2
this is content : hello micro3
this is content : hello micro4
this is content : hello micro5
this is content : hello micro6
this is content : hello micro7
this is content : hello micro8
this is content : hello micro9
耗時 : 10
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
如果一系列請求,我們並不需要等待每個請求,我們可以invokeAny,只要某一個請求返回即可。
ExecutorService executorService = Executors.newCachedThreadPool();
ArrayList<GetContentTask> taskList = new ArrayList<GetContentTask>();
taskList.add(new GetContentTask("micro1",3));
taskList.add(new GetContentTask("micro2", 6));
try {
List<Future<String>> resultList = executorService.invokeAll(taskList);// 等待6秒
// String result2 = executorService.invokeAny(taskList); // 等待3秒
// invokeAll 提交一堆任務並行處理並拿到結果
// invokeAny就是提交一堆並行任務拿到一個結果即可
for (Future<String> result : resultList) {
System.out.println(result.get());
}
// System.out.println(result2);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("主線程等待");
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
如果我雖然發送了一堆異步的任務,但是我只等待一定的時間,在規定的時間沒有返回我就不要了,例如很多時候的網絡請求其他服務器如果要數據,由於網絡原因不能一直等待,在規定時間內去拿,拿不到就我使用一個默認值。這樣的場景,我們可以使用下面的寫法:
try {
ExecutorService executorService = Executors.newCachedThreadPool();
List<Callable<String>> taskList = new ArrayList<Callable<String>>();
taskList.add(new GetContentTask("micro1", 4));
taskList.add(new GetContentTask("micro2", 6));
// 等待五秒
List<Future<String>> resultList = executorService.invokeAll(taskList, 5, TimeUnit.SECONDS);
for (Future<String> future : resultList) {
System.out.println(future.get());
}
} catch (Exception e) {
e.printStackTrace();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
this is content : hello micro1
java.util.concurrent.CancellationException
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.micro.demo.spring.ExecuteServiceDemo.main(ExecuteServiceDemo.java:105)
- 1
- 2
- 3
- 4
- 5
因為只等待5秒,6秒的那個任務自然獲取不到,拋出異常,如果將等待時間設置成8秒,就都能獲取到。
