NetMQ(三): 發布訂閱模式 Publisher-Subscriber


ZeroMQ系列 之NetMQ

一:zeromq簡介

二:NetMQ 請求響應模式 Request-Reply

三:NetMQ 發布訂閱模式 Publisher-Subscriber

四:NetMQ 推拉模式 Push-Pull

NetMQ 發布訂閱模式 Publisher-Subscriber

1:簡單介紹

PUB-SUB模式一般處理的都不是系統的關鍵數據。發布者不關注訂閱者是否收到發布的消息,訂閱者也不知道自己是否收到了發布者發出的所有消息。你也不知道訂閱者何時開始收到消息。類似於廣播,收音機。因此邏輯上,它都不是可靠的。這個可以通過與請求響應模型組合來解決。

簡單的發布訂閱模式
圖1:簡單的發布訂閱模式


圖2:與請求響應模式組合的發布訂閱模式

2:案例

接下來,我們通過寫一個天氣預報的例子,來說明發布訂閱模式。發布端一直在發布大量的天氣信息,訂閱端通過過濾字段,接收到想要的數據。

使用的NetMQ版本是3.3.2.2

發布端代碼:

主程序:

class Program
{
    static void Main(string[] args)
    {
        NetMQPub.Start();
    }
}  

發布類:

public class NetMQPub
{
    readonly static ManualResetEvent _terminateEvent = new ManualResetEvent(false);
    /// <summary>
    /// NetMQ 發布端
    /// </summary>
    public static void Start()
    {
        string[] weathers = new string[6] { "晴朗", "多雲", "陰天", "霾", "雨", "雪" };
         
        Console.WriteLine("發布多個地區天氣預報:");

        using (NetMQContext context = NetMQContext.Create())
        {
            using (var publisher = context.CreatePublisherSocket())
            {
                publisher.Bind("tcp://127.0.0.1:5556");

                var rng = new Random();
                string msg;
                int sleeptime = 1000;//1秒

                ///指定發布的時間間隔,1秒
                while (_terminateEvent.WaitOne(1000) == false)
                {
                    //隨機生成天氣數據
                    int zipcode = rng.Next(0, 99);
                    int temperature = rng.Next(-50, 50);
                    int weatherId = rng.Next(0, 5);

                    msg = string.Format("{0} {1} {2}", zipcode, temperature, weathers[weatherId]);
                    publisher.SendFrame(msg);

                    Console.WriteLine(msg);
                    Thread.Sleep(sleeptime);
                }
            }
        }
    }

    private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
    {
        Console.WriteLine("exit……");
        _terminateEvent.Set();
    }
}

訂閱端代碼

主程序:

class Program
{
    static void Main(string[] args)
    {
        NetMQSub.Start();
    }
}

訂閱類:

public class NetMQSub
{
    public delegate void GetDataHandler(string message);
    public static event GetDataHandler OnGetData;
    /// <summary>
    /// NetMQ 訂閱端
    /// </summary>
    public static void Start()
    {
        var rng = new Random();
        int zipcode = rng.Next(0, 99);
        Console.WriteLine("接收本地天氣預報{0}……", zipcode);

        OnGetData += new GetDataHandler(ProcessData);

        using (var context = NetMQContext.Create())
        {
            using (var subscriber = context.CreateSubscriberSocket())
            {
                subscriber.Connect("tcp://127.0.0.1:5556");
				//設置過濾字符串
                subscriber.Subscribe(zipcode.ToString(CultureInfo.InvariantCulture));
				//訂閱所有的發布端內容
				//subscriber.Subscribe("");
                while (true)
                {
                    string results = subscriber.ReceiveFrameString(Encoding.UTF8);
                    Console.WriteLine(".");

                    string[] split = results.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);

                    int zip = int.Parse(split[0]);
                    if (zip == zipcode)
                    {
                        OnGetData(results);
                    }
                }
            }
        }
    }

    private static void ProcessData(string message)
    {
        Console.WriteLine("天氣情況:" + message);
    }
}

3:總結

  1. 一個發布端可以有多個訂閱端
  2. 如果只想要接收指定的數據,訂閱端必須要設置過濾字符
  3. 訂閱端設置空字符串,訂閱所有的發布內容。【You can set topic an empty string to subscribe to everything】
  4. 發布端和訂閱端的套接字綁定的地址必須一樣的。比如:tcp://127.0.0.1:5556,使用tcp協議,監聽端口5556

4:下載

NetMQ3.3.3.1例子
NetMQ3.3.2.2例子


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM