RabbitMQ是基於高級消息隊列的AMQP協議的技術實現,是一個開源產品,其本身屬於一個“半成品”的消息中間件,提供了豐富的開發文檔和多種客戶端API(如JAVA\.NET等)的開發組件,RabbitMQ服務器本身主要承擔通訊和傳輸的功能,支持多種消息架構和模式,實踐中需要采用哪些消息架構和模式,完全取決於客戶端的設計和開發。 在企業實踐中,設計或開發不當會造成嚴重的后果,特別是涉及到跨系統的交互和集成,對系統穩定性和可靠性等方面性能要求很高,本文總結了的幾點實踐經驗,供各位同學參考。
一、關於RabbitMQ部署總結。
如果使用早期在Windows下部署的RabbitMQ版本(因為早期的Erlang/OTP是基於32位的,Erlang/OTP R15B01以后開始支持64位Windows),生產環境下是不支持64位的,32位的在性能和吞吐量方面有很大影響,如果目前使用32位的應該升級到64位的,以提高系統處理能力。
二、關於RabbitMQ消息持久化的總結。
在OA與SAP等其他系統進行交互和集成的時候,發現供應商的客戶端開發代碼中沒有進行持久化,如果RabbitMQ服務器或服務進行重啟后,將導致消息的丟失,從而造成對業務的影響。而自行開發和集成的代碼中已進行了消息持久化,因此通知供應商進行修改代碼並給出了相應C#示例代碼供參考:
private const string EXCHANGE_NAME = "xxx.bpms.ehr"; static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory { HostName = "localhost", VirtualHost=@"/", UserName = "admin", Password = "12345678" }; using (IConnection connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { //交換機持久化 channel.ExchangeDeclare(EXCHANGE_NAME, "topic",true); //隊列持久化 channel.QueueDeclare("xxx.bpms.ehr", true, false, false, null); channel.QueueBind("xxx.bpms.ehr", EXCHANGE_NAME, "EHR.EmpMessageUpdate", null); byte[] payload = File.ReadAllBytes(@"c:\employeeSec1.xls"); Stream s = new MemoryStream(payload); SystemComponent.ADHelper.XlsToDataTable xtd = new SystemComponent.ADHelper.XlsToDataTable(); SystemComponent.ADHelper.FileConvert fct = new SystemComponent.ADHelper.FileConvert(); DataTable dt = xtd.RenderFromExcel(s); string ls_json = JsonConvert.SerializeObject(dt, new DataTableConverter()); IMapMessageBuilder mapmsg = new MapMessageBuilder(channel); //數據持久化 ((IBasicProperties)mapmsg.GetContentHeader()).DeliveryMode = 2; channel.BasicPublish(EXCHANGE_NAME, "EHR.EmpMessageUpdate", (IBasicProperties)mapmsg.GetContentHeade(),System.Text.Encoding.UTF8.GetBytes(ls_json)); } } }
消息持久化需要同時進行交換機持久化、隊列持久化、數據持久,也可以在RabbitMQ的管理界面中查看是否進行了持久化,如下圖:
三、關於RabbitMQ的發布者/訂閱者模式的開發總結。
在實際開發中發現,發布者和訂閱者的兩者的聲明交換機和隊列參數需要保持一致(無論是相同的客戶端開發語言還是不同的客戶端開發語言都適用),如果發布者加入了持久化參數,而訂閱者沒有加入,則會導致交互不成功,無法消費相應消息,這點需要特別注意,以確保運行成功。