消息隊列庫——ZeroMQ
ZeroMQ(簡稱ZMQ)是一個基於消息隊列的多線程網絡庫,其對套接字類型、連接處理、幀、甚至路由的底層細節進行抽象,提供跨越多種傳輸協議的套接字。
ZMQ是網絡通信中新的一層,介於應用層和傳輸層之間(按照TCP/IP划分),其是一個可伸縮層,可並行運行,分散在分布式系統間。
ZMQ不是單獨的服務,而是一個嵌入式庫,它封裝了網絡通信、消息隊列、線程調度等功能,向上層提供簡潔的API,應用程序通過加載庫文件,調用API函數來實現高性能網絡通信。
主線程與I/O線程:
I/O線程,ZMQ根據用戶調用zmq_init函數時傳入的參數,創建對應數量的I/O線程。每個I/O線程都有與之綁定的Poller,Poller采用經典的Reactor模式實現。
Poller根據不同操作系統平台使用不同的網絡I/O模型(select、poll、epoll、devpoll、kequeue等),所有的I/O操作都是異步的,線程不會被阻塞。。
主線程與I/O線程通過Mail Box傳遞消息來進行通信。
Server,在主線程創建zmq_listener,通過Mail Box發消息的形式將其綁定到I/O線程,I/O線程把zmq_listener添加到Poller中用以偵聽讀事件。
Client,在主線程中創建zmq_connecter,通過Mail Box發消息的形式將其綁定到I/O線程,I/O線程把zmq_connecter添加到Poller中用以偵聽寫事件。
Client與Server第一次通信時,會創建zmq_init來發送identity,用以進行認證。認證結束后,雙方會為此次連接創建Session,以后雙方就通過Session進行通信。
每個Session都會關聯到相應的讀/寫管道, 主線程收發消息只是分別從管道中讀/寫數據。Session並不實際跟kernel交換I/O數據,而是通過plugin到Session中的Engine來與kernel交換I/O數據。
ZMQ將消息通信分成4種模型:
- 一對一結對模型(Exclusive-Pair),可以認為是一個TCP Connection,但是TCP Server只能接受一個連接。數據可以雙向流動,這點不同於后面的請求回應模型。
- 請求回應模型(Request-Reply),由Client發起請求,並由Server響應,跟一對一結對模型的區別在於可以有多個Client。
- 發布訂閱模型(Publish-Subscribe),Publish端單向分發數據,且不關心是否把全部信息發送給Subscribe端。如果Publish端開始發布信息時,Subscribe端尚未連接進來,則這些信息會被直接丟棄。Subscribe端只能接收,不能反饋,且在Subscribe端消費速度慢於Publish端的情況下,會在Subscribe端堆積數據。
- 管道模型(Push-Pull),從 PUSH 端單向的向 PULL 端單向的推送數據流。如果有多個PULL端同時連接到PUSH端,則PUSH端會在內部做一個負載均衡,采用平均分配的算法,將所有消息均衡發布到PULL端上。與發布訂閱模型相比,管道模型在沒有消費者的情況下,發布的消息不會被消耗掉;在消費者能力不夠的情況下,能夠提供多消費者並行消費解決方案。該模型主要用於多任務並行。
這4種模型總結出了通用的網絡通信模型,在實際中可以根據應用需要,組合其中的2種或多種模型來形成自己的解決方案。
ZMQ提供進程內(inproc://)、進程間(ipc://)、機器間(tcp://)、廣播(pgm://)等四種通信協議。
ZMQ API
ZMQ提供的所有API均以zmq_開頭,
#include <zmq.h>
gcc [flags] files -lzmq [libraries]
例如,返回當前ZMQ庫的版本信息
void zmq_version (int *major, int *minor, int *patch);
Context
在使用任何ZQM庫函數之前,必須首先創建ZMQ context(上下文),程序終止時,也需要銷毀context。
創建context
void *zmq_ctx_new ();
ZMQ context是線程安全的,可以在多線程環境使用,而不需要程序員對其加/解鎖。
在一個進程中,可以有多個ZMQ context並存。
設置context選項
int zmq_ctx_set (void *context, int option_name, int option_value); int zmq_ctx_get (void *context, int option_name);
銷毀context
int zmq_ctx_term (void *context);
Sockets
ZMQ Sockets 是代表異步消息隊列的一個抽象,注意,這里的ZMQ socket和POSIX套接字的socket不是一回事,ZMQ封裝了物理連接的底層細節,對用戶不透明。
傳統的POSIX套接字只能支持1對1的連接,而ZMQ socket支持多個Client的並發連接,甚至在沒有任何對端(peer)的情況下,ZMQ sockets上也能放入消息;
ZMQ sockets不是線程安全的,因此,不要在多個線程中並行操作同一個sockets。
創建ZMQ Sockets
void *zmq_socket (void *context, int type);
注意,ZMQ socket在bind之前還不能使用。
pattern |
type |
description |
一對一結對模型 |
ZMQ_PAIR |
|
請求回應模型 |
ZMQ_REQ |
client端使用 |
ZMQ_REP |
server端使用 |
|
ZMQ_DEALER |
將消息以輪詢的方式分發給所有對端(peers) |
|
ZMQ_ROUTER |
|
|
發布訂閱模型 |
ZMQ_PUB |
publisher端使用 |
ZMQ_XPUB |
|
|
ZMQ_SUB |
subscriber端使用 |
|
ZMQ_XSUB |
|
|
管道模型 |
ZMQ_PUSH |
push端使用 |
ZMQ_PULL |
pull端使用 |
|
原生模型 |
ZMQ_STREAM |
|
設置socket選項
int zmq_getsockopt (void *socket, int option_name, void *option_value, size_t *option_len); int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len);
關閉socket
int zmq_close (void *socket);
創建一個消息流
int zmq_bind (void *socket, const char *endpoint); int zmq_connect (void *socket, const char *endpoint);
bind函數是將socket綁定到本地的端點(endpoint),而connect函數連接到指定的peer端點。
endpoint支持的類型:
transports |
description |
uri example |
zmp_tcp |
TCP的單播通信 |
tcp://*:8080 |
zmp_ipc |
本地進程間通信 |
ipc:// |
zmp_inproc |
本地線程間通信 |
inproc:// |
zmp_pgm |
PGM廣播通信 |
pgm:// |
收發消息
int zmq_send (void *socket, void *buf, size_t len, int flags); int zmq_recv (void *socket, void *buf, size_t len, int flags); int zmq_send_const (void *socket, void *buf, size_t len, int flags);
zmq_recv()函數的len參數指定接收buf的最大長度,超出部分會被截斷,函數返回的值是接收到的字節數,返回-1表示出錯;
zmq_send()函數將指定buf的指定長度len的字節寫入隊列,函數返回值是發送的字節數,返回-1表示出錯;
zmq_send_const()函數表示發送的buf是一個常量內存區(constant-memory),這塊內存不需要復制、釋放。
socket事件監控
int zmq_socket_monitor (void *socket, char * *addr, int events);
zmq_socket_monitor()函數會生成一對sockets,publishers端通過inproc://協議發布 sockets狀態改變的events;
消息包含2幀,第1幀包含events id和關聯值,第2幀表示受影響的endpoint。
監控支持的events:
ZMQ_EVENT_CONNECTED: 建立連接
ZMQ_EVENT_CONNECT_DELAYED: 連接失敗
ZMQ_EVENT_CONNECT_RETRIED: 異步連接/重連
ZMQ_EVENT_LISTENING: bind到端點
ZMQ_EVENT_BIND_FAILED: bind失敗
ZMQ_EVENT_ACCEPTED: 接收請求
ZMQ_EVENT_ACCEPT_FAILED: 接收請求失敗
ZMQ_EVENT_CLOSED: 關閉連接
ZMQ_EVENT_CLOSE_FAILED: 關閉連接失敗
ZMQ_EVENT_DISCONNECTED: 會話(tcp/ipc)中斷
I/O多路復用
int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
對sockets集合的I/O多路復用,使用水平觸發。
與epoll類似,items參數指定一個結構體數組(結構體定義如下),nitems指定數組的元素個數,timeout參數是超時時間(單位:ms,0表示不等待立即返回,-1表示阻塞等待)。
typedef struct { void *socket; int fd; short events; short revents; } zmq_pollitem_t;
對於每個zmq_pollitem_t元素,ZMQ會同時檢查其socket(ZMQ套接字)和fd(原生套接字)上是否有指定的events發生,且ZMQ套接字優先。
events指定該sockets需要關注的事件,revents返回該sockets已發生的事件,它們的取值為:
- ZMQ_POLLIN,可讀;
- ZMQ_POLLOUT,可寫;
- ZMQ_POLLERR,出錯;
Messages
一個ZMQ消息就是一個用於在消息隊列(進程內部或跨進程)中進行傳輸的數據單元,ZMQ消息本身沒有數據結構,因此支持任意類型的數據,這完全依賴於程序員如何定義消息的數據結構。
一條ZMQ消息可以包含多個消息片(multi-part messages),每個消息片都是一個獨立zmq_msg_t結構。
ZMQ保證以原子方式傳遞消息,要么所有消息片都發送成功,要么都不成功。
初始化消息
typedef void (zmq_free_fn) (void *data, void *hint); int zmq_msg_init (zmq_msg_t *msg); int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint); int zmq_msg_init_size (zmq_msg_t *msg, size_t size);
zmq_msg_init()函數初始化一個消息對象zmq_msg_t ,不要直接訪問zmq_msg_t對象,可以通過zmq_msg_* 函數來訪問它。
zmq_msg_init()、zmq_msg_init_data()、zmq_msg_init_size() 三個函數是互斥的,每次使用其中一個即可。
設置消息屬性
int zmq_msg_get (zmq_msg_t *message, int property); int zmq_msg_set (zmq_msg_t *message, int property, int value);
釋放消息
int zmq_msg_close (zmq_msg_t *msg);
收發消息
int zmq_msg_send (zmq_msg_t *msg, void *socket, int flags); int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags);
其中,flags參數如下:
ZMQ_DONTWAIT,非阻塞模式,如果沒有可用的消息,將errno設置為EAGAIN;
ZMQ_SNDMORE,發送multi-part messages時,除了最后一個消息片外,其它每個消息片都必須使用 ZMQ_SNDMORE 標記位。
獲取消息內容
void *zmq_msg_data (zmq_msg_t *msg); int zmq_msg_more (zmq_msg_t *message); size_t zmq_msg_size (zmq_msg_t *msg);
zmq_msg_data()返回指向消息對象所帶內容的指針;
zmq_msg_size()返回消息的字節數;
zmq_msg_more()標識該消息片是否是整個消息的一部分,是否還有更多的消息片待接收;
控制消息
int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src);
zmq_msg_copy()函數實現的是淺拷貝;
zmq_msg_move()函數中,將dst指向src消息,然后src被置空。
eg,接收消息的代碼示例:
zmq_msg_t part; while (true) { // Create an empty ØMQ message to hold the message part int rc = zmq_msg_init (&part); assert (rc == 0); // Block until a message is available to be received from socket rc = zmq_msg_recv (socket, &part, 0); assert (rc != -1); if (zmq_msg_more (&part)) fprintf (stderr, "more\n"); else { fprintf (stderr, "end\n"); break; } zmq_msg_close (&part); }
代理
ZMQ提供代理功能,代理可以在前端socket和后端socket之間轉發消息。
int zmq_proxy (const void *frontend, const void *backend, const void *capture); int zmq_proxy_steerable (const void *frontend, const void *backend, const void *capture, const void *control);
共享隊列(shared queue),前端是ZMQ_ROUTER socket,后端是ZMQ_DEALER socket,proxy會把clients發來的請求,公平地分發給services;
轉發隊列(forwarded),前端是ZMQ_XSUB socket, 后端是ZMQ_XPUB socket, proxy會把從publishers收到的消息轉發給所有的subscribers;
流(streamer),前端是ZMQ_PULL socket, 后端是ZMQ_PUSH socket.
proxy使用的一個示例:
// Create frontend and backend sockets void *frontend = zmq_socket (context, ZMQ_ROUTER); assert (backend); void *backend = zmq_socket (context, ZMQ_DEALER); assert (frontend); // Bind both sockets to TCP ports assert (zmq_bind (frontend, "tcp://*:5555") == 0); assert (zmq_bind (backend, "tcp://*:5556") == 0); // Start the queue proxy, which runs until ETERM zmq_proxy frontend, backend, NULL);
錯誤處理
ZMQ庫使用POSIX處理函數錯誤,返回NULL指針或者負數時表示調用出錯。
int zmq_errno (void); const char *zmq_strerror (int errnum);
zmq_errno()函數返回當前線程的錯誤碼errno變量的值;
zmq_strerror()函數將錯誤映射成錯誤字符串。
加密傳輸
ZQM可以為IPC和TCP連接提供安全機制:
- 不加密,zmq_null
- 使用用戶名/密碼授權,zmq_plain
- 橢圓加密,zmq_curve
這些通過 zmq_setsockopt()函數設置socket選項的時候配置。
總結:
1、僅僅提供24個API接口,風格類似於BSD Socket。
2、處理了網絡異常,包括連接異常中斷、重連等。
3、改變TCP基於字節流收發數據的方式,處理了粘包、半包等問題,以msg為單位收發數據,結合Protocol Buffers,可以對應用層徹底屏蔽網絡通信層。
4、對大數據通過SENDMORE/RECVMORE提供分包收發機制。
5、通過線程間數據流動來保證同一時刻任何數據都只會被一個線程持有,以此實現多線程的“去鎖化”。
6、通過高水位HWM來控制流量,用交換SWAP來轉儲內存數據,彌補HWM丟失數據的缺陷。
7、服務器端和客戶端的啟動沒有先后順序。