Uwl.Admin.Core中使用RabbitMQ消息隊列:
本文負責講解RabbitMQ的使用
Uwl.Admin.Core使用的技術有:
*、Async和Await 異步編程
*、Repository + Service 倉儲模式編程;倉儲模式支持工作單元
*、Swagger 前后端文檔說明,基於RESTful風格編寫接口
*、Cors 簡單的跨域解決方案
*、JWT自定義策略授權權限驗證
*、依賴注入選擇的是官方自帶的DI注入,沒有使用第三方框架,ORM使用EF Core,數據庫使用的是Sql server,(后期會擴展MySql版本);
*、AutoMapper 自動對象映射、
*、Linq To Sql \ lambda表達式樹查詢;(表達式樹查詢是個人擴展的,表達式樹的使用方法請參考Uwl.Data.Server.MenuServer的多條件查詢)
*、登錄認證方式使用JWT認證方式,后台接口使用SwaggerUI展示,角色權限使用 自定義權限處理器PermissionHandler 繼承與微軟官方 IAuthorizationRequirement;
*、Excel導入導出使用的是Epplus第三方框架,導入導出只需要配置Attribute特性就好,不需要在自己寫列名;導出只支持List導出,暫時不支持Datatable;(Excel使用方法請參考UserController控制器)
*、Rabbit MQ消息隊列(目前暫無業務使用場景后期准備用來記錄日志)
*、Redis 輕量級分布式緩存;(Redis使用方法請參考Uwl.Data.Server.MenuServer類)
*、QuartzNet第三方任務框架;(使用方法請參考類庫Uwl.ScheduledTask.Job.TestJobOne類)
*、IdentityServer4授權模式已開發完成,未發布演示服務器代碼在github;(Identityserver4Auth分支)
RabbitMQ簡介
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集群和分布式部署。適用於排隊算法、秒殺活動、消息分發、異步處理、數據同步、處理耗時任務、CQRS等應用場景。
RabbitMQ安裝
安裝完成之后訪問Web控制台
http://服務器ip:15672/
注意配置防火牆,默認用戶名密碼都是guest,若新建用戶一定要記得配置權限。guest僅限localhost訪問,外網無法使用此賬號!
.NET Core 使用RabbitMQ
通過nuget安裝:https://www.nuget.org/packages/RabbitMQ.Client/
定義生產者.
本文的代碼生產者是基礎的消息隊列生產者,源代碼請看我的開源項目 UWl.Admin.Core
public class RabbitServer: IRabbitMQ { private IConnection connection; private ConnectionFactory connectionFactory; public RabbitServer() { try { connectionFactory = new ConnectionFactory() { UserName = Appsettings.app(new string[] { "RabbitMQConfig", "UserName" }), Password = Appsettings.app(new string[] { "RabbitMQConfig", "Password" }), HostName = Appsettings.app(new string[] { "RabbitMQConfig", "HostName" }), AutomaticRecoveryEnabled= Convert.ToBoolean(Appsettings.app(new string[] { "RabbitMQConfig", "AutomaticRecoveryEnabled" })), TopologyRecoveryEnabled= Convert.ToBoolean(Appsettings.app(new string[] { "RabbitMQConfig", "TopologyRecoveryEnabled" })), }; } catch (Exception) { throw; } } public IConnection GetConnection() { return this.connectionFactory.CreateConnection(); } /// <summary> /// RabbitMQ指定隊列名稱模式發送消息 /// </summary> /// <param name="queuename">隊列名字</param> /// <param name="obj">傳輸數據</param> public void SendData(string queuename, object obj) { connection = GetConnection(); if (obj == null) return; if (connection == null) return; if (queuename.IsNullOrEmpty()) return; using (connection) { using (var channel= connection.CreateModel()) { //聲明一個隊列 //隊列模式 一共有四種 channel.QueueDeclare(queuename, false, false, false, null); //第一個參數:預計大小,第二個參數每次讀取幾個,第三個參數是否本地 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); //交付模式 var prop = channel.CreateBasicProperties(); // 非持久性(1)或持久性(2)。 prop.DeliveryMode = 2; //將對象轉化為json字符串 var json = JsonConvert.SerializeObject(obj); //將字符串轉換為二進制 var bytes= Encoding.UTF8.GetBytes(json); //開始傳送 channel.BasicPublish("", queuename, prop,bytes); } } } }
定義消費者.
消費者我是使用.Net Core控制台程序來寫的源代碼放到了百度網盤請自行下載 RebbitMQDemo 鏈接: https://pan.baidu.com/s/1n9CaSiAuB9t63Fh_YIU78A 提取碼: 3939
//創建連接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "wzw", Password = "wzw", HostName = "localhost" }; //創建連接 var connection = factory.CreateConnection(); //創建通道 var channel = connection.CreateModel(); //接收到的消息處理事件 EventingBasicConsumer Recipient = new EventingBasicConsumer(channel); Recipient.Received += (ch, ea) => { var RecipientMsg = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"后台處理方法收到消息:{RecipientMsg}"); //確認該消息已被處理 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"消息已經處理【{ea.DeliveryTag}】"); }; channel.BasicConsume("hello", false, Recipient); Console.WriteLine("后台處理方法已啟動"); Console.ReadKey(); channel.Dispose(); connection.Close();
RabbitMQ消費失敗的處理
RabbitMQ采用消息應答機制,即消費者收到一個消息之后,需要發送一個應答,然后RabbitMQ才會將這個消息從隊列中刪除,如果消費者在消費過程中出現異常,斷開連接切沒有發送應答,那么RabbitMQ會將這個消息重新投遞。
使用RabbitMQ的Exchange
前面我們可以看到生產者將消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將消息發送到Exchange(交換器),由Exchange將消息路由到一個或多個Queue中(或者丟棄)
AMQP協議中的核心思想就是生產者和消費者隔離,生產者從不直接將消息發送給隊列。生產者通常不知道是否一個消息會被發送到隊列中,只是將消息發送到一個交換機。先由Exchange來接收,然后Exchange按照特定的策略轉發到Queue進行存儲。同理,消費者也是如此。Exchange 就類似於一個交換機,轉發各個消息分發到相應的隊列中。