之前工作時記錄的一份入門文檔,簡單介紹下rabbitmq跟cap的使用方法
以下我的理解說法不知規不規范,只是用我最通俗的理解寫出來
RabbitMQ是一種底層隊列的實現(Kafka也是一種隊列),CAP提供了一種通用隊列發布、訂閱使用方法。
可以理解為SqlServer、MySql跟EF的關系吧(EF比作為CAP),你不通過EF,也可以用SqlClient相關類來使用SqlServer,但EF提供一種通用的代碼使用方法,同樣的代碼可以同時用於SqlServer、MySql,代碼使用者不用關心我底層是用了SqlServer還是MySql。
RabbitMQ簡單介紹
我先說下RabbitMQ原生的使用方法,然后再說下怎么跟CAP結合
說到隊列,先理解下以下幾個概念
- 生產者:也可以說是發布者,主要是發布消息,發送給交換器;
- 消費者:也可以說是訂閱者,從隊列中訂閱消息進行處理並返回應答;
- 交換器:可能連接多個隊列,將生產者發布的消息發送到隊列中;
- 隊列:存放生產者產生的消息,供消費者進行訂閱處理;
消息從發布到訂閱的流程步驟:
生產者發布消息給交換器(傳遞一個key值),交換器在它綁定的隊列中根據key值及交換器模式找到匹配的隊列發送消息,訂閱了此隊列的消費者就可以獲取消息進行處理,並返回應答;
交換器模式
- direct 消息發送到RouteKey完全匹配的隊列中
- fanout 消息轉發到交換器綁定的所有隊列中
- topic 消息發送到RouteKey模糊匹配的隊列中
- header 會用headers屬性來進行匹配,性能最差(實際使用中很少)
topic匹配規則:隊列的key為TestRouteKey.#,可以匹配到 TestRouteKey.A.B,隊列的key為TestRouteKey.*,可以匹配到TestRouteKey.A
以下是需要注意理解的點
- 生產者也可以直接發布消息到隊列中;
- 如果交換器沒有綁定任何隊列,那發布的消息將直接丟棄;
- 一個消息只能被一個消費者獲取,要實現消息同時被多個消費者獲取,要使用交換器綁定多個隊列;
- 消費者獲取消息后,如果處理過程中失敗了沒有返回應答,那消息會在隊列中重新發送;
以下演示,可以創建兩個控制台程序,然后在Main里面寫相關代碼進行測試
生產者發布消息代碼
安裝包 RabbitMQ.Client
//連接工廠 var factory = new ConnectionFactory(){ UserName="", Password="", HostName="", Port=0 }; //創建連接 var connection = factory.CreateConnection(); //創建通道 var channel = connection.CreateModel(); //聲明交換器,模式為direct channel.ExchangeDeclare("exchangeName","direct"); //聲明隊列 channel.QueueDeclare("queueName",durable:true); //將交換器跟隊列進行綁定 channel.QueueBind("queueName","exchangeName","routeKey",null); //發布消息 channel.BasicPublish("exchangeName","routeKey",null,Encoding.UTF8.GetBytes("hello world")); channel.Close(); connection.Close();
消費者訂閱消息
var factory = new ConnectionFactory { UserName = "", Password = "", HostName = "", }; //創建個連接 var connection = factory.CreateConnection(); //創建個通道 var channel = connection.CreateModel(); var consumer = new EventingBasicConsumer(channel); //定義事件消費者,及消費接收事件(返回應答) consumer.Received += (o, e) => { var message = Encoding.UTF8.GetString(e.Body.ToArray()); Console.WriteLine($"收到消息:{message}"); channel.BasicAck(e.DeliveryTag, false); }; //啟動消費者,第二個參數是代表是否自動應答,false就得手動調用BasicAck方法 channel.BasicConsume("hello", false, consumer); Console.WriteLine("消費者已啟動"); Console.ReadKey(); channel.Close(); connection.Close();
CAP組件簡單介紹
上面簡單的介紹完RabbitMQ使用方法,下面再來簡單說下CAP是干什么的
分布式有一個CAP原則,C(一致性),A(可用性)跟P(容錯性),那這個CAP組件就是提供了一個開箱既用的解決方案
CAP可用於微服務分布式事務解決方案,就是可以搭建不同站點,使用CAP,連接同一個RabbitMQ,部署在不同的服務器上,實現分布式部署。
那要實現CAP,需要一個數據庫來記錄事件,需要一個隊列來存放事件消息。
CAP更詳情的文檔可查看它的官網,重點有中文的 http://cap.dotnetcore.xyz/
創建一個WebApi初始項目來演示一下。
安裝包 DotNetCore.CAP
安裝包DotNetCore.CAP.SqlServer,這是提供Sqlserver來記錄事件的包
安裝包 DotNetCore.CAP.RabbitMQ,這是提供RabbitMQ來存放事件消息的包
安裝包 DotNetCore.CAP.Dashboard,這是提供一個Web管理后台可查看發布、訂閱消息情況
在Startup.cs的ConfigureServices方法中注入
services.AddCap(o=>{ o.UseSqlServer(""); o.UseRabbitMQ(mq => { mq.HostName = "";//RabbitMQ服務器地址 mq.Port=5672; mq.UserName = "admin"; mq.Password = "admin"; }); o.UseDashboard(); //添加監控儀表盤,通過http://localhost/cap訪問 o.FailedRetryInterval = 30;//失敗后的重拾間隔,默認60秒 o.FailedRetryCount = 10;//失敗后的重試次數,默認50次;在FailedRetryInterval默認60秒的情況下,即默認重試50*60秒(50分鍾)之后放棄失敗重試 o.SucceedMessageExpiredAfter = 60 * 60; //設置成功信息的刪除時間默認24*3600秒 });
然后在Controllers目錄下創建一個測試控制器
[ApiController] [Route("[controller]/[action]")] public class TestController : ControllerBase { private readonly ICapPublisher _capPublisher; public TestController(ICapPublisher capPublisher) { _capPublisher = capPublisher; } [HttpPost] public void Test1() { //發布消息,消息被訂閱處理后,會回調到Test.Callback _capPublisher.Publish<string>("Test.Event", "Hello,World","Test.Callback"); } [NonAction] [CapSubscribe("Test.Event")] //訂閱Test.Event事件 public string Test2(string message) { //進行訂閱消息處理 Console.WriteLine(message); return "OK"; } [NonAction] [CapSubscribe("Test.Callback")] public void TestCallback(string result) { //發布消息完成后的回調 Console.WriteLine(result); } }
好了,上面就簡單的介紹了RabbitMQ跟CAP組件的使用方法,本來還在想這些東西適用於哪些場景,然后今天項目上線后出現問題了,里面涉及到兩個系統的調用,一個系統A因為接口被頻繁地調用超時,導致另一個系統B一直顯示出錯,我就發現這個場景就很適合用這個CAP了。
系統A的崩潰不應影響到系統B,而系統A崩潰時也可以自動進行重試,當系統B發布消息后,也不用等待系統A,顯示處理中,等系統A處理成功后再通知系統B,B再顯示成功就可以了。
