前言
最近在忙一個高考項目,看着系統順利完成了這次高考,終於可以松口氣了。看到那些即將參加高考的學生,也想起當年高三的自己。
下面分享下RabbitMQ實戰經驗,希望對大家有所幫助:
一、生產消息
關於RabbitMQ的基礎使用,這里不再介紹了,項目中使用的是Exchange中的topic模式。
先上發消息的代碼
private bool MarkErrorSend(string[] lstMsg) { try { var factory = new ConnectionFactory() { UserName = "guest",//用戶名 Password = "guest",//密碼 HostName = "localhost",//ConfigurationManager.AppSettings["sHostName"], }; //創建連接 var connection = factory.CreateConnection(); //創建通道 var channel = connection.CreateModel(); try { //定義一個Direct類型交換機 channel.ExchangeDeclare( exchange: "TestTopicChange", //exchange名稱 type: ExchangeType.Topic, //Topic模式,采用路由匹配 durable: true,//exchange持久化 autoDelete: false,//是否自動刪除,一般設成false arguments: null//一些結構化參數,比如:alternate-exchange ); //定義測試隊列 channel.QueueDeclare( queue: "Test_Queue", //隊列名稱 durable: true, //隊列磁盤持久化(要和消息持久化一起使用才有效) exclusive: false,//是否排他的,false。如果一個隊列聲明為排他隊列,該隊列首次聲明它的連接可見,並在連接斷開時自動刪除 autoDelete: false,//是否自動刪除,一般設成false arguments: null ); //將隊列綁定到交換機 string routeKey = "TestRouteKey.*";//*匹配一個單詞 channel.QueueBind( queue: "Test_Queue", exchange: "TestTopicChange", routingKey: routeKey, arguments: null ); //消息磁盤持久化,把DeliveryMode設成2(要和隊列持久化一起使用才有效) IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; channel.ConfirmSelect();//發送確認機制 foreach (var itemMsg in lstMsg) { byte[] sendBytes = Encoding.UTF8.GetBytes(itemMsg); //發布消息 channel.BasicPublish( exchange: "TestTopicChange", routingKey: "TestRouteKey.one", basicProperties: properties, body: sendBytes ); } bool isAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均發送才返回true return isAllPublished; } catch (Exception ex) { //寫錯誤日志 return false; } finally { channel.Close(); connection.Close(); } } catch { //RabbitMQ.Client.Exceptions.BrokerUnreachableException: //When the configured hostname was not reachable. return false; } }
發消息沒啥特別的。關於消息持久化的介紹這里也不再介紹,不懂的可以看上篇文章。發消息需要注意的地方是,可以選擇多條消息一起發送,最后才確定消息發送成功,這樣效率比較高;此外,需要盡量精簡每條消息的長度(樓主在這里吃過虧),不然會因消息過長從而增加發送時間。在實際項目中一次發了4萬多條數據沒有出現問題。
二、接收消息
接下來說下消費消息的過程,我使用的是單個連接多個channel,每個channel每次只取一條消息方法。有人會問單個TCP連接,多個channel會不會影響通信效率。這個理論上肯定會有影響的,看影響大不大而已。我開的channel數一般去到30左右,並沒有覺得影響效率,有可能是因為我每個channel是拿一條消息的原因。通過單個連接多個channel的方法,可以少開了很多連接。至於我為什么選每個channel每次只取一條消息,這是外界因素限制了,具體看自己需求。
接下接收消息的過程,首先定義一個RabbitMQHelper類,里面有個全局的conn連接變量,此外還有創建連接、關閉連接和驗證連接是否打開等方法。程序運行一個定時器,當
檢測到連接未打開的情況下,主動創建連接處理消息。
public class RabbitMQHelper { public IConnection conn = null; /// <summary> /// 創建RabbitMQ消息中間件連接 /// </summary> /// <returns>返回連接對象</returns> public IConnection RabbitConnection(string sHostName, ushort nChannelMax) { try { if (conn == null) { var factory = new ConnectionFactory() { UserName = "guest",//用戶名 Password = "guest",//密碼 HostName = sHostName,//ConfigurationManager.AppSettings["MQIP"], AutomaticRecoveryEnabled = false,//取消自動重連,改用定時器定時檢測連接是否存在 RequestedConnectionTimeout = 10000,//請求超時時間設成10秒,默認的為30秒 RequestedChannelMax = nChannelMax//與開的線程數保持一致 }; //創建連接 conn = factory.CreateConnection(); Console.WriteLine("RabbitMQ連接已創建!"); } return conn; } catch { Console.WriteLine("創建連接失敗,請檢查RabbitMQ是否正常運行!"); return null; } } /// <summary> /// 關閉RabbitMQ連接 /// </summary> public void Close() { try { if (conn != null) { if (conn.IsOpen) conn.Close(); conn = null; Console.WriteLine("RabbitMQ連接已關閉!"); } } catch { } } /// <summary> /// 判斷RabbitMQ連接是否打開 /// </summary> /// <returns></returns> public bool IsOpen() { try { if (conn != null) { if (conn.IsOpen) return true; } return false; } catch { return false; } } }
接下來我們看具體如何接收消息。
private static AutoResetEvent myEvent = new AutoResetEvent(false); private RabbitMQHelper rabbit = new RabbitMQHelper(); private ushort nChannel = 10;//一個連接的最大通道數和所開的線程數一致
首先初始化一個rabbit實例,然后通過RabbitConnection方法創建RabbitMQ連接。
當連接打開時候,用線程池運行接收消息的方法。注意了,這里開的線程必須和開的channel數量一致,不然會有問題(具體問題是,設了RabbitMQ連接超時時間為10秒,有時候不管用,原因未查明。RabbitMQ創建連接默認超時時間為30秒,假如在這個時間內再去調用創建的話,就有可能得到兩倍的channel;)
/// <summary> /// 單個RabbitMQ連接開多個線程,每個線程開一個channel接受消息 /// </summary> private void CreateConnecttion() { try { rabbit.RabbitConnection("localhost", nChannel); if (rabbit.conn != null) { ThreadPool.SetMinThreads(1, 1); ThreadPool.SetMaxThreads(100, 100); for (int i = 1; i <= nChannel; i++) { ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveMsg), ""); } myEvent.WaitOne();//等待所有線程工作完成后,才能關閉連接 rabbit.Close(); } } catch (Exception ex) { rabbit.Close(); Console.WriteLine(ex.Message); } }
接着就是接收消息的方法,處理消息的過程省略了。
/// <summary> /// 接收並處理消息,在一個連接中創建多個通道(channel),避免創建多個連接 /// </summary> /// <param name="con">RabbitMQ連接</param> private void ReceiveMsg(object obj) { IModel channel = null; try { #region 創建通道,定義中轉站和隊列 channel = rabbit.conn.CreateModel(); channel.ExchangeDeclare( exchange: "TestTopicChange", //exchange名稱 type: ExchangeType.Topic, //Topic模式,采用路由匹配 durable: true,//exchange持久化 autoDelete: false,//是否自動刪除,一般設成false arguments: null//一些結構化參數,比如:alternate-exchange ); //定義閱卷隊列 channel.QueueDeclare( queue: "Test_Queue", //隊列名稱 durable: true, //隊列磁盤持久化(要和消息持久化一起使用才有效) exclusive: false,//是否排他的,false。如果一個隊列聲明為排他隊列,該隊列首次聲明它的連接可見,並在連接斷開時自動刪除 autoDelete: false, arguments: null ); #endregion channel.BasicQos(0, 1, false);//每次只接收一條消息 channel.QueueBind(queue: "Test_Queue", exchange: "TestTopicChange", routingKey: "TestRouteKey.*"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; //處理消息方法 try { bool isMark = AutoMark(message); if (isMark) { //Function.writeMarkLog(message); //確認該消息已被消費,發消息給RabbitMQ隊列 channel.BasicAck(ea.DeliveryTag, false); } else { if (MarkErrorSend(message))//把錯誤消息推到錯誤消息隊列 channel.BasicReject(ea.DeliveryTag, false); else //消費消息失敗,拒絕此消息,重回隊列,讓它可以繼續發送到其他消費者 channel.BasicReject(ea.DeliveryTag, true); } } catch (Exception ex) { try { Console.WriteLine(ex.Message); if (channel != null && channel.IsOpen)//處理RabbitMQ停止重啟而自動評閱崩潰的問題 { //消費消息失敗,拒絕此消息,重回隊列,讓它可以繼續發送到其他消費者 channel.BasicReject(ea.DeliveryTag, true); } } catch { } } }; //手動確認消息 channel.BasicConsume(queue: "Test_Queue", autoAck: false, consumer: consumer); } catch (Exception ex) { try { Console.WriteLine("接收消息方法出錯:" + ex.Message); if (channel != null && channel.IsOpen)//關閉通道 channel.Close(); if (rabbit.conn != null)//處理RabbitMQ突然停止的問題 rabbit.Close(); } catch { } } }
三、處理錯誤消息
把處理失敗的消息放到“錯誤隊列”,然后把原隊列的消息刪除(這里主要解決問題是,存在多個處理失敗或處理不了的消息時,如果把這些消息都放回原隊列,它們會繼續分發到其他線程的channel,但結果還是處理不了,就會造成一個死循環,導致后面的消息無法處理)。把第一次處理不了的消息放到“錯誤隊列”后,重新再開一個新的連接去處理“錯誤隊列”的消息。
/// <summary> /// 把處理錯誤的消息發送到“錯誤消息隊列” /// </summary> /// <param name="msg"></param> /// <returns></returns> private bool MarkErrorSend(string msg) { RabbitMQHelper MQ = new RabbitMQHelper(); MQ.RabbitConnection("localhost",1); //創建通道 var channel = MQ.conn.CreateModel(); try { //定義一個Direct類型交換機 channel.ExchangeDeclare( exchange: "ErrorTopicChange", //exchange名稱 type: ExchangeType.Topic, //Topic模式,采用路由匹配 durable: true,//exchange持久化 autoDelete: false,//是否自動刪除,一般設成false arguments: null//一些結構化參數,比如:alternate-exchange ); //定義閱卷隊列 channel.QueueDeclare( queue: "Error_Queue", //隊列名稱 durable: true, //隊列磁盤持久化(要和消息持久化一起使用才有效) exclusive: false,//是否排他的,false。如果一個隊列聲明為排他隊列,該隊列首次聲明它的連接可見,並在連接斷開時自動刪除 autoDelete: false,//是否自動刪除,一般設成false arguments: null ); //將隊列綁定到交換機 string routeKey = "ErrorRouteKey.*";//*匹配一個單詞 channel.QueueBind( queue: "Error_Queue", exchange: "ErrorTopicChange", routingKey: routeKey, arguments: null ); //消息磁盤持久化,把DeliveryMode設成2(要和隊列持久化一起使用才有效) IBasicProperties properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; channel.ConfirmSelect();//發送確認機制 byte[] sendBytes = Encoding.UTF8.GetBytes(msg); //發布消息 channel.BasicPublish( exchange: "ErrorTopicChange", routingKey: "ErrorRouteKey.one", basicProperties: properties, body: sendBytes ); bool isAllPublished = channel.WaitForConfirms();//通道(channel)里所有消息均發送才返回true return isAllPublished; } catch (Exception ex) { //寫錯誤日志 return false; } finally { channel.Close(); MQ.conn.Close(); } }
總結:RabbitMQ本身已經很穩定了,而且性能也很好,所有不穩定的因素都在我們處理消息的過程,所以可以放心使用。
Demo源碼地址:https://github.com/Bingjian-Zhu/RabbitMQHelper