並行設計模式(二)-- Master-Worker模式


  Java多線程編程中,常用的多線程設計模式包括:Future模式、Master-Worker模式、Guarded Suspeionsion模式、不變模式和生產者-消費者模式等。這篇文章主要講述Master-Worker模式,關於其他多線程設計模式的地址如下:
  關於Future模式的詳解: 並行設計模式(一)-- Future模式
  關於Guarded Suspeionsion模式的詳解: 並行設計模式(三)-- Guarded Suspeionsion模式
  關於不變模式的詳解: 並行設計模式(四)-- 不變模式
  關於生產者-消費者模式的詳解:並行設計模式(五)-- 生產者-消費者模式

 1. Master-Worker模式

  Master-Worker模式是常用的並行模式之一,它的核心思想是:系統由兩類進程協同工作,即Master進程和Worker進程,Master負責接收和分配任務,Wroker負責處理子任務。當各個Worker進程將子任務處理完成后,將結果返回給Master進程,由Master進程進行匯總,從而得到最終的結果,其具體處理過程如下圖所示。

http://img.blog.csdn.net/20141113102527554

 

  Master-Worker 模式的好處,它能夠將一個大任務分解成若干個小任務並行執行,從而提高系統的吞吐量。而對於系統請求者 Client 來說,任務一旦提交,Master進程會分配任務並立即返回,並不會等待系統全部處理完成后再返回,其處理過程是異步的。因此,Client 不會出現等待現象。

2. Master-Worker模式結構

  Master-Worker 模式的結構相對比較簡單,Master 進程為主要進程,它維護了一個Worker 進程隊列、子任務隊列和子結果集、Worker 進程隊列中的 Worker 進程,不停地從任務隊列中提取要處理的子任務,並將子任務的處理結果寫入結果集。具體的結構圖如下所示:

注意:Master-Worker 模式是一種使用多線程進行數據處理的結構。多個 Worker 進程協作處理用戶請求,Master 進程負責維護 Worker 進程,並整合最終處理結果。

3. 代碼實現

  Master-Worker 主要角色分配如下所示:

角 色 作 用

            Worker

 用於實際處理一個任務

            Master

 用於任務的分配和最終結果的合成

           Main

 啟動系統,調用開啟Master

下面是一個簡易版的 Master-Worker 框架 Java 代碼實現

1. Master 部分源碼實現:

 1 import java.util.HashMap;
 2 import java.util.Map;
 3 import java.util.Queue;
 4 import java.util.concurrent.ConcurrentHashMap;
 5 import java.util.concurrent.ConcurrentLinkedQueue;
 6 
 7 public class Master {
 8     // 任務隊列
 9     protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
10     // Worker進程隊列
11     protected Map<String, Thread> threadMap = new HashMap<String, Thread>();
12     // 子任務處理結果集
13     protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
14     // 構造函數
15     public Master(Worker worker, int countWorker) {
16         worker.setWorkQueue(workQueue); //添加任務隊列
17         worker.setResultMap(resultMap); //添加計算結果集合
18         for(int i=0; i<countWorker; i++) {
19             threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i))); //循環添加任務進程
20         }
21     }
22     
23     //是否所有的子任務都結束了
24     public boolean isComplete() {
25         for(Map.Entry<String, Thread> entry : threadMap.entrySet()) {
26             if(entry.getValue().getState() != Thread.State.TERMINATED)
27                 return false; //存在未完成的任務
28         }
29         return true;
30     }
31     
32     //提交一個子任務
33     public void submit(Object job) {
34         workQueue.add(job);
35     }
36     
37     //返回子任務結果集
38     public Map<String, Object> getResultMap() {
39         return resultMap;
40     }
41     
42     //執行所有Worker進程,進行處理
43     public void execute() {
44         for(Map.Entry<String, Thread> entry : threadMap.entrySet()) {
45             entry.getValue().start();
46         }
47     }
48 }

2. Worker 進程的源代碼實現

 1 import java.util.Map;
 2 import java.util.Queue;
 3 
 4 public class Worker  implements Runnable{
 5     //任務隊列,用於取得子任務
 6     protected Queue<Object> workQueue;
 7     //子任務處理結果集
 8     protected Map<String ,Object> resultMap;
 9     public void setWorkQueue(Queue<Object> workQueue){
10         this.workQueue= workQueue;
11     }
12     public void setResultMap(Map<String ,Object> resultMap){
13         this.resultMap=resultMap;
14     }
15     //子任務處理的邏輯,在子類中實現具體邏輯
16     public Object handle(Object input){ 17         return input; 18  } 19     
20     @Override
21     public void run() {
22         while(true){
23             //獲取子任務
24             Object input= workQueue.poll(); 25             if(input==null){
26                 break;
27             }
28             //處理子任務
29             Object re = handle(input);
30             resultMap.put(Integer.toString(input.hashCode()), re);
31         }
32     }
33 }

  以上兩段代碼已經展示了 Master-Worker 框架的全貌。應用程序中通過重載Worker.handle()方法實現應用層邏輯。

注意:Master-Worker 模式是一種將串行任務並行化的方法,被分解的子任務在系統中可以被並行處理。同時,如果有需要,Master 進程不需要等待所有子任務都完成計算,就可以根據已有的部分結果集計算最終結果。

  現應用這個 Master-Worker 框架,實現一個計算立方和的應用,並計算 1 ~~ 100 的立方和,即 13 + 23  + 33 + ... + 1003

  計算任務可被分解為 100 個子任務,每個子任務僅用於計算單獨的立方和。Master 產生固定個數的 Worker 來處理所有這些子任務。Worker 不斷地從任務集合中取得這些計算立方和的子任務,並將計算結果返回給 Master。Master 負責將所有 Worker 的任務結果進行累加,從而產生最終的立方和。在整個計算過程中,Master 與 Worker 的運行也是完全異步的,Master 不必等到所有的 Worker 都執行完成后,就可以進行求和操作。即,Master 在獲得部分子任務結果集時,就已經可以開始對最終結果進行計算,從而進一步提高系統的並行度和吞吐量。具體的任務分解如下圖所示:

  

3.子任務 PlusWork 源碼實現

  計算任務被划分成100個子任務,每個任務僅僅用於計算單獨的立方和,對應的 PlusWork 源碼如下:

public class PlusWorker extends Worker { //求立方和
    @Override
    public Object handle(Object input) {
        int i = (Integer)input;
        return i * i * i;
    }
}

4. 進行計算的 Main 函數

  運行的調用函數如下。在主函數中首先通過Master類創建5個Worker工作進程和Worker工作實例PlusWorker。在提交了100個子任務后,邊開始子任務的計算。這些子任務中由這5個進程共同完成。Master不用等待所有Worker計算完成才開始匯總,而是子任務在計算的過程中,Master就開始匯總了。

 1 import java.util.Map;
 2 import java.util.Set;
 3 
 4 public class Application {
 5     public static void main(String[] args) {
 6         //固定使用5個Workde
 7         Master master = new Master(new PlusWorker(), 5);
 8         for(int i=1; i<=100; i++) //提交100個子任務
 9             master.submit(i);
10         master.execute(); //開始計算
11         int re = 0;  //最終計算結果保存在此
12         Map<String, Object> resultMap = master.getResultMap();
13         
14         while(true) {//不需要等待所有Worker都執行完成,即可開始計算最終結果
15             Set<String> keys = resultMap.keySet();  //開始計算最終結果
16             String key = null;
17             for(String k : keys) {
18                 key = k;
19                 break;
20             }
21             Integer i = null;
22             if(key != null)
23                 i = (Integer)resultMap.get(key);
24             if(i != null)
25                 re += i; //最終結果
26             if(key != null)
27                 resultMap.remove(key); //移除已被計算過的項目
28             if(master.isComplete() && resultMap.size()==0)
29                 break;
30         }
31         System.out.println(re);
32     }
33 }

運行結果:

25502500

 總結:

  重要的事情說三遍,Master-Worker 模式是一種將串行任務並行化的方案,被分解的子任務在系統中可以被並行處理,同時,如果有需要,Master進程不需要等待所有子任務都完成計算,就可以根據已有的部分結果集計算最終結果集。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM