master-worker模式的幾種實現與應用


master-worker模式是一種並行模式,它的核心思想,系統有兩個進程或者線程協議工作,master負責接收和分配並整合任務(merge),worker進程負責處理子任務(divide),可見這也是一種歸並的思想,當客戶端進程啟動后,開啟master進程,流程如圖所示

1.ZooKeeper中的master-worker實現

每個worker的監控與調度可以交給第三方工具去實現,比如Zookeeper便可以充當這樣的角色,zookeeper是一個分布式文件系統,我們可以把worker掛載在它的/worker目錄(結點)下,注意這個worker目錄應該是一個臨時的目錄,且目錄下的worker結點應該是有序的,但當worker掛掉后,又該如何知道它被分配了哪些任務呢,所以還應該有個/assign目錄,記錄worker結點已被分配的任務,當然,任務被掛載在/job結點下面,所以通過Zookeeper監控和調度時,應當有四個結點,三個角色。監聽/job目錄,如果該目錄下的結點發生了變化,就將待分配的job分配給空閑的worker結點,而如何知道該worker是否空閑呢,就看其對應的assign節點是否為空,如果為空,便可繼續分配,

  private void onNodeChange(WatchedEvent event){
 
        // event的類型為數量更新
        if (Watcher.Event.EventType.NodeChildrenChanged != event.getType()) {
            return;
        }
 
        try {
        // 如果節點數量更新,那么遍歷子節點,還沒被處理的job
        List<String> jobPathList = zooKeeper.getChildren("/job", false);
        if (CollectionUtils.isEmpty(jobPathList)) {
            return;
        }
 
        JobMessage initJobMessage = null;
        String initJobPath = "";
        for (String jobPath : jobPathList) {
            String jobCurrentPath = "/job/" + jobPath;
            byte[] jobDataByteArray = zooKeeper.getData(jobCurrentPath, false, null);
            String jobData = new String(jobDataByteArray);
 
            if (StringUtils.isEmpty(jobData)) {
                continue;
            }
 
            // 將其轉換為 jobMessage ,其中的status為 0 ,也就是沒被分配的時候,才會分配
            JobMessage jobMessage = JSON.parseObject(jobData, JobMessage.class);
            if (jobMessage == null) {
                continue;
            }
 
            // 將path設置進去
            if (JobMessage.StatusEnum.INIT.getValue() == jobMessage.getStatus()) {
                initJobMessage = jobMessage;
                initJobPath = jobPath;
                break;
            }
        }
 
        if (initJobMessage == null) {
            return;
        }
 
        // 遍歷 /worker 節點,如果對應節點在 /assign 中沒有子節點,那么將其分配在/ assgin 中
        List<String> workNodeList = zooKeeper.getChildren("/worker", false);
        if (CollectionUtils.isEmpty(slaveNodeList)) {
            return;
        }
 
        boolean assignSuccess = false;
        for (String workerNodePath : workerNodeList) {
 
            String assignWorkerCurrentPath = "/assign/" + workerNodePath;
 
            // 查詢其有無子節點,如果沒有子節點,說明可以分配給任務
            List<String> assignWorkerChildNodeList = zooKeeper.getChildren(assignWorkerCurrentPath, false);
            if (CollectionUtils.isNotEmpty(assignWorkerChildNodeList)) {
                continue;
            }
 
            // 給assign的對應節點增加一個子節點
            String jobAssignPath = assignWorkerCurrentPath + "/" + initJobPath;
            zooKeeper.create(jobAssignPath, JSON.toJSONString(initJobMessage).getBytes(), OPEN_ACL_UNSAFE , CreateMode.PERSISTENT);
            assignSuccess = true;
            break;
        }
        LogUtils.printLog("分配  " + (assignSuccess ? "成功" : "失敗"));
 
    } catch (KeeperException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

代碼中的onNodeChange是個回調方法,當master結點監聽到node節點有變動時,回調該方法。

既然master結點負責任務的分配和調度,那worker結點也要監聽自己被分配的任務的變化,執行完畢任務后,還要刪除在/assign目錄下的自己的結點信息,基本原理和上述代碼差不多。

 

 

 

2.Netty中的master-worker實現

netty中的serverBootstrap的啟動代碼如下

bootstrap bootstrap = new ServerBootstrap(    
    new NioServerSocketChannelFactory(    
        Executors.newCachedThreadPool(),//master線程池    
        Executors.newCachedThreadPool(),//worker線程池    
    )    
);   

在netty中,一個master對應一個端口,當master接收了socket的連接請求后,channel通道就此建立,建立連接請求后的消息交給worker線程處理,實際上是master線程調用serverSocketChannel后,由factory從worker線程池(NioEventLoopGroup)中找出一個worker線程(NioEventLoop)進行后續處理,一個worker可以服務不同的socket(即IO多路復用,原理是NioEventLoop持有selector多路復用器和任務隊列queue,每個channel都會注冊在selector上,並持有selectKey,select表示其注冊在哪個選擇器上,所以worker線程可以同時接收到多個就緒事件),當然,也可以設置成阻塞模式,即一個worker只能服務一個socket,但是現在服務器都會采取keep-alive,所以最好設置成非阻塞的模式,不然對worrker資源會造成很大的消耗與浪費。

在worker線程中,接收到的消息實際上交給ChannelPipeline處理(這個pipeline實際上就是filter,而filter通常采用責任鏈模式,由許多handler組成),當所有的handler走完沒有異常,證明worker的工作完畢,會被worker線程池回收,其中還可以再優化,比如碰到耗時的handler(通常是業務handler),可以再開一個新線程處理,這個新線程也最好來自別的新線程池,從而當前worker可以盡早釋放,提高worker線程的周轉率。

 

3.Golang中的master-worker實現

通過channel來進行通信

下面是worker的實現代碼

package worker
import (
    "fmt"
)
//需要處理的任務,簡單定義一下
type Job struct {
    num int
}
func NewJob(num int) Job {
    return Job{num: num}
}
type Worker struct {
    id        int                //workerID
    WorkerPool chan chan Job      //worker池
    JobChannel chan Job            //worker從JobChannel中獲取Job進行處理
    Result    map[interface{}]int //worker將處理結果放入reuslt
    quit      chan bool          //停止worker信號
}
func NewWorker(workerPool chan chan Job, result map[interface{}]int, id int) Worker {
    return Worker{
        id:        id,
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        Result:    result,
        quit:      make(chan bool),
    }
}
func (w Worker) Start() {
    go func() {
        for {
            //將worker的JobChannel放入master的workerPool中
            w.WorkerPool <- w.JobChannel
            select {
                //從JobChannel中獲取Job進行處理,JobChannel是同步通道,會阻塞於此
                case job := <-w.JobChannel:
                    //處理這個job
                    //並將處理得到的結果存入master中的結果集
                    x := job.num * job.num
                    fmt.Println(w.id, ":", x)
                    w.Result[x] = w.id
                //停止信號
                case <-w.quit:
                    return
            }
       }
    }()
}
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

master

package master
import (
    "MasterWorkerPattern/worker"
)
type Master struct {
    WorkerPool chan chan worker.Job //worker池
    Result    map[interface{}]int  //存放worker處理后的結果集
    jobQueue  chan worker.Job      //待處理的任務chan
    workerList []worker.Worker      //存放worker列表,用於停止worker
}
var maxworker int
//maxWorkers:開啟線程數
//result :結果集
func NewMaster(maxWorkers int, result map[interface{}]int) *Master {
    pool := make(chan chan worker.Job, maxWorkers)
    maxworker = maxWorkers
    return &Master{WorkerPool: pool, Result: result, jobQueue: make(chan worker.Job,                                                  2*maxWorkers)}
}
func (m *Master) Run() {
    //啟動所有的Worker
    for i := 0; i < maxworker; i++ {
        work := worker.NewWorker(m.WorkerPool, m.Result, i)
        m.workerList = append(m.workerList, work)
        work.Start()
    }
    go m.dispatch()
}
func (m *Master) dispatch() {
    for {
        select {
        case job := <-m.jobQueue:
            go func(job worker.Job) {
                //從workerPool中取出一個worker的JobChannel
                jobChannel := <-m.WorkerPool
                //向這個JobChannel中發送job,worker中的接收配對操作會被喚醒
                jobChannel <- job
            }(job)
        }
    }
}
//添加任務到任務通道
func (m *Master) AddJob(num int) {
    job := worker.NewJob(num)
    //向任務通道發送任務
    m.jobQueue <- job
}
//停止所有任務
func (m *Master) Stop() {
    for _, v := range m.workerList {
    v.Stop()
    }
}

  test

package main
import (
    "MasterWorkerPattern/master"
    "fmt"
    "time"
)
func main() {
    result := map[interface{}]int{}
    mas := master.NewMaster(4, result)
    mas.Run()
    for i := 0; i < 10; i++ {
        mas.AddJob(i)
    }
    time.Sleep(time.Millisecond)
    //mas.Stop()
    fmt.Println("result=", result)
}

  運行結果是不確定的

 

 

 

4.Nginx中的master-worker實現

基本原理和Netty差不多,如圖所示

 

 Nginx采用的就是大名鼎鼎的Linux中的IO復用模型Epoll,這里簡單描述一下

首先,Epoll會在Linux內核中申請一顆B+樹作為文件系統,然后會調用epoll_create方法建立一個epoll對象,用於存放通過event_ctl()方法向epoll對象注冊的事件,這些注冊的事件掛載在紅黑樹下,這些注冊的事件都會與設備驅動(都可以抽象成一個socket)建立回調關系,當相應的事件發生時,便會把這些事件(通常是一個類似於key的標識,即socketFd)放入event_poll結構體(雙向鏈表,在linux內核中)。然后event_wait()返回給用戶時只要檢查內核中的雙向鏈表是不是為空就行,(與select相比,不需要輪詢了,因為事件就緒觸發回調函數后會自動放入鏈表)不為空直接返回,這就是一個reactor反應器模式的實現,即事件驅動,事實上,事件驅動非常適用於IO密集型的場所

 

for( ; ; )  //  無限循環
      {
          nfds = epoll_wait(epfd,events,20,500);  //  最長阻塞 500s
          for(i=0;i<nfds;++i)
          {
              if(events[i].data.fd==listenfd) //有新的連接
              {
                  connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept這個連接
                  ev.data.fd=connfd;
                 ev.events=EPOLLIN|EPOLLET;
                 epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //將新的fd添加到epoll的監聽隊列中
             }
             else if( events[i].events&EPOLLIN ) //接收到數據,讀socket
             {
                 n = read(sockfd, line, MAXLINE)) < 0    //
                 ev.data.ptr = md;     //md為自定義類型,添加數據
                 ev.events=EPOLLOUT|EPOLLET;
                 epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改標識符,等待下一個循環時發送數據,異步處理的精髓
             }
             else if(events[i].events&EPOLLOUT) //有數據待發送,寫socket
             {
                 struct myepoll_data* md = (myepoll_data*)events[i].data.ptr;    //取數據
                 sockfd = md->fd;
                 send( sockfd, md->ptr, strlen((char*)md->ptr), 0 );        //發送數據
                 ev.data.fd=sockfd;
                 ev.events=EPOLLIN|EPOLLET;
                 epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改標識符,等待下一個循環時接收數據
             }
             else
             {
                 //其他的處理
             }
         }
     }

 

總結:

可以看出,master-worker模型在大數據計算場景下和fork-join思想一致,而在一些開發場景下,則是很明顯的I/O多路復用的思想。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM