一. ZeroMQ概述
ZeroMQ是一種基於消息隊列的多線程網絡庫,其對套接字類型、連接處理、幀、甚至路由的底層細節進行抽象,提供跨越多種傳輸協議的套接字。ZeroMQ是網絡通信中新的一層,介於應用層和傳輸層之間(按照TCP/IP划分),其是一個可伸縮層,可並行運行,分散在分布式系統間。
ZeroMQlooks like an embeddable networking library but acts like a concurrency framework. It gives you sockets that carry atomic messages across various transports like in-process, inter-process, TCP, and multicast. You can connect sockets N-to-N with patterns like fan-out, pub-sub, task distribution, and request-reply. It's fast enough to be the fabric for clustered products. Its asynchronous I/O model gives you scalable multicore applications, built as asynchronous message-processing tasks. It has a score of language APIs and runs on most operating systems.
ZMQ對系統調用進行封裝,屏蔽了底層技術細節,實現了進程內、進程間、TCP和廣播通信,可采用多種消息模型實現N-M通信。
針對C語言應用有兩類庫可應用,基礎的libzmq和在基礎庫上封裝的czmq。CZMQ對ZMQ進一步封裝成class,並提供額外的功能封裝。如下主要介紹標准的ZMQ。
二. 消息模型
2.1 消息模型基礎
創建一個context時同步創建了一個I/O線程,其在后台處理I/O。線程數量的設定原則是每秒一GB數據的進出需要一個線程。對於大多數程序,一個線程足夠了。無論是發送消息還是接收消息,ZMQ都會先將消息放入隊列中,並保證進程不會因為內存溢出而崩潰,適時地將消息寫入磁盤。
2.2四種核心消息模型
- 請求回應模型,Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
- 發布訂閱模型,Pub-sub, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
- 流水線模型,Pipeline, which connects nodes in a fan-out/fan-in pattern that can have multiple steps and loops. This is a parallel task distribution and collection pattern.
- 一對一結對模型,Exclusive pair, which connects two sockets exclusively. This is a pattern for connecting two threads in a process, not to be confused with "normal" pairs of sockets.
在socket的connect-bind應用中,消息模型可以根據需要組合使用:
- PUB and SUB
- REQ and REP
- REQ and ROUTER (take care, REQ inserts an extra null frame)
- DEALER and REP (take care, REP assumes a null frame)
- DEALER and ROUTER
- DEALER and DEALER
- ROUTER and ROUTER
- PUSH and PULL
- PAIR and PAIR
除上述組合外,其他組合模式不支持。
2.3 socket類型
與通用socket不同
l 通用socket是同步接口,zmq的socket是異步消息隊列。
l 通用socket傳輸字節流stream或數據報datagrams,zmq的socket傳輸分散的messages。
l zmq的socket自動處理網絡檢測與重連。
l 通用socket支持1-1或1-N,zmq的socket只是N-N(不包括ZMQ_PAIR)。
zmq_socket()用於創建socket,綁定特定的socket類型,socket類型決定了socket通信的規則。不同消息模型支持不同的socket類型,常用socket類型如下,以消息類型分類如下:參考:http://api.zeromq.org/4-2:zmq-socket
請求回復模型
ZMQ_REQ:client request,僅允許一問(zmq_send)一答(zmq_recv),同步。This socket type allows only an alternating sequence of zmq_send(request) and subsequent zmq_recv(reply) calls. Each request sent is round-robined among all services, and each reply received is matched with the last issued request.若服務不可用,阻塞。不會刪除messages。
zmq對ZMQ_REQ消息封裝成如下格式,在消息數據前有空的分割幀:
ZMQ_REP:server reply,同步。This socket type allows only an alternating sequence of zmq_recv(request) and subsequent zmq_send(reply) calls. Each request received is fair-queued from among all clients, and each reply sent is routed to the client that issued the last request. 假如請求者不再存在,刪除回復。
zmq對ZMQ_REP消息封裝成如下格式:
ZMQ_DEALER:擴展了request/reply,異步,Each message sent is round-robined among all connected peers, and each message received is fair-queued from all connected peers.輪詢發送,公平隊列接收。HWM時阻塞,刪除消息。ZMQ_DEALER連接到ZMQ_REP時,發送的消息包含空消息部分(用作分割消息主體),后跟消息主體部分。對消息無封裝,都是原始幀。
ZMQ_ROUTER:擴展了request/reply,異步,接收消息時,會在消息主體前增加id部分,然后發送給應用。公平隊列接收;發送時根據消息id路由(刪除消息的第一部分即id然后發送)。ZMQ_REQ連接ZMQ_ROUTER時,ZMQ_ROUTER接收到的消息包含id、分割部分和主體,ZMQ_ROUTER發送給ZMQ_REQ的消息應包含分割部分(delimiter)。對消息無封裝,都是原始幀。
Think of REQ and DEALER sockets as "clients" and REP and ROUTER sockets as "servers". Mostly, you'll want to bind REP and ROUTER sockets, and connect REQ and DEALER sockets to them. It's not always going to be this simple, but it is a clean and memorable place to start.
發布訂閱模型
ZMQ_PUB:分發消息到所有訂閱者,不提供zmq_recv()函數(只發送)。在mute狀態下(超過閾值HWM),ZMQ_PUB將丟棄所有發向指定訂閱者的消息。絕不會阻塞。
ZMQ_SUB:訂閱消息。須通過zmq_setsockopt()的ZMQ_SUBSCRIBE指定訂閱選項。不提供zmq_send()函數(只接收)。
ZMQ_XPUB:和ZMQ_PUB等同,除了一點:可以接收訂閱信息。 Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. Messages without a sub/unsub prefix are also received, but have no effect on subscription status. ZMQ_XPUB主要(或只)應用在PUB與SUB間的proxy中。
ZMQ_XSUB:和ZMQ_SUB等同,除了一點:可以發送訂閱信息。Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. Messages without a sub/unsub prefix may also be sent, but have no effect on subscription status. ZMQ_XSUB主要(或只)應用在PUB與SUB間的proxy中。
流水線模型
The pipeline pattern is used for distributing data to nodes arranged in a pipeline. Data always flows down the pipeline, and each stage of the pipeline is connected to at least one node. When a pipeline stage is connected to multiple nodes data is round-robined among all connected nodes.
流水線模式包含多個階段,每個階段至少連接一個node。當一個階段連接多個node時,采用輪詢分發數據。
ZMQ_PUSH:下發消息,輪詢分發消息。不提供zmq_recv()(只發送)。處於mute狀態時或沒有node時,阻塞,刪除消息。
ZMQ_PULL:公平隊列接收上游消息。不提供zmq_send()(只接收)。
一對一結對模型
ZMQ_PAIR:用於進程內線程間一對一通信。沒有消息路由或過濾機制。
三. 安裝
可參考https://github.com/zeromq/czmq安裝部署zmq和czmq。
安裝依賴
sudo apt-get install -y \ git build-essential libtool \ pkg-config autotools-dev autoconf automake cmake \ uuid-dev libpcre3-dev libsodium-dev valgrind
安裝zmq
git clone git://github.com/zeromq/libzmq.git cd libzmq ./autogen.sh # do not specify "--with-libsodium" if you prefer to use internal tweetnacl security implementation (recommended for development) ./configure --with-libsodium make check sudo make install sudo ldconfig
安裝czmq
git clone git://github.com/zeromq/czmq.git cd czmq ./autogen.sh && ./configure && make check sudo make install sudo ldconfig
四. 編程應用
zmq將通信實體抽象為socket和message,zmq編程主要是應用socket和message API。
注意:sockets是空指針類型(void pointers),而messages是結構體。因此,C中調用socket直接使用變量即可,但調用message需要使用地址(引用)。ZMQ中多有套接字都由ZMQ管理,只有消息是由程序員管理的。
3.1 socket API
zmq socket API類似BSD sockets:
創建和關閉:zmq_socket(),zmq_close()
配置socket:zmq_setsockopt(),zmq_getsockopt()
綁定連接:zmq_bind(),zmq_connect()
收發消息:zmq_send(),zmq_recv(),zmq_msg_send(),zmq_msg_recv()
注:綁定連接的地址形如:transport://address,支持的transport有tcp、ipc、inproc和pgm或epgm,如下示例:
tcp://*:5555 udp://192.168.1.1:5555 ipc:///tmp/feeds/0 inproc://somename
3.2 message API
有兩類message API,簡單的為zmq_send()和zmq_recv(),只適用於簡單消息(消息被截斷到所提供的的buffer大小;復雜的消息API基於zmq_msg_t結構體,有如下API:
- Initialise a message:
zmq_msg_init(), zmq_msg_init_size(), zmq_msg_init_data().
- Sending and receiving a message: zmq_msg_send(), zmq_msg_recv().
- Release a message: zmq_msg_close().
- Access message content:
zmq_msg_data(), zmq_msg_size(), zmq_msg_more().
- Work with message properties: zmq_msg_get(), zmq_msg_set().
- Message manipulation: zmq_msg_copy(), zmq_msg_move().
需要注意的是,當你將一個消息對象傳遞給zmq_send()函數后,該對象的長度就會被清零,因此你無法發送同一個消息對象兩次,也無法獲得已發送消息的內容。
可以發送0字節長度的消息,作為一種信號。
3.3 CZMQ
基礎版本zmq有些需要改進的地方,以便代碼更易使用和閱讀:
l 自動處理套接字。每次都要手動關閉套接字是很麻煩的事,手動定義過期時間也不是太有必要,所以,如果能在關閉上下文時自動關閉套接字就太好了。
l 便捷的線程管理。基本上所有的ØMQ應用都會用到多線程,但POSIX的多線程接口不可移植,所以也可以封裝一下。
l 便捷的時鍾管理。想要獲取毫秒數、或是暫停運行幾毫秒都不太方便,我們的API應該提供這個接口。
l 一個能夠替代zmq_poll()的反應器。poll循環很簡單,但比較笨拙,會造成重復代碼:計算時間、處理套接字中的信息等。若有一個簡單的反應器來處理套接字的讀寫以及時間的控制,將會很方便。
l 恰當地處理Ctrl-C按鍵。我么已經看到如何處理中斷了,最好這一機制可以用到所有的程序里。
CZMQ實現了上述需求,(采用對象模型)提供了ZMQ的上層封裝,甚至是數據結構(hashes和lists)。
五. 應用示例
zmq提供了多個語言版本的應用示例,可通過如下命令獲取:
git clone --depth=1 https://github.com/imatix/zguide.git
參考:
1. http://zguide.zeromq.org/page:all zmq指導文檔
2. http://api.zeromq.org/4-3:_start v4.3.1版本API
3. https://github.com/zeromq/libzmq zmq github
4. https://github.com/zeromq/czmq czmq github
5. http://czmq.zeromq.org/ CZMQ
7. https://www.cnblogs.com/fengbohello/tag/zeromq/ 中文API文檔 博客
8. https://github.com/anjuke/zguide-cn 中文zguide文檔 基於2.1.0
9. https://github.com/pebbe/zmq4 zmq golang 綁定
轉載於:https://www.cnblogs.com/embedded-linux/p/11191239.html
https://blog.csdn.net/weixin_30376509/article/details/96738526