1. RabbitMQ
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過寫和檢索出入列隊的針對應用程序的數據(消息)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。其中較為成熟的MQ產品有MSMQ,ActiveMQ,RabbitMQ,IBM WEBSPHERE MQ 等。
RabbitMQ是使用Erlang開發的,開源的,一個在高級消息隊列協議(AMQP)基礎上完整的,可復用的企業消息系統,遵循Mozilla Public License開源協議。
RabbitMQ支持大多數開發語言如Java,Ruby,Python,.Net,C/C++,Erlang等,各種語言的客戶端可以從官網上下載。
2. RabbitMQ的一些概念
- 連接(Connection),與RabbitMQ Server建立的一個連接,由ConnectionFactory創建,每個connection只與一個物理的Server進行連接,此連接是基於Socket進行連接的,這個可以相似的理解為像一個DB Connection。AMQP一般使用TCP鏈接來保證消息傳輸的可靠性。
- 通道 (Channel),在C#客戶端里應該是叫Model,其他客戶端基本都叫Channel。建立在Connection基礎上的一個通道,相對於Connection來說,它是輕量級的。它就像是Hibernate里面的Session一樣。Channel 主要進行相關定義,發送消息,獲取消息,事務處理等。Channel可以在多線程中使用,但是必須保證任何時候只有一個線程執行命令。一個Connection可以有多個Channel。客戶端程序有時候會是一個多線程程序,每一個線程都想要和RabbitMQ進行連接,但是又不想共享一個連接,這種需求還是比較普遍的。因為一個Connection就是一個TCP鏈接,RabbitMQ在設計的時候不希望與每一個客戶端保持多個TCP連接,但這確實是有些客戶端的需求,所以在設計中引入了Channel的概念,每一個Channel之間沒有任何聯系,是完全分離的。多個Channel來共享一個Connection。
- 交換器(Exchange),發送消息的實體。
- 隊列(Queue),接收消息的實體。
- 綁定器(Bind),將交換器和隊列連接起來,並且封裝消息的路由信息。
3.資源文件
4.發送消息
1 /// <summary> 2 /// </summary> 3 /// <param name="deptId">部門id</param> 4 /// <param name="userCode">用戶名</param> 5 /// <returns></returns> 6 [Generated] 7 protected override String SendMQ(long deptId, string userCode, string clientIP) 8 { 9 var factory = new ConnectionFactory(); 10 factory.HostName = "182.94.73.104"; //MQ服務端IP 11 factory.Port = 15673; //MQ 服務端端口 12 factory.UserName = "login_mq_user"; 13 factory.Password = "chinaiss_xyz@345Q1uTfcx.vb"; 14 //factory.HostName = "localhost"; 15 //factory.UserName = "yy"; 16 //factory.Password = "hello!"; 17 18 19 string ticket = System.Guid.NewGuid().ToString(); //對應URL中的token參數 20 21 //MQINFOPoco MQobj = new MQINFOPoco(); 22 //MQobj.clientIP = clientIP; 23 //MQobj.section = deptId; 24 //MQobj.userId = userCode; 25 //MQobj.loginTime = GetTimeStamp(); 26 //MQobj.timeLive = 480; 27 //MQobj.token = ticket; 28 var loginTime = GetTimeStamp(); 29 30 using (var connection = factory.CreateConnection()) 31 { 32 using (var channel = connection.CreateModel()) 33 { 34 channel.QueueDeclare("login_queue", false, false, false, null); //MQ隊列名稱login_queue 35 //string message = ObjectToJson(MQobj); 36 StringBuilder message = new StringBuilder(@"{""clientIP"":"); 37 message.AppendFormat("\"{0}\"", clientIP); 38 message.AppendFormat(",\"section\":\"{0}\"", deptId); 39 message.AppendFormat(",\"userId\":\"{0}\"", userCode); 40 //message.Append(",\"userId\":\"hxj\""); 41 message.AppendFormat(",\"loginTime\":{0}", loginTime); 42 message.Append(",\"timeLive\":480"); 43 message.AppendFormat(",\"token\":\"{0}\"", ticket); 44 message.Append("}"); 45 46 var properties = channel.CreateBasicProperties(); 47 properties.SetPersistent(true); 48 var body = Encoding.UTF8.GetBytes(message.ToString()); 49 channel.BasicPublish("", "login_queue", properties, body); 50 } 51 } 52 53 return ticket; 54 }