ZMQ


本篇簡單介紹ZMQ的使用。廣泛應用於本公司的Android 主板 板間通訊、局域網通訊,學習一哈。

ZMQ簡介

ZMQ被稱為史上最快的消息隊列,它處於會話層之上,應用層之下,使用后台異步線程完成消息的接受和發送,完美的封裝了Socket API,大大簡化了編程人員的復雜度。

  • ZMQ發送和接受的是具有固定長度的二進制對象,ZMQ的消息包最大254個字節,前6個字節是協議,然后是數據包。

如果超過255個字節(有一個字節表示包屬性),則ZMQ會自動分包傳輸;而對於TCP Socket,是面向字節流的連接。

  • 傳統的TCP Socket的連接是1對1的,ZMQ的Socket可以很輕松的實現1對N,N對1和N對N的連接模式。

  • ZMQ使用異步后台線程處理接受和發送請求,這意味着發送完消息,不可以立即釋放資源,消息什么時候發送用戶是無法控制的,同時,ZMQ自動重連,

這意味着用戶可以以任意順序加入到網絡中,服務器也可以隨時加入或者退出網絡;

ZMQ幾種模式

  1. 請求-回復模式

這種模式主要用於從客戶端向一個或多個服務器發送請求,客戶端首先使用zmq_send 發送消息,再用zmq_recv來接收消息。

服務端先用zmq_recv接收消息如果收到了客戶端的消息,則使用zmq_send向客戶端發送消息。如此循環。形成請求-回復模式。

outPut

  1. 發布-訂閱模式(PUB-SUB)

這種模式主要用於一個服務器對應一個或多個客戶端,該模式相對來說是異步的。客戶端在一個循環體中執行zmq_recv來接收消息。

如果嘗試向SUB socket發送消息會導致錯誤。

TIPS: 你無法知道訂閱者從什么時候開始獲取消息。即使是啟動訂閱者,過一段時間啟動發布者。訂閱者總是會錯過發布者的第一條信息。

因為訂閱者連接到發布者需要一點時間(雖然可能很小)。

outPut

3)管道模式

這種模式會將數據發布(PUSH)到由管道排列的節點上面,數據總是沿着管道流動。每個管道至少連接了一個節點(Worker),節點不會主動從管道中獲取(PULL)數據,數據會負載均衡的分配在節點上。。

如果平均的批次時間為5S。

  • 1個節點,總時長 5034 ms

  • 2個節點,總時長 2421 ms

  • 4個節點,總時長 1018 ms

outPut

請求-回復實例 (JAVA)

Client:


import org.zeromq.ZMQ;

public class hwclient {

    public static void main(String[] args) {
        ZMQ.Context context = ZMQ.context(1);

        //  Socket to talk to server
        System.out.println("Connecting to hello world server…");

        ZMQ.Socket requester = context.socket(ZMQ.REQ);
        requester.connect("tcp://localhost:5555");

        for (int requestNbr = 0; requestNbr != 10; requestNbr++) {
            String request = "Hello";
            System.out.println("Sending Hello " + requestNbr);
            requester.send(request.getBytes(), 0);

            byte[] reply = requester.recv(0);
            System.out.println("Received " + new String(reply) + " " + requestNbr);
        }
        requester.close();
        context.term();
    }
}

Server:


import org.zeromq.ZMQ;

public class hwserver {

    public static void main(String[] args) throws Exception {
        ZMQ.Context context = ZMQ.context(1);

        //  Socket to talk to clients
        ZMQ.Socket responder = context.socket(ZMQ.REP);
        responder.bind("tcp://*:5555");

        while (!Thread.currentThread().isInterrupted()) {
            // Wait for next request from the client
            byte[] request = responder.recv(0);
            System.out.println("Received Hello");

            // Do some 'work'
            Thread.sleep(1000);

            // Send reply back to client
            String reply = "World";
            responder.send(reply.getBytes(), 0);
        }
        responder.close();
        context.term();
    }
}

Client:

outPut

Server:

outPut

發布-訂閱模式實例 (JAVA)

Client:


import org.zeromq.ZMQ;

public class hwclient {

    public static void client() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                ZMQ.Context zContext = ZMQ.context(1);
                ZMQ.Socket socket = zContext.socket(ZMQ.SUB);
                socket.connect("tcp://" + "127.0.0.1" + ":5553");
                socket.subscribe("Hello World!".getBytes());
                while(true){
                    System.out.println("client:"+new String(socket.recv(0)));
                }
					
                //socket.close();
            }
        }).start();
    }


    public static void  main(String args[]){
        client();
    }
}

Server:


import org.zeromq.ZMQ;

public class hwserver {

     private static void server() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                ZMQ.Context zContext = ZMQ.context(1);
                ZMQ.Socket publisher = zContext.socket(ZMQ.PUB);
                publisher.bind("tcp://*:5553");
					while (!Thread.currentThread().isInterrupted()) {
                        try {

                            Thread.currentThread().sleep(1000);
                            String reply = "Hello World!";
                            System.out.println("server:"+reply);
                            publisher.send(reply.getBytes(), 0);

                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

					}
                /*publisher.close ();
                zContext.term ();*/

            }
        }).start();

    }

    public static void  main(String args[]){
        server();
    }
}

Client:

outPut

more in ZMQ

more in other blog 雲風

2017/11/22 update

org.zeromq.ZMQException: Operation cannot be accomplished in current state

ZMQ不可以線程之間共享Socket

2018/01/18 update

UnsatisfiedLinkError

網上下載的zmq.jar中,缺少了arm64-v8alibjzmq.so文件,在使用arm64-v8a架構手機的時候會出現此問題。

解決方案:

1.可以使用網上的源碼替代之。

zmq通訊包 github

缺少jnacl.jar自行下載。

2.也可以自己編譯 arm64-v8a libjzmq.so文件。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM