一. CAP框架簡介
1. 什么是事件總線?
事件總線(EventBus)是一種機制,它允許不同的組件彼此通信而不彼此了解。 組件可以將事件發送到Eventbus,而無需知道是誰來接聽或有多少其他人來接聽。組件也可以偵聽Eventbus上的事件,而無需知道誰發送了事件。 這樣,組件可以相互通信而無需相互依賴。同樣,很容易替換一個組件,只要新組件了解正在發送和接收的事件,其他組件就永遠不會知道.
使用事件總線的目的:將微服務系統各組件之間進行解耦。
2. CAP框架簡介
CAP 是一個在分布式系統中(SOA,MicroService)實現事件總線(EventBus)和 最終一致性(分布式事務)的一個開源的 C# 庫,她具有輕量級,高性能,易使用等特點。
CAP 以 NuGet 包的形式提供,對項目無任何入侵,你仍然可以以你喜愛的方式來構建分布式系統。
CAP 具有 Event Bus 的所有功能,並且CAP提供了更加簡化的方式來處理EventBus中的發布/訂閱。
CAP 具有消息持久化的功能,也就是當你的服務進行重啟或者宕機時,她可以保證消息的可靠性。
CAP 實現了分布式事務中的最終一致性,你不用再去處理這些瑣碎的細節。
CAP 提供了基於 Microsoft DI 的 API 服務,它可以和你的 ASP.NET Core 系統進行無縫結合,並且能夠和你的業務代碼集成支持強一致性的事務處理。
CAP 是開源免費的.
參考:
官網地址:https://cap.dotnetcore.xyz/user-guide/zh/getting-started/quick-start/
GitHub: https://github.com/dotnetcore/CAP/tree/master/samples
官方博客:https://cap.dotnetcore.xyz/user-guide/zh/getting-started/introduction/
3. 優勢
相對於直接集成消息隊列,異步消息傳遞最強大的優勢之一是可靠性,系統的一個部分中的故障不會傳播,也不會導致整個系統崩潰。 在 CAP 內部會將消息進行存儲,以保證消息的可靠性,並配合重試等策略以達到各個服務之間的數據最終一致性。
相對於其他的 Service Bus 或者 Event Bus, CAP 擁有自己的特色,它不要求使用者發送消息或者處理消息的時候實現或者繼承任何接口,擁有非常高的靈活性。我們一直堅信約定大於配置,所以CAP使用起來非常簡單,對於新手非常友好,並且擁有輕量級。CAP 采用模塊化設計,具有高度的可擴展性。你有許多選項可以選擇,包括消息隊列,存儲,序列化方式等,系統的許多元素內容可以替換為自定義實現。
4. Cap框架支持的消息隊列 和 存儲介質
(1).消息隊列有:RabbitMQ、Kafka、Azure Service Bus、Amazon SQS、In-Memory Queue.
(2).存儲介質有:In-Memory、SQLServer、MySQL、PostgreSql、MongonDB
5. Cap框架結構圖
剖析:客戶端調用微服務1→在本地事務中執行相關業務+發送消息存儲到publish表中 →通過CAP框架開啟新線程→CAP框架把消息發送到MQ中→MQ主動通過CAP框架調用微服務2→微服務2接收到消息並且本地業務執行成功,反饋ACK消息確認→MQ標記/刪除消息。
PS:上述流程是針對最終一致性的事務來寫的,如果僅僅是為了實現事件總線,則第二步直接發送消息存儲到publish表中即可,無需開啟本地事務來保證原子性。
整套流程涉及到4個角色:發布者、消息隊列、訂閱者、存儲器。(PS:發布者和訂閱者各自對應一個存儲器,當然可以在一個DB中,發布者對應published表,訂閱者對應received表,通常情況下發布者和訂閱者各自是一個服務,所以各自對應一個存儲器)
注:下圖實線是用戶代碼,虛線是CAP框架內部實線,還有這里是推模式,消息隊列主動調用用戶服務代碼(默認推模式是單線程的,所以訂閱者的方法不需要考慮並發問題的;可以配置多個線程,但多個線程則無法保證消費順序了,且訂閱者方法內可能存在並發問題了。)。
下圖為官方GitHub中的圖
PS:以上內容均參考官方文檔和博客。
二. RabbitMq的安裝和操作
1. 安裝步驟
(1). 安裝Erl運行環境
A. 去官網:https://www.erlang.org/downloads 下載,然后安裝,安裝過程,直接默認選項,下一步到底,安裝完成即可。
PS:這里用的版本為:【23.1】,安裝路徑為 D:\Program Files\erl-23.1
B. 安裝完成后需要配置環境變量,這里以win10的配置為例:先配置 ERLANG_HOME = D:\Program Files\erl-23.1,再把 %ERLANG_HOME%\bin 加到Path變量中,如下圖:
C. 在命令行中輸入erl,如下圖,說明安裝成功。
(2). 安裝RabbitMQ的服務端。
A. 去官網 https://www.rabbitmq.com/download.html 進行下載,並且安裝,安裝過程如下圖,一路下一步,直到安裝完成。
PS:這里用的版本為:【3.8.9】,安裝路徑為 D:\Program Files\RabbitMQ Server
B. 安裝完成后,查看windows服務已啟動,如下圖:
C.進入安裝目錄,找到sbin文件夾,這里我的是 D:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.9\sbin ,在該路徑下運行指令:【rabbitmq-plugins enable rabbitmq_management】,安裝可視化插件。
PS:如果指令不好用,需要在前面加個 ./ 表示當前路徑。
然后通過地址:http://127.0.0.1:15672 訪問RabbitMq的后台管理系統,賬號和密碼都是guest。
截止到此,RabbitMq已經安裝完成,並且已經以服務的方式啟動了,大功告成!!!
2. 基本操作
(1). 啟動/關閉RabbitMq服務(兩種模式)
A. 服務的模式啟動:(推薦)
services.msc 查看服務,有一個RabbitMQ服務,右鍵啟動或關閉即可即可。或者通過下面指令啟動關閉服務【net start rabbitmq】【net stop rabbitmq】(這兩指令不需要配置全局環境變量)。
B. 指令的模式啟動:
前提:下面指令都需要在 RabbitMQ Server/rabbitmq_server-3.8.3/sbin 目錄下進行. (也可以配置一下環境變量就不需要在該目錄下執行了,全局執行指令即可)
必須先把已經安裝的RabbitMQ關閉 或者 刪除。
在安裝目錄下的sbin目錄下運行指令【rabbitmq-server】,服務則啟動,這種模式關閉窗口,服務則停止,如下圖:
在安裝目錄下的sbin目錄下運行指令【rabbitmq-server -detached】,服務則在后台啟動,關閉窗口,服務則不停止,需要運行【rabbitmqctl stop】來停止服務,如下圖:
PS:如果指令不好用,需要在前面加個 ./ 表示當前路徑,eg 【./rabbitmq-server】。
(2).查看RabbitMq狀態
【rabbitmqctl status】
PS: 如果上述的這幾個指令不想每次都到安裝目錄下執行,則可以配置一些全局環境
新增:RabbitMq_Home = E:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.9
path中添加:%RabbitMq_Home%\sbin
測試:
三. CAP之EventBus實戰-快速上手
(目標:快速實現單服務使用CAP框架----基於內存存儲和內存消息隊列)
1. 新建項目,配置命令啟動
新建項目Publisher1,具有發布者和訂閱者兩種角色,配置通過指令設置端口啟動,使用端口 9001。
public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { // 添加命令行支持(寫在上面的Main里也可以) new ConfigurationBuilder().AddCommandLine(args).Build(); webBuilder.UseStartup<Startup>(); });
2. 通過Nuget給該項目安裝下面程序集
【DotNetCore.CAP 5.0.3】 :CAP的主程序集
【DotNetCore.CAP.InMemoryStorage 5.0.3】:內存存儲
【Savorboard.CAP.InMemoryMessageQueue 3.1.1】:內存消息隊列
3. 編寫發布者和訂閱者相關的方法
(1).發布者方法:SendMsg, 先需要通過構造函數注入ICapPublisher _capBus,然后方法內部調用_capBus.Publish("ypfkey2", DateTime.Now);進行消息發送。
代碼如下:
[Route("api/[controller]/[action]")] [ApiController] public class SimpleUseController : ControllerBase { private ICapPublisher _capBus; private ILogger _log; public SimpleUseController(ICapPublisher capBus, ILogger<SimpleUseController> log) { this._capBus = capBus; this._log = log; } /// <summary> /// 發布者調用的方法1 /// </summary> /// <returns></returns> public IActionResult SendMsg() { var nowTime = DateTime.Now; _capBus.Publish("ypfkey2", nowTime); _log.LogInformation($"我是發布者,發布的內容為:{nowTime}"); return Content("發送成功"); } }
注意:Publish的第一個參數是一個key,就是通過這個key來和訂閱者建立起聯系;第二個參數可以是任意類型.
(2).訂閱者方法:ReceiveMsg,通過[CapSubscribe("ypfkey2")]來和發布者建立起聯系,同時加一個[NonAction]特性,排除Controller中的調用。 (這里是基於內存存儲,所以發布者和訂閱者必須在一個項目中)
代碼如下:
[Route("api/[controller]/[action]")] [ApiController] public class SimpleUseController : ControllerBase { private ICapPublisher _capBus; private ILogger _log; public SimpleUseController(ICapPublisher capBus, ILogger<SimpleUseController> log) { this._capBus = capBus; this._log = log; } /// <summary> /// 訂閱者的方法 /// </summary> /// <param name="time"></param> [NonAction] [CapSubscribe("ypfkey2")] public void ReceiveMsg(DateTime time) { //1. 正常接收 _log.LogInformation($"我是訂閱者,收到的內容為:{time}"); } }
注意:這里是‘推模式’,消息隊列 主動發送請求 調用訂閱者的方法, 二者通過ypfkey2這個標記來通信。
5. 在ConfigureService中添加代碼如下代碼進行注冊
代碼如下:
services.AddCap(x => { x.UseInMemoryStorage(); //內存存儲 x.UseInMemoryMessageQueue(); //內存消息隊列 });
6.啟動項目,並測試
啟動指令:【dotnet Publisher1.dll --urls="http://*:9001" --ip="127.0.0.1" --port=9001】
PostMan發送Get請求 http://localhost:9001/api/SimpleUse/SendMsg , 在控制台中先看到發布者的方法中的內容 ,然后看到訂閱者方法的內容,說明搭建成功。
如圖:
四. CAP之EventBus實戰-深入探討
(目標:多服務使用CAP框架---基於SQLServer/MySQL存儲 和 RabbitMq消息隊列)
1. 新建發布者和訂閱者服務,並配置命令啟動
新建發布者服務:Publisher1 訂閱者服務:Subscriber1,這兩項目均為.Net 5.0,配置通過指令設置端口啟動,分別使用端口 9001和 9011。
代碼如下:
public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { // 添加命令行支持(寫在上面的Main里也可以) new ConfigurationBuilder().AddCommandLine(args).Build(); webBuilder.UseStartup<Startup>(); });
啟動指令如下:
【dotnet Publisher1.dll --urls="http://*:9001" --ip="127.0.0.1" --port=9001】
【dotnet Subscriber1.dll --urls="http://*:9011" --ip="127.0.0.1" --port=9011】
PS:這里的發布者和訂閱者項目是兩個微服務,屬於不同的項目,所以不能使用內存消息隊列,不通的。
2. 通過Nuget給兩個項目安裝程序集
【DotNetCore.CAP 5.0.3】:CAP主框架
【DotNetCore.CAP.RabbitMQ 5.0.3】:RabbitMq消息隊列
【DotNetCore.CAP.SqlServer 5.0.3】:SQLServer存儲
【DotNetCore.CAP.MySql 5.0.3】:MySQL存儲
注:SQLServer和MySQL程序集不要同時安裝!!!! 容易命名相同引用混亂。
3. 編寫發布者方法
Publisher1發布者項目中准備SendMsg方法,注入ICapPublisher _capBus,調用_capBus.Publish("ypfkey1", DateTime.Now);進行消息發送。
注意:Publish的第一個參數是一個key,就是通過這個key來和訂閱者建立起聯系;第二個參數可以是任意類型.
代碼如下:
[Route("api/[controller]/[action]")] [ApiController] public class PubController : ControllerBase { private ICapPublisher _capBus; private ILogger _log;
public PubController(ICapPublisher capBus, ILogger<PubController> log) { this._capBus = capBus; this._log = log; } /// <summary> /// 發布者調用的方法1(無事務) /// </summary> /// <returns></returns> public IActionResult SendMsg() { var nowTime = DateTime.Now; _capBus.Publish("ypfkey1", nowTime); _log.LogInformation($"我是發布者,發布的內容為:{nowTime}"); return Content("發送成功"); } }
4. 編寫訂閱者方法
SubSctiber1訂閱者項目中准備ReceiveMsg方法, 通過[CapSubscribe("ypfkey1")]來和發布者建立起聯系,同時加一個[NonAction]特性,排除Controller中的調用.
注意:這里是‘推模式’,消息隊列 主動發送請求 調用訂閱者的方法。
代碼如下:
[Route("api/[controller]/[action]")] [ApiController] public class SubController : ControllerBase { private ILogger _log; public SubController(ILogger<SubController> log) { this._log = log; } /// <summary> /// 訂閱者的方法 /// </summary> /// <param name="time"></param> [NonAction] [CapSubscribe("ypfkey1")] public void ReceiveMsg(DateTime time) { //1. 正常接收 { _log.LogInformation($"我是訂閱者,收到的內容為:{time}"); } } }
5. 使用RabbitMQ作為消息隊列
(1). 啟動RabbitMq服務 (見上面的二)
(2). 兩個項目都在ConfigureService注冊RabbitMq服務
代碼如下:
//注冊cap事件 services.AddCap(x => { //-----------------------------一.聲明存儲類型---------------------------------
//1. 使用xxx存儲 //-----------------------------二.聲明消息隊列類型---------------------------------
//1.使用RabbitMq隊列存儲 x.UseRabbitMQ(rb => { rb.HostName = "localhost"; rb.UserName = "guest"; rb.Password = "guest"; rb.Port = 5672; rb.VirtualHost = "/"; //rb.QueueMessageExpires = 24 * 3600 * 10; //隊列中消息自動刪除時間(默認10天) }); });
6. 使用SQLServer作為消息存儲
前提:先刪掉【DotNetCore.CAP.MySql】,否則存在調用二義性。
(1). 通過Nuget安裝【DotNetCore.CAP.SqlServer 5.0.3】
(2).安裝EFCore-SQLServer相關的程序集
【Microsoft.EntityFrameworkCore 5.0.6】【Microsoft.EntityFrameworkCore.SqlServer 5.0.6】
【Microsoft.EntityFrameworkCore.Tools 5.0.6】【Microsoft.EntityFrameworkCore.Design 5.0.6】
(3). 在SQLServer中新建兩個空的數據庫分別是PubDB和SubDB,分別服務於發布者和訂閱者
A.在Publisher1項目中執行下面映射指令:
【Scaffold-DbContext "Server=localhost;Database=PubDB;User ID=sa;Password=123456;" Microsoft.EntityFrameworkCore.SqlServer -OutputDir Models -UseDatabaseNames -DataAnnotations -NoPluralize】
B.在Subscriber1項目中執行下面映射指令:
【Scaffold-DbContext "Server=localhost;Database=SubDB;User ID=sa;Password=123456;" Microsoft.EntityFrameworkCore.SqlServer -OutputDir Models -UseDatabaseNames -DataAnnotations -NoPluralize】
(4).兩個項目都修改ConfigureService代碼
A. EFCore自身注入:
services.AddDbContext<PubDBContext>(option => option.UseSqlServer(Configuration.GetConnectionString("EFStr")));
B. Cap框架依賴SQLServer,(下面選擇一種即可)
① x.UseEntityFramework<PubDBContext>(); //使用EFCore
② x.UseSqlServer(Configuration.GetConnectionString("EFStr")); //使用ADO.NET (不需要依賴EF上下文,沒有數據庫的話,事先建好數據庫即可)
發布者代碼:(訂閱者代碼類似,上下文改為 SubDbContext)
public void ConfigureServices(IServiceCollection services) { services.AddControllers(); //數據庫連接(SQLServer) services.AddDbContext<PubDBContext>(option => option.UseSqlServer(Configuration.GetConnectionString("EFStr")));//注冊cap事件 services.AddCap(x => { //-----------------------------一.聲明存儲類型---------------------------------
//1. 使用SQLServer存儲 //還需要配合上面EF上下文的注入 services.AddDbContext x.UseEntityFramework<PubDBContext>(); //EFCore配置 //x.UseSqlServer(Configuration.GetConnectionString("EFStr")); //ADO.Net配置
//-----------------------------二.聲明消息隊列類型---------------------------------//1.使用RabbitMq隊列存儲 x.UseRabbitMQ(rb => { rb.HostName = "localhost"; rb.UserName = "guest"; rb.Password = "guest"; rb.Port = 5672; rb.VirtualHost = "/"; //rb.QueueMessageExpires = 24 * 3600 * 10; //隊列中消息自動刪除時間(默認10天) }); //-----------------------------三.添加后台監控,用於人工干預---------------------------------//-----------------------------四.通用配置--------------------------------- }); }
發布者連接字符串:(訂閱者代碼類似,數據庫改為:SubDB)
"ConnectionStrings": {
"EFStr": "Server=localhost;Database=PubDB;User ID=sa;Password=123456;",
}
(5). 啟動項目並測試
啟動指令:【dotnet Publisher1.dll --urls="http://*:9001" --ip="127.0.0.1" --port=9001】 【dotnet Subscriber1.dll --urls="http://*:9011" --ip="127.0.0.1" --port=9011】
然后用PostMan發送Get請求 http://localhost:9001/api/Pub/SendMsg ,訂閱者收到發布者發送的消息,如下圖:測試通過。
另外,PubDB和SubDB都各自生成兩張表,因為現在發布者和訂閱者是兩個服務,所有發布者只使用PubDB數據庫中的cap.Published表,訂閱者只使用SubDB數據庫中的cap.Received表。
A.cap.Published (Publisher1中只用該表)
B.cap.Received (Subscriber1中只用該表)
(6).分析表結構:
Published表:Name為標記key,Content為發送的內容,Retries為重試次數,ExpriesAt過期時間、StatusName狀態
Received表:含義同上,多了個Group組
特別注意:CAP框架發消息給訂閱者默認是單線程執行的,上一個線程執行結束下一個線程才能開始,所以訂閱者中不需要考慮並發問題,Received表中數據默認狀態為Schedued,執行完成,才會變為Succeed或Failed。
7.使用MySQL作為消息存儲
前提:先刪掉【DotNetCore.CAP.SqlServer】,否則存在調用二義性
(1).通過Nuget安裝【DotNetCore.CAP.MySql 5.0.3】
(2).安裝EFCore-MySQL相關程序集
【Microsoft.EntityFrameworkCore 5.0.6】【Microsoft.EntityFrameworkCore.Design 5.0.6】
【Microsoft.EntityFrameworkCore.Tools 5.0.6】【Pomelo.EntityFrameworkCore.MySql】
(3).在MySQL數據庫中新建兩個空的數據庫分別是PubDB和SubDB,
A.在Publisher1項目中執行下面映射指令:
【Scaffold-DbContext "Server=localhost;Database=PubDB;User ID=root;Password=123456;" Pomelo.EntityFrameworkCore.MySql -OutputDir Models -UseDatabaseNames -DataAnnotations -NoPluralize】
B.在Subscriber1項目中執行下面映射指令:
【Scaffold-DbContext "Server=localhost;Database=SubDB;User ID=root;Password=123456;" Pomelo.EntityFrameworkCore.MySql -OutputDir Models -UseDatabaseNames -DataAnnotations -NoPluralize】
(4).修改ConfigureService代碼
A. EFCore自身注入:
services.AddDbContext<PubDBContext>(option => option.UseMySql(Configuration.GetConnectionString("EFStrMySQL"), Microsoft.EntityFrameworkCore.ServerVersion.Parse("5.7.28-mysql")));
B. Cap框架依賴MySQL:(下面選擇一種即可)
x.UseEntityFramework<PubDBContext>(); //基於EFCore
x.UseMySql(Configuration.GetConnectionString("EFStrMySQL")); //基於ADO.Net (不需要依賴EF上下文,沒有數據庫的話事先建好即可)
發布者代碼:(訂閱者代碼類似,上下文改為 SubDbContext)
public void ConfigureServices(IServiceCollection services) { services.AddControllers(); //數據庫連接(MySQL) services.AddDbContext<PubDBContext>(option => option.UseMySql(Configuration.GetConnectionString("EFStrMySQL"), Microsoft.EntityFrameworkCore.ServerVersion.Parse("5.7.28-mysql"))); //注冊cap事件 services.AddCap(x => { //-----------------------------一.聲明存儲類型--------------------------------- //1. 使用MySQL存儲 x.UseEntityFramework<PubDBContext>(); //x.UseMySql(Configuration.GetConnectionString("EFStrMySQL")); //-----------------------------二.聲明消息隊列類型---------------------------------//1.使用RabbitMq隊列存儲 x.UseRabbitMQ(rb => { rb.HostName = "localhost"; rb.UserName = "guest"; rb.Password = "guest"; rb.Port = 5672; rb.VirtualHost = "/"; //rb.QueueMessageExpires = 24 * 3600 * 10; //隊列中消息自動刪除時間(默認10天) }); //-----------------------------三.添加后台監控,用於人工干預---------------------------------//-----------------------------四.通用配置--------------------------------- }); }
發布者連接字符串:(訂閱者代碼類似,數據庫改為:SubDB)
"ConnectionStrings": { "EFStrMySQL": "Server=localhost;Database=PubDB;User ID=root;Password=123456;" }
(5).測試:PostMan發送Get請求 http://localhost:9001/api/Pub/SendMsg ,測試通過,同時在數據庫中自動生成兩張表,其它運行效果與SQLServer完全相同啊。
A.cap.Published (Publisher1中只用該表)
B.cap.Received (Subscriber1中只用該表)
!
- 作 者 : Yaopengfei(姚鵬飛)
- 博客地址 : http://www.cnblogs.com/yaopengfei/
- 聲 明1 : 如有錯誤,歡迎討論,請勿謾罵^_^。
- 聲 明2 : 原創博客請在轉載時保留原文鏈接或在文章開頭加上本人博客地址,否則保留追究法律責任的權利。