我們在實際的應用中最常遇到的場景如下:
A向B發送請求,B向A返回結果。。。。
但是這種場景就會很容易變成這個樣子:
很多A向B發送請求,所以B要不斷的處理這些請求,所以就會很容易想到對B進行擴展,由多個B來處理這些請求,那么這里就出現了另外一個問題:
B對請求處理的速度可能不同,那么B之間他們的負載也是不同的,那么應該如何對請求進行分發就成了一個比較重要的問題。。。也就變成了負載均衡的問題了。。。
其實最好的負載均衡解決方案也很簡單:
絕大多數的任務都是獨立的,這里中間層可以將A發送過來的請求先緩存起來,然后B的行為就是主動的找中間層獲取請求處理,然后返回,再獲取。。。。也就是中間層只是做一個請求的緩存。。。由B自己來掌控合適來處理請求,也就是當B已經處理完了任務之后,自己去主動獲取。。。而不是由中間層自己去主動分發。。。。
嗯,那么在ZeroMQ中應該如何實現這種模式呢,恩其實還挺簡單的,如下圖:
這里由兩個Router來作為中間層,具體的數據流程如下:
(1)中間層啟動,Worker連接Backend,向其發送Request請求(ready),這個時候中間層就能夠知道哪一個worker現在是空閑的,將其保存起來(放到worker隊列),可以處理請求
worker的執行流程就是send(發送ready)--->recv(獲取請求),
(2)Client端向Fronted發送請求,中間層將請求緩存到一個任務隊列
(3)中間層從任務隊里里面取出一個任務,將其發送給worker隊列中的一個worker,並將其從woker隊列中移除
(4)worker處理完以后,發送執行結果,也就是send,中間層收到woker的數據 之后,將其發送給相應的client,然后在講這個worker放到worker隊列中,表示當前這個worker可用。。。。
好了,前面就基本上介紹了整個結構用ZeroMQ應該是怎么實現的,那么接下來就直接來上代碼吧:
- package balance;
- import java.util.LinkedList;
- import org.zeromq.ZFrame;
- import org.zeromq.ZMQ;
- import org.zeromq.ZMsg;
- public class Balance {
- public static class Client {
- public void start() {
- new Thread(new Runnable(){
- public void run() {
- // TODO Auto-generated method stub
- ZMQ.Context context = ZMQ.context(1);
- ZMQ.Socket socket = context.socket(ZMQ.REQ);
- socket.connect("ipc://front"); //連接router,想起發送請求
- for (int i = 0; i < 1000; i++) {
- socket.send("hello".getBytes(), 0); //發送hello請求
- String bb = new String(socket.recv()); //獲取返回的數據
- System.out.println(bb);
- }
- socket.close();
- context.term();
- }
- }).start();
- }
- }
- public static class Worker {
- public void start() {
- new Thread(new Runnable(){
- public void run() {
- // TODO Auto-generated method stub
- ZMQ.Context context = ZMQ.context(1);
- ZMQ.Socket socket = context.socket(ZMQ.REQ);
- socket.connect("ipc://back"); //連接,用於獲取要處理的請求,並發送回去處理結果
- socket.send("ready".getBytes()); //發送ready,表示當前可用
- while (!Thread.currentThread().isInterrupted()) {
- ZMsg msg = ZMsg.recvMsg(socket); //獲取需要處理的請求,其實這里msg最外面的標志frame是router對分配給client的標志frame
- ZFrame request = msg.removeLast(); //最后一個frame其實保存的就是實際的請求數據,這里將其移除,待會用新的frame代替
- ZFrame frame = new ZFrame("hello fjs".getBytes());
- msg.addLast(frame); //將剛剛創建的frame放到msg的最后,worker將會收到
- msg.send(socket); //將數據發送回去
- }
- socket.close();
- context.term();
- }
- }).start();
- }
- }
- public static class Middle {
- private LinkedList<ZFrame> workers;
- private LinkedList<ZMsg> requests;
- private ZMQ.Context context;
- private ZMQ.Poller poller;
- public Middle() {
- this.workers = new LinkedList<ZFrame>();
- this.requests = new LinkedList<ZMsg>();
- this.context = ZMQ.context(1);
- this.poller = new ZMQ.Poller(2);
- }
- public void start() {
- ZMQ.Socket fronted = this.context.socket(ZMQ.ROUTER); //創建一個router,用於接收client發送過來的請求,以及向client發送處理結果
- ZMQ.Socket backend = this.context.socket(ZMQ.ROUTER); //創建一個router,用於向后面的worker發送數據,然后接收處理的結果
- fronted.bind("ipc://front"); //監聽,等待client的連接
- backend.bind("ipc://back"); //監聽,等待worker連接
- //創建pollItem
- ZMQ.PollItem fitem = new ZMQ.PollItem(fronted, ZMQ.Poller.POLLIN);
- ZMQ.PollItem bitem = new ZMQ.PollItem(backend, ZMQ.Poller.POLLIN);
- this.poller.register(fitem); //注冊pollItem
- this.poller.register(bitem);
- while (!Thread.currentThread().isInterrupted()) {
- this.poller.poll();
- if (fitem.isReadable()) { //表示前面有請求發過來了
- ZMsg msg = ZMsg.recvMsg(fitem.getSocket()); //獲取client發送過來的請求,這里router會在實際請求上面套一個連接的標志frame
- this.requests.addLast(msg); //將其掛到請求隊列
- }
- if (bitem.isReadable()) { //這里表示worker發送數據過來了
- ZMsg msg = ZMsg.recvMsg(bitem.getSocket()); //獲取msg,這里也會在實際發送的數據前面包裝一個連接的標志frame
- //這里需要注意,這里返回的是最外面的那個frame,另外它還會將后面的接着的空的標志frame都去掉
- ZFrame workerID = msg.unwrap(); //把外面那層包裝取下來,也就是router對連接的標志frame
- this.workers.addLast(workerID); //將當前的worker的標志frame放到worker隊列里面,表示這個worker可以用了
- ZFrame readyOrAddress = msg.getFirst(); //這里獲取標志frame后面的數據,如果worker剛剛啟動,那么應該是發送過來的ready,
- if (new String(readyOrAddress.getData()).equals("ready")) { //表示是worker剛剛啟動,發過來的ready
- msg.destroy();
- } else {
- msg.send(fronted); //表示是worker處理完的返回結果,那么返回給客戶端
- }
- }
- while (this.workers.size() > 0 && this.requests.size() > 0) {
- ZMsg request = this.requests.removeFirst();
- ZFrame worker = this.workers.removeFirst();
- request.wrap(worker); //在request前面包裝一層,把可以用的worker的標志frame包裝上,這樣router就會發給相應的worker的連接
- request.send(backend); //將這個包裝過的消息發送出去
- }
- }
- fronted.close();
- backend.close();
- this.context.term();
- }
- }
- public static void main(String args[]) {
- Worker worker = new Worker();
- worker.start();
- Client client = new Client();
- client.start();
- Middle middle = new Middle();
- middle.start();
- }
- }
其實根據前面已經提出來的實現原理來編寫代碼還是比較順利的,中途也沒有遇到什么問題。。。不過要理解這部分要比較了解ZeroMQ的數據格式才行