master-worker模式是一種並行模式,它的核心思想,系統有兩個進程或者線程協議工作,master負責接收和分配並整合任務(merge),worker進程負責處理子任務(divide),可見這也是一種歸並的思想,當客戶端進程啟動后,開啟master進程,流程如圖所示
1.ZooKeeper中的master-worker實現
每個worker的監控與調度可以交給第三方工具去實現,比如Zookeeper便可以充當這樣的角色,zookeeper是一個分布式文件系統,我們可以把worker掛載在它的/worker目錄(結點)下,注意這個worker目錄應該是一個臨時的目錄,且目錄下的worker結點應該是有序的,但當worker掛掉后,又該如何知道它被分配了哪些任務呢,所以還應該有個/assign目錄,記錄worker結點已被分配的任務,當然,任務被掛載在/job結點下面,所以通過Zookeeper監控和調度時,應當有四個結點,三個角色。監聽/job目錄,如果該目錄下的結點發生了變化,就將待分配的job分配給空閑的worker結點,而如何知道該worker是否空閑呢,就看其對應的assign節點是否為空,如果為空,便可繼續分配,
private void onNodeChange(WatchedEvent event){ // event的類型為數量更新 if (Watcher.Event.EventType.NodeChildrenChanged != event.getType()) { return; } try { // 如果節點數量更新,那么遍歷子節點,還沒被處理的job List<String> jobPathList = zooKeeper.getChildren("/job", false); if (CollectionUtils.isEmpty(jobPathList)) { return; } JobMessage initJobMessage = null; String initJobPath = ""; for (String jobPath : jobPathList) { String jobCurrentPath = "/job/" + jobPath; byte[] jobDataByteArray = zooKeeper.getData(jobCurrentPath, false, null); String jobData = new String(jobDataByteArray); if (StringUtils.isEmpty(jobData)) { continue; } // 將其轉換為 jobMessage ,其中的status為 0 ,也就是沒被分配的時候,才會分配 JobMessage jobMessage = JSON.parseObject(jobData, JobMessage.class); if (jobMessage == null) { continue; } // 將path設置進去 if (JobMessage.StatusEnum.INIT.getValue() == jobMessage.getStatus()) { initJobMessage = jobMessage; initJobPath = jobPath; break; } } if (initJobMessage == null) { return; } // 遍歷 /worker 節點,如果對應節點在 /assign 中沒有子節點,那么將其分配在/ assgin 中 List<String> workNodeList = zooKeeper.getChildren("/worker", false); if (CollectionUtils.isEmpty(slaveNodeList)) { return; } boolean assignSuccess = false; for (String workerNodePath : workerNodeList) { String assignWorkerCurrentPath = "/assign/" + workerNodePath; // 查詢其有無子節點,如果沒有子節點,說明可以分配給任務 List<String> assignWorkerChildNodeList = zooKeeper.getChildren(assignWorkerCurrentPath, false); if (CollectionUtils.isNotEmpty(assignWorkerChildNodeList)) { continue; } // 給assign的對應節點增加一個子節點 String jobAssignPath = assignWorkerCurrentPath + "/" + initJobPath; zooKeeper.create(jobAssignPath, JSON.toJSONString(initJobMessage).getBytes(), OPEN_ACL_UNSAFE , CreateMode.PERSISTENT); assignSuccess = true; break; } LogUtils.printLog("分配 " + (assignSuccess ? "成功" : "失敗")); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }
代碼中的onNodeChange是個回調方法,當master結點監聽到node節點有變動時,回調該方法。
既然master結點負責任務的分配和調度,那worker結點也要監聽自己被分配的任務的變化,執行完畢任務后,還要刪除在/assign目錄下的自己的結點信息,基本原理和上述代碼差不多。
2.Netty中的master-worker實現
netty中的serverBootstrap的啟動代碼如下
bootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(),//master線程池 Executors.newCachedThreadPool(),//worker線程池 ) );
在netty中,一個master對應一個端口,當master接收了socket的連接請求后,channel通道就此建立,建立連接請求后的消息交給worker線程處理,實際上是master線程調用serverSocketChannel后,由factory從worker線程池(NioEventLoopGroup)中找出一個worker線程(NioEventLoop)進行后續處理,一個worker可以服務不同的socket(即IO多路復用,原理是NioEventLoop持有selector多路復用器和任務隊列queue,每個channel都會注冊在selector上,並持有selectKey,select表示其注冊在哪個選擇器上,所以worker線程可以同時接收到多個就緒事件),當然,也可以設置成阻塞模式,即一個worker只能服務一個socket,但是現在服務器都會采取keep-alive,所以最好設置成非阻塞的模式,不然對worrker資源會造成很大的消耗與浪費。
在worker線程中,接收到的消息實際上交給ChannelPipeline處理(這個pipeline實際上就是filter,而filter通常采用責任鏈模式,由許多handler組成),當所有的handler走完沒有異常,證明worker的工作完畢,會被worker線程池回收,其中還可以再優化,比如碰到耗時的handler(通常是業務handler),可以再開一個新線程處理,這個新線程也最好來自別的新線程池,從而當前worker可以盡早釋放,提高worker線程的周轉率。
3.Golang中的master-worker實現
通過channel來進行通信
下面是worker的實現代碼
package worker import ( "fmt" ) //需要處理的任務,簡單定義一下 type Job struct { num int } func NewJob(num int) Job { return Job{num: num} } type Worker struct { id int //workerID WorkerPool chan chan Job //worker池 JobChannel chan Job //worker從JobChannel中獲取Job進行處理 Result map[interface{}]int //worker將處理結果放入reuslt quit chan bool //停止worker信號 } func NewWorker(workerPool chan chan Job, result map[interface{}]int, id int) Worker { return Worker{ id: id, WorkerPool: workerPool, JobChannel: make(chan Job), Result: result, quit: make(chan bool), } } func (w Worker) Start() { go func() { for { //將worker的JobChannel放入master的workerPool中 w.WorkerPool <- w.JobChannel select { //從JobChannel中獲取Job進行處理,JobChannel是同步通道,會阻塞於此 case job := <-w.JobChannel: //處理這個job //並將處理得到的結果存入master中的結果集 x := job.num * job.num fmt.Println(w.id, ":", x) w.Result[x] = w.id //停止信號 case <-w.quit: return } } }() } func (w Worker) Stop() { go func() { w.quit <- true }() }
master
package master import ( "MasterWorkerPattern/worker" ) type Master struct { WorkerPool chan chan worker.Job //worker池 Result map[interface{}]int //存放worker處理后的結果集 jobQueue chan worker.Job //待處理的任務chan workerList []worker.Worker //存放worker列表,用於停止worker } var maxworker int //maxWorkers:開啟線程數 //result :結果集 func NewMaster(maxWorkers int, result map[interface{}]int) *Master { pool := make(chan chan worker.Job, maxWorkers) maxworker = maxWorkers return &Master{WorkerPool: pool, Result: result, jobQueue: make(chan worker.Job, 2*maxWorkers)} } func (m *Master) Run() { //啟動所有的Worker for i := 0; i < maxworker; i++ { work := worker.NewWorker(m.WorkerPool, m.Result, i) m.workerList = append(m.workerList, work) work.Start() } go m.dispatch() } func (m *Master) dispatch() { for { select { case job := <-m.jobQueue: go func(job worker.Job) { //從workerPool中取出一個worker的JobChannel jobChannel := <-m.WorkerPool //向這個JobChannel中發送job,worker中的接收配對操作會被喚醒 jobChannel <- job }(job) } } } //添加任務到任務通道 func (m *Master) AddJob(num int) { job := worker.NewJob(num) //向任務通道發送任務 m.jobQueue <- job } //停止所有任務 func (m *Master) Stop() { for _, v := range m.workerList { v.Stop() } }
test
package main import ( "MasterWorkerPattern/master" "fmt" "time" ) func main() { result := map[interface{}]int{} mas := master.NewMaster(4, result) mas.Run() for i := 0; i < 10; i++ { mas.AddJob(i) } time.Sleep(time.Millisecond) //mas.Stop() fmt.Println("result=", result) }
運行結果是不確定的
4.Nginx中的master-worker實現
基本原理和Netty差不多,如圖所示
Nginx采用的就是大名鼎鼎的Linux中的IO復用模型Epoll,這里簡單描述一下
首先,Epoll會在Linux內核中申請一顆B+樹作為文件系統,然后會調用epoll_create方法建立一個epoll對象,用於存放通過event_ctl()方法向epoll對象注冊的事件,這些注冊的事件掛載在紅黑樹下,這些注冊的事件都會與設備驅動(都可以抽象成一個socket)建立回調關系,當相應的事件發生時,便會把這些事件(通常是一個類似於key的標識,即socketFd)放入event_poll結構體(雙向鏈表,在linux內核中)。然后event_wait()返回給用戶時只要檢查內核中的雙向鏈表是不是為空就行,(與select相比,不需要輪詢了,因為事件就緒觸發回調函數后會自動放入鏈表)不為空直接返回,這就是一個reactor反應器模式的實現,即事件驅動,事實上,事件驅動非常適用於IO密集型的場所
for( ; ; ) // 無限循環 { nfds = epoll_wait(epfd,events,20,500); // 最長阻塞 500s for(i=0;i<nfds;++i) { if(events[i].data.fd==listenfd) //有新的連接 { connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept這個連接 ev.data.fd=connfd; ev.events=EPOLLIN|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //將新的fd添加到epoll的監聽隊列中 } else if( events[i].events&EPOLLIN ) //接收到數據,讀socket { n = read(sockfd, line, MAXLINE)) < 0 //讀 ev.data.ptr = md; //md為自定義類型,添加數據 ev.events=EPOLLOUT|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改標識符,等待下一個循環時發送數據,異步處理的精髓 } else if(events[i].events&EPOLLOUT) //有數據待發送,寫socket { struct myepoll_data* md = (myepoll_data*)events[i].data.ptr; //取數據 sockfd = md->fd; send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //發送數據 ev.data.fd=sockfd; ev.events=EPOLLIN|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改標識符,等待下一個循環時接收數據 } else { //其他的處理 } } }
總結:
可以看出,master-worker模型在大數據計算場景下和fork-join思想一致,而在一些開發場景下,則是很明顯的I/O多路復用的思想。