消息中間件NetMQ結合Protobuf簡介


概述

  對於稍微熟悉這兩個優秀的項目來說,每個內容單獨介紹都不為過,本文只是簡介並探討如何將兩部分內容合並起來,使其在某些場景下更適合、更高效。

 

  NetMQ:ZeroMQ的.Net版本,ZeroMQ簡單來說就是局域網內的消息中間件(與MSMQ類似),包括了進程間通訊、點對點通訊、訂閱模式通訊等等,底層用更“完美”的Socket實現,ZeroMQ實現了多語言、跨平台、高效率等諸多優勢。詳細介紹請參考ZeroMQ和NetMQ官方文檔:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns,http://netmq.readthedocs.org/en/latest/introduction/

 

  Protocol Buffer:源自與Google內部的開源項目,作為高效的RPC消息協議,相比較Json、XML協議的消息格式,Protobuf在序列化以及數據大小上都具有十分明顯的優勢,跨平台,協議可讀性也接近於Json等等。這里也推薦一篇文章:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

 

定義Protobuf協議

  Protocol Buffer(簡稱Protobuf)是以.proto的腳本形式實現的通用語義形式,類似於Json格式:

message  WeatherMessage 
{ 
    enum CommandType 
    {
        Debug=0;
        Weather=1;
        Other=2;
    }
 
    required CommandType Command=1 [default=Weather];
    optional string Content=2;

    message Loaction 
    {
        required int32 East=1;
        required int32 North=2;
    }

    repeated Loaction UserLocation=3;
}

  這里的Message、required(必選屬性)、optional(可有可無屬性)、repeated(內部嵌套的類型屬性)等都是proto的關鍵字,具體意義以及為關鍵字的功能大家可以查看官方文檔,這里只介紹如何應用,或者Stephen Liu的文章也不錯。

  當然,光定義腳本是不能實現應用的,還需要根據特定的編碼語言進行描述,這里利用Protobuf-Net來實現.Net平台的協議實現。

  首先,下載軟件包:https://code.google.com/p/protobuf-net/(肯能需要FQ)

  然后,解壓並將剛才的.proto文件復制到文件夾ProtoGen下。

  最后,啟動CMD並cd到ProtoGen文件夾目錄下,運行命令:

  protogen -i: PBWeatherMessage.proto -0: PBWeatherMessage.cs -ns:ProtobufNameSpace

(-i指定了輸入,-o指定了輸出,-ns指定了生成代碼的namespace)

  如果,正確的話(當然了,我給出的腳本是不會錯的),就會生成一個PBWeatherMessage.cs文件,這樣的話就可以將.cs文件加入到項目中當做一個純粹的類來使用了。

代碼中使用,就是類似於二進制序列化一樣,只是這回序列化的是Protobuf專用的序列化方式而已。

  序列化:

                        #region Protobuf
                        var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage()
                        {
                            Command = PBProtocol.WeatherMessage.CommandType.Weather,
                            Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), 
                        };
                        
                        using (var sm = new MemoryStream())
                        {
                            ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);
                            publisher.Send(sm.ToArray());
                        }
                        #endregion

  反序列化:

                      var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();
                      var receivedBytes = subscriber.Receive();
                      using (var sm = new MemoryStream(receivedBytes))
                      {
                          weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);
                      }

  這里就簡單介紹完了protobuf協議的使用,下面介紹一下NetMQ+Protobuf的使用。

 

NetMQ+Protobuf

  接下來我們來改造下NetMQ Sample中的Publisher-Subscriber模式:

  首先下載從GitHub上下載NetMQ Sample: https://github.com/zeromq/netmq

或者下載我的示例代碼,其中包含了一個No Protobuf的工程,這個是直接摘自原作者的示例代碼。

  服務端Publisher:

            using (var context = NetMQContext.Create())// NetMQ全局維護的Content上下文,建議只有一個並且使用完畢后及時回收。
            using (var publisher = context.CreatePublisherSocket())// 從Content上下文中創建CreatePublisherSocket,這里如果用其他四種模式之一需要Create其他類型。
            {
                publisher.Bind("tcp://127.0.0.1:5556");// Bind到指定的IP及端口。

                var rng = new Random();

                while (!stopRequested)
                {
                    int zipcode =  rng.Next(0, 99999);// 這里模擬一個隨機命令編號(如果非10001,客戶端直接丟棄此Publisher發布的消息,實現消息過濾)
                    int temperature = rng.Next(-80, 135);
                    int relhumidity = rng.Next(0, 90);

                    publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity));// 直接Send,干凈整潔。
                }
            }

  客戶端Subscriber:

 using (var context = NetMQContext.Create())// 創建全局NetMQ句柄,建議唯一,使用完畢及時回收。 
            using (var subscriber = context.CreateSubscriberSocket())// 創建Publisher-Subscriber模式的客戶端監聽。
            {
                subscriber.Connect("tcp://127.0.0.1:5556");// 連接到指定Socket
                subscriber.Subscribe(zipToSubscribeTo.ToString(CultureInfo.InvariantCulture));// 這里創建消息內容的過濾,如果不包含“zipToSubscribeTo”值則不接收消息。

                for (int i = 0; i < iterations; i++)
                {
                    string results = subscriber.ReceiveString(); // 如果消息以“zipToSubscribeTo”開頭,則會返回整條信息。
                    Console.Write(".");

                    // "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]
                    string[] split = results.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);// 按照固定模式解碼。

                    int zip = int.Parse(split[0]);
                    if (zip != zipToSubscribeTo)
                    {
                        throw new Exception(string.Format("Received message for unexpected zipcode: {0} (expected {1})", zip, zipToSubscribeTo));
                    }

                    totalTemp += int.Parse(split[1]);
                    totalHumidity += int.Parse(split[2]);
                }
            }

  這就是四種模式之一的發布者模式,使用起來很方便,但是這僅僅傳遞的是基於String的字符串,還不是一個可以序列化的對象,下一步我們將把消息字符串用Protobuf進行序列化與反序列化,來優化我們的消息格式。

請參考,我的示例代碼中的Publisher Pattern工程:

  服務端Publisher:

                using (var context = NetMQContext.Create())
                using (var publisher = context.CreatePublisherSocket()) 
                {
                    publisher.Bind("tcp://127.0.0.1:5556");
                    var rng = new Random();
                    while (!stopRequested)
                    { 
                        int zipcode = rng.Next(10000,10010); //Relpace: rng.Next(0, 99999);
                        int temperature = rng.Next(-80, 135);
                        int relhumidity = rng.Next(0, 90);

                        #region Protobuf
                        var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage()
                        {
                            Command = PBProtocol.WeatherMessage.CommandType.Weather,
                            Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), 
                        };
                        
                        using (var sm = new MemoryStream())
                        {
                            ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg);
                            publisher.Send(sm.ToArray());
                        }
                        #endregion

                        // publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity));
                       
                        WriteLine(string.Format("Publisher send message: {0} {1} {2}", zipcode, temperature, relhumidity));
                        System.Threading.Thread.Sleep(100);
                    }
                }
View Code

  客戶端Subscriber:

 using (var context = NetMQContext.Create())
              using (var subscriber = context.CreateSubscriberSocket())
              {
                  subscriber.Connect("tcp://127.0.0.1:5556");
                  subscriber.SubscribeToAnyTopic(); // No Command Filter, warn if not set thie method SubscribeToAnyTopic, it will receive nothing.

                  while (true)
                  {
                      if (curIndex > iterations) break;

                      var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage();
                      var receivedBytes = subscriber.Receive();
                      using (var sm = new MemoryStream(receivedBytes))
                      {
                          weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm);
                      }

                      // "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"]
                      string[] split = weatherMsg.Content.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
                      int cmdId = int.Parse(split[0]);
                      if (weatherMsg.Command == PBProtocol.WeatherMessage.CommandType.Weather)
                      {
                          if (cmdId == zipToSubscribeTo)
                          {
                              curIndex++;
                              WriteLine(string.Format("Subscriber receive message: {0}", weatherMsg.Content));
                              totalTemp += int.Parse(split[1]);
                              totalHumidity += int.Parse(split[2]);
                          }
                      }
                  }

 

  好了,其實單獨來看,這兩部分內容並為涉及的很深入,只是作為一個技術實踐、技術儲備,希望其中有問題或者有更好的應用場景,還請各位留言,不勝感謝!

 

  我的示例代碼下載

冷靜下來

這里補充一些不足:

  1. NetMQ中的過濾:默認NetMQ支持過濾,可是當我們摒棄String類型傳遞而轉向Protobuf格式的時候NetMQ通道是無法解析其內容的,所以我們需要先解析內容,然后手寫一些過濾代碼,放棄了原生的支持。subscriber.SubscribeToAnyTopic()監聽所有非過濾模式。
  2. NetMQ消息持久化:基於ZMQ的NetMQ設計理念中均不支持數據持久化(相比MSMQ而言,NetMQ不能接收當客戶端不在線情況下的消息),所以如果需要持久化還需要做其他工作或者轉戰其他MQ家族。

 

引用

ZMQ:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns

NetMQ:http://netmq.readthedocs.org/en/latest/introduction/

Protocol Buffer:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/

Stephen Liu:http://www.cnblogs.com/stephen-liu74/archive/2013/01/02/2841485.html

Protobuf-Net:https://code.google.com/p/protobuf-net/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM