NetMQ發布訂閱C#示例


      NetMQ (ZeroMQ to .Net),ØMQ號稱史上最快中間件。它對socket通信進行了封裝,使得我們不需要寫socket函數調用就能完成復雜的網絡通信。和一般意義上的消息隊列產品不同的是,它沒有消息隊列服務器,而更像是一個網絡通信庫。從網絡通信的角度看,它處於會話層之上,應用層之下。【ZeroMQ 官網】:http://zeromq.org

      ØMQ有4個基本通信模型:分別是一對一結對模型(Exclusive-Pair)、請求回應模型(Request-Reply)、發布訂閱模型(Publish-Subscribe)、推拉模型(Push-Pull)。

Request-reply pattern 請求-回復模型 

  • 這種模型主要用於從客戶端向一個或多個服務實例發送請求,然后等待緊接着對於每個請求的回復
  • 里面又具體分了ZMQ_REQ ZMQ_REP ZMQ_DEALER ZMQ_ROUTER 
  • REQ 發送完消息后,必須接收一個回應消息后,才能發送新的消息
  • REP當接收消息時,都會返回一個消息 

Publish-subscribe pattern 發布-訂閱模式

  • 這種模式主要用於1對多的數據發布(一個發布者,多個訂閱者)
  • 里面又具體分了ZMQ_PUB ZMQ_SUB 
  • PUB發送消息給所有的SUB。如果此時SUB沒有啟動,下次啟動時會漏掉該消息 

Pipeline pattern 管道模式

  • 這種模式主要用於發布數據到由管道排列的節點上面,數據總是沿着管道流動。每個管道階段連接了至少一個節點
  • 里面又具體分了ZMQ_PUSH ZMQ_PULL
  • 一個1對N隊列的實現,PUSH將數據放入隊列,PULL從隊列中不取出數據。數據會負載均衡的發送給每一個PULL 

Exclusive pair pattern 獨立對模式

  • peer to peer 模式。主要用於進程內部線程間通信
  • 里面又具體分了ZMQ_PAIR
  • 線程間1-to-1隊列的實現,采用了lock free實現,所以速度很快

下面是訂閱發布的示例代碼:

發布服務端:

  public static class NetMQPub
    {
        readonly static ManualResetEvent _terminateEvent = new ManualResetEvent(false);
        /// <summary>
        /// NetMQ 發布模式
        /// </summary>
        public static void Start()
        {
            string[] wethers = new string[5] {"晴朗","多雲","陰天","小雨","暴雪" };

            //CTRL+C 退出程序
            Console.CancelKeyPress += Console_CancelKeyPress;
            Console.WriteLine("發布多個地區天氣預報:");

            using (var context = NetMQContext.Create())
            {
                using (var publisher = context.CreatePublisherSocket())
                {
                    publisher.Bind("tcp://127.0.0.1:5556");

                    var rng = new Random();
                    string msg;
                    int sleeptime = 10;

                    while (_terminateEvent.WaitOne(0) == false)
                    {
                        //隨機生成天氣數據
                        int zipcode = rng.Next(0, 99);
                        int temperature = rng.Next(-50, 50);
                        int wetherId = rng.Next(0, 4);

                        msg = string.Format("{0} {1} {2}", zipcode, temperature, wethers[wetherId]);
                        publisher.Send(msg,Encoding.UTF8, zmq.SendReceiveOptions.DontWait);

                        Console.WriteLine(msg);
                        Thread.Sleep(sleeptime);
                    }
                }
            }
        }

        static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
        {
            Console.WriteLine("exit...");
            _terminateEvent.Set();
        }
    }

  訂閱客戶端,可啟動多個實例來模擬接收天氣信息:

  public static class NetMQSub
    {
        public delegate void GetDataHandler(string message);
        public static event GetDataHandler OnGetData;

        /// <summary>
        /// NetMQ 訂閱模式
        /// </summary>
        public static void Start()
        {            
            var rng = new Random();
            int zipcode = rng.Next(0, 99);
            Console.WriteLine("接收本地天氣預報 {0}...", zipcode);

            OnGetData += new GetDataHandler(ProcessData);
    
            using (var context = NetMQContext.Create())
            using (var subscriber = context.CreateSubscriberSocket())
            {
                subscriber.Connect("tcp://127.0.0.1:5556");
                subscriber.Subscribe(zipcode.ToString(CultureInfo.InvariantCulture));

                while(true)
                {
                    string results = subscriber.ReceiveString(Encoding.UTF8);
                    Console.Write(".");

                    string[] split = results.Split(new[] { ' ' }, StringSplitOptions.RemoveEmptyEntries);

                    int zip = int.Parse(split[0]);
                    if (zip == zipcode)
                    {
                        OnGetData(results);
                    }                  
                }
            }
        }

        public static void ProcessData(string msg)
        {
            Console.WriteLine("天氣情況:" + msg);
        }
    }

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM