ZeroMQ(java)之負載均衡


我們在實際的應用中最常遇到的場景如下:


A向B發送請求,B向A返回結果。。。。

但是這種場景就會很容易變成這個樣子:


很多A向B發送請求,所以B要不斷的處理這些請求,所以就會很容易想到對B進行擴展,由多個B來處理這些請求,那么這里就出現了另外一個問題:

B對請求處理的速度可能不同,那么B之間他們的負載也是不同的,那么應該如何對請求進行分發就成了一個比較重要的問題。。。也就變成了負載均衡的問題了。。。

其實最好的負載均衡解決方案也很簡單:

絕大多數的任務都是獨立的,這里中間層可以將A發送過來的請求先緩存起來,然后B的行為就是主動的找中間層獲取請求處理,然后返回,再獲取。。。。也就是中間層只是做一個請求的緩存。。。由B自己來掌控合適來處理請求,也就是當B已經處理完了任務之后,自己去主動獲取。。。而不是由中間層自己去主動分發。。。。

嗯,那么在ZeroMQ中應該如何實現這種模式呢,恩其實還挺簡單的,如下圖:



這里由兩個Router來作為中間層,具體的數據流程如下:

(1)中間層啟動,Worker連接Backend,向其發送Request請求(ready),這個時候中間層就能夠知道哪一個worker現在是空閑的,將其保存起來(放到worker隊列),可以處理請求

worker的執行流程就是send(發送ready)--->recv(獲取請求),

(2)Client端向Fronted發送請求,中間層將請求緩存到一個任務隊列

(3)中間層從任務隊里里面取出一個任務,將其發送給worker隊列中的一個worker,並將其從woker隊列中移除

(4)worker處理完以后,發送執行結果,也就是send,中間層收到woker的數據 之后,將其發送給相應的client,然后在講這個worker放到worker隊列中,表示當前這個worker可用。。。。


好了,前面就基本上介紹了整個結構用ZeroMQ應該是怎么實現的,那么接下來就直接來上代碼吧:

[java]  view plain copy 在CODE上查看代碼片 派生到我的代碼片
  1. package balance;  
  2.   
  3. import java.util.LinkedList;  
  4.   
  5. import org.zeromq.ZFrame;  
  6. import org.zeromq.ZMQ;  
  7. import org.zeromq.ZMsg;  
  8.   
  9. public class Balance {  
  10.       
  11.     public static class Client {  
  12.         public void start() {  
  13.             new Thread(new Runnable(){  
  14.   
  15.                 public void run() {  
  16.                     // TODO Auto-generated method stub  
  17.                     ZMQ.Context context = ZMQ.context(1);  
  18.                     ZMQ.Socket socket = context.socket(ZMQ.REQ);  
  19.                       
  20.                     socket.connect("ipc://front");  //連接router,想起發送請求  
  21.                       
  22.                     for (int i = 0; i < 1000; i++) {  
  23.                         socket.send("hello".getBytes(), 0);  //發送hello請求  
  24.                         String bb = new String(socket.recv());  //獲取返回的數據  
  25.                         System.out.println(bb);   
  26.                     }  
  27.                     socket.close();  
  28.                     context.term();  
  29.                 }  
  30.                   
  31.             }).start();  
  32.         }  
  33.     }  
  34.       
  35.     public static class Worker {  
  36.         public void start() {  
  37.             new Thread(new Runnable(){  
  38.   
  39.                 public void run() {  
  40.                     // TODO Auto-generated method stub  
  41.                     ZMQ.Context context = ZMQ.context(1);  
  42.                     ZMQ.Socket socket = context.socket(ZMQ.REQ);  
  43.                       
  44.                     socket.connect("ipc://back");  //連接,用於獲取要處理的請求,並發送回去處理結果  
  45.                       
  46.                     socket.send("ready".getBytes());  //發送ready,表示當前可用  
  47.                        
  48.                     while (!Thread.currentThread().isInterrupted()) {  
  49.                         ZMsg msg = ZMsg.recvMsg(socket);  //獲取需要處理的請求,其實這里msg最外面的標志frame是router對分配給client的標志frame  
  50.                         ZFrame request = msg.removeLast();   //最后一個frame其實保存的就是實際的請求數據,這里將其移除,待會用新的frame代替  
  51.                         ZFrame frame = new ZFrame("hello fjs".getBytes());    
  52.                         msg.addLast(frame);  //將剛剛創建的frame放到msg的最后,worker將會收到  
  53.                         msg.send(socket);  //將數據發送回去  
  54.                           
  55.                     }  
  56.                     socket.close();  
  57.                     context.term();  
  58.                 }  
  59.                   
  60.             }).start();  
  61.         }  
  62.     }  
  63.       
  64.     public static class Middle {  
  65.         private LinkedList<ZFrame> workers;  
  66.         private LinkedList<ZMsg> requests;  
  67.         private ZMQ.Context context;  
  68.         private ZMQ.Poller poller;  
  69.           
  70.         public Middle() {  
  71.             this.workers = new LinkedList<ZFrame>();  
  72.             this.requests = new LinkedList<ZMsg>();  
  73.             this.context = ZMQ.context(1);  
  74.             this.poller = new ZMQ.Poller(2);  
  75.         }  
  76.           
  77.         public void start() {  
  78.             ZMQ.Socket fronted = this.context.socket(ZMQ.ROUTER);  //創建一個router,用於接收client發送過來的請求,以及向client發送處理結果  
  79.             ZMQ.Socket backend = this.context.socket(ZMQ.ROUTER);  //創建一個router,用於向后面的worker發送數據,然后接收處理的結果  
  80.               
  81.             fronted.bind("ipc://front");  //監聽,等待client的連接  
  82.             backend.bind("ipc://back");  //監聽,等待worker連接  
  83.               
  84.             //創建pollItem  
  85.             ZMQ.PollItem fitem = new ZMQ.PollItem(fronted, ZMQ.Poller.POLLIN);    
  86.             ZMQ.PollItem bitem = new ZMQ.PollItem(backend, ZMQ.Poller.POLLIN);  
  87.               
  88.             this.poller.register(fitem);  //注冊pollItem  
  89.             this.poller.register(bitem);  
  90.               
  91.               
  92.             while (!Thread.currentThread().isInterrupted()) {  
  93.                 this.poller.poll();  
  94.                 if (fitem.isReadable()) {  //表示前面有請求發過來了  
  95.                     ZMsg msg = ZMsg.recvMsg(fitem.getSocket());  //獲取client發送過來的請求,這里router會在實際請求上面套一個連接的標志frame  
  96.                     this.requests.addLast(msg);   //將其掛到請求隊列  
  97.                 }  
  98.                 if (bitem.isReadable()) {  //這里表示worker發送數據過來了  
  99.                     ZMsg msg = ZMsg.recvMsg(bitem.getSocket());  //獲取msg,這里也會在實際發送的數據前面包裝一個連接的標志frame  
  100.                     //這里需要注意,這里返回的是最外面的那個frame,另外它還會將后面的接着的空的標志frame都去掉  
  101.                     ZFrame workerID = msg.unwrap();  //把外面那層包裝取下來,也就是router對連接的標志frame  
  102.                     this.workers.addLast(workerID);  //將當前的worker的標志frame放到worker隊列里面,表示這個worker可以用了  
  103.                     ZFrame readyOrAddress = msg.getFirst(); //這里獲取標志frame后面的數據,如果worker剛剛啟動,那么應該是發送過來的ready,  
  104.                       
  105.                       
  106.                     if (new String(readyOrAddress.getData()).equals("ready")) {  //表示是worker剛剛啟動,發過來的ready  
  107.                         msg.destroy();  
  108.                     } else {  
  109.                         msg.send(fronted);  //表示是worker處理完的返回結果,那么返回給客戶端  
  110.                     }  
  111.                 }  
  112.                   
  113.                 while (this.workers.size() > 0 && this.requests.size() > 0) {  
  114.                     ZMsg request = this.requests.removeFirst();  
  115.                     ZFrame worker = this.workers.removeFirst();  
  116.                       
  117.                     request.wrap(worker);  //在request前面包裝一層,把可以用的worker的標志frame包裝上,這樣router就會發給相應的worker的連接  
  118.                     request.send(backend);  //將這個包裝過的消息發送出去  
  119.                 }  
  120.                   
  121.             }  
  122.             fronted.close();  
  123.             backend.close();  
  124.             this.context.term();  
  125.         }  
  126.     }  
  127.       
  128.       
  129.     public static void main(String args[]) {  
  130.         Worker worker = new Worker();  
  131.         worker.start();  
  132.         Client client = new Client();  
  133.         client.start();  
  134.         Middle middle = new Middle();  
  135.         middle.start();  
  136.           
  137.     }  
  138. }  

其實根據前面已經提出來的實現原理來編寫代碼還是比較順利的,中途也沒有遇到什么問題。。。不過要理解這部分要比較了解ZeroMQ的數據格式才行


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM