本篇簡單介紹ZMQ
的使用。廣泛應用於本公司的Android 主板
板間通訊、局域網通訊,學習一哈。
ZMQ簡介
ZMQ被稱為史上最快的消息隊列,它處於會話層之上,應用層之下,使用后台異步線程完成消息的接受和發送,完美的封裝了Socket API,大大簡化了編程人員的復雜度。
- ZMQ發送和接受的是具有固定長度的二進制對象,ZMQ的消息包最大254個字節,前6個字節是協議,然后是數據包。
如果超過255個字節(有一個字節表示包屬性),則ZMQ會自動分包傳輸;而對於TCP Socket,是面向字節流的連接。
-
傳統的TCP Socket的連接是1對1的,ZMQ的Socket可以很輕松的實現1對N,N對1和N對N的連接模式。
-
ZMQ使用異步后台線程處理接受和發送請求,這意味着發送完消息,不可以立即釋放資源,消息什么時候發送用戶是無法控制的,同時,ZMQ自動重連,
這意味着用戶可以以任意順序加入到網絡中,服務器也可以隨時加入或者退出網絡;
ZMQ幾種模式
- 請求-回復模式
這種模式主要用於從客戶端向一個或多個服務器發送請求,客戶端首先使用zmq_send 發送消息,再用zmq_recv來接收消息。
服務端先用zmq_recv接收消息如果收到了客戶端的消息,則使用zmq_send向客戶端發送消息。如此循環。形成請求-回復模式。
- 發布-訂閱模式(PUB-SUB)
這種模式主要用於一個服務器對應一個或多個客戶端,該模式相對來說是異步的。客戶端在一個循環體中執行zmq_recv來接收消息。
如果嘗試向SUB socket發送消息會導致錯誤。
TIPS: 你無法知道訂閱者從什么時候開始獲取消息。即使是啟動訂閱者,過一段時間啟動發布者。訂閱者總是會錯過發布者的第一條信息。
因為訂閱者連接到發布者需要一點時間(雖然可能很小)。
3)管道模式
這種模式會將數據發布(PUSH)到由管道排列的節點上面,數據總是沿着管道流動。每個管道至少連接了一個節點(Worker),節點不會主動從管道中獲取(PULL)數據,數據會負載均衡的分配在節點上。。
如果平均的批次時間為5S。
-
1個節點,總時長 5034 ms
-
2個節點,總時長 2421 ms
-
4個節點,總時長 1018 ms
請求-回復實例 (JAVA)
Client:
import org.zeromq.ZMQ;
public class hwclient {
public static void main(String[] args) {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to server
System.out.println("Connecting to hello world server…");
ZMQ.Socket requester = context.socket(ZMQ.REQ);
requester.connect("tcp://localhost:5555");
for (int requestNbr = 0; requestNbr != 10; requestNbr++) {
String request = "Hello";
System.out.println("Sending Hello " + requestNbr);
requester.send(request.getBytes(), 0);
byte[] reply = requester.recv(0);
System.out.println("Received " + new String(reply) + " " + requestNbr);
}
requester.close();
context.term();
}
}
Server:
import org.zeromq.ZMQ;
public class hwserver {
public static void main(String[] args) throws Exception {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to clients
ZMQ.Socket responder = context.socket(ZMQ.REP);
responder.bind("tcp://*:5555");
while (!Thread.currentThread().isInterrupted()) {
// Wait for next request from the client
byte[] request = responder.recv(0);
System.out.println("Received Hello");
// Do some 'work'
Thread.sleep(1000);
// Send reply back to client
String reply = "World";
responder.send(reply.getBytes(), 0);
}
responder.close();
context.term();
}
}
Client:
Server:
發布-訂閱模式實例 (JAVA)
Client:
import org.zeromq.ZMQ;
public class hwclient {
public static void client() {
new Thread(new Runnable() {
@Override
public void run() {
ZMQ.Context zContext = ZMQ.context(1);
ZMQ.Socket socket = zContext.socket(ZMQ.SUB);
socket.connect("tcp://" + "127.0.0.1" + ":5553");
socket.subscribe("Hello World!".getBytes());
while(true){
System.out.println("client:"+new String(socket.recv(0)));
}
//socket.close();
}
}).start();
}
public static void main(String args[]){
client();
}
}
Server:
import org.zeromq.ZMQ;
public class hwserver {
private static void server() {
new Thread(new Runnable() {
@Override
public void run() {
ZMQ.Context zContext = ZMQ.context(1);
ZMQ.Socket publisher = zContext.socket(ZMQ.PUB);
publisher.bind("tcp://*:5553");
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.currentThread().sleep(1000);
String reply = "Hello World!";
System.out.println("server:"+reply);
publisher.send(reply.getBytes(), 0);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/*publisher.close ();
zContext.term ();*/
}
}).start();
}
public static void main(String args[]){
server();
}
}
Client:
more in ZMQ
more in other blog 雲風
2017/11/22 update
org.zeromq.ZMQException: Operation cannot be accomplished in current state
ZMQ不可以線程之間共享Socket
2018/01/18 update
UnsatisfiedLinkError
網上下載的zmq.jar中,缺少了arm64-v8a
的libjzmq.so
文件,在使用arm64-v8a
架構手機的時候會出現此問題。
解決方案:
1.可以使用網上的源碼替代之。
缺少jnacl.jar自行下載。
2.也可以自己編譯 arm64-v8a
libjzmq.so
文件。