Java Master-Worker模式實現


引用:http://blog.51cto.com/zhangfengzhe/1879323

Master-Worker模式簡介

Master-Worker模式是非常經典的常用的一個並行計算模式,它的核心思想是2類進程協作工作:Master進程和Worker進程。Master負責接收客戶端請求,分配任務;Worker負責具體處理任務。當各個Worker處理完任務后,統一將結果返回給Master,由Master進行整理和總結。其好處是能夠將一個大JOB分解成若干小JOB,並行執行,從而提高系統的吞吐量。比如流行的Web Server,如Nginx,Apache HTTP都存在這種Master-Worker工作模式;離線分布式計算框架Hadoop的JobTracker和TaskTracker,實時流計算框架Strom的Nimbus和Supervisor都涉及到這種思想。那么下面我們來具體分析下Java Master-Worker模式的實現。

 

Master-Worker模式分析

 

wKiom1hDkfzB-5wbAAArs5hrPOc274.png

 

我們重點分析下Master,Worker這2個角色。

 

Master

Master需要接受Client端提交過來的任務Task,而且還得將Task分配給Worker進行處理,因此Master需要一個存儲來存放Task。那么采用哪種存儲集合呢?首先來說,需要支持並發的集合類,因為多個Worker間可能存在任務競爭,因此我們需要考慮java.util.concurrent包下的集合。這里可以考慮采用非阻塞的ConcurrentLinkedQueue。

Master需要清楚的知道各個Woker的基本信息,如是否各個Worker都運行完畢,因此Master端需要保存Worker的信息,可以采用Map存儲。

由於最后各個Worker都會上報運行結果,Master端需要有一個存儲結果的Map,可以采用支持並發的ConcurrentHashMap。

 

Worker

Worker需要持有Master端的任務Task集合的引用,因為Worker需要從里面拿取Task。

同上,Worker需要持有Master端的存儲結果的引用。

 

綜上,我們可以得到如下:

wKioL1hDlqHwZeM3AABZMBWVf3Q051.png

 

我們可以進一步細化,Master/Worker應該提供什么操作?

 

Master:

  1. 通過構造方法以初始化workers

  2. 應該提供submit(Task)方法接受Client端提交過來的任務

  3. start()讓workers開始處理任務

  4. 提供isComplete()判斷各個worker的狀態,是否都處理完畢

  5. 提供getResult()給客戶端返回結果

 

Worker:

  1. Worker本質上就是Runnable,提供run()

  2. 負責處理業務邏輯的handle()

 

 

Java Master-Worker代碼實現

 

wKiom1hDxaDg3Zb_AAAJ9yFjHPc824.png

 

Task

public class Task {

    private long id;
    private String name;

    public Task(long id, String name) {
        this.id = id;
        this.name = name;
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }


}

Worker

public class Worker implements Runnable {

    private long id;
    private String name;

    private ConcurrentLinkedQueue<Task> workQueue;

    private ConcurrentHashMap<Long,Object> results;

    public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
        this.workQueue = workQueue;
    }

    public void setResults(ConcurrentHashMap<Long, Object> results) {
        this.results = results;
    }

    public Worker(long id, String name) {
        this.id = id;
        this.name = name;
    }

    @Override
    public void run() {

        while(true){

            Task task = workQueue.poll();

            if(task == null){
                break;
            }

            long start = System.currentTimeMillis();
            long result = handle(task);

            this.results.put(task.getId(),result);

            System.out.println(this.name + " handle " + task.getName() + " success . result is " + result + " cost time : " + (System.currentTimeMillis() - start));
        }



    }

    /**
     * 負責處理具體業務邏輯
     * @param task
     * @return
     */
    private long handle(Task task) {

        //這里只是模擬下,在真實環境也許是查詢數據庫,也許是查緩存等
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return new Random().nextLong();
    }
}

Master

public class Master {

    private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();

    private Map<Long,Thread> workers = new HashMap<Long, Thread>();

    private ConcurrentHashMap<Long,Object> results = new ConcurrentHashMap<Long, Object>();

    public Master(int num){

        for(int i = 0 ; i < num ; i++){

            Worker worker = new Worker(i,"worker-" + i);
            worker.setResults(results);
            worker.setWorkQueue(workQueue);

            workers.put(Long.valueOf(i),new Thread(worker));
        }

    }

    public void submit(Task task){
        workQueue.add(task);
    }

    public void start(){

        for (Map.Entry<Long,Thread> entry : workers.entrySet()){

            entry.getValue().start();
        }

    }

    public boolean isComlepte(){

        for(Map.Entry<Long,Thread> entry : workers.entrySet()){

            if(entry.getValue().getState() != Thread.State.TERMINATED){
                return false;
            }

        }

        return true;
    }

    public long getSumResult(){

        long value = 0;
        for(Map.Entry<Long,Object> entry : results.entrySet()){

            value = value + (Long)entry.getValue();

        }
        return value;
    }
}

Main

public class Main {

    public static void main(String[] args) {

        Master master = new Master(10);

        for(int i = 0 ; i < 10 ; i++){

            Task task = new Task(i,"task-" + i);

            master.submit(task);
        }

        long start = System.currentTimeMillis();
        master.start();

        while(true){

            if(master.isComlepte()){

                System.out.println("sum result is " + master.getSumResult() + " . cost time : " + (System.currentTimeMillis() - start));
                break;
            }
        }


    }

}

 

運行結果

wKioL1hDxjzwX2K2AACBfC_nvdY147.png

 

總結

在單線程的時候,處理一個Task需要500ms,那么處理10個Task需要5S,如果采用Master-Worker這種並行模型,可以大大縮短計算處理時間。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM