概述
對於稍微熟悉這兩個優秀的項目來說,每個內容單獨介紹都不為過,本文只是簡介並探討如何將兩部分內容合並起來,使其在某些場景下更適合、更高效。
NetMQ:ZeroMQ的.Net版本,ZeroMQ簡單來說就是局域網內的消息中間件(與MSMQ類似),包括了進程間通訊、點對點通訊、訂閱模式通訊等等,底層用更“完美”的Socket實現,ZeroMQ實現了多語言、跨平台、高效率等諸多優勢。詳細介紹請參考ZeroMQ和NetMQ官方文檔:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns,http://netmq.readthedocs.org/en/latest/introduction/。
Protocol Buffer:源自與Google內部的開源項目,作為高效的RPC消息協議,相比較Json、XML協議的消息格式,Protobuf在序列化以及數據大小上都具有十分明顯的優勢,跨平台,協議可讀性也接近於Json等等。這里也推薦一篇文章:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/
定義Protobuf協議
Protocol Buffer(簡稱Protobuf)是以.proto的腳本形式實現的通用語義形式,類似於Json格式:
message WeatherMessage { enum CommandType { Debug=0; Weather=1; Other=2; } required CommandType Command=1 [default=Weather]; optional string Content=2; message Loaction { required int32 East=1; required int32 North=2; } repeated Loaction UserLocation=3; }
這里的Message、required(必選屬性)、optional(可有可無屬性)、repeated(內部嵌套的類型屬性)等都是proto的關鍵字,具體意義以及為關鍵字的功能大家可以查看官方文檔,這里只介紹如何應用,或者Stephen Liu的文章也不錯。
當然,光定義腳本是不能實現應用的,還需要根據特定的編碼語言進行描述,這里利用Protobuf-Net來實現.Net平台的協議實現。
首先,下載軟件包:https://code.google.com/p/protobuf-net/(肯能需要FQ)
然后,解壓並將剛才的.proto文件復制到文件夾ProtoGen下。
最后,啟動CMD並cd到ProtoGen文件夾目錄下,運行命令:
protogen -i: PBWeatherMessage.proto -0: PBWeatherMessage.cs -ns:ProtobufNameSpace
(-i指定了輸入,-o指定了輸出,-ns指定了生成代碼的namespace)
如果,正確的話(當然了,我給出的腳本是不會錯的),就會生成一個PBWeatherMessage.cs文件,這樣的話就可以將.cs文件加入到項目中當做一個純粹的類來使用了。
代碼中使用,就是類似於二進制序列化一樣,只是這回序列化的是Protobuf專用的序列化方式而已。
序列化:
#region Protobuf var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage() { Command = PBProtocol.WeatherMessage.CommandType.Weather, Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), }; using (var sm = new MemoryStream()) { ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg); publisher.Send(sm.ToArray()); } #endregion
反序列化:
var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage(); var receivedBytes = subscriber.Receive(); using (var sm = new MemoryStream(receivedBytes)) { weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm); }
這里就簡單介紹完了protobuf協議的使用,下面介紹一下NetMQ+Protobuf的使用。
NetMQ+Protobuf
接下來我們來改造下NetMQ Sample中的Publisher-Subscriber模式:
首先下載從GitHub上下載NetMQ Sample: https://github.com/zeromq/netmq
或者下載我的示例代碼,其中包含了一個No Protobuf的工程,這個是直接摘自原作者的示例代碼。
服務端Publisher:
using (var context = NetMQContext.Create())// NetMQ全局維護的Content上下文,建議只有一個並且使用完畢后及時回收。 using (var publisher = context.CreatePublisherSocket())// 從Content上下文中創建CreatePublisherSocket,這里如果用其他四種模式之一需要Create其他類型。 { publisher.Bind("tcp://127.0.0.1:5556");// Bind到指定的IP及端口。 var rng = new Random(); while (!stopRequested) { int zipcode = rng.Next(0, 99999);// 這里模擬一個隨機命令編號(如果非10001,客戶端直接丟棄此Publisher發布的消息,實現消息過濾) int temperature = rng.Next(-80, 135); int relhumidity = rng.Next(0, 90); publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity));// 直接Send,干凈整潔。 } }
客戶端Subscriber:
using (var context = NetMQContext.Create())// 創建全局NetMQ句柄,建議唯一,使用完畢及時回收。 using (var subscriber = context.CreateSubscriberSocket())// 創建Publisher-Subscriber模式的客戶端監聽。 { subscriber.Connect("tcp://127.0.0.1:5556");// 連接到指定Socket subscriber.Subscribe(zipToSubscribeTo.ToString(CultureInfo.InvariantCulture));// 這里創建消息內容的過濾,如果不包含“zipToSubscribeTo”值則不接收消息。 for (int i = 0; i < iterations; i++) { string results = subscriber.ReceiveString(); // 如果消息以“zipToSubscribeTo”開頭,則會返回整條信息。 Console.Write("."); // "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"] string[] split = results.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);// 按照固定模式解碼。 int zip = int.Parse(split[0]); if (zip != zipToSubscribeTo) { throw new Exception(string.Format("Received message for unexpected zipcode: {0} (expected {1})", zip, zipToSubscribeTo)); } totalTemp += int.Parse(split[1]); totalHumidity += int.Parse(split[2]); } }
這就是四種模式之一的發布者模式,使用起來很方便,但是這僅僅傳遞的是基於String的字符串,還不是一個可以序列化的對象,下一步我們將把消息字符串用Protobuf進行序列化與反序列化,來優化我們的消息格式。
請參考,我的示例代碼中的Publisher Pattern工程:
服務端Publisher:

using (var context = NetMQContext.Create()) using (var publisher = context.CreatePublisherSocket()) { publisher.Bind("tcp://127.0.0.1:5556"); var rng = new Random(); while (!stopRequested) { int zipcode = rng.Next(10000,10010); //Relpace: rng.Next(0, 99999); int temperature = rng.Next(-80, 135); int relhumidity = rng.Next(0, 90); #region Protobuf var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage() { Command = PBProtocol.WeatherMessage.CommandType.Weather, Content = string.Format("{0} {1} {2}", zipcode, temperature, relhumidity), }; using (var sm = new MemoryStream()) { ProtoBuf.Serializer.Serialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm, weatherMsg); publisher.Send(sm.ToArray()); } #endregion // publisher.Send(string.Format("{0} {1} {2}", zipcode, temperature, relhumidity)); WriteLine(string.Format("Publisher send message: {0} {1} {2}", zipcode, temperature, relhumidity)); System.Threading.Thread.Sleep(100); } }
客戶端Subscriber:
using (var context = NetMQContext.Create()) using (var subscriber = context.CreateSubscriberSocket()) { subscriber.Connect("tcp://127.0.0.1:5556"); subscriber.SubscribeToAnyTopic(); // No Command Filter, warn if not set thie method SubscribeToAnyTopic, it will receive nothing. while (true) { if (curIndex > iterations) break; var weatherMsg = new NetMQAndProtocolBuffer.PBProtocol.WeatherMessage(); var receivedBytes = subscriber.Receive(); using (var sm = new MemoryStream(receivedBytes)) { weatherMsg = ProtoBuf.Serializer.Deserialize<NetMQAndProtocolBuffer.PBProtocol.WeatherMessage>(sm); } // "zip temp relh" ... "10001 84 23" -> ["10001", "84", "23"] string[] split = weatherMsg.Content.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries); int cmdId = int.Parse(split[0]); if (weatherMsg.Command == PBProtocol.WeatherMessage.CommandType.Weather) { if (cmdId == zipToSubscribeTo) { curIndex++; WriteLine(string.Format("Subscriber receive message: {0}", weatherMsg.Content)); totalTemp += int.Parse(split[1]); totalHumidity += int.Parse(split[2]); } } }
好了,其實單獨來看,這兩部分內容並為涉及的很深入,只是作為一個技術實踐、技術儲備,希望其中有問題或者有更好的應用場景,還請各位留言,不勝感謝!
冷靜下來
這里補充一些不足:
- NetMQ中的過濾:默認NetMQ支持過濾,可是當我們摒棄String類型傳遞而轉向Protobuf格式的時候NetMQ通道是無法解析其內容的,所以我們需要先解析內容,然后手寫一些過濾代碼,放棄了原生的支持。subscriber.SubscribeToAnyTopic()監聽所有非過濾模式。
- NetMQ消息持久化:基於ZMQ的NetMQ設計理念中均不支持數據持久化(相比MSMQ而言,NetMQ不能接收當客戶端不在線情況下的消息),所以如果需要持久化還需要做其他工作或者轉戰其他MQ家族。
引用
ZMQ:http://zguide.zeromq.org/page:all#Chapter-Sockets-and-Patterns
NetMQ:http://netmq.readthedocs.org/en/latest/introduction/
Protocol Buffer:http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/
Stephen Liu:http://www.cnblogs.com/stephen-liu74/archive/2013/01/02/2841485.html
Protobuf-Net:https://code.google.com/p/protobuf-net/