ZeroMQ系列 之NetMQ
一:zeromq簡介
二:NetMQ 請求響應模式 Request-Reply
三:NetMQ 發布訂閱模式 Publisher-Subscriber
四:NetMQ 推拉模式 Push-Pull
NetMQ 發布訂閱模式 Publisher-Subscriber
1:簡單介紹
PUB-SUB模式一般處理的都不是系統的關鍵數據。發布者不關注訂閱者是否收到發布的消息,訂閱者也不知道自己是否收到了發布者發出的所有消息。你也不知道訂閱者何時開始收到消息。類似於廣播,收音機。因此邏輯上,它都不是可靠的。這個可以通過與請求響應模型組合來解決。
圖1:簡單的發布訂閱模式
圖2:與請求響應模式組合的發布訂閱模式
2:案例
接下來,我們通過寫一個天氣預報的例子,來說明發布訂閱模式。發布端一直在發布大量的天氣信息,訂閱端通過過濾字段,接收到想要的數據。
使用的NetMQ版本是3.3.2.2
發布端代碼:
主程序:
class Program
{
static void Main(string[] args)
{
NetMQPub.Start();
}
}
發布類:
public class NetMQPub
{
readonly static ManualResetEvent _terminateEvent = new ManualResetEvent(false);
/// <summary>
/// NetMQ 發布端
/// </summary>
public static void Start()
{
string[] weathers = new string[6] { "晴朗", "多雲", "陰天", "霾", "雨", "雪" };
Console.WriteLine("發布多個地區天氣預報:");
using (NetMQContext context = NetMQContext.Create())
{
using (var publisher = context.CreatePublisherSocket())
{
publisher.Bind("tcp://127.0.0.1:5556");
var rng = new Random();
string msg;
int sleeptime = 1000;//1秒
///指定發布的時間間隔,1秒
while (_terminateEvent.WaitOne(1000) == false)
{
//隨機生成天氣數據
int zipcode = rng.Next(0, 99);
int temperature = rng.Next(-50, 50);
int weatherId = rng.Next(0, 5);
msg = string.Format("{0} {1} {2}", zipcode, temperature, weathers[weatherId]);
publisher.SendFrame(msg);
Console.WriteLine(msg);
Thread.Sleep(sleeptime);
}
}
}
}
private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
{
Console.WriteLine("exit……");
_terminateEvent.Set();
}
}
訂閱端代碼
主程序:
class Program
{
static void Main(string[] args)
{
NetMQSub.Start();
}
}
訂閱類:
public class NetMQSub
{
public delegate void GetDataHandler(string message);
public static event GetDataHandler OnGetData;
/// <summary>
/// NetMQ 訂閱端
/// </summary>
public static void Start()
{
var rng = new Random();
int zipcode = rng.Next(0, 99);
Console.WriteLine("接收本地天氣預報{0}……", zipcode);
OnGetData += new GetDataHandler(ProcessData);
using (var context = NetMQContext.Create())
{
using (var subscriber = context.CreateSubscriberSocket())
{
subscriber.Connect("tcp://127.0.0.1:5556");
//設置過濾字符串
subscriber.Subscribe(zipcode.ToString(CultureInfo.InvariantCulture));
//訂閱所有的發布端內容
//subscriber.Subscribe("");
while (true)
{
string results = subscriber.ReceiveFrameString(Encoding.UTF8);
Console.WriteLine(".");
string[] split = results.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);
int zip = int.Parse(split[0]);
if (zip == zipcode)
{
OnGetData(results);
}
}
}
}
}
private static void ProcessData(string message)
{
Console.WriteLine("天氣情況:" + message);
}
}
3:總結
- 一個發布端可以有多個訂閱端
- 如果只想要接收指定的數據,訂閱端必須要設置過濾字符
- 訂閱端設置空字符串,訂閱所有的發布內容。【You can set topic an empty string to subscribe to everything】
- 發布端和訂閱端的套接字綁定的地址必須一樣的。比如:tcp://127.0.0.1:5556,使用tcp協議,監聽端口5556