Java Worker 設計模式


Worker模式

想解決的問題

異步執行一些任務,有返回或無返回結果

使用動機

有些時候想執行一些異步任務,如異步網絡通信、daemon任務,但又不想去管理這任務的生命周。這個時候可以使用Worker模式,它會幫您管理與執行任務,並能非常方便地獲取結果

結構

很多人可能為覺得這與executor很像,但executor是多線程的,它的作用更像是一個規划中心。而Worker則只是個搬運工,它自己本身只有一個線程的。每個worker有自己的任務處理邏輯,為了實現這個目的,有兩種方式

1. 建立一個抽象的AbstractWorker,不同邏輯的worker對其進行不同的實現;

2. 對worker新增一個TaskProcessor不同的任務傳入不同的processor即可。

第二種方式worker的角色可以很方便地改變,而且可以隨時更換processor,可以理解成可”刷機”的worker

 


^ ^。這里我們使用第二種方式來介紹此模式的整體結構。

針對上圖,詳細介紹一下幾個角色:

  • ConfigurableWorker:顧名思義這個就是真正干活的worker了。要實現自我生命周期管理,需要實現Runable,這樣其才能以單獨的線程運行,需要注意的是work最好以daemon線程的方式運行。worker里面還包括幾個其它成員:taskQueue,一個阻塞性質的queue,一般BlockingArrayList就可以了,這樣任務是FIFO(先進先出)的,如果要考慮任務的優先級,則可以考慮使用PriorityBlockingQueue;listeners,根據事件進行划分的事件監聽者,以便於當一個任務完成的時候進行處理,需要注意的是,為了較高效地進行listener遍歷,這里我推薦使用CopyOnWriteArrayList,免得每次都復制。其對應的方法有addlistener、addTask等配套方法,這個都不多說了,更詳細的可以看后面的示例代碼。
  • WorkerTask:實際上這是一個抽象的工內容,其包括基本的id與,task的ID是Worker生成的,相當於遞wtte后的一個執回,當數據執行完了的時候需要使用這個id來取結果。而后面真正實現的實體task則包含任務處理時需要的數據。
  • Processor:為了實現可”刷機”的worker,我們將處理邏輯與worker分開來,processor的本職工作很簡單,只需要加工傳入的task數據即可,加工完成后觸發fireEvent(WorkerEvent.TASK_COMPLETE)事件,之后通過Future的get即可得到最終的數據。

另外再說一點,對於addTask,可以有一個overload的方法,即在輸入task的同時,傳入一個RejectPolice,這樣可以在size過大的時候做出拒絕操作,有效避免被撐死。

適用性/問題

這種設計能自動處理任務,並能根據任務的優先級自動調節任務的執行順序,一個完全獨立的thread,你完全可以將其理解成一專門負責干某種活的”機器人”。它可以用於處理一些定時、請求量固定均勻且對實時性要求不是太高的任務,如日志記錄,數據分析等。當然,如果想提高任務處理的數據,可以生成多個worker,就相當於雇佣更多的人來為你干活,非常直觀的。當然這樣一來,誰來維護這worker便成了一個問題,另外就目前這種設計下worker之間是沒有通信與協同的,這些都是改進點。

那么對於多個worker,有什么組織方式呢?這里我介紹三種,算是拋磚引玉:

流水線式worker(assembly-line worker)

就像生產車間上的流水線工人一樣,將任務切分成幾個小塊,每個worker負責自己的一部分,以提高整體的生產、產出效率,如下圖:

 

假設完成任務 t 需要的時間為:W(t)=n,那么將任務分解成m份,流水線式的執行,每小份需要的時間便為 W(t/m)=n/m,那么執行1000條任務的時間,單個為1000n,流水線長度為L,則用這種方式所用的時間為(1000-1)*(m-L+1)*n/m+n
其中L<m,由此可見,流水線的worker越多、任務越細分,工作的效率將越高。這種主方式的問題在於,如果一個worker出現問題,那么整個流水線就將停止工作。而且任務的優先級不能動態調用,必須事先告知。

多級反饋隊列(Multilevel Feedback Queue)

這是一個有Q1、Q2…Qn個多重流水線方式,從高到低分別代碼不同的優先級,高優先級的worker要多於低優先級的,一般是2的倍數,即Q4有16個worker、Q3有8個,后面類推。任務根據預先估計好的優先級進入,如果任務在某步的執行過長,直接踢到下一級,讓出最快的資源。如下圖所示:

顯然這種方式的好處就在於可以動態地調整任務的優級,及時做出反應。當然,為了實現更好的高度,我們可以在低級里增加一個閥值,使得放偶然放入低級的task可以有復活的機會^
^。

MapReduce式

流水線雖然有一定的並行性,但總體來說仍然是串行的,因為只要有一個節點出了問題,那都是致命的錯誤。MapReduce是Google率先實現的一個分布式算法,有非常好的並行執行效率。

 

如上圖所示,只要我們將Map與Reduce都改成Worker就行了,如MapWorker與ReduceWorker。這樣,可以看見,Map的過程是完全並行的,當然這樣就需要在Map與Reduce上的分配與數據組合上稍稍下一點功夫了。

樣例實現

這里我們實現一個PageURLMiningWorker,對給定的URL,打開頁面后,采取所有的URL,並反回結果進行匯總輸出。由於時間有限,這里我只實現了單worker與MapReduce worker集兩種方式,有興趣的同學可以實現其它類型,如多級反饋隊列。注意!我這里只是向大家展示這種設計模式,URL
抓取的效率不在本次考慮之列。

所有的代碼可以在這里獲取:https://github.com/sefler1987/javaworker

結果對比

Y軸為抓取X軸URL個數所用的時間

 

總結

我們可以看到,worker模式組合是非常靈活的,它真的就像一個活生生的工人,任你調配。使用worker,我們可以更方便地實現更復雜的結構。

教程:http://how2j.cn?p=35025


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM