CAP帶你輕松玩轉Asp.Net Core消息隊列


CAP是什么?

CAP是由我們園子里的楊曉東大神開發出來的一套分布式事務的決絕方案,是.Net Core Community中的第一個千星項目(目前已經1656 Star),具有輕量級、易使用、高性能等特點。

https://github.com/dotnetcore/CAP

本博客主要針對易用性這一點,展開敘述,一起看看CAP如何結合EF Core和RabbitMQ帶領小白輕松走入分布式消息隊列的世界。忍者

准備

首先,你需要搭建一套RabbitMQ系統,搭建過程在此不再敘述,如果大家覺得麻煩,可以用我搭好的。

HostName: coderayu.cn  UserName:guest Password:guest  (僅僅可用作實驗,數據丟失不負責)

創建Asp.Net Core 項目,並引入Nuget包

你可以運行以下下命令在你的項目中安裝 CAP。

PM> Install-Package DotNetCore.CAP

如果你的消息隊列使用的是 Kafka 的話,你可以:

PM> Install-Package DotNetCore.CAP.Kafka

如果你的消息隊列使用的是 RabbitMQ 的話,你可以:

PM> Install-Package DotNetCore.CAP.RabbitMQ

CAP 提供了 Sql Server, MySql, PostgreSQL 的擴展作為數據庫存儲:

// 按需選擇安裝你正在使用的數據庫
PM> Install-Package DotNetCore.CAP.SqlServer
PM> Install-Package DotNetCore.CAP.MySql
PM> Install-Package DotNetCore.CAP.PostgreSql

創建DbContext

因為我采用的是EF Core,所以首先要創建一個DbContext上下文,代碼如下:

public class CapDbContext:DbContext
    {
        public CapDbContext(DbContextOptions options) : base(options)
        {
        }
    }

Startup配置

首先需要在ConfigureServices函數中進行相關服務的注入,對應的操作和功能解釋如下:

public void ConfigureServices(IServiceCollection services)
        {
            //注入DbContext上下文,如果用的是Mysql可能還需要添加Pomelo.EntityFrameworkCore.MySql這個Nuget包
            services.AddDbContext<CapDbContext>(options =>
                options.UseMySql("Server=127.0.0.1;Database=testcap;UserId=root;Password=123456;"));

            //配置CAP
            services.AddCap(x =>
            {
                x.UseEntityFramework<CapDbContext>();

                //啟用操作面板
                x.UseDashboard();
                //使用RabbitMQ
                x.UseRabbitMQ(rb =>
                {
                    //rabbitmq服務器地址
                    rb.HostName = "coderayu.cn";

                    rb.UserName = "guest";
                    rb.Password = "guest";

                    //指定Topic exchange名稱,不指定的話會用默認的
                    rb.ExchangeName = "cap.text.exchange";
                });

                //設置處理成功的數據在數據庫中保存的時間(秒),為保證系統新能,數據會定期清理。
                x.SucceedMessageExpiredAfter = 24 * 3600;

                //設置失敗重試次數
                x.FailedRetryCount = 5;
            });

            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
        }

 

最后還要再Congiure中啟用CAP中間件

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            //啟用cap中間件
            app.UseCap();

            app.UseMvc();
        }

利用EF Core生成CAP數據庫

再程序包管理控制台中依此輸入以下命令行

PM> Add-Migration Init
PM> update-database

如果成成功執行,那么打開數據庫,就可以看到用來存儲CAP發送和接收數據的表格了。

image

表格中每列的含義如下:

image

 

消息的發送和訂閱

我們直接在ValuesController的基礎上進行改造。

在 Controller 中注入 ICapPublisher 然后使用 ICapPublisher 進行消息發送

private readonly ICapPublisher _publisher;

        public ValuesController(ICapPublisher publisher)
        {
            _publisher = publisher;
        }

發送消息

[HttpGet]
        public string Get(string message)
        {
            //"cap.test.queue"是發送的消息RouteKey,可以理解為消息管道的名稱
            _publisher.Publish("cap.test.queue",message);

            return "發送成功";
        }

訂閱消息

       //"cap.test.queue"為發送消息時的RauteKey,也可以模糊匹配
        //詳情https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html
        [CapSubscribe("cap.test.queue")]
        public void HandleMessage(string message)
        {
            Console.Write(DateTime.Now.ToString()+"收到消息:"+message);
        }

Run

啟動程序后,首先看到CAP啟動成功

image

緊隨其后,消費者也就是我們的訂閱方法在RabbitMQ服務器上注冊成功。

image

發送消息,發送成功,如下

image

發送后,立即在控制台看到了訂閱方法輸出的結果。

image

 

消息的失敗重試

在訂閱方法中,如果拋出異常,那么CAP就會認為該條消息處理失敗,會自動進行重試,重試次數在前方已經進行了配置。

我們把訂閱方法做一個改動,打印接收的信息到控制台中,並拋出異常

//"cap.test.queue"為發送消息時的RauteKey,也可以模糊匹配
        //詳情https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html
        [CapSubscribe("cap.test.queue")]
        public void HandleMessage(string message)
        {
            Console.WriteLine(DateTime.Now.ToString()+"收到消息:"+message);
            throw new Exception("測試失敗重試");
        }

 

可以看到,立即進行了三次重試

image

可是在前面,我們設置的失敗重試次數是5次,為什么這里只重試三次嗎?是不是要叫曉東過來改BUG了呢轉動眼睛?當然不是。

觀察發現,CAP重試的前三次是立即進行的,而后面的重試,是每隔一段時間進行的,當在分布式通訊的過程中,可能出現了問題確實不會立即修復解決,可能過了一定時間,系統就自動恢復了,如網絡抖動。

 

CAP儀表盤

image

發送成功了五條消息,成功接收處理了三條,兩條處理失敗,處理失敗的任務,我們可以直接在面板中進行重新消費,可謂非常方便。

image

同時,處理失敗的消息,點擊消息的編號后,可以查看到消息的內容和異常原因。

image

 

CAP如此強大,讓消息隊列這種高大上產品操作So Easy,學會了CAP,也可以吹牛說,我也懂分布式任務處理啦大聲笑

感謝曉東開發出如此強大的項目,同時感謝.Net Core Community。

參考 CAP Github wiki

https://github.com/dotnetcore/CAP/wiki

本博客Demo代碼

https://github.com/liuzhenyulive/CAP.Demo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM