基礎拾遺
前言
消息隊列,在高並發環境下,由於來不及同步處理,請求往往會發生堵塞,比如說雙十一很多人進行下單,購買物品這是對於數據的操作是非常之大的,不管是是insert還是update是不是都有及時操作數據庫,那么就有可能造成數據庫思索移除什么堆積阻塞。那么我們這時是不是加入異步,nosql是不是能減輕其壓力,那么這中間劍氣的橋梁就是mq了,當然她的使用場景有很多,我們接下來把社么是消息隊列了解清楚它是怎么一回事之后,希望大家能在自己的項目中靈活應用即可。
消息隊列(MQ)
我們先從圖文上說一下它的使用場景,異步處理,應用解耦,流量削鋒和消息通訊四個場景。因為以前開發過商城所以就以下載訂單來敘述一下,他的適應場景吧。
異步處理
比如我們下載訂單后發送郵件與短信給使用者(簡單舉例一般不會哈)。那么我們在寫程序一般怎么處理呢?(1)把下單信息存入數據庫中,調用郵件,短信接口,發送(並行發送或者一個一個發送),返回界面。但是我們計算一下如果每個操作時間為30ms那么最少也需要60ms,多的情況是90ms,
那么如果我們加入消息隊列將是一個怎樣的情況呢,我們先把下單信息存入數據庫,同時把信息放到消息隊列。然后就不用管它了。這樣的話所用時間就是30ms+1ms(存消息隊列)。其實放消息隊列中還是要管的的,但那是消費者的事和下單這個生產這無關。
應用解耦
還是商城下載訂單的問題,當我們商城下載訂單,然后公司內部erp中庫存管理相應庫存進行同步。一般我們怎么處理,下載完訂單,調用erp系統,然后處理erp數據,接着把erp數據庫中的信息進行同步到商城,這個時候處理上面提到的效率,還有一個問題,需要解決:如果兩個系統不能同時訪問,你會怎么做。那么我們就要對兩個系統進行解耦了對不對。這個時候消息隊列就有了用武之地。如下圖:其實消息隊列在這個功能下,我們的erp系統也有寫入的時候,在這不再累述業務,大家了解消息隊列的用途即可。
流量削鋒
做過商城的應該都會遇到這個問題,當舉行活動是擁擠大量的用戶,可能會是系統崩潰,這時候流量控制,和異常處理是一件特別重要的工作。當然請不要說在這其他方法,我們不對其進行討論,我們盡對消息隊列的使用做簡單介紹。
消息通訊
消息通訊是指,消息隊列一般都內置了高效的通信機制,因此也可以用在純的消息通訊。比如實現點對點消息隊列,或者聊天室等
以上我們大致說了一下他的使用場景,那么不知道大家有沒有了解到它到底是個什么東西?
其實吧消息隊列就是一個生產者,把相應消息(對象)放到消息隊列(中間件中),然后它就什么都不用管了,接下來消費者(或者叫訂閱者)去消息隊列中間件中去獲取訂閱的信息,它自己再去處理。能解決的問題咱們從上面的場景應該已經了解到了,解耦,提高效率。那么重點來了消息隊列中間件又是什么呢?它都有哪些,又是怎么實現的呢?下面我們就來了解其中的一個中間件RabbitMq。
RabbitMq
大家大致知道什么是消息隊列了,那么它的實現是什么樣的呢?現在基本上也知道它實現重要的一環是消息對立中間件,rabbitmq,就是其中之一,其中還包括:Active MQ,Rocket Mq,Kafka,Zero MQ甚至也有人用redis來實現。
從我的角度來說我去了解了兩個AcctionMQ與RabbitMq這兩種最終選擇了它,也簡單做了相應的封裝,來我先來介紹一下RabbitMq.
RabbitMQ是一個消息代理 - 一個消息系統的媒介。它可以為你的應用提供一個通用的消息發送和接收平台,並且保證消息在傳輸過程中的安全。它提供的內部機制包括持久性機制、投遞確認、發布者證實和高可用性機制,多協議,集群,聯合我們可以在實現的過程中針對於性能與可靠性進行相應權衡。
看一下:rabbitmq可視化工具如下(此可視化web的操作請大家自行查詢):
其實消息隊列的協議是AMQP,有很多對此的介紹在這不再累述。結果上面的了解我們大致知道它是個什么東西,不過我們也要在此提一下,幾個概念。消息、隊列、路由(包括點對點和發布/訂閱),生產者,消費者,具體解釋我覺得不需要了,就是你理解的字面意思。
其中隊列我們一般用P來表示,消費者一般用C,隊列(存消息的集合)用q。路由是R.多個消費者可以訪問多個q。接下來開始我們的實現了。
RabbitMq的代碼實現
RabbitMq連接
首先看一下配置文件信息:
<appSettings> <!--rabbitMQ--> <add key="serveraddress" value="amqp://192.168.0.76:5672/"/> <add key="virtualhost" value="erpadminvirtualhost"/> <add key="username" value="tx_junpin"/> <add key="password" value="abc.1234%"/>
以上分別是訪問服務地址,虛擬地址(可在可視化上手動添加,記得要加一條數據進去,然后刪除,好比初始haunted一樣),用戶,密碼。其中web訪問地址一般為端口后改為“15672”.
連接關鍵數據准備好之后就是c# 中代碼的實現了
private RabbitConsumerConfig RBGetinfo; private ConnectionFactory cf = new ConnectionFactory(); private IConnection conn; //建立聯接 /// <summary> /// 初始化Rabbit連接 /// </summary> /// <param name="rbinfo"></param> public RabbitConsumer(RabbitConsumerConfig rbinfo) { RBGetinfo = rbinfo; cf = new ConnectionFactory() { UserName = RBGetinfo.UserName, Password = RBGetinfo.Password, VirtualHost = RBGetinfo.VirtualHost, RequestedHeartbeat = 0, Uri = RBGetinfo.ServerAddress }; conn = cf.CreateConnection(); }
以上ConnectionFactory 內部為中間件提供的連接工廠。方便與AMQP代理相關聯的Connection。用興趣的小伙伴請F12去看代碼吧。
調用代碼封裝

/// <summary> /// 隊列出列的方法,傳入處理隊列中body的方法,並傳入隊列名稱 /// </summary> /// <param name="messageProcessAction">要執行的方法(委托)</param> /// <param name="queuename">隊列名稱</param> /// <param name="count">獲取數據條數</param> public void ConsumeMessage(Action<string> messageProcessAction, string queuename, ushort count) { if (string.IsNullOrEmpty(queuename)) { throw new ArgumentNullException("queuename"); } CheckConn(); using (IModel ch = conn.CreateModel()) { //第二種取法QueueingBasicConsumer基於訂閱模式 QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch); ch.BasicQos(0, count, true); ch.BasicConsume(queuename, false, consumer); while (true) { string message = ""; try { BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); IBasicProperties props = e.BasicProperties; byte[] body = e.Body; message = System.Text.Encoding.UTF8.GetString(body); messageProcessAction.Invoke(System.Text.Encoding.UTF8.GetString(body).Replace("\0\0\0body\0\n", "").Replace("\0", "").ToString()); ch.BasicAck(e.DeliveryTag, false); } catch (Exception ex) { throw new RabbitException() { InternalException = ex, QueueName = queuename, RabbitInfo = RBGetinfo.ToString(), CurrentMessage = message }; } } } }
其中 CheckConn()判斷是否連接如果沒連接繼續連接誒:

private void CheckConn() { if (RBGetinfo != null && !IsOpen) { cf = new ConnectionFactory() { UserName = RBGetinfo.UserName, Password = RBGetinfo.Password, VirtualHost = RBGetinfo.VirtualHost, RequestedHeartbeat = 0, Uri = RBGetinfo.ServerAddress }; conn = cf.CreateConnection(); } }
可能大家看到注釋了,是的,RabbitMQ Consumer 獲取消息有兩種方式(poll、subscribe) 。-----訂閱與輪詢。我們用的是訂閱模式。寫到者突然件想有時間還是要把上面提到的那個幾個概念再梳理一下吧。
其中委托調用的方法:
public void StockTBCExecute(string body) { logger.Error("StockTBCExecute" + body); }
你有可能會問。我可不可以定義委托方法為多個參數?我只能說,你看一下代碼:
byte[] body = e.Body; message = System.Text.Encoding.UTF8.GetString(body); messageProcessAction.Invoke(System.Text.Encoding.UTF8.GetString(body).Replace("\0\0\0body\0\n", "").Replace("\0", "").ToString());
至於能否擴展你們自己去研究吧。
向中間件插入數據

public class RabbitMQManager { private static readonly string _serverAddress; private static readonly string _virtualHost; private static readonly string _userName; private static readonly string _password; private static readonly ILog _logger = LogManager.GetLogger(typeof(RabbitMQManager)); private static RabbitProducer _rabbitProducer; static RabbitMQManager() { _serverAddress = ConfigurationManager.AppSettings["serveraddress"]; _virtualHost = ConfigurationManager.AppSettings["virtualhost"]; _userName = ConfigurationManager.AppSettings["username"]; _password = ConfigurationManager.AppSettings["password"]; } /// <summary> /// 交換鏈接信息 /// </summary> /// <param name="routingKey">路由關鍵字</param> /// <param name="queueName">隊列名稱</param> /// <param name="message">消息內容</param> public static void SendRabbitMQ(string routingKey, string queueName, string message) { RabbitProducerConfig _rabbitConfig = new RabbitProducerConfig() { ServerAddress = _serverAddress, VirtualHost = _virtualHost, UserName = _userName, Password = _password, Exchange = "erp.service", ExchangeType = "direct", RoutingKey = routingKey }; if (_rabbitProducer == null || !_rabbitProducer.IsOpen) { _rabbitProducer = new RabbitProducer(_rabbitConfig); } try { _rabbitProducer.ProduceMessage(message, queueName); } catch (Exception ex) { _logger.Error(ex); } finally { _rabbitProducer.Close(); } } }
以上代碼好像也沒有什么好解釋的,這里面用到的路由與於隊列參數,基本上我使用的一個隊列會對應一個路由,但是 rabbitmq並非只有這種方式。
那么就在這多說一點吧。
RabbitMQ三種路由方式
Direct Exchange(直接路由)
任何發送到Direct Exchange的消息都會被轉發到RouteKey中指定的Queue.(我封裝的方法是這種)
1
.一般情況可以使用rabbitMQ自帶的Exchange:"(該Exchange的名字為空字符串,下文稱其為
default
Exchange)。
2
.這種模式下不需要將Exchange進行任何綁定(binding)操作
3
.消息傳遞時需要一個“RouteKey”,可以簡單的理解為要發送到的隊列名字。
4
.如果vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄。
Fanout Exchange(廣播路由)
任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的所有Queue上。
1
.可以理解為路由表的模式
2
.這種模式不需要RouteKey
3
.這種模式需要提前將Exchange與Queue進行綁定,一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定。
4
.如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄
Topic Exchange(主題訂閱模式路由)
任何發送到Topic Exchange的消息都會被轉發到所有關心RouteKey中指定話題的Queue上
1
.這種模式較為復雜,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一個“標題”(RouteKey),Exchange會將消息轉發到所有關注主題能與RouteKey模糊匹配的隊列。
2
.這種模式需要RouteKey,也許要提前綁定Exchange與Queue。
3
.在進行綁定時,要提供一個該隊列關心的主題,如“#.log.#”表示該隊列關心所有涉及log的消息(一個RouteKey為”MQ.log.error”的消息會被轉發到該隊列)。
4
.“#”表示
0
個或若干個關鍵字,“*”表示一個關鍵字。如“log.*”能與“log.warn”匹配,無法與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。
5
.同樣,如果Exchange沒有發現能夠與RouteKey匹配的Queue,則會拋棄此消息。
最后的最后源碼
源碼:https://github.com/wyl1924/RabbitMQ
不管你是否了解上面我說的,你可以直接用下面的方法來使用我封裝的這個類庫:
插入指定隊列一條數據:
RabbitMQManager.SendRabbitMQ(RoutKey.RoutKey_stock_eshop, Queuen.Queuen_Stock_Eshop, "0108ZLY036");
獲取隊列中的數據(先進先出):
public void Execute1() { while (true) { try { if (rc1 == null || !rc1.IsOpen) { rc1 = new RabbitConsumer(rcc); } rc1.ConsumeMessage(StockEshopExecute, RabbitMQ.RabbitMqConst.Queuen.Queuen_Stock_Eshop, 1); } catch (Exception ex) { logger.ErrorFormat("Execute1,異常:{0}", ex.Message); } } }
總結:
大致講的是怎么在項目中使用,其中有很多細節與需要注意的東西詳細闡述,大家可自己詳細的去了解,針對於代碼,我已經傳到github上去,如果有什么問題大家可以給我提出來,有什么需要討論的,請在公告來中找到QQ與我聯系。