針對高並發,可擴展的互聯網架構,搭建消息隊列(一)


針對高並發,可擴展的互聯網架構,搭建消息隊列(一)

  想開發高並發可擴展的互聯網架構,消息隊列是不可缺少的,目前主流的消息隊列,有windows自帶的MSMQ,還有跨平台的強大的ZeroMQ,這里我們就選用ZeroMQ.

  ZeroMQ介紹:(也拼寫作 ØMQ、 0MQ 或 ZMQ) 是個非常輕量級的開源消息隊列軟件。它沒有獨立的服務器,消息直接從一個應用程序被發送到另一個應用程序。ZeroMQ的學習和應用也非常簡單,它只有一個 C++ 編寫成的單個庫文件libzmq.dll, 可以鏈接到任何應用程序中。如果要在.NET 環境中使用,我們需要用到一個C#編寫的名為 clrzmq.dll 包裝庫。ZeroMQ可以在 Windows、 OS X 和 Linux 等多種操作系統上運行, C、 C++、C#、 Java、 Python 等語言都可以編寫ZeroMQ 應用程序這使得不同平台上的不同應用程序之間可以相互通訊。

1、環境搭建:

  codeproject專題,下載對應的Download binaries - 377.6 KB,解壓縮到你的指定路徑。

  這里我們就不詳細介紹,主要說一下C#封裝好的版本,NetMQ,是基於ZeroMQ進行封裝的。就不需要下載了,直接nuget上獲取:

  PM> Install-Package NetMQ

  為什么不直接用ZeroMQ,而使用NetMQ,運行非托管代碼的托管應用程序內可能會出現許多想不到的問題,像內存泄漏和奇怪的沒有訪問錯誤。而NetMQ使用原生的C#語言,它更容易調試原生C#代碼,你可以下載代碼,調試你的系統。你可以在github上貢獻。

  待安裝好后,系統會自動添加NetMQ的引用。

  

  可以看到,NetMQ是基於zmq進行開發的,其實就是ZeroMQ了,並且已經為我們封裝了各種功能的MQ對象,比如REP/REQ ,PUB/SUB(主題式訂閱),XPUB/XSUB(非主題訂閱),Push/Pull,甚至還有路由模式等,從字面意義上,應該能看出個大概,后面我們一個一個進行測試使用。

  先看個簡單的demo,初步了解一下:

  

 1 class Program
 2 {
 3   static void Main(string[] args)
 4   {
 5     using (NetMQContext context = NetMQContext.Create())
 6     {
 7       Task serverTask = Task.Factory.StartNew(() =>Server(context));
 8       Task clientTask = Task.Factory.StartNew(() => Client(context));
 9       Task.WaitAll(serverTask, clientTask);
10     }
11   }
12  
13   static void Server(NetMQContext context)
14   {
15     using (NetMQSocket serverSocket = context.CreateResponseSocket())
16     {
17       serverSocket.Bind("tcp://*:5555");
18  
19       while (true)
20       {
21         string message = serverSocket.ReceiveString();
22  
23         Console.WriteLine("Receive message {0}", message);
24  
25         serverSocket.Send("World");          
26  
27         if (message == "exit")
28         {
29           break;
30         }
31       }
32     }      
33   }
34  
35   static void Client(NetMQContext context)
36   {
37     using (NetMQSocket clientSocket = context.CreateRequestSocket())
38     {
39       clientSocket.Connect("tcp://127.0.0.1:5555");
40  
41       while (true)
42       {
43         Console.WriteLine("Please enter your message:");
44         string message = Console.ReadLine();
45         clientSocket.Send(message);
46  
47         string answer = clientSocket.ReceiveString();
48  
49         Console.WriteLine("Answer from server: {0}", answer);
50  
51         if (message == "exit")
52         {
53           break;
54         }
55       }
56     }
57   }
58 }

  代碼比較簡潔的介紹了REP/REQ模式下NetMQ的使用,而且我們可以看到,這個Mq對象是可以在不同的線程間切換使用的,也許你會測試中文,那就先序列化再反序列化吧,因為可能會出現亂碼喲。

  這里,我先簡單根據NetMQ,封裝一個Server端和一個Client端,方便后面使用,當然也可以不封裝,直接使用:

  Server:

  

 1 /// <summary>
 2     /// Mq服務端
 3     /// </summary>
 4     public class OctMQServer : IDisposable
 5     {
 6         public event EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> OnReceive;
 7 
 8         protected virtual void OnOnReceive(DataEventArgs<NetMQSocket, NetMQMessage> e)
 9         {
10             EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> handler = OnReceive;
11             if (handler != null) handler(this, e);
12         }
13 
14         private int _port;
15         private NetMQSocket _serverSocket;
16         private ServerType _type;
17         private NetMQContext _context;
18 
19         public void Init(int port, ServerType type)
20         {
21             _type = type;
22             _port = port;
23             _context = NetMQContext.Create();
24             CreateServer();
25         }
26 
27         void CreateServer()
28         {
29             switch (_type)
30             {
31                 case ServerType.Response:
32                     _serverSocket = _context.CreateResponseSocket(); break;
33                 case ServerType.Pub:
34                     _serverSocket = _context.CreatePushSocket(); break;
35                 case ServerType.Router:
36                     _serverSocket = _context.CreateRouterSocket(); break;
37                 case ServerType.Stream:
38                     _serverSocket = _context.CreateStreamSocket(); break;
39                 case ServerType.Push:
40                     _serverSocket = _context.CreatePushSocket(); break;
41                 case ServerType.XPub:
42                     _serverSocket = _context.CreateXPublisherSocket(); break;
43                 default:
44                     _serverSocket = _context.CreateResponseSocket(); break;
45             }
46             _serverSocket.Bind("tcp://*:" + _port);
47             Task.Factory.StartNew(() =>
48             AsyncRead(_serverSocket), TaskCreationOptions.LongRunning);
49         }
50 
51         private void AsyncRead(NetMQSocket serverSocket)
52         {
53             while (true)
54             {
55                 var msg = serverSocket.ReceiveMessage();
56                 OnOnReceive(new DataEventArgs<NetMQSocket, NetMQMessage>(serverSocket, msg));
57             }
58         }
59 
60 
61         public NetMQSocket Server
62         {
63             get { return _serverSocket; }
64         }
65 
66         public void Dispose()
67         {
68             _serverSocket.Dispose();
69             _context.Dispose();
70         }
71 
72         public void Send(NetMQMessage msg)
73         {
74             _serverSocket.SendMessage(msg);
75         }
76 
77         public NetMQMessage CreateMessage()
78         {
79             return new NetMQMessage();
80         }
81     }

  這樣,使用者就可以根據枚舉進行服務端的創建, 不用糾結到底用哪一種服務端,並且封裝了一些消息的異步事件,方便在開發中使用,可以使用多播委托,針對不同的消息進行不同的處理,我這里使用的while循環,當然,在netmq內部提供了循環器和心跳等,都可以在實際的開發中進行擴展和使用:Poller和NetMQTimer。

 

  Client:

  

  1  /// <summary>
  2     /// MQ客戶端
  3     /// </summary>
  4     public class OctMQClient:IDisposable
  5     {
  6         public event EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> OnReceive;
  7 
  8         protected virtual void OnOnReceive(DataEventArgs<NetMQSocket, NetMQMessage> e)
  9         {
 10             EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> handler = OnReceive;
 11             if (handler != null) handler(this, e);
 12         }
 13 
 14         private int _port;
 15         private NetMQSocket _clientSocket;
 16         private ClientType _type;
 17         private NetMQContext _context;
 18         private string _ip;
 19         private Task task;
 20         public void Init(string ip, int port, ClientType type)
 21         {
 22             _type = type;
 23             _ip = ip;
 24             _port = port;
 25             _context = NetMQContext.Create();
 26             CreateClient();
 27         }
 28 
 29         void CreateClient()
 30         {
 31             switch (_type)
 32             {
 33                 case ClientType.Request:
 34                     _clientSocket = _context.CreateRequestSocket(); break;
 35                 case ClientType.Sub:
 36                     _clientSocket = _context.CreateSubscriberSocket(); break;
 37                 case ClientType.Dealer:
 38                     _clientSocket = _context.CreateDealerSocket(); break;
 39                 case ClientType.Stream:
 40                     _clientSocket = _context.CreateStreamSocket(); break;
 41                 case ClientType.Pull:
 42                     _clientSocket = _context.CreatePullSocket(); break;
 43                 case ClientType.XSub:
 44                     _clientSocket = _context.CreateXSubscriberSocket(); break;
 45                 default:
 46                     _clientSocket = _context.CreateRequestSocket(); break;
 47             }
 48             _clientSocket.Connect("tcp://" + _ip + ":" + _port);
 49         }
 50 
 51         public void StartAsyncReceive()
 52         {
 53             task = Task.Factory.StartNew(() =>
 54          AsyncRead(_clientSocket), TaskCreationOptions.LongRunning);
 55            
 56         }
 57 
 58         private void AsyncRead(NetMQSocket cSocket)
 59         {
 60             while (true)
 61             {
 62                 var msg = cSocket.ReceiveMessage();
 63                 OnOnReceive(new DataEventArgs<NetMQSocket, NetMQMessage>(cSocket, msg));
 64             }
 65         }
 66 
 67         public NetMQSocket Client
 68         {
 69             get { return _clientSocket; }
 70         }
 71 
 72         public T GetClient<T>() where T : NetMQSocket
 73         {
 74             return (T)_clientSocket;
 75         }
 76 
 77         public void Send(NetMQMessage msg)
 78         {
 79             _clientSocket.SendMessage(msg);
 80         }
 81 
 82         public NetMQMessage CreateMessage()
 83         {
 84             return new NetMQMessage();
 85         }
 86 
 87         public NetMQMessage ReceiveMessage()
 88         {
 89             return _clientSocket.ReceiveMessage();
 90         }
 91 
 92         public void Dispose()
 93         {
 94             _clientSocket.Dispose();
 95             _context.Dispose();
 96             if (task != null)
 97             {
 98                 task.Dispose();
 99             }
100         }
101     }

 

  客戶端提供了,同步接受消息和異步接收消息兩種方式,當啟動異步時,就開始循環的讀取消息了,當讀到消息時拋出事件,並且針對任務等做了資源的釋放。並提供創建消息和返回MQ對象等公共方法,可以在開發過程中快速的入手和使用。

  先簡單說一下response和request模式,就是響應模式,當mq客戶端向mq的服務端發送消息時,需要得到及時的響應,並返回給使用者或者是用戶,這就需要及時響應的服務端程序,一般的MQ都會有這種功能,也是使用最廣泛的,我們就先寫一個這種類型的demo,基於我們前面提供的客戶端和服務端。

  Server Console

  這里我提供了2種也是最常用的2種服務端方式,並且提供了不同的處理方式。

  1 class Program
  2     {
  3         private static OctMQServer _server;
  4         static ServerType _type;
  5         static void Main(string[] args)
  6         {
  7             AppDomain.CurrentDomain.UnhandledException += CurrentDomain_UnhandledException;
  8             CreateCmd();
  9 
 10         }
 11 
 12         /// <summary>
 13         /// 創建mq對象
 14         /// </summary>
 15         static void Create()
 16         {
 17             _server = new OctMQServer();
 18             _server.OnReceive += server_OnReceive;
 19             _server.Init(5555, _type);
 20            
 21         }
 22 
 23         /// <summary>
 24         /// 選擇類型
 25         /// </summary>
 26         private static void CreateCmd()
 27         {
 28             Csl.Wl(ConsoleColor.Red, "請選擇您要創建的MQ服務端類型");
 29             Csl.Wl(ConsoleColor.Yellow, "1.PUB   2.REP");
 30             var key = System.Console.ReadLine();
 31             switch (key)
 32             {
 33                 case "1":
 34                     {
 35                         _type = ServerType.Pub;
 36                         Create();
 37                         Cmd();
 38                     }
 39 
 40                     break;
 41                 case "2":
 42                     _type = ServerType.Response;
 43                     Create();
 44                     Cmd();
 45                     break;
 46                 default:
 47                     {
 48                         CreateCmd();
 49                        
 50                     }
 51                     break;
 52             }
 53         }
 54 
 55         static void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e)
 56         {
 57             Csl.WlEx((Exception)e.ExceptionObject);
 58         }
 59 
 60         /// <summary>
 61         /// 接收消息
 62         /// </summary>
 63         private static void Cmd()
 64         {
 65             if (_type == ServerType.Pub)
 66             {
 67                 Csl.Wl(ConsoleColor.Red, "請輸入您要發個訂閱者的信息主題與信息用空格分開");
 68             }
 69             else
 70             {
 71                 Csl.Wl(ConsoleColor.Red, "等待消息");
 72             }
 73             var cmd = System.Console.ReadLine();
 74 
 75             switch (cmd)
 76             {
 77                 case "exit":
 78                     Csl.Wl("正在關閉應用程序。。。等待最后一個心跳執行完成。。。");
 79                     _server.Dispose();
 80                     break;
 81 
 82                 default:
 83                     {
 84                         var str = cmd.Split(' ');
 85                         var msg = _server.CreateMessage();
 86                         msg.Append(str[0],Encoding.UTF8);
 87                         msg.Append(str[1],Encoding.UTF8);
 88                         _server.Send(msg);
 89                         Cmd();
 90                         break;
 91                     }
 92                     return;
 93             }
 94         }
 95 
 96         static void server_OnReceive(object sender, DataEventArgs<NetMQ.NetMQSocket, NetMQ.NetMQMessage> e)
 97         {
 98             var msg = e.Arg2;
 99             var server = e.Arg1;
100             Csl.Wl(msg.Pop().ConvertToString(Encoding.UTF8));
101             server.Send("你好,您的請求已處理,並返回消息及處理結果",Encoding.UTF8);
102         }
103     }

  Client Form 

  客戶端,我使用winform來處理,並且配合控制台使用,這個用法有些巧妙,不會的同學可以私密我,嘿嘿,先上截圖,也是可以同時處理兩種方式,給個demo,方便大家在實際項目中使用:

  響應式:

  

  訂閱者式:

  

  不會做gif ,我就逐步說吧,從訂閱者模式中我們可以看到,我的打開順序1-》2->3,先打開1,訂閱了t的主題,發了2個消息,內容1和內容2,第一個程序均收到,這時我啟動另外一個程序,同樣訂閱t這個主題,發現消息是通過輪詢的方式分別向兩個訂閱者發送,這樣,我們在處理一些比較耗時的業務邏輯,並且不會因為並發出現問題時,就可以使用多個訂閱者,分別處理業務從而大大的提高我們的系統性能。

  然后打開第三個,訂閱y這個主題,這時發送y的主題消息,前2個訂閱者就無法收到了,這樣我們還可以區分業務,進行多進程的處理,更高的提高可用性和可擴展性,並結合高性能的緩存解決方案處理高並發的業務邏輯。

  貼出客戶端代碼:

  

 1 public partial class Form1 : Form
 2     {
 3         public Form1()
 4         {
 5             InitializeComponent();
 6             Csl.Init();
 7         }
 8 
 9         /// <summary>
10         /// mq客戶端
11         /// </summary>
12         private OctMQClient _client;
13 
14         /// <summary>
15         /// 訂閱者模式連接
16         /// </summary>
17         /// <param name="sender"></param>
18         /// <param name="e"></param>
19         private void btnConn_Click(object sender, EventArgs e)
20         {
21             _client = new OctMQClient();
22             _client.OnReceive += _client_OnReceive;
23 
24             _client.Init(txtip.Text,int.Parse(txtport.Text),ClientType.Sub);
25             var sub = (SubscriberSocket) _client.Client;
26             sub.Subscribe(txtTop.Text);
27             _client.StartAsyncReceive();
28             
29         }
30 
31         /// <summary>
32         /// 訂閱者模式受到消息
33         /// </summary>
34         /// <param name="sender"></param>
35         /// <param name="e"></param>
36         void _client_OnReceive(object sender, Core.Args.DataEventArgs<NetMQ.NetMQSocket, NetMQ.NetMQMessage> e)
37         {
38             var msg = e.Arg2;
39             Csl.Wl("主題:"+msg.Pop().ConvertToString(Encoding.UTF8));
40             Csl.Wl("內容:" + msg.Pop().ConvertToString(Encoding.UTF8));
41         }
42 
43         /// <summary>
44         /// 發送響應消息
45         /// </summary>
46         /// <param name="sender"></param>
47         /// <param name="e"></param>
48         private void btnSend_Click(object sender, EventArgs e)
49         {
50             using (_client = new OctMQClient())
51             {
52                 _client.Init(txtip.Text, int.Parse(txtport.Text), ClientType.Request);
53                 var content = txtContent.Text;
54                 var msg = _client.CreateMessage();
55                 msg.Append(content, Encoding.UTF8);
56                 _client.Send(msg);
57                 var rmsg = _client.ReceiveMessage();
58                 var reqStr = rmsg.Pop().ConvertToString(Encoding.UTF8);
59                 Csl.Wl(reqStr);
60             }
61             
62         }
63 
64         /// <summary>
65         /// 釋放資源
66         /// </summary>
67         /// <param name="e"></param>
68         protected override void OnClosed(EventArgs e)
69         {
70             base.OnClosed(e);
71             if (_client != null)
72             {
73                 _client.Dispose();                
74             }
75         }
76     }

  好了,大家先消化一下,等系列寫完了,我會提交到github上。下一期,會寫一些並發情況下的應用。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM