【ZeroMQ】消息模式


1、請求/應答模式(REP/REQ)

該模式特征:

  • 服務器使用REP類型套接字而客戶端使用REQ類型套接字
  • 客戶端發送請求和接收答復,而服務器則接收請求並發送答復
  • 客戶端可以連接到一個或多個服務器。在這種情況下,請求會在所有的服務器(Reps)之間循環,一個請求被發送到某個服務器,下一個請求則被發送到下個服務器,如此進行下去。
  • 基於狀態的模式:客戶端在發送另一個請求之前,必須先接收前一個請求的答復。而服務器在接收另一個請求之前,必須答復前一個請求。

 1 //服務器端代碼
 2         private static void Main(string[] args)
 3         {
 4             using (var context = ZContext.Create())
 5             {
 6                 using (var resp = new ZSocket(context, ZSocketType.REP))
 7                 {
 8                     resp.Bind("tcp://*:5555");
 9                     while (true)
10                     {
11                         ZFrame reply = resp.ReceiveFrame();
12                         string message = reply.ReadString();
13 
14                         Console.WriteLine("Receive message {0}", message);
15 
16                         resp.Send(new ZFrame(message));
17 
18                         if (message == "exit")
19                         {
20                             break;
21                         }
22                     }
23                 }
24             }
25         }
服務器端代碼
 1 //客戶端代碼
 2         static void Main(string[] args)
 3         {
 4             using (var context = new ZContext())
 5             using (var requester = new ZSocket(context, ZSocketType.REQ))
 6             {
 7                 // Connect
 8                 requester.Connect("tcp://127.0.0.1:5555");
 9 
10                 while(true)
11                 {
12                     Console.WriteLine("Please enter your message:");
13                     string message = Console.ReadLine();
14                     requester.Send(new ZFrame(message));
15 
16                     // Send
17                     //requester.Send(new ZFrame(requestText));
18 
19                     // Receive
20                     using (ZFrame reply = requester.ReceiveFrame())
21                     {
22                         Console.WriteLine("Received: {0} {1}!", message, reply.ReadString());
23                         if ("exit" == reply.ReadString())
24                         {
25                             break;
26                         }
27                     }
28                 }
29             }
30         }
客戶端代碼

 

2、發布/訂閱模式Publish/Subscribe(PUB/SUB)

該模式具有一下特征:

  • 發布者使用PUB類型套接字和訂閱者則使用SUB類型套接字
  • 一個發布者可以有一個或者多個訂閱者
  • 一個訂閱者可以連接到一個或者多個發布者
  • 發布者發送消息而訂閱者接收消息
  • 訂閱者可以使用SubscribeAll方法訂閱所有的發布者消息,可以使用Subscrube方法訂閱某個特定的消息,這時要將所感興趣的發布者的消息前綴作為參數。對消息的過濾發生在訂閱者端,即發布者將其所有的消息發送給訂閱者,而訂閱者負責將不需要的消息丟棄。
  • 訂閱者可以用UnsubscribeAll方法取消所有訂閱,也可以使用Unsubscribe方法加上消息前綴來退訂某個發布者。
  • 發布者將消息發送到已連接的所有訂閱者。
  • 如果發布者沒有和任何訂閱者連接,那么消息將會被丟棄。
  • 如訂閱者連接到多個發布者,那么它會均勻的接收所有發布者的消息(公平隊列)。

 1 //發布者
 2         static void Publisher()
 3         {
 4             using (var context = new ZContext())
 5             using (var publisher = new ZSocket(context, ZSocketType.PUB))
 6             {
 7                 publisher.Bind("tcp://127.0.0.1:5555");
 8                 Random random = new Random();
 9                 while (true)
10                 {
11                     string message = string.Format("Random: {0}", random.Next(20));
12                     publisher.Send(new ZFrame(message));
13                     Console.WriteLine("Send:{0}", message);
14                     Thread.Sleep(1000);
15                 }
16             }
17         }
發布者
 1 //訂閱者
 2         static void Subscribe()
 3         {
 4             using (var context = new ZContext())
 5             using (var subscribe = new ZSocket(context, ZSocketType.SUB))
 6             {
 7                subscribe.SubscribeAll();
 8                 subscribe.Connect("tcp://127.0.0.1:5555");
 9                 while (true)
10                 { 
11                    
12                     using (ZFrame replay = subscribe.ReceiveFrame())
13                     {
14                         Console.WriteLine("REceive: {0}", replay.ReadString());
15                     }
16                 }
17                 
18             }
19         }
訂閱者

 

3、管道模式(Push/Pull)

當需要進行並行數據處理時,通常會用到該模式。管道模式使用場景如下所示:

  • 1.首先任務分發器通常以循環方式將消息(任務)推送給工作單元(每個工作單元有不同任務)
  • 2.接收到消息時工作單元會先對其進行處理,然后將它推送給接收消息(任務)的某種任務收集器。
  • 3.收集器以公平排隊的方法從期連接的所有工作單元那接收消息。

該模式具有一下特征:

  • 任務分發器使用PUSH類型的套接字。它綁定到端點,並等待工作單元的連接。
  • 工作單元有兩個套接字,一個使用PULL類型連接到任務分發器,另一個則是PUSH類型,負責和收集器的連接。
  • 任務收集器使用PULL類型套接字。它綁定到端點,並等待接收工作單元的連接。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM