Uwl.Admin.Core開源框架(三) 使用RabbitMQ


Uwl.Admin.Core中使用RabbitMQ消息隊列:

本文負責講解RabbitMQ的使用

Uwl.Admin.Core使用的技術有:

  *、Async和Await 異步編程

  *、Repository + Service 倉儲模式編程;倉儲模式支持工作單元

  *、Swagger 前后端文檔說明,基於RESTful風格編寫接口

  *、Cors 簡單的跨域解決方案

  *、JWT自定義策略授權權限驗證

  *、依賴注入選擇的是官方自帶的DI注入,沒有使用第三方框架,ORM使用EF Core,數據庫使用的是Sql server,(后期會擴展MySql版本);

  *、AutoMapper 自動對象映射、

  *、Linq To Sql \ lambda表達式樹查詢;(表達式樹查詢是個人擴展的,表達式樹的使用方法請參考Uwl.Data.Server.MenuServer的多條件查詢)

  *、登錄認證方式使用JWT認證方式,后台接口使用SwaggerUI展示,角色權限使用  自定義權限處理器PermissionHandler 繼承與微軟官方 IAuthorizationRequirement;

  *、Excel導入導出使用的是Epplus第三方框架,導入導出只需要配置Attribute特性就好,不需要在自己寫列名;導出只支持List導出,暫時不支持Datatable;(Excel使用方法請參考UserController控制器)

  *、Rabbit MQ消息隊列(目前暫無業務使用場景后期准備用來記錄日志)

  *、Redis 輕量級分布式緩存;(Redis使用方法請參考Uwl.Data.Server.MenuServer類)

  *、QuartzNet第三方任務框架;(使用方法請參考類庫Uwl.ScheduledTask.Job.TestJobOne類)

  *、IdentityServer4授權模式已開發完成,未發布演示服務器代碼在github;(Identityserver4Auth分支)

RabbitMQ簡介

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集群和分布式部署。適用於排隊算法、秒殺活動、消息分發、異步處理、數據同步、處理耗時任務、CQRS等應用場景。

RabbitMQ安裝

請參考我的第一篇博客:

安裝完成之后訪問Web控制台

http://服務器ip:15672/ 注意配置防火牆,默認用戶名密碼都是guest,若新建用戶一定要記得配置權限。guest僅限localhost訪問,外網無法使用此賬號!

 

 .NET Core 使用RabbitMQ

通過nuget安裝:https://www.nuget.org/packages/RabbitMQ.Client/

定義生產者.

 

 本文的代碼生產者是基礎的消息隊列生產者,源代碼請看我的開源項目 UWl.Admin.Core

public class RabbitServer: IRabbitMQ
    {
        private IConnection connection;
        private ConnectionFactory connectionFactory;
        public RabbitServer()
        {
            try
            {
                connectionFactory = new ConnectionFactory()
                {
                    UserName = Appsettings.app(new string[] { "RabbitMQConfig", "UserName" }),
                    Password = Appsettings.app(new string[] { "RabbitMQConfig", "Password" }),
                    HostName = Appsettings.app(new string[] { "RabbitMQConfig", "HostName" }),
                    AutomaticRecoveryEnabled= Convert.ToBoolean(Appsettings.app(new string[] { "RabbitMQConfig", "AutomaticRecoveryEnabled" })),
                    TopologyRecoveryEnabled= Convert.ToBoolean(Appsettings.app(new string[] { "RabbitMQConfig", "TopologyRecoveryEnabled" })),
                };
            }
            catch (Exception)
            {
                throw;
            }
        }

        public IConnection GetConnection()
        {
            return this.connectionFactory.CreateConnection();
        }
        /// <summary>
        /// RabbitMQ指定隊列名稱模式發送消息
        /// </summary>
        /// <param name="queuename">隊列名字</param>
        /// <param name="obj">傳輸數據</param>
        public void SendData(string queuename, object obj)
        {
            connection = GetConnection();
            if (obj == null)
                return;
            if (connection == null)
                return;
            if (queuename.IsNullOrEmpty())
                return;
            using (connection)
            {
                using (var channel= connection.CreateModel())
                {
                    //聲明一個隊列    //隊列模式   一共有四種
                    channel.QueueDeclare(queuename, false, false, false, null);
                    //第一個參數:預計大小,第二個參數每次讀取幾個,第三個參數是否本地
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    //交付模式
                    var prop = channel.CreateBasicProperties();
                    // 非持久性(1)或持久性(2)。
                    prop.DeliveryMode = 2;
                    //將對象轉化為json字符串
                    var json = JsonConvert.SerializeObject(obj);
                    //將字符串轉換為二進制
                    var bytes= Encoding.UTF8.GetBytes(json);
                    //開始傳送
                    channel.BasicPublish("", queuename, prop,bytes);
                }
            }
        }
    }
定義消費者.

消費者我是使用.Net Core控制台程序來寫的源代碼放到了百度網盤請自行下載 RebbitMQDemo 鏈接: https://pan.baidu.com/s/1n9CaSiAuB9t63Fh_YIU78A 提取碼: 3939
           

//創建連接工廠
            ConnectionFactory factory = new ConnectionFactory
            {
                UserName = "wzw",
                Password = "wzw",
                HostName = "localhost"
            };
            //創建連接
            var connection = factory.CreateConnection();
            //創建通道
            var channel = connection.CreateModel();

            //接收到的消息處理事件
            EventingBasicConsumer Recipient = new EventingBasicConsumer(channel);
            Recipient.Received += (ch, ea) =>
            {
                var RecipientMsg = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine($"后台處理方法收到消息:{RecipientMsg}");
                //確認該消息已被處理
                channel.BasicAck(ea.DeliveryTag, false);
                Console.WriteLine($"消息已經處理【{ea.DeliveryTag}】");
            };
            channel.BasicConsume("hello", false, Recipient);
            Console.WriteLine("后台處理方法已啟動");
            Console.ReadKey();
            channel.Dispose();
            connection.Close();

RabbitMQ消費失敗的處理

RabbitMQ采用消息應答機制,即消費者收到一個消息之后,需要發送一個應答,然后RabbitMQ才會將這個消息從隊列中刪除,如果消費者在消費過程中出現異常,斷開連接切沒有發送應答,那么RabbitMQ會將這個消息重新投遞。

使用RabbitMQ的Exchange

前面我們可以看到生產者將消息投遞到Queue中,實際上這在RabbitMQ中這種事情永遠都不會發生。實際的情況是,生產者將消息發送到Exchange(交換器),由Exchange將消息路由到一個或多個Queue中(或者丟棄)

AMQP協議中的核心思想就是生產者和消費者隔離,生產者從不直接將消息發送給隊列。生產者通常不知道是否一個消息會被發送到隊列中,只是將消息發送到一個交換機。先由Exchange來接收,然后Exchange按照特定的策略轉發到Queue進行存儲。同理,消費者也是如此。Exchange 就類似於一個交換機,轉發各個消息分發到相應的隊列中。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM