近段日子在做一個比較復雜的項目,其中用到了開源軟件ZMQ和MessagePack。ZMQ對底層網絡通信進行了封裝,是一個消息處理隊列庫,使用起來非常方便。MessagePack是一個基於二進制的對象序列化類庫,具有跨語言的特性,同樣非常容易使用。在我做的項目中,消息類通過MessagePack進行壓包,然后寫入ZMQ的消息結構體,通過ZMQ傳遞,最后接收者利用MessagePack進行解包,從而分析命令。由於我英語水平實在不高,所以我並沒有通過閱讀它們的說明文檔來對它們進行了解,而僅僅是通過它們的示例代碼進行探索。雖然因此遇到了一些不解問題,但這種方式卻為我節省了很多時間。不過,對於英語好的人,還是應該通過閱讀說明文檔來去了解它們。
為了說明如何使用它們,在這里構造一個使用場景:有N個Client,一個Server,M個Agent,Client使用ZMQ的請求-響應模式和Server通信,Server收到Client的命令后,通過ZMQ的發布-訂閱模式與各個Agent進行通信。下面的代碼封裝並使用了ZMQ和MessagePack,為了簡便,我把類的定義和實現都寫在了頭文件。
1.對ZMQ的簡單封裝:
1 #include"Msgpack.h" 2 #include<zmq.h> 3 #include<string> 4 #include<cassert> 5 #include<iostream> 6 7 namespace Tool 8 { 9 //網絡工具類 10 class Network 11 { 12 public: 13 14 // 功能 :構造函數。 15 // 參數 :無。 16 // 返回 :無。 17 Network() : m_socket(NULL) { } 18 19 // 功能 :初始化socket。 20 // 參數 :zmqType表示ZMQ的模式,address表示socket綁定或連接地址。 21 // 返回 :true表示初始化成功,false表示失敗。 22 bool Init(int zmqType,const std::string& address) 23 { 24 try 25 { 26 m_socket = zmq_socket(Context,zmqType); 27 return SetSocket(zmqType,address); 28 } 29 catch(...) 30 { 31 std::cout << "Network初始化失敗。" << std::endl; 32 return false; 33 } 34 } 35 36 // 功能 :發送消息。 37 // 參數 :指向Msgpack的指針,isRelease如果為true表示發送消息后即刻釋放資源。 38 // 返回 :true表示發送成功,false表示發送失敗。 39 bool SendMessage(Msgpack *msgpack,bool isRelease = true) const 40 { 41 try 42 { 43 zmq_msg_t msg; 44 zmq_msg_init(&msg); 45 if(isRelease) 46 { 47 zmq_msg_init_data(&msg,msgpack->GetSbuf().data(),msgpack->GetSbuf().size(),Tool::Network::Release,msgpack); 48 } 49 else 50 { 51 zmq_msg_init_data(&msg,msgpack->GetSbuf().data(),msgpack->GetSbuf().size(),0,0); 52 } 53 zmq_msg_send(&msg,m_socket,0); 54 return true; 55 } 56 catch(...) 57 { 58 std::cout << "Network發送失敗。" << std::endl; 59 return false; 60 } 61 } 62 63 // 功能 :接收消息。 64 // 參數 :無。 65 // 返回 :指向消息的指針。 66 zmq_msg_t* ReceiveMessage() const 67 { 68 zmq_msg_t *reply = NULL; 69 try 70 { 71 reply = new zmq_msg_t(); 72 zmq_msg_init(reply); 73 zmq_msg_recv(reply,m_socket,0); 74 return reply; 75 } 76 catch(...) 77 { 78 if( reply != NULL ) 79 { 80 delete reply; 81 } 82 return NULL; 83 } 84 } 85 86 // 功能 :關閉消息。 87 // 參數 :指向消息的指針。 88 // 返回 :無。 89 void CloseMsg(zmq_msg_t* msg) 90 { 91 try 92 { 93 zmq_msg_close(msg); 94 msg = NULL; 95 } 96 catch(...) 97 { 98 msg = NULL; 99 } 100 } 101 102 // 功能 :析構函數。 103 // 參數 :無。 104 // 返回 :無。 105 ~Network() 106 { 107 if( m_socket != NULL ) 108 { 109 zmq_close(m_socket); 110 m_socket = NULL; 111 } 112 } 113 114 private: 115 116 //通信socket 117 void *m_socket; 118 119 //網絡環境 120 static void *Context; 121 122 private: 123 124 // 功能 :設置socket。 125 // 參數 :zmqType表示ZMQ的模式,address表示socket綁定或連接地址。 126 // 返回 :true表示設置成功,false表示設置失敗。 127 bool SetSocket(int zmqType,const std::string& address) 128 { 129 int result = -1; 130 switch(zmqType) 131 { 132 case ZMQ_REP: 133 case ZMQ_PUB: 134 result = zmq_bind(m_socket,address.c_str()); 135 break; 136 case ZMQ_REQ: 137 result = zmq_connect(m_socket,address.c_str()); 138 break; 139 case ZMQ_SUB: 140 result = zmq_connect(m_socket,address.c_str()); 141 assert(result == 0); 142 result = zmq_setsockopt(m_socket,ZMQ_SUBSCRIBE,"",0); 143 break; 144 default: 145 return false; 146 } 147 assert( result == 0 ); 148 return true; 149 } 150 151 // 功能 :發送完消息后,釋放消息資源。 152 // 參數 :function為函數地址,hint指向要釋放資源的對象。 153 // 返回 :無。 154 static void Release(void *function, void *hint) 155 { 156 Msgpack *msgpack = (Msgpack*)hint; 157 if( msgpack != NULL ) 158 { 159 delete msgpack; 160 msgpack = NULL; 161 } 162 } 163 }; 164 165 //整個程序共用一個context 166 void *Tool::Network::Context = zmq_ctx_new(); 167 };
說明:
(1)由zmq_ctx_new創建出來的Context,整個應用程序共用一個就可以了,具體的通信是由zmq_socket創建的socket來完成的。上述代碼中沒有去釋放Context指向的資源。
(2)在zmq_msg_init_data函數的參數中,需要傳入一個釋放資源的函數地址,在ZMQ發送完消息后就調用這個函數來釋放資源。如果沒有傳入這個參數,而且傳入的信息是臨時變量,那么接收方很有可能接收不到信息,甚至拋出異常。如果不傳入這個參數,那么就要記得由自己去釋放資源了。
2.對MessagePack的簡單封裝:
1 #include"BaseMessage.h" 2 #include"ClientMessage.h" 3 #include"ServerMessage.h" 4 #include<zmq.h> 5 #include<msgpack.hpp> 6 7 namespace Tool 8 { 9 using namespace Message; 10 11 //壓包/解包工具類 12 class Msgpack 13 { 14 public: 15 16 // 功能 :構造函數。 17 // 參數 :無。 18 // 返回 :無。 19 Msgpack(void) { } 20 21 // 功能 :析構函數。 22 // 參數 :無。 23 // 返回 :無。 24 ~Msgpack(void) { } 25 26 // 功能 :壓包數據。 27 // 參數 :要壓包的數據。 28 // 返回 :true表示壓包成功。 29 template<typename T> 30 bool Pack(const T& t) 31 { 32 try 33 { 34 Release(); 35 msgpack::pack(m_sbuf,t); 36 return true; 37 } 38 catch(...) 39 { 40 std::cout << "Msgpack壓包數據失敗。" << std::endl; 41 return false; 42 } 43 } 44 45 // 功能 :解包數據。 46 // 參數 :zmq消息體。 47 // 返回 :返回指向基類消息的指針。 48 BaseMessage* Unpack(zmq_msg_t& msg) 49 { 50 try 51 { 52 int size = zmq_msg_size(&msg); 53 if( size > 0 ) 54 { 55 Release(); 56 m_sbuf.write((char*)zmq_msg_data(&msg),size); 57 size_t offset = 0; 58 msgpack::zone z; 59 msgpack::object obj; 60 msgpack::unpack(m_sbuf.data(),m_sbuf.size(),&offset,&z,&obj); 61 return GetMessage(obj); 62 } 63 } 64 catch(...) 65 { 66 //吃掉異常 67 } 68 return NULL; 69 } 70 71 // 功能 :獲取壓包/解包工具。 72 // 參數 :無。 73 // 返回 :壓包/解包工具。 74 inline msgpack::sbuffer& GetSbuf() 75 { 76 return m_sbuf; 77 } 78 79 private: 80 81 //壓包/解包工具 82 msgpack::sbuffer m_sbuf; 83 84 private: 85 86 // 功能 :釋放上一次的數據資源。 87 // 參數 :無。 88 // 返回 :無。 89 void Release() 90 { 91 m_sbuf.clear(); 92 m_sbuf.release(); 93 } 94 95 // 功能 :獲取消息。 96 // 參數 :用於轉換的msgpack::object。 97 // 返回 :指向消息基類的指針。 98 BaseMessage* GetMessage(const msgpack::object& obj) 99 { 100 BaseMessage bmessage; 101 obj.convert(&bmessage); 102 switch(bmessage.Type) 103 { 104 case 1024: 105 return Convert<ClientMessage>(obj); 106 case 2048: 107 return Convert<ServerMessage>(obj); 108 default: 109 return NULL; 110 } 111 } 112 113 // 功能 :將壓包后的數據轉換為具體的類。 114 // 參數 :用於轉換的msgpack::object。 115 // 返回 :指向T的指針。 116 template<typename T> 117 T* Convert(const msgpack::object& obj) 118 { 119 T *t = new T(); 120 obj.convert(t); 121 return t; 122 } 123 }; 124 };
說明:
壓包時將zmq_msg_t消息體壓包到msgpack::sbuffer,然后就可以關閉這個消息體了。要將解包后的數據轉換成具體的某一個類,需要知道這個類是什么類,這里有三種方法:
(1)可以先發送一個消息告知接收者即將收到什么消息,然后接收者將消息解包后轉換成對應的類。這種方式需要額外的一次通信,不建議使用。
(2)所有的消息都繼承自一個基類,這個基類存儲有消息類型的字段。解包后,先將數據轉換為基類,然后根據類型再轉換為具體的派生類。這種方式需要多轉換一次,上面的代碼也正是采用這種方式。
(3)壓包時先壓包一個消息類,然后再壓包一個標識這個消息是什么類型的標識類,即壓包兩次。解包時,先解包標識類,得知消息類的具體類型,然后再解包消息類,即解包兩次,轉換兩次。與(2)相比,除了要做更多的壓包、解包工作外,這里還需要對解包的偏移量進行計算,否則容易出錯。
3.使用到的消息類:
namespace Message { //消息基類 class BaseMessage { public: MSGPACK_DEFINE(Type); //消息類型 int Type; //默認構造函數 BaseMessage() { Type = 0; } }; //來自客戶端的消息 class ClientMessage : public BaseMessage { public: MSGPACK_DEFINE(Type,Information); //信息 std::string Information; //默認構造函數 ClientMessage() { Type = 1024; } }; //來自服務端的消息 class ServerMessage : public BaseMessage { public: MSGPACK_DEFINE(Type,Information); //信息 std::vector<std::string> Information; //默認構造函數 ServerMessage() { Type = 2048; } }; };
說明:
(1)MSPACK_DEFINE標識了一個類的哪些成員可以進行壓包/解包。派生類中的MSGPACK_DEFINE還需要寫上基類的成員,否則無法使用對MessagePack封裝說明的第二個方法。
(2)C++版本的MessagePack壓/解包的數據成員,只能是一個類、結構或者聯合體,不能使用指針(包括boost庫的智能指針)、數組,枚舉值也不適用。因此,BaseMessage使用int值來標識派生類屬於哪個類型。C#版本的MessagePack可以對枚舉值進行壓包。
4.Client的示例代碼:
1 int _tmain(int argc, _TCHAR* argv[]) 2 { 3 Network network; 4 bool result = network.Init(ZMQ_REQ,"tcp://192.168.10.179:8888"); 5 if(result) 6 { 7 ClientMessage cmessage; 8 cmessage.Information = "I come form Client."; 9 10 Msgpack msgpack; 11 result = msgpack.Pack<ClientMessage>(cmessage); 12 if(result) 13 { 14 result = network.SendMessageW(&msgpack,false); 15 if(result) 16 { 17 zmq_msg_t *msg = network.ReceiveMessage(); 18 if( msg != NULL ) 19 { 20 BaseMessage *bmessage = msgpack.Unpack(*msg); 21 network.CloseMsg(msg); 22 if( bmessage != NULL && bmessage->Type == 2048 ) 23 { 24 ServerMessage *smessage = static_cast<ServerMessage*>(bmessage); 25 if( smessage != NULL && smessage->Information.size() > 0 ) 26 { 27 std::cout << smessage->Information[0] << std::endl; 28 } 29 delete smessage; 30 smessage = NULL; 31 bmessage = NULL; 32 } 33 } 34 } 35 } 36 } 37 38 system("pause"); 39 return 0; 40 }
5.Server的示例代碼:
1 int _tmain(int argc, _TCHAR* argv[]) 2 { 3 Network responder; 4 bool result = responder.Init(ZMQ_REP,"tcp://192.168.10.179:8888"); 5 if(result) 6 { 7 Network publisher; 8 result = publisher.Init(ZMQ_PUB,"tcp://192.168.10.179:9999"); 9 if(result) 10 { 11 Msgpack msgpack; 12 while(true) 13 { 14 zmq_msg_t *msg = responder.ReceiveMessage(); 15 BaseMessage *bmessage = msgpack.Unpack(*msg); 16 responder.CloseMsg(msg); 17 18 ServerMessage smessage; 19 smessage.Information.push_back("I come from Server."); 20 msgpack.Pack<ServerMessage>(smessage); 21 result = responder.SendMessageW(&msgpack,false); 22 23 if( result ) 24 { 25 if( bmessage != NULL && bmessage->Type == 1024 ) 26 { 27 ClientMessage *cmessage = static_cast<ClientMessage*>(bmessage); 28 if( cmessage != NULL ) 29 { 30 std::cout << cmessage->Information << std::endl; 31 for( int counter = 0 ; counter < 100 ; counter++ ) 32 { 33 publisher.SendMessageW(&msgpack,false); 34 } 35 } 36 delete cmessage; 37 cmessage = NULL; 38 bmessage = NULL; 39 } 40 } 41 } 42 } 43 } 44 45 return 0; 46 }
6.Agent的示例代碼:
int _tmain(int argc, _TCHAR* argv[]) { Network network; bool result = network.Init(ZMQ_SUB,"tcp://192.168.10.179:9999"); if(result) { zmq_msg_t *msg = network.ReceiveMessage(); if( msg != NULL ) { Msgpack msgpack; BaseMessage *bmessage = msgpack.Unpack(*msg); network.CloseMsg(msg); if( bmessage->Type == 2048 ) { ServerMessage *smessage = static_cast<ServerMessage*>(bmessage); if( smessage->Information.size() > 0 ) { std::cout << smessage->Information[0] << std::endl; } delete smessage; smessage = NULL; bmessage = NULL; } } } system("pause"); return 0; }
7.啟動這三個程序,Client將要發送的消息壓包后發給Server,Server接收到消息后反饋一個信息給Client,然后循環發布消息給Agent,Agent不需要回復Server。最后着重說明兩點:
(1)ZMQ創建的socket發送數據和接收數據要處在同一條線程。Server接收到Client的數據后,不能通過開一條線程來給Client反饋信息,必須要在接收數據的線程中反饋信息。
(2)ZMQ並不要求發送者和接收者有一定的啟動順序,但在Server中如果只發布一次消息,那么Agent很有可能收不到信息。不管是Agent先啟動,還是Server先啟動,Agent都有可能收不到信息。在Server的代碼中,通過循環發布一百次,來讓Agent收到信息。至於實際應用中,可以結合請求-響應模式來保證訂閱消息者都收到了發布者的消息。
參考資料:
ZMQ:http://zguide.zeromq.org/page:all
MessagePack:http://wiki.msgpack.org/pages/viewpage.action?pageId=1081387#QuickStartforC%2B%2B-ImplementationStatus