ZMQ和MessagePack的簡單使用


  近段日子在做一個比較復雜的項目,其中用到了開源軟件ZMQ和MessagePack。ZMQ對底層網絡通信進行了封裝,是一個消息處理隊列庫,使用起來非常方便。MessagePack是一個基於二進制的對象序列化類庫,具有跨語言的特性,同樣非常容易使用。在我做的項目中,消息類通過MessagePack進行壓包,然后寫入ZMQ的消息結構體,通過ZMQ傳遞,最后接收者利用MessagePack進行解包,從而分析命令。由於我英語水平實在不高,所以我並沒有通過閱讀它們的說明文檔來對它們進行了解,而僅僅是通過它們的示例代碼進行探索。雖然因此遇到了一些不解問題,但這種方式卻為我節省了很多時間。不過,對於英語好的人,還是應該通過閱讀說明文檔來去了解它們。

  為了說明如何使用它們,在這里構造一個使用場景:有N個Client,一個Server,M個Agent,Client使用ZMQ的請求-響應模式和Server通信,Server收到Client的命令后,通過ZMQ的發布-訂閱模式與各個Agent進行通信。下面的代碼封裝並使用了ZMQ和MessagePack,為了簡便,我把類的定義和實現都寫在了頭文件。

  1.對ZMQ的簡單封裝:

  1 #include"Msgpack.h"
  2 #include<zmq.h>
  3 #include<string>
  4 #include<cassert>
  5 #include<iostream>
  6 
  7 namespace Tool
  8 {
  9     //網絡工具類
 10     class Network
 11     {
 12     public:
 13 
 14         // 功能 :構造函數。
 15         // 參數 :無。
 16         // 返回 :無。
 17         Network() : m_socket(NULL) { }
 18 
 19         // 功能 :初始化socket。
 20         // 參數 :zmqType表示ZMQ的模式,address表示socket綁定或連接地址。
 21         // 返回 :true表示初始化成功,false表示失敗。
 22         bool Init(int zmqType,const std::string& address)
 23         {
 24             try
 25             {
 26                 m_socket = zmq_socket(Context,zmqType);
 27                 return SetSocket(zmqType,address);
 28             }
 29             catch(...)
 30             {
 31                 std::cout << "Network初始化失敗。" << std::endl;
 32                 return false;
 33             }
 34         }
 35 
 36         // 功能 :發送消息。
 37         // 參數 :指向Msgpack的指針,isRelease如果為true表示發送消息后即刻釋放資源。
 38         // 返回 :true表示發送成功,false表示發送失敗。
 39         bool SendMessage(Msgpack *msgpack,bool isRelease = true) const
 40         {
 41             try
 42             {
 43                 zmq_msg_t msg;
 44                 zmq_msg_init(&msg);
 45                 if(isRelease)
 46                 {
 47                     zmq_msg_init_data(&msg,msgpack->GetSbuf().data(),msgpack->GetSbuf().size(),Tool::Network::Release,msgpack);
 48                 }
 49                 else
 50                 {
 51                     zmq_msg_init_data(&msg,msgpack->GetSbuf().data(),msgpack->GetSbuf().size(),0,0);
 52                 }
 53                 zmq_msg_send(&msg,m_socket,0);
 54                 return true;
 55             }
 56             catch(...)
 57             {
 58                 std::cout << "Network發送失敗。" << std::endl;
 59                 return false;
 60             }
 61         }
 62 
 63         // 功能 :接收消息。
 64         // 參數 :無。
 65         // 返回 :指向消息的指針。
 66         zmq_msg_t* ReceiveMessage() const
 67         {
 68             zmq_msg_t *reply = NULL;
 69             try
 70             {
 71                 reply = new zmq_msg_t();
 72                 zmq_msg_init(reply);
 73                 zmq_msg_recv(reply,m_socket,0);
 74                 return reply;
 75             }
 76             catch(...)
 77             {
 78                 if( reply != NULL )
 79                 {
 80                     delete reply;
 81                 }
 82                 return NULL;
 83             }
 84         }
 85 
 86         // 功能 :關閉消息。
 87         // 參數 :指向消息的指針。
 88         // 返回 :無。
 89         void CloseMsg(zmq_msg_t* msg)
 90         {
 91             try
 92             {
 93                 zmq_msg_close(msg);
 94                 msg = NULL;
 95             }
 96             catch(...)
 97             {
 98                 msg = NULL;
 99             }
100         }
101 
102         // 功能 :析構函數。
103         // 參數 :無。
104         // 返回 :無。
105         ~Network()
106         {
107             if( m_socket != NULL )
108             {
109                 zmq_close(m_socket);
110                 m_socket = NULL;
111             }
112         }
113 
114     private:
115 
116         //通信socket
117         void *m_socket;
118 
119         //網絡環境
120         static void *Context;
121 
122     private:
123 
124         // 功能 :設置socket。
125         // 參數 :zmqType表示ZMQ的模式,address表示socket綁定或連接地址。
126         // 返回 :true表示設置成功,false表示設置失敗。
127         bool SetSocket(int zmqType,const std::string& address)
128         {
129             int result = -1;
130             switch(zmqType)
131             {
132             case ZMQ_REP:
133             case ZMQ_PUB:
134                 result = zmq_bind(m_socket,address.c_str());
135                 break;
136             case ZMQ_REQ:
137                 result = zmq_connect(m_socket,address.c_str());
138                 break;
139             case ZMQ_SUB:
140                 result = zmq_connect(m_socket,address.c_str());
141                 assert(result == 0);
142                 result = zmq_setsockopt(m_socket,ZMQ_SUBSCRIBE,"",0);
143                 break;
144             default:
145                 return false;
146             }
147             assert( result == 0 );
148             return true;
149         }
150 
151         // 功能 :發送完消息后,釋放消息資源。
152         // 參數 :function為函數地址,hint指向要釋放資源的對象。
153         // 返回 :無。
154         static void Release(void *function, void *hint)
155         {
156             Msgpack *msgpack = (Msgpack*)hint;
157             if( msgpack != NULL )
158             {
159                 delete msgpack;
160                 msgpack = NULL;
161             }
162         }
163     };
164 
165     //整個程序共用一個context
166     void *Tool::Network::Context = zmq_ctx_new();
167 };

  說明:

  (1)由zmq_ctx_new創建出來的Context,整個應用程序共用一個就可以了,具體的通信是由zmq_socket創建的socket來完成的。上述代碼中沒有去釋放Context指向的資源。

  (2)在zmq_msg_init_data函數的參數中,需要傳入一個釋放資源的函數地址,在ZMQ發送完消息后就調用這個函數來釋放資源。如果沒有傳入這個參數,而且傳入的信息是臨時變量,那么接收方很有可能接收不到信息,甚至拋出異常。如果不傳入這個參數,那么就要記得由自己去釋放資源了。

  2.對MessagePack的簡單封裝:

  1 #include"BaseMessage.h"
  2 #include"ClientMessage.h"
  3 #include"ServerMessage.h"
  4 #include<zmq.h>
  5 #include<msgpack.hpp>
  6 
  7 namespace Tool
  8 {
  9     using namespace Message;
 10 
 11     //壓包/解包工具類
 12     class Msgpack
 13     {
 14     public:
 15 
 16         // 功能 :構造函數。
 17         // 參數 :無。
 18         // 返回 :無。
 19         Msgpack(void) { }
 20 
 21         // 功能 :析構函數。
 22         // 參數 :無。
 23         // 返回 :無。
 24         ~Msgpack(void) { }
 25 
 26         // 功能 :壓包數據。
 27         // 參數 :要壓包的數據。
 28         // 返回 :true表示壓包成功。
 29         template<typename T>
 30         bool Pack(const T& t)
 31         {
 32             try
 33             {
 34                 Release();
 35                 msgpack::pack(m_sbuf,t);
 36                 return true;
 37             }
 38             catch(...)
 39             {
 40                 std::cout << "Msgpack壓包數據失敗。" << std::endl;
 41                 return false;
 42             }
 43         }
 44 
 45         // 功能 :解包數據。
 46         // 參數 :zmq消息體。
 47         // 返回 :返回指向基類消息的指針。
 48         BaseMessage* Unpack(zmq_msg_t& msg)
 49         {
 50             try
 51             {
 52                 int size = zmq_msg_size(&msg);
 53                 if( size > 0 )
 54                 {
 55                     Release();
 56                     m_sbuf.write((char*)zmq_msg_data(&msg),size);
 57                     size_t offset = 0;
 58                     msgpack::zone z;
 59                     msgpack::object obj;
 60                     msgpack::unpack(m_sbuf.data(),m_sbuf.size(),&offset,&z,&obj);
 61                     return GetMessage(obj);
 62                 }
 63             }
 64             catch(...)
 65             {
 66                 //吃掉異常
 67             }
 68             return NULL;
 69         }
 70 
 71         // 功能 :獲取壓包/解包工具。
 72         // 參數 :無。
 73         // 返回 :壓包/解包工具。
 74         inline msgpack::sbuffer& GetSbuf()
 75         {
 76             return m_sbuf;
 77         }
 78 
 79     private:
 80 
 81         //壓包/解包工具
 82         msgpack::sbuffer m_sbuf;
 83 
 84     private:
 85 
 86         // 功能 :釋放上一次的數據資源。
 87         // 參數 :無。
 88         // 返回 :無。
 89         void Release()
 90         {
 91             m_sbuf.clear();
 92             m_sbuf.release();
 93         }
 94         
 95         // 功能 :獲取消息。
 96         // 參數 :用於轉換的msgpack::object。
 97         // 返回 :指向消息基類的指針。
 98         BaseMessage* GetMessage(const msgpack::object& obj)
 99         {
100             BaseMessage bmessage;
101             obj.convert(&bmessage);
102             switch(bmessage.Type)
103             {
104             case 1024:
105                 return Convert<ClientMessage>(obj);
106             case 2048:
107                 return Convert<ServerMessage>(obj);
108             default:
109                 return NULL;
110             }
111         }
112 
113         // 功能 :將壓包后的數據轉換為具體的類。
114         // 參數 :用於轉換的msgpack::object。
115         // 返回 :指向T的指針。
116         template<typename T>
117         T* Convert(const msgpack::object& obj)
118         {
119             T *t = new T();
120             obj.convert(t);
121             return t;
122         }
123     };
124 };

  說明:

  壓包時將zmq_msg_t消息體壓包到msgpack::sbuffer,然后就可以關閉這個消息體了。要將解包后的數據轉換成具體的某一個類,需要知道這個類是什么類,這里有三種方法:

  (1)可以先發送一個消息告知接收者即將收到什么消息,然后接收者將消息解包后轉換成對應的類。這種方式需要額外的一次通信,不建議使用。

  (2)所有的消息都繼承自一個基類,這個基類存儲有消息類型的字段。解包后,先將數據轉換為基類,然后根據類型再轉換為具體的派生類。這種方式需要多轉換一次,上面的代碼也正是采用這種方式。

  (3)壓包時先壓包一個消息類,然后再壓包一個標識這個消息是什么類型的標識類,即壓包兩次。解包時,先解包標識類,得知消息類的具體類型,然后再解包消息類,即解包兩次,轉換兩次。與(2)相比,除了要做更多的壓包、解包工作外,這里還需要對解包的偏移量進行計算,否則容易出錯。

  3.使用到的消息類:

namespace Message
{
    //消息基類
    class  BaseMessage
    {
    public:

        MSGPACK_DEFINE(Type);

        //消息類型
        int Type;

        //默認構造函數
        BaseMessage()
        {
            Type = 0;
        }
    };

    //來自客戶端的消息
    class ClientMessage : public BaseMessage
    {
    public:

        MSGPACK_DEFINE(Type,Information);

        //信息
        std::string Information;

        //默認構造函數
        ClientMessage()
        {
            Type = 1024;
        }
    };

    //來自服務端的消息
    class ServerMessage : public BaseMessage
    {
    public:

        MSGPACK_DEFINE(Type,Information);

        //信息
        std::vector<std::string> Information;

        //默認構造函數
        ServerMessage()
        {
            Type = 2048;
        }
    };
}; 

  說明:

  (1)MSPACK_DEFINE標識了一個類的哪些成員可以進行壓包/解包。派生類中的MSGPACK_DEFINE還需要寫上基類的成員,否則無法使用對MessagePack封裝說明的第二個方法。

  (2)C++版本的MessagePack壓/解包的數據成員,只能是一個類、結構或者聯合體,不能使用指針(包括boost庫的智能指針)、數組,枚舉值也不適用。因此,BaseMessage使用int值來標識派生類屬於哪個類型。C#版本的MessagePack可以對枚舉值進行壓包。

  4.Client的示例代碼:

 1 int _tmain(int argc, _TCHAR* argv[])
 2 {
 3     Network network;
 4     bool result = network.Init(ZMQ_REQ,"tcp://192.168.10.179:8888");
 5     if(result)
 6     {
 7         ClientMessage cmessage;
 8         cmessage.Information = "I come form Client.";
 9 
10         Msgpack msgpack;
11         result = msgpack.Pack<ClientMessage>(cmessage);
12         if(result)
13         {
14             result = network.SendMessageW(&msgpack,false);
15             if(result)
16             {
17                 zmq_msg_t *msg = network.ReceiveMessage();
18                 if( msg != NULL )
19                 {
20                     BaseMessage *bmessage = msgpack.Unpack(*msg);
21                     network.CloseMsg(msg);
22                     if( bmessage != NULL && bmessage->Type == 2048 )
23                     {
24                         ServerMessage *smessage = static_cast<ServerMessage*>(bmessage);
25                         if( smessage != NULL && smessage->Information.size() > 0 )
26                         {
27                             std::cout << smessage->Information[0] << std::endl;
28                         }
29                         delete smessage;
30                         smessage = NULL;
31                         bmessage = NULL;
32                     }
33                 }
34             }
35         }
36     }
37 
38     system("pause");
39     return 0;
40 }

  5.Server的示例代碼:

 1 int _tmain(int argc, _TCHAR* argv[])
 2 {
 3     Network responder;
 4     bool result = responder.Init(ZMQ_REP,"tcp://192.168.10.179:8888");
 5     if(result)
 6     {
 7         Network publisher;
 8         result = publisher.Init(ZMQ_PUB,"tcp://192.168.10.179:9999");
 9         if(result)
10         {
11             Msgpack msgpack;
12             while(true)
13             {
14                 zmq_msg_t *msg = responder.ReceiveMessage();
15                 BaseMessage *bmessage = msgpack.Unpack(*msg);
16                 responder.CloseMsg(msg);
17 
18                 ServerMessage smessage;
19                 smessage.Information.push_back("I come from Server.");
20                 msgpack.Pack<ServerMessage>(smessage);
21                 result = responder.SendMessageW(&msgpack,false);
22 
23                 if( result )
24                 {
25                     if( bmessage != NULL && bmessage->Type == 1024 )
26                     {
27                         ClientMessage *cmessage = static_cast<ClientMessage*>(bmessage);
28                         if( cmessage != NULL )
29                         {
30                             std::cout << cmessage->Information << std::endl;
31                             for( int counter = 0 ; counter < 100 ; counter++ )
32                             {
33                                 publisher.SendMessageW(&msgpack,false);
34                             }
35                         }
36                         delete cmessage;
37                         cmessage = NULL;
38                         bmessage = NULL;
39                     }
40                 }
41             }
42         }
43     }
44 
45     return 0;
46 }

  6.Agent的示例代碼:

int _tmain(int argc, _TCHAR* argv[])
{
    Network network;
    bool result = network.Init(ZMQ_SUB,"tcp://192.168.10.179:9999");
    if(result)
    {
        zmq_msg_t *msg = network.ReceiveMessage();
        if( msg != NULL )
        {
            Msgpack msgpack;
            BaseMessage *bmessage = msgpack.Unpack(*msg);
            network.CloseMsg(msg);
            if( bmessage->Type == 2048 )
            {
                ServerMessage *smessage = static_cast<ServerMessage*>(bmessage);
                if( smessage->Information.size() > 0 )
                {
                    std::cout << smessage->Information[0] << std::endl;
                }
                delete smessage;
                smessage = NULL;
                bmessage = NULL;
            }
        }
    }

    system("pause");
    return 0;
}

  7.啟動這三個程序,Client將要發送的消息壓包后發給Server,Server接收到消息后反饋一個信息給Client,然后循環發布消息給Agent,Agent不需要回復Server。最后着重說明兩點:

  (1)ZMQ創建的socket發送數據和接收數據要處在同一條線程。Server接收到Client的數據后,不能通過開一條線程來給Client反饋信息,必須要在接收數據的線程中反饋信息。

  (2)ZMQ並不要求發送者和接收者有一定的啟動順序,但在Server中如果只發布一次消息,那么Agent很有可能收不到信息。不管是Agent先啟動,還是Server先啟動,Agent都有可能收不到信息。在Server的代碼中,通過循環發布一百次,來讓Agent收到信息。至於實際應用中,可以結合請求-響應模式來保證訂閱消息者都收到了發布者的消息。

 

 參考資料:

ZMQ:http://zguide.zeromq.org/page:all

MessagePack:http://wiki.msgpack.org/pages/viewpage.action?pageId=1081387#QuickStartforC%2B%2B-ImplementationStatus


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM