並行程序設計模式--Master-Worker模式
- 簡介
Master-Worker模式是常用的並行設計模式。它的核心思想是,系統有兩個進程協議工作:Master進程和Worker進程。Master進程負責接收和分配任務,Worker進程負責處理子任務。當各個Worker進程將子任務處理完后,將結果返回給Master進程,由Master進行歸納和匯總,從而得到系統結果。處理過程如下圖:

Master-Worker模式的好處是,它能將大任務分解成若干個小任務,並發執行,從而提高系統性能。而對於系統請求者Client來說,任務一旦提交,Master進程就會立刻分配任務並立即返回,並不會等系統處理完全部任務再返回,其處理過程是異步的。
- Master-Worker模式結構
Master-Worker模式的主要結構如下圖:

如上圖所示,Master進程是主要進程,它維護着一個Worker進程隊列、子任務隊列和子結果集,Worker進程中的Worker進程不斷的從任務隊列中提取要處理的子任務,並將子任務的處理結果放入到子結果集中。
在上圖中,Master:用於任務的分配和最終結果的合並;Worker:用於實際處理一個任務;客戶端進程:用於啟動系統,調度開啟Master。
- Master-Worker模式代碼實現
Master代碼實現:
1 public class Master {
2 //任務隊列
3 protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
4 //worker進程隊列
5 protected Map<String, Thread> threadMap = new HashMap<String, Thread>();
6 //結果集
7 protected Map<String, Object> resultMap = new HashMap<String,Object>();
8
9 //是否所有的子任務都結束
10
11 public boolean isComplete(){
12 for(Map.Entry<String, Thread> entry:threadMap.entrySet()){
13 if(entry.getValue().getState()!=Thread.State.TERMINATED){
14 return false;
15 }
16 }
17 return true;
18 }
19
20 public Master(Worker worker,int countWorker) {
21 worker.setResultMap(resultMap);
22 worker.setWorkQueue(workQueue);
23 for (int i = 0; i < countWorker; i++) {
24 threadMap.put(Integer.toString(i), new Thread(worker,Integer.toString(i)));
25 }
26 }
27
28 //提交任務
29
30 public void submit(Object obj){
31 workQueue.add(obj);
32 //System.out.println(obj.toString());
33 }
34
35
36
37 //返回子任務結果集
38 public Map<String, Object> getResultMap() {
39 return resultMap;
40 }
41
42 //開始運行所有worker進程,並進行處理
43
44 public void execute(){
45 for(Map.Entry<String, Thread> entry:threadMap.entrySet()){
46 entry.getValue().start();
47 }
48 }
49
50 }
Worker代碼實現:
1 public class Worker implements Runnable {
2 //任務隊列
3 protected Queue<Object> workQueue;
4 //子任務結果集
5 protected Map<String,Object> resultMap = new HashMap<String, Object>();
6
7
8 public void setWorkQueue(Queue<Object> workQueue) {
9 this.workQueue = workQueue;
10 }
11 public void setResultMap(Map<String, Object> resultMap) {
12 this.resultMap = resultMap;
13 }
14
15 public Object handle(Object input){
16 return input;
17 }
18 @Override
19 public void run() {
20 while(true){
21 Object input = workQueue.poll();
22
23 if(null==input) break;
24 //處理子任務
25 Object re = handle(input);
26 resultMap.put(Integer.toString(input.hashCode()),re);
27 //System.out.println(re.toString());
28 }
29 }
30
31 }
Master-Worker模式是一種串行任務並行化的方法,被分解的子任務在系統中可以並行處理。同時,如果有需要,Master進程不需要所有子任務都執行完成,就可以根據已有的部分結果集計算最終的結果。
現在以上面的Master-Worker實現為基礎,來實現計算1-100的立方和。計算將被分解為100個子任務,每個子任務僅用於計算單獨的立方和。Master產生固定數目Worker,來處理這些子任務。Worker不斷的從任務集合中取出這些計算立方和的子任務,並將計算結果放入到Master的結果集中。Master負責將所有Worker的任務結果進行累加,從而產生最終的立方和。整個計算過程,Worker和Master的運算也是完全異步的,Master進程不必等所有的Worker進程都執行完成,就可以進行求和操作了。也就是所,Master在獲取部分子任務的結果集時,就可以對最終結果進行計算了,從而提高了系統的並發性和吞吐量。
計算子任務的實現如下:
1 public class PlusWorker extends Worker {
2
3 @Override
4 public Object handle(Object input) {
5 Integer i = (Integer) input;
6 return i*i*i;
7 }
8
9 }
客戶端代碼如下:
1 public class Client {
2 public static void main(String[] args) {
3 Master m = new Master(new PlusWorker(), 5);//啟動五個線程處理
4 for (int i = 0; i < 100; i++) {
5 m.submit(i);
6 }
7 m.execute();
8 int re = 0;
9 Map<String, Object> resultMap = m.getResultMap();
10 while(resultMap.size()>0||!m.isComplete()){
11 Set<String> keys = resultMap.keySet();
12 String key = null;
13 for(String k:keys){
14 key=k;
15 break;
16 }
17 Integer i = null;
18 if(key != null){
19 i = (Integer) resultMap.get(key);
20 }
21 if(i!=null){
22 re+=i;//並行計算結果集
23 }
24
25 if(key!=null){
26 resultMap.remove(key);//將計算完成的結果移除
27 }
28 }
29
30 System.out.println(re);
31 }
32 }
通過Master創建5個Worker工作線程和PlusWorker工作實例。提交完100個任務后,就開始計算子任務。這些子任務,由生成的5個Worker線程共同完成。Master並不等所有的子任務都計算完成,就開始訪問子結果集進行最終結果的計算,直到子結果集中所有的數據都被處理,並且5個活躍的Worker線程全部終止,才能求出最終結果。

