針對高並發,可擴展的互聯網架構,搭建消息隊列(一)
想開發高並發可擴展的互聯網架構,消息隊列是不可缺少的,目前主流的消息隊列,有windows自帶的MSMQ,還有跨平台的強大的ZeroMQ,這里我們就選用ZeroMQ.
ZeroMQ介紹:(也拼寫作 ØMQ、 0MQ 或 ZMQ) 是個非常輕量級的開源消息隊列軟件。它沒有獨立的服務器,消息直接從一個應用程序被發送到另一個應用程序。ZeroMQ的學習和應用也非常簡單,它只有一個 C++ 編寫成的單個庫文件libzmq.dll, 可以鏈接到任何應用程序中。如果要在.NET 環境中使用,我們需要用到一個C#編寫的名為 clrzmq.dll 包裝庫。ZeroMQ可以在 Windows、 OS X 和 Linux 等多種操作系統上運行, C、 C++、C#、 Java、 Python 等語言都可以編寫ZeroMQ 應用程序這使得不同平台上的不同應用程序之間可以相互通訊。
1、環境搭建:
codeproject專題,下載對應的Download binaries - 377.6 KB,解壓縮到你的指定路徑。
這里我們就不詳細介紹,主要說一下C#封裝好的版本,NetMQ,是基於ZeroMQ進行封裝的。就不需要下載了,直接nuget上獲取:
PM> Install-Package NetMQ
為什么不直接用ZeroMQ,而使用NetMQ,運行非托管代碼的托管應用程序內可能會出現許多想不到的問題,像內存泄漏和奇怪的沒有訪問錯誤。而NetMQ使用原生的C#語言,它更容易調試原生C#代碼,你可以下載代碼,調試你的系統。你可以在github上貢獻。
待安裝好后,系統會自動添加NetMQ的引用。
可以看到,NetMQ是基於zmq進行開發的,其實就是ZeroMQ了,並且已經為我們封裝了各種功能的MQ對象,比如REP/REQ ,PUB/SUB(主題式訂閱),XPUB/XSUB(非主題訂閱),Push/Pull,甚至還有路由模式等,從字面意義上,應該能看出個大概,后面我們一個一個進行測試使用。
先看個簡單的demo,初步了解一下:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 using (NetMQContext context = NetMQContext.Create()) 6 { 7 Task serverTask = Task.Factory.StartNew(() =>Server(context)); 8 Task clientTask = Task.Factory.StartNew(() => Client(context)); 9 Task.WaitAll(serverTask, clientTask); 10 } 11 } 12 13 static void Server(NetMQContext context) 14 { 15 using (NetMQSocket serverSocket = context.CreateResponseSocket()) 16 { 17 serverSocket.Bind("tcp://*:5555"); 18 19 while (true) 20 { 21 string message = serverSocket.ReceiveString(); 22 23 Console.WriteLine("Receive message {0}", message); 24 25 serverSocket.Send("World"); 26 27 if (message == "exit") 28 { 29 break; 30 } 31 } 32 } 33 } 34 35 static void Client(NetMQContext context) 36 { 37 using (NetMQSocket clientSocket = context.CreateRequestSocket()) 38 { 39 clientSocket.Connect("tcp://127.0.0.1:5555"); 40 41 while (true) 42 { 43 Console.WriteLine("Please enter your message:"); 44 string message = Console.ReadLine(); 45 clientSocket.Send(message); 46 47 string answer = clientSocket.ReceiveString(); 48 49 Console.WriteLine("Answer from server: {0}", answer); 50 51 if (message == "exit") 52 { 53 break; 54 } 55 } 56 } 57 } 58 }
代碼比較簡潔的介紹了REP/REQ模式下NetMQ的使用,而且我們可以看到,這個Mq對象是可以在不同的線程間切換使用的,也許你會測試中文,那就先序列化再反序列化吧,因為可能會出現亂碼喲。
這里,我先簡單根據NetMQ,封裝一個Server端和一個Client端,方便后面使用,當然也可以不封裝,直接使用:
Server:
1 /// <summary> 2 /// Mq服務端 3 /// </summary> 4 public class OctMQServer : IDisposable 5 { 6 public event EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> OnReceive; 7 8 protected virtual void OnOnReceive(DataEventArgs<NetMQSocket, NetMQMessage> e) 9 { 10 EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> handler = OnReceive; 11 if (handler != null) handler(this, e); 12 } 13 14 private int _port; 15 private NetMQSocket _serverSocket; 16 private ServerType _type; 17 private NetMQContext _context; 18 19 public void Init(int port, ServerType type) 20 { 21 _type = type; 22 _port = port; 23 _context = NetMQContext.Create(); 24 CreateServer(); 25 } 26 27 void CreateServer() 28 { 29 switch (_type) 30 { 31 case ServerType.Response: 32 _serverSocket = _context.CreateResponseSocket(); break; 33 case ServerType.Pub: 34 _serverSocket = _context.CreatePushSocket(); break; 35 case ServerType.Router: 36 _serverSocket = _context.CreateRouterSocket(); break; 37 case ServerType.Stream: 38 _serverSocket = _context.CreateStreamSocket(); break; 39 case ServerType.Push: 40 _serverSocket = _context.CreatePushSocket(); break; 41 case ServerType.XPub: 42 _serverSocket = _context.CreateXPublisherSocket(); break; 43 default: 44 _serverSocket = _context.CreateResponseSocket(); break; 45 } 46 _serverSocket.Bind("tcp://*:" + _port); 47 Task.Factory.StartNew(() => 48 AsyncRead(_serverSocket), TaskCreationOptions.LongRunning); 49 } 50 51 private void AsyncRead(NetMQSocket serverSocket) 52 { 53 while (true) 54 { 55 var msg = serverSocket.ReceiveMessage(); 56 OnOnReceive(new DataEventArgs<NetMQSocket, NetMQMessage>(serverSocket, msg)); 57 } 58 } 59 60 61 public NetMQSocket Server 62 { 63 get { return _serverSocket; } 64 } 65 66 public void Dispose() 67 { 68 _serverSocket.Dispose(); 69 _context.Dispose(); 70 } 71 72 public void Send(NetMQMessage msg) 73 { 74 _serverSocket.SendMessage(msg); 75 } 76 77 public NetMQMessage CreateMessage() 78 { 79 return new NetMQMessage(); 80 } 81 }
這樣,使用者就可以根據枚舉進行服務端的創建, 不用糾結到底用哪一種服務端,並且封裝了一些消息的異步事件,方便在開發中使用,可以使用多播委托,針對不同的消息進行不同的處理,我這里使用的while循環,當然,在netmq內部提供了循環器和心跳等,都可以在實際的開發中進行擴展和使用:Poller和NetMQTimer。
Client:
1 /// <summary> 2 /// MQ客戶端 3 /// </summary> 4 public class OctMQClient:IDisposable 5 { 6 public event EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> OnReceive; 7 8 protected virtual void OnOnReceive(DataEventArgs<NetMQSocket, NetMQMessage> e) 9 { 10 EventHandler<DataEventArgs<NetMQSocket, NetMQMessage>> handler = OnReceive; 11 if (handler != null) handler(this, e); 12 } 13 14 private int _port; 15 private NetMQSocket _clientSocket; 16 private ClientType _type; 17 private NetMQContext _context; 18 private string _ip; 19 private Task task; 20 public void Init(string ip, int port, ClientType type) 21 { 22 _type = type; 23 _ip = ip; 24 _port = port; 25 _context = NetMQContext.Create(); 26 CreateClient(); 27 } 28 29 void CreateClient() 30 { 31 switch (_type) 32 { 33 case ClientType.Request: 34 _clientSocket = _context.CreateRequestSocket(); break; 35 case ClientType.Sub: 36 _clientSocket = _context.CreateSubscriberSocket(); break; 37 case ClientType.Dealer: 38 _clientSocket = _context.CreateDealerSocket(); break; 39 case ClientType.Stream: 40 _clientSocket = _context.CreateStreamSocket(); break; 41 case ClientType.Pull: 42 _clientSocket = _context.CreatePullSocket(); break; 43 case ClientType.XSub: 44 _clientSocket = _context.CreateXSubscriberSocket(); break; 45 default: 46 _clientSocket = _context.CreateRequestSocket(); break; 47 } 48 _clientSocket.Connect("tcp://" + _ip + ":" + _port); 49 } 50 51 public void StartAsyncReceive() 52 { 53 task = Task.Factory.StartNew(() => 54 AsyncRead(_clientSocket), TaskCreationOptions.LongRunning); 55 56 } 57 58 private void AsyncRead(NetMQSocket cSocket) 59 { 60 while (true) 61 { 62 var msg = cSocket.ReceiveMessage(); 63 OnOnReceive(new DataEventArgs<NetMQSocket, NetMQMessage>(cSocket, msg)); 64 } 65 } 66 67 public NetMQSocket Client 68 { 69 get { return _clientSocket; } 70 } 71 72 public T GetClient<T>() where T : NetMQSocket 73 { 74 return (T)_clientSocket; 75 } 76 77 public void Send(NetMQMessage msg) 78 { 79 _clientSocket.SendMessage(msg); 80 } 81 82 public NetMQMessage CreateMessage() 83 { 84 return new NetMQMessage(); 85 } 86 87 public NetMQMessage ReceiveMessage() 88 { 89 return _clientSocket.ReceiveMessage(); 90 } 91 92 public void Dispose() 93 { 94 _clientSocket.Dispose(); 95 _context.Dispose(); 96 if (task != null) 97 { 98 task.Dispose(); 99 } 100 } 101 }
客戶端提供了,同步接受消息和異步接收消息兩種方式,當啟動異步時,就開始循環的讀取消息了,當讀到消息時拋出事件,並且針對任務等做了資源的釋放。並提供創建消息和返回MQ對象等公共方法,可以在開發過程中快速的入手和使用。
先簡單說一下response和request模式,就是響應模式,當mq客戶端向mq的服務端發送消息時,需要得到及時的響應,並返回給使用者或者是用戶,這就需要及時響應的服務端程序,一般的MQ都會有這種功能,也是使用最廣泛的,我們就先寫一個這種類型的demo,基於我們前面提供的客戶端和服務端。
Server Console
這里我提供了2種也是最常用的2種服務端方式,並且提供了不同的處理方式。
1 class Program 2 { 3 private static OctMQServer _server; 4 static ServerType _type; 5 static void Main(string[] args) 6 { 7 AppDomain.CurrentDomain.UnhandledException += CurrentDomain_UnhandledException; 8 CreateCmd(); 9 10 } 11 12 /// <summary> 13 /// 創建mq對象 14 /// </summary> 15 static void Create() 16 { 17 _server = new OctMQServer(); 18 _server.OnReceive += server_OnReceive; 19 _server.Init(5555, _type); 20 21 } 22 23 /// <summary> 24 /// 選擇類型 25 /// </summary> 26 private static void CreateCmd() 27 { 28 Csl.Wl(ConsoleColor.Red, "請選擇您要創建的MQ服務端類型"); 29 Csl.Wl(ConsoleColor.Yellow, "1.PUB 2.REP"); 30 var key = System.Console.ReadLine(); 31 switch (key) 32 { 33 case "1": 34 { 35 _type = ServerType.Pub; 36 Create(); 37 Cmd(); 38 } 39 40 break; 41 case "2": 42 _type = ServerType.Response; 43 Create(); 44 Cmd(); 45 break; 46 default: 47 { 48 CreateCmd(); 49 50 } 51 break; 52 } 53 } 54 55 static void CurrentDomain_UnhandledException(object sender, UnhandledExceptionEventArgs e) 56 { 57 Csl.WlEx((Exception)e.ExceptionObject); 58 } 59 60 /// <summary> 61 /// 接收消息 62 /// </summary> 63 private static void Cmd() 64 { 65 if (_type == ServerType.Pub) 66 { 67 Csl.Wl(ConsoleColor.Red, "請輸入您要發個訂閱者的信息主題與信息用空格分開"); 68 } 69 else 70 { 71 Csl.Wl(ConsoleColor.Red, "等待消息"); 72 } 73 var cmd = System.Console.ReadLine(); 74 75 switch (cmd) 76 { 77 case "exit": 78 Csl.Wl("正在關閉應用程序。。。等待最后一個心跳執行完成。。。"); 79 _server.Dispose(); 80 break; 81 82 default: 83 { 84 var str = cmd.Split(' '); 85 var msg = _server.CreateMessage(); 86 msg.Append(str[0],Encoding.UTF8); 87 msg.Append(str[1],Encoding.UTF8); 88 _server.Send(msg); 89 Cmd(); 90 break; 91 } 92 return; 93 } 94 } 95 96 static void server_OnReceive(object sender, DataEventArgs<NetMQ.NetMQSocket, NetMQ.NetMQMessage> e) 97 { 98 var msg = e.Arg2; 99 var server = e.Arg1; 100 Csl.Wl(msg.Pop().ConvertToString(Encoding.UTF8)); 101 server.Send("你好,您的請求已處理,並返回消息及處理結果",Encoding.UTF8); 102 } 103 }
Client Form
客戶端,我使用winform來處理,並且配合控制台使用,這個用法有些巧妙,不會的同學可以私密我,嘿嘿,先上截圖,也是可以同時處理兩種方式,給個demo,方便大家在實際項目中使用:
響應式:
訂閱者式:
不會做gif ,我就逐步說吧,從訂閱者模式中我們可以看到,我的打開順序1-》2->3,先打開1,訂閱了t的主題,發了2個消息,內容1和內容2,第一個程序均收到,這時我啟動另外一個程序,同樣訂閱t這個主題,發現消息是通過輪詢的方式分別向兩個訂閱者發送,這樣,我們在處理一些比較耗時的業務邏輯,並且不會因為並發出現問題時,就可以使用多個訂閱者,分別處理業務從而大大的提高我們的系統性能。
然后打開第三個,訂閱y這個主題,這時發送y的主題消息,前2個訂閱者就無法收到了,這樣我們還可以區分業務,進行多進程的處理,更高的提高可用性和可擴展性,並結合高性能的緩存解決方案處理高並發的業務邏輯。
貼出客戶端代碼:
1 public partial class Form1 : Form 2 { 3 public Form1() 4 { 5 InitializeComponent(); 6 Csl.Init(); 7 } 8 9 /// <summary> 10 /// mq客戶端 11 /// </summary> 12 private OctMQClient _client; 13 14 /// <summary> 15 /// 訂閱者模式連接 16 /// </summary> 17 /// <param name="sender"></param> 18 /// <param name="e"></param> 19 private void btnConn_Click(object sender, EventArgs e) 20 { 21 _client = new OctMQClient(); 22 _client.OnReceive += _client_OnReceive; 23 24 _client.Init(txtip.Text,int.Parse(txtport.Text),ClientType.Sub); 25 var sub = (SubscriberSocket) _client.Client; 26 sub.Subscribe(txtTop.Text); 27 _client.StartAsyncReceive(); 28 29 } 30 31 /// <summary> 32 /// 訂閱者模式受到消息 33 /// </summary> 34 /// <param name="sender"></param> 35 /// <param name="e"></param> 36 void _client_OnReceive(object sender, Core.Args.DataEventArgs<NetMQ.NetMQSocket, NetMQ.NetMQMessage> e) 37 { 38 var msg = e.Arg2; 39 Csl.Wl("主題:"+msg.Pop().ConvertToString(Encoding.UTF8)); 40 Csl.Wl("內容:" + msg.Pop().ConvertToString(Encoding.UTF8)); 41 } 42 43 /// <summary> 44 /// 發送響應消息 45 /// </summary> 46 /// <param name="sender"></param> 47 /// <param name="e"></param> 48 private void btnSend_Click(object sender, EventArgs e) 49 { 50 using (_client = new OctMQClient()) 51 { 52 _client.Init(txtip.Text, int.Parse(txtport.Text), ClientType.Request); 53 var content = txtContent.Text; 54 var msg = _client.CreateMessage(); 55 msg.Append(content, Encoding.UTF8); 56 _client.Send(msg); 57 var rmsg = _client.ReceiveMessage(); 58 var reqStr = rmsg.Pop().ConvertToString(Encoding.UTF8); 59 Csl.Wl(reqStr); 60 } 61 62 } 63 64 /// <summary> 65 /// 釋放資源 66 /// </summary> 67 /// <param name="e"></param> 68 protected override void OnClosed(EventArgs e) 69 { 70 base.OnClosed(e); 71 if (_client != null) 72 { 73 _client.Dispose(); 74 } 75 } 76 }
好了,大家先消化一下,等系列寫完了,我會提交到github上。下一期,會寫一些並發情況下的應用。