微服務架構:事件總線、消息隊列CAP.RabbitMQ


參考:

CAP官方中文文檔(重點)

RabbitMQ中文文檔

RabbitMQ官網英文文檔

事件總線--文檔

事件總線

什么是事件總線

就是用來管理所有的事件的一種機制就稱作為事件總線,包括事件發布,事件存儲,事件訂閱,事件處理的統稱

作用

事件總線是一種機制,它允許不同的組件彼此通信而不彼此了解。 組件可以將事件發送到Eventbus,而無需知道是誰來接聽或有多少其他人來接聽。 組件也可以偵聽Eventbus上的事件,而無需知道誰發送了事件。 這樣,組件可以相互通信而無需相互依賴。 同樣,很容易替換一個組件。 只要新組件了解正在發送和接收的事件,其他組件就永遠不會知道

CAP框架

CAP 是一個EventBus,同時也是一個在微服務或者SOA系統中解決分布式事務問題的一個框架。它有助於創建可擴展,可靠並且易於更改的微服務系統。

CAP 支持以下幾種運輸方式:

存儲方式分為內存和數據庫

RabbitMQ

RabbitMQ是消息代理:它接受並轉發消息。您可以將其視為郵局:將您要發布的郵件放在郵箱中時,可以確保Mailperson先生或女士最終將郵件傳遞給收件人。以此類推,RabbitMQ是一個郵箱,一個郵局和一個郵遞員。

RabbitMQ與郵局之間的主要區別在於,它不處理紙張,而是接收,存儲和轉發數據消息的二進制斑點。

六種隊列模式:先進先出,后進后出

發布訂閱模式:一般是聚合服務發布消息給RabbitMQ,RabbitMQ再發消息發動訂閱者服務,通過RabbitMQ解耦,兩個服務不直接通信,是RabbitMQ調用服務,不再是聚合服務調用服務

  • 發布(Publish):程序把請求消息發布給RabbitMQ后直接返回,不用再等請求執行完畢再返回,對應數據庫發布表:Published
  • 訂閱(Subscrib):程序訂閱RabbitMQ后,RabbitMQ把消息逐一推送給訂閱者,也就是RabbitMQ調用訂閱者服務,對應數據庫訂閱表:Received

連接參數:

  • HostName: 宿主地址
  • UserName: 用戶名
  • Password: 密碼
  • VirtualHost: 虛擬主機
  • Port: 端口號
  • TopicExchangeName: CAP默認Exchange名稱
  • QueueMessageExpires: 隊列中消息自動刪除時間
  • ConnectionFactoryOptions: 本機連接工廠選項

微服務中使用CAP框架的RabbitMQ方式

環境准備 

RabbitMQ的開發語言環境:Erlang下載地址
RabbitMQ下載地址

RabbitMQ啟動

1、在安裝目錄下添加可視化插件命令

rabbitmq-plugins enable rabbitmq_management

2、在安裝目錄sbin文件夾下啟動

rabbitmq-server

3、sbin文件夾下查看rabbitmq狀態,最后兩行就是端口號,默認是15672

rabbitmqctl status

4、在瀏覽器輸入http://127.0.0.1:15672

訪問rabbitmq后台系統,默認賬號和密碼都是:guest

在項目中使用RabbitMQ

MicroService.Core項目安裝NuGet包

  • CAP: Nuget DotNetCore.CAP
  • CAP傳輸器:Nuget DotNetCore.CAP.RabbitMQ
  • CAPSql持久化:DotNetCore.CAP.SqlServer
  • CAP內存持久化:DotNetCore.CAP.InMemoryStorage

在聚合服務 MicroService.AggregateService服務中startup.cs中使用CAP

            // 添加事件總線cap
            services.AddCap(x => {
                // 使用內存存儲消息(消息發送失敗處理)
                x.UseInMemoryStorage();
//使用EntityFramework進行存儲操作,指定數據庫上下文就好,不用配置 //x.UseEntityFramework<AggregateContext>(); // 使用RabbitMQ進行事件中心處理 x.UseRabbitMQ(rb => { rb.HostName = "localhost"; rb.UserName = "guest"; rb.Password = "guest"; rb.Port = 5672; rb.VirtualHost = "/"; }); });

 

 在聚合服務MicroService.AggregateService服務中AggregateController.cs中注入且使用ICapPublisher,把消息發布給RabbitMQ

using DotNetCore.CAP;
private readonly ICapPublisher capPublisher; public TeamsController(ICapPublisher capPublisher) { this.capPublisher = capPublisher; } //異步發布對象消息 capPublisher.PublishAsync<Video>("video.event.1", video);

 

 

在視頻服務MicroService.VideoService服務startup.cs中使用CAP

            // 添加事件總線cap
            services.AddCap(x => {
                // 使用RabbitMQ進行事件中心處理
                x.UseRabbitMQ(rb => {
                    rb.HostName = "localhost";
                    rb.UserName = "guest";
                    rb.Password = "guest";
                    rb.Port = 5672;
                    rb.VirtualHost = "/";
                });
            });    

在視頻服務MicroService.VideoService服務VideoController.cs 的PostVideo方法上添加過濾器特性[CapSubscribe] ,從RabbitMQ中訂閱,且處理這些請求

using DotNetCore.CAP;

        /// <summary>
        /// 視頻添加:
        /// 從消息隊列獲取請求
        /// </summary>
        /// <param name="Video"></param>
        /// <returns></returns>
        [NonAction]  //指示控制器方法不是動作方法,不讓控制器調用
        [CapSubscribe("video.*")]  //訂閱消息:RabbitMQ推送消息給訂閱者
        public ActionResult<Video> PostVideo(Video Video)
        {
            Console.WriteLine($"接受到視頻事件消息");
            videoService.Create(Video);
            return CreatedAtAction("GetVideo", new { id = Video.Id }, Video);
        }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM