Java多線程編程中,常用的多線程設計模式包括:Future模式、Master-Worker模式、Guarded Suspeionsion模式、不變模式和生產者-消費者模式等。這篇文章主要講述Master-Worker模式,關於其他多線程設計模式的地址如下:
關於Future模式的詳解: 並行設計模式(一)-- Future模式
關於Guarded Suspeionsion模式的詳解: 並行設計模式(三)-- Guarded Suspeionsion模式
關於不變模式的詳解: 並行設計模式(四)-- 不變模式
關於生產者-消費者模式的詳解:並行設計模式(五)-- 生產者-消費者模式
1. Master-Worker模式
Master-Worker模式是常用的並行模式之一,它的核心思想是:系統由兩類進程協同工作,即Master進程和Worker進程,Master負責接收和分配任務,Wroker負責處理子任務。當各個Worker進程將子任務處理完成后,將結果返回給Master進程,由Master進程進行匯總,從而得到最終的結果,其具體處理過程如下圖所示。
Master-Worker 模式的好處,它能夠將一個大任務分解成若干個小任務並行執行,從而提高系統的吞吐量。而對於系統請求者 Client 來說,任務一旦提交,Master進程會分配任務並立即返回,並不會等待系統全部處理完成后再返回,其處理過程是異步的。因此,Client 不會出現等待現象。
2. Master-Worker模式結構
Master-Worker 模式的結構相對比較簡單,Master 進程為主要進程,它維護了一個Worker 進程隊列、子任務隊列和子結果集、Worker 進程隊列中的 Worker 進程,不停地從任務隊列中提取要處理的子任務,並將子任務的處理結果寫入結果集。具體的結構圖如下所示:
注意:Master-Worker 模式是一種使用多線程進行數據處理的結構。多個 Worker 進程協作處理用戶請求,Master 進程負責維護 Worker 進程,並整合最終處理結果。
3. 代碼實現
Master-Worker 主要角色分配如下所示:
角 色 | 作 用 |
Worker |
用於實際處理一個任務 |
Master |
用於任務的分配和最終結果的合成 |
Main |
啟動系統,調用開啟Master |
下面是一個簡易版的 Master-Worker 框架 Java 代碼實現
1. Master 部分源碼實現:
1 import java.util.HashMap; 2 import java.util.Map; 3 import java.util.Queue; 4 import java.util.concurrent.ConcurrentHashMap; 5 import java.util.concurrent.ConcurrentLinkedQueue; 6 7 public class Master { 8 // 任務隊列 9 protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>(); 10 // Worker進程隊列 11 protected Map<String, Thread> threadMap = new HashMap<String, Thread>(); 12 // 子任務處理結果集 13 protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); 14 // 構造函數 15 public Master(Worker worker, int countWorker) { 16 worker.setWorkQueue(workQueue); //添加任務隊列 17 worker.setResultMap(resultMap); //添加計算結果集合 18 for(int i=0; i<countWorker; i++) { 19 threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i))); //循環添加任務進程 20 } 21 } 22 23 //是否所有的子任務都結束了 24 public boolean isComplete() { 25 for(Map.Entry<String, Thread> entry : threadMap.entrySet()) { 26 if(entry.getValue().getState() != Thread.State.TERMINATED) 27 return false; //存在未完成的任務 28 } 29 return true; 30 } 31 32 //提交一個子任務 33 public void submit(Object job) { 34 workQueue.add(job); 35 } 36 37 //返回子任務結果集 38 public Map<String, Object> getResultMap() { 39 return resultMap; 40 } 41 42 //執行所有Worker進程,進行處理 43 public void execute() { 44 for(Map.Entry<String, Thread> entry : threadMap.entrySet()) { 45 entry.getValue().start(); 46 } 47 } 48 }
2. Worker 進程的源代碼實現
1 import java.util.Map; 2 import java.util.Queue; 3 4 public class Worker implements Runnable{ 5 //任務隊列,用於取得子任務 6 protected Queue<Object> workQueue; 7 //子任務處理結果集 8 protected Map<String ,Object> resultMap; 9 public void setWorkQueue(Queue<Object> workQueue){ 10 this.workQueue= workQueue; 11 } 12 public void setResultMap(Map<String ,Object> resultMap){ 13 this.resultMap=resultMap; 14 } 15 //子任務處理的邏輯,在子類中實現具體邏輯 16 public Object handle(Object input){ 17 return input; 18 } 19 20 @Override 21 public void run() { 22 while(true){ 23 //獲取子任務 24 Object input= workQueue.poll(); 25 if(input==null){ 26 break; 27 } 28 //處理子任務 29 Object re = handle(input); 30 resultMap.put(Integer.toString(input.hashCode()), re); 31 } 32 } 33 }
以上兩段代碼已經展示了 Master-Worker 框架的全貌。應用程序中通過重載Worker.handle()方法實現應用層邏輯。
注意:Master-Worker 模式是一種將串行任務並行化的方法,被分解的子任務在系統中可以被並行處理。同時,如果有需要,Master 進程不需要等待所有子任務都完成計算,就可以根據已有的部分結果集計算最終結果。
現應用這個 Master-Worker 框架,實現一個計算立方和的應用,並計算 1 ~~ 100 的立方和,即 13 + 23 + 33 + ... + 1003。
計算任務可被分解為 100 個子任務,每個子任務僅用於計算單獨的立方和。Master 產生固定個數的 Worker 來處理所有這些子任務。Worker 不斷地從任務集合中取得這些計算立方和的子任務,並將計算結果返回給 Master。Master 負責將所有 Worker 的任務結果進行累加,從而產生最終的立方和。在整個計算過程中,Master 與 Worker 的運行也是完全異步的,Master 不必等到所有的 Worker 都執行完成后,就可以進行求和操作。即,Master 在獲得部分子任務結果集時,就已經可以開始對最終結果進行計算,從而進一步提高系統的並行度和吞吐量。具體的任務分解如下圖所示:
3.子任務 PlusWork 源碼實現
計算任務被划分成100個子任務,每個任務僅僅用於計算單獨的立方和,對應的 PlusWork 源碼如下:
public class PlusWorker extends Worker { //求立方和 @Override public Object handle(Object input) { int i = (Integer)input; return i * i * i; } }
4. 進行計算的 Main 函數
運行的調用函數如下。在主函數中首先通過Master類創建5個Worker工作進程和Worker工作實例PlusWorker。在提交了100個子任務后,邊開始子任務的計算。這些子任務中由這5個進程共同完成。Master不用等待所有Worker計算完成才開始匯總,而是子任務在計算的過程中,Master就開始匯總了。
1 import java.util.Map; 2 import java.util.Set; 3 4 public class Application { 5 public static void main(String[] args) { 6 //固定使用5個Workde 7 Master master = new Master(new PlusWorker(), 5); 8 for(int i=1; i<=100; i++) //提交100個子任務 9 master.submit(i); 10 master.execute(); //開始計算 11 int re = 0; //最終計算結果保存在此 12 Map<String, Object> resultMap = master.getResultMap(); 13 14 while(true) {//不需要等待所有Worker都執行完成,即可開始計算最終結果 15 Set<String> keys = resultMap.keySet(); //開始計算最終結果 16 String key = null; 17 for(String k : keys) { 18 key = k; 19 break; 20 } 21 Integer i = null; 22 if(key != null) 23 i = (Integer)resultMap.get(key); 24 if(i != null) 25 re += i; //最終結果 26 if(key != null) 27 resultMap.remove(key); //移除已被計算過的項目 28 if(master.isComplete() && resultMap.size()==0) 29 break; 30 } 31 System.out.println(re); 32 } 33 }
運行結果:
25502500
總結:
重要的事情說三遍,Master-Worker 模式是一種將串行任務並行化的方案,被分解的子任務在系統中可以被並行處理,同時,如果有需要,Master進程不需要等待所有子任務都完成計算,就可以根據已有的部分結果集計算最終結果集。