ZeroMQ(java)之Router/Dealer模式


 本教程轉自:http://blog.csdn.net/kobejayandy/article/details/20163527

在開始之前先把guid里面提到的幾個ZeroMQ的特性列一下吧:

(1)ZeroMQ有自己的I/O線程來異步的處理I/O,而且后台采用了無鎖的數據結構

(2)在ZeroMQ中,所有的組件都可以動態的加入和移除,而且可以啟動組件以任何的順利,例如我們可以先啟動request,再啟動response,依然可以工作,而且還會自動的重連接。

(3)如果有需要的話,會自動的將message進行排隊,當然這都是有一套的模式的,一般情況下會盡量早的將數據發送到receiver。

(4)當緩沖的message隊列滿了以后,ZeroMQ有自己的行為,有的組件會阻塞,有的則會將message拋棄。

(5)底層的通信可以采用各種各樣的都行,例如TCP,IPC啥的。

(6)它會自動的處理那些比較慢而且阻塞的reader

(7)支持message的路由

(8)ZeroMQ確保全部的數據被receiver接收到,例如發送10K,那么也接受到10K

(9)它發送的數據格式是二進制,所以對發送的內容無要求

(10)ZeroMQ會自動處理網絡錯誤,而且會自動嘗試恢復

(11)節能。。。(我擦,居然還有這個)


好了,先來看一下poller這個東西吧,蠻有意思的,類似與epoll或者java里面的selector,

在前面的例子中我們都只是創建一個socket,那如果我們要創建兩個socket在同一個線程中該怎么處理呢,那么這個時候就可以用到poller這東西了。。。可以將已經建立好的socket注冊到poller上面去,並注冊相應的事件。。

這里就用push/pull來舉例子吧,就直接來看pull端的代碼吧:

[java]  view plain copy 在CODE上查看代碼片 派生到我的代碼片
  1. package poller;  
  2.   
  3. import org.zeromq.ZMQ;  
  4.   
  5. public class Pull {  
  6.     public static void main(String args[]) {  
  7.         ZMQ.Context context = ZMQ.context(1);  
  8.           
  9.         ZMQ.Socket pull1 = context.socket(ZMQ.PULL);  //創建一個pull  
  10.         pull1.connect("tcp://127.0.0.1:5555");    //建立於push的連接  
  11.         ZMQ.Socket pull2 = context.socket(ZMQ.PULL);  
  12.         pull2.connect("tcp://127.0.0.1:5555");  
  13.           
  14.         ZMQ.Poller poller = new ZMQ.Poller(2);  //創建一個大小為2的poller  
  15.         poller.register(pull1, ZMQ.Poller.POLLIN);  //分別將上述的pull注冊到poller上,注冊的事件是讀  
  16.         poller.register(pull2, ZMQ.Poller.POLLIN);  
  17.         int i = 0;  
  18.         while (!Thread.currentThread().isInterrupted()) {  
  19.             poller.poll();  
  20.             if (poller.pollin(0)) {  
  21.                 while (null != pull1.recv(ZMQ.NOBLOCK)) {  //這里采用了非阻塞,確保一次性將隊列中的數據讀取完  
  22.                     i++;  
  23.                 }  
  24.                   
  25.             }  
  26.             if (poller.pollin(1)) {  
  27.                 while (null != pull2.recv(ZMQ.NOBLOCK)) {  
  28.                     i++;  
  29.                 }  
  30.                   
  31.                   
  32.             }  
  33.             if (i % 10000000 == 0) {  
  34.                 System.out.println(i);  
  35.             }  
  36.         }  
  37.         pull1.close();  
  38.         pull2.close();  
  39.         context.term();  
  40.           
  41.     }  
  42. }  

這里還算簡單吧,同時創建了兩個pull,都將他們注冊到了poller上面去。。。其實這個樣子很像是selector或者epoll啥的。。。

好啦,接下來進入正題:

request/response算是一種非常常用的模式了,但是前面的例子中,我們的response端都只能在單線程中運行,因為必須要recv與send配對使用,那么就很大程度上限制了response的伸縮性,如果有大量的request來提交很多request請求的話,那么性能將會受到極大的限制,當然這種情況下我們可以采用如下的方式來解決:



這里讓request同時連接到多個response,這里就可以將request請求分散到多個response,這樣當有多個request的時候的性能要求。。。但是有一個問題,如果我們又10個request端,他們每一個都不斷的提交request請求,這個時候我們的reponse可能就會很忙,那么在這種結構下無法動態的添加response,依然限制了整個系統的伸縮性。。。

那么最終的解決方案就來了:



這里可以看到,在request端與response端之間加了一個中間層,可以將其看成一個路由器,它將request端的請求路由到response端,如果性能不夠的話,可以再建立新的response將其連接到中間層就可以了,就方便的解決系統的伸縮性問題了。。。

好了,這里直接就上中間層與response端的代碼吧:

[java]  view plain copy 在CODE上查看代碼片 派生到我的代碼片
  1. package multireqrep;  
  2.   
  3. import org.zeromq.ZMQ;  
  4.   
  5. public class Response {  
  6.     public static void main(String args[]) {  
  7.         final ZMQ.Context context = ZMQ.context(1);  
  8.         ZMQ.Socket router = context.socket(ZMQ.ROUTER);  
  9.         ZMQ.Socket dealer = context.socket(ZMQ.DEALER);  
  10.           
  11.         router.bind("ipc://fjs1");  
  12.         dealer.bind("ipc://fjs2");  
  13.           
  14.         for (int i = 0; i < 20; i++) {  
  15.             new Thread(new Runnable(){  
  16.   
  17.                 public void run() {  
  18.                     // TODO Auto-generated method stub  
  19.       
  20.                     ZMQ.Socket response = context.socket(ZMQ.REP);  
  21.                     response.connect("ipc://fjs2");  
  22.                     while (!Thread.currentThread().isInterrupted()) {  
  23.                         response.recv();  
  24.                         response.send("hello".getBytes());  
  25.                         try {  
  26.                             Thread.currentThread().sleep(1);  
  27.                         } catch (InterruptedException e) {  
  28.                             // TODO Auto-generated catch block  
  29.                             e.printStackTrace();  
  30.                         }  
  31.                     }  
  32.                     response.close();  
  33.                 }  
  34.                   
  35.             }).start();  
  36.         }  
  37.         ZMQ.proxy(router, dealer, null);  
  38.         router.close();  
  39.         dealer.close();  
  40.         context.term();  
  41.     }  
  42. }  

好吧,代碼還算蠻簡單的,直接用了ZeroMQ定義的router和dealer組件,以及內置的proxy方法就好了。。。


嗯,再來贊嘆一次,ZeroMQ確實好用。。。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM