RabbitMQ是什么,怎么使用我就不介紹了,大家可以到園子里搜一下教程。本篇的重點在於實現服務與服務之間的異步通信。
首先說一下為什么要使用消息隊列來實現服務通信:1.提高接口並發能力。 2.保證服務各方數據最終一致。 3.解耦。
使用消息隊列通信的優點就是直接調用的缺點,比如在直接調用過程中發生未知錯誤,很可能就會出現數據不一致的問題,這個時候就需要人工修補數據,如果有過這個經歷的同學一定是可憐的,人工修補數據簡直痛苦!!再比如高並發情況下接口直接掛點,這就更直白了,接口掛了,功能就掛了,事故報告寫起來!!而消息隊列可以輕松解決上面兩個問題,接口發生錯誤,不要緊,MQ重試一下,再不行,人工重試MQ;在使用消息隊列的時候,請求實際是被串行化,簡單說就是排隊,所以再也不用擔心因為並發導致數據不一致或者接口直接掛掉的問題。
我現在公司使用的消息隊列排隊的請求最高的有上萬個,所以完全不需要擔心MQ的性能。
OK,我們來實現一下微服務里如何使用消息隊列,主要思路是這樣的:
【提供消費者注冊界面,用於綁定RoutingKey和隊列;消息發布后,根據RoutingKey去Redis中查找對應的服務地址,然后異步調用。】
上面這句話就是消息隊列的主體思路,也是我司現在使用的方式,話不多說,代碼敲起來。
首先看下我們的項目結構:
首先我們需要先建三個這樣的類庫,這里面有些東西是用不到的,當然最最主要的就是標記出來的消息隊列部分,現在暫時提供了兩個方法,分別是發布(Publish)和訂閱(Subscribe)。
首先新增消息·隊列接口類IEventBus,這個將來用於在業務系統中注入使用,這里提供了發布訂閱方法:
public interface IEventBus { void Publish(string RoutingKey, object Model); void Subscribe(string QueueName, string RoutingKey); }
新增RabbitMQ操作接口類IRabbitMQPersistentConnection,這個用來檢查RabbitMQ的連接和釋放:
public interface IRabbitMQPersistentConnection : IDisposable { bool IsConnected { get; } bool TryConnect(); IModel CreateModel(); }
新增IRabbitMQPersistentConnection的實現類DefaultRabbitMQPersistentConnection,這個是RabbitMQ連接和釋放方法的具體實現,這個沒什么可說的,大家一看就知道了,就是檢查RabbitMQ的連接狀態,沒有連接創建連接,發生錯誤的捕捉錯誤重新連接,這里用到了Polly的重新策略:
public class DefaultRabbitMQPersistentConnection:IRabbitMQPersistentConnection { private readonly IConnectionFactory _connectionFactory; private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger; private readonly int _retryCount; IConnection _connection; bool _disposed; object sync_root = new object(); public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<DefaultRabbitMQPersistentConnection> logger, int retryCount = 5) { _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _retryCount = retryCount; } public bool IsConnected { get { return _connection != null && _connection.IsOpen && !_disposed; } } public IModel CreateModel() { if (!IsConnected) { throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); } return _connection.CreateModel(); } public void Dispose() { if (_disposed) return; _disposed = true; try { _connection.Dispose(); } catch (IOException ex) { _logger.LogCritical(ex.ToString()); } } public bool TryConnect() { _logger.LogInformation("RabbitMQ Client is trying to connect"); lock (sync_root) { var policy = RetryPolicy.Handle<SocketException>() .Or<BrokerUnreachableException>() .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { _logger.LogWarning(ex.ToString()); }); policy.Execute(() => { _connection = _connectionFactory .CreateConnection(); }); if (IsConnected) { _connection.ConnectionShutdown += OnConnectionShutdown; _connection.CallbackException += OnCallbackException; _connection.ConnectionBlocked += OnConnectionBlocked; _logger.LogInformation($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events"); return true; } else { _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); return false; } } } private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) { if (_disposed) return; _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); TryConnect(); } void OnCallbackException(object sender, CallbackExceptionEventArgs e) { if (_disposed) return; _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); TryConnect(); } void OnConnectionShutdown(object sender, ShutdownEventArgs reason) { if (_disposed) return; _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); TryConnect(); } }
接下來是最重要,IEventBus的實現類EventBusRabbitMQ,在這個類里我們實現了消息的發布、訂閱、消費,首先把代碼展示出來,然后一個一個的介紹:
public class EventBusRabbitMQ : IEventBus, IDisposable { const string BROKER_NAME = "mi_event_bus"; private readonly IRabbitMQPersistentConnection _persistentConnection; private readonly ILogger<EventBusRabbitMQ> _logger; private readonly ILifetimeScope _autofac; private readonly IApiHelperService _apiHelperService; private readonly string AUTOFAC_SCOPE_NAME = "mi_event_bus"; private readonly int _retryCount; private IModel _consumerChannel; private string _queueName; public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection,ILogger<EventBusRabbitMQ> logger, ILifetimeScope autofac, IApiHelperService apiHelperService, string queueName=null,int retryCount=5) { _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _queueName = queueName; _consumerChannel = CreateConsumerChannel(); _autofac = autofac; _retryCount = retryCount; _apiHelperService = apiHelperService; } /// <summary> /// 發布消息 /// </summary> public void Publish(string routingKey,object Model) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var policy = RetryPolicy.Handle<BrokerUnreachableException>() .Or<SocketException>() .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { _logger.LogWarning(ex.ToString()); }); using (var channel = _persistentConnection.CreateModel()) { channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); var message = JsonConvert.SerializeObject(Model); var body = Encoding.UTF8.GetBytes(message); policy.Execute(() => { var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //持久化 channel.BasicPublish(exchange: BROKER_NAME, routingKey: routingKey, mandatory: true, basicProperties: properties, body: body); }); } } /// <summary> /// 訂閱(綁定RoutingKey和隊列) /// </summary> public void Subscribe(string QueueName, string RoutingKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: QueueName, exchange: BROKER_NAME, routingKey: RoutingKey); } } /// <summary> /// 創建消費者並投遞消息 /// </summary> /// <returns></returns> private IModel CreateConsumerChannel() { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var channel = _persistentConnection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); await ProcessEvent(ea.RoutingKey, message); channel.BasicAck(ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer); channel.CallbackException += (sender, ea) => { _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); }; return channel; } /// <summary> /// 發送MQ數據到指定服務接口 /// </summary> private async Task ProcessEvent(string routingKey, string message) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { //獲取綁定該routingKey的服務地址集合 var subscriptions = await StackRedis.Current.GetAllList(routingKey); foreach(var apiUrl in subscriptions) { _logger.LogInformation(message); await _apiHelperService.PostAsync(apiUrl, message); } } } public void Dispose() { _consumerChannel?.Dispose(); } }
首先是發布方法,接受一個字符串類型的RoutingKey和Object類型的MQ數據,然后根據RoutingKey將數據發布到指定的隊列,這里RoutingKey發布到隊列的方式用的是direct模式,生產環境下我們通常會使用Topic模式,后面真正使用的時候這里也會改掉;同時在MQ發布方面也采用了Polly的重試策略。
接下來是訂閱Subscribe方法,這個比較簡單,就是包RoutingKey和Queue進行綁定,這里會提供一個專門的注冊界面,用於配置RoutingKey、Queue、ExChange和服務接口地址之間的對應關系,用的就是這個方法。
using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: QueueName, exchange: BROKER_NAME, routingKey: RoutingKey); }
然后是消費者的創建和消費方式方法CreateConsumerChannel,這個是最重要一個,在這個方法里真正實現了消息的消費,消息的消費通過委托實現,我們需要關注的是下面這個地方:
var channel = _persistentConnection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); await ProcessEvent(ea.RoutingKey, message); channel.BasicAck(ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);
解釋下這段代碼,首先創建消息通道,並為它綁定交換器Exchange和隊列Queue,然后在這條消息通道上創建消費者Consumer,為這個消費者的接受消息的委托注冊一個處理方法。
當消息被路由到當前隊列Queue上時,就會觸發這個消息的處理方法,處理完成后,自動發送ack確認。
ProcessEvent是消息的具體處理方法,大體流程是這樣的,它接受一個RoutingKey和消息數據message,根據RoutingKey從Redis中拿到對應的服務地址,我們前面說過會有一個專門的頁面用於綁定RoutingKey和服務地址的關系,拿到地址集合之后循環調用,即Api調用。
/// <summary> /// 發送MQ到指定服務接口 /// </summary> private async Task ProcessEvent(string routingKey, string message) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { //獲取綁定該routingKey的服務地址集合 var subscriptions = await StackRedis.Current.GetAllList(routingKey); foreach(var apiUrl in subscriptions) { _logger.LogInformation(message); await _apiHelperService.PostAsync(apiUrl, message); } } }
這里用到了Api調用的幫助類,前面已經寫過了,只不過把它放到了這個公共的地方,還是貼下代碼:
public interface IApiHelperService { Task<T> PostAsync<T>(string url, object Model); Task<T> GetAsync<T>(string url); Task PostAsync(string url, string requestMessage); }
public class ApiHelperService : IApiHelperService { private readonly IHttpClientFactory _httpClientFactory; private readonly ILogger<ApiHelperService> _logger; public ApiHelperService(ILogger<ApiHelperService> _logger, IHttpClientFactory _httpClientFactory) { this._httpClientFactory = _httpClientFactory; this._logger = _logger; } /// <summary> /// HttpClient實現Post請求 /// </summary> public async Task<T> PostAsync<T>(string url, object Model) { var http = _httpClientFactory.CreateClient("MI.Web"); //添加Token var token = await GetToken(); http.SetBearerToken(token); //使用FormUrlEncodedContent做HttpContent var httpContent = new StringContent(JsonConvert.SerializeObject(Model), Encoding.UTF8, "application/json"); //await異步等待回應 var response = await http.PostAsync(url, httpContent); //確保HTTP成功狀態值 response.EnsureSuccessStatusCode(); //await異步讀取 string Result = await response.Content.ReadAsStringAsync(); var Item = JsonConvert.DeserializeObject<T>(Result); return Item; } /// <summary> /// HttpClient實現Post請求(用於MQ發布功能 無返回) /// </summary> public async Task PostAsync(string url, string requestMessage) { var http = _httpClientFactory.CreateClient(); //添加Token var token = await GetToken(); http.SetBearerToken(token); //使用FormUrlEncodedContent做HttpContent var httpContent = new StringContent(requestMessage, Encoding.UTF8, "application/json"); //await異步等待回應 var response = await http.PostAsync(url, httpContent); //確保HTTP成功狀態值 response.EnsureSuccessStatusCode(); } /// <summary> /// HttpClient實現Get請求 /// </summary> public async Task<T> GetAsync<T>(string url) { var http = _httpClientFactory.CreateClient("MI.Web"); //添加Token var token = await GetToken(); http.SetBearerToken(token); //await異步等待回應 var response = await http.GetAsync(url); //確保HTTP成功狀態值 response.EnsureSuccessStatusCode(); var Result = await response.Content.ReadAsStringAsync(); var Items = JsonConvert.DeserializeObject<T>(Result); return Items; } /// <summary> /// 轉換URL /// </summary> /// <param name="str"></param> /// <returns></returns> public static string UrlEncode(string str) { StringBuilder sb = new StringBuilder(); byte[] byStr = System.Text.Encoding.UTF8.GetBytes(str); for (int i = 0; i < byStr.Length; i++) { sb.Append(@"%" + Convert.ToString(byStr[i], 16)); } return (sb.ToString()); } //獲取Token //獲取Token public async Task<string> GetToken() { var client = _httpClientFactory.CreateClient("MI.Web"); string token = await Untity.StackRedis.Current.Get("ApiToken"); if (!string.IsNullOrEmpty(token)) { return token; } try { //DiscoveryClient類:IdentityModel提供給我們通過基礎地址(如:http://localhost:5000)就可以訪問令牌服務端; //當然可以根據上面的restful api里面的url自行構建;上面就是通過基礎地址,獲取一個TokenClient;(對應restful的url:token_endpoint "http://localhost:5000/connect/token") //RequestClientCredentialsAsync方法:請求令牌; //獲取令牌后,就可以通過構建http請求訪問API接口;這里使用HttpClient構建請求,獲取內容; var cache = new DiscoveryCache("http://localhost:7000"); var disco = await cache.GetAsync(); if (disco.IsError) throw new Exception(disco.Error); var tokenResponse = await client.RequestClientCredentialsTokenAsync(new ClientCredentialsTokenRequest { Address = disco.TokenEndpoint, ClientId = "MI.Web", ClientSecret = "miwebsecret", Scope = "MI.Service" }); if (tokenResponse.IsError) { throw new Exception(tokenResponse.Error); } token = tokenResponse.AccessToken; await Untity.StackRedis.Current.Set("ApiToken", token, (int)TimeSpan.FromSeconds(tokenResponse.ExpiresIn).TotalMinutes); } catch (Exception ex) { throw new Exception(ex.Message); } return token; } }
然后Redis幫助類的代碼也貼一下,Redis這里大家可以根據自己習慣,如何使用沒什么區別:
public class StackRedis : IDisposable { #region 配置屬性 基於 StackExchange.Redis 封裝 //連接串 (注:IP:端口,屬性=,屬性=) //public string _ConnectionString = "47.99.92.76:6379,password=shenniubuxing3"; public string _ConnectionString = "47.99.92.76:6379"; //操作的庫(注:默認0庫) public int _Db = 0; #endregion #region 管理器對象 /// <summary> /// 獲取redis操作類對象 /// </summary> private static StackRedis _StackRedis; private static object _locker_StackRedis = new object(); public static StackRedis Current { get { if (_StackRedis == null) { lock (_locker_StackRedis) { _StackRedis = _StackRedis ?? new StackRedis(); return _StackRedis; } } return _StackRedis; } } /// <summary> /// 獲取並發鏈接管理器對象 /// </summary> private static ConnectionMultiplexer _redis; private static object _locker = new object(); public ConnectionMultiplexer Manager { get { if (_redis == null) { lock (_locker) { _redis = _redis ?? GetManager(_ConnectionString); return _redis; } } return _redis; } } /// <summary> /// 獲取鏈接管理器 /// </summary> /// <param name="connectionString"></param> /// <returns></returns> public ConnectionMultiplexer GetManager(string connectionString) { return ConnectionMultiplexer.Connect(connectionString); } /// <summary> /// 獲取操作數據庫對象 /// </summary> /// <returns></returns> public IDatabase GetDb() { return Manager.GetDatabase(_Db); } #endregion #region 操作方法 #region string 操作 /// <summary> /// 根據Key移除 /// </summary> /// <param name="key"></param> /// <returns></returns> public async Task<bool> Remove(string key) { var db = this.GetDb(); return await db.KeyDeleteAsync(key); } /// <summary> /// 根據key獲取string結果 /// </summary> /// <param name="key"></param> /// <returns></returns> public async Task<string> Get(string key) { var db = this.GetDb(); return await db.StringGetAsync(key); } /// <summary> /// 根據key獲取string中的對象 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="key"></param> /// <returns></returns> public async Task<T> Get<T>(string key) { var t = default(T); try { var _str = await this.Get(key); if (string.IsNullOrWhiteSpace(_str)) { return t; } t = JsonConvert.DeserializeObject<T>(_str); } catch (Exception ex) { } return t; } /// <summary> /// 存儲string數據 /// </summary> /// <param name="key"></param> /// <param name="value"></param> /// <param name="expireMinutes"></param> /// <returns></returns> public async Task<bool> Set(string key, string value, int expireMinutes = 0) { var db = this.GetDb(); if (expireMinutes > 0) { return db.StringSet(key, value, TimeSpan.FromMinutes(expireMinutes)); } return await db.StringSetAsync(key, value); } /// <summary> /// 存儲對象數據到string /// </summary> /// <typeparam name="T"></typeparam> /// <param name="key"></param> /// <param name="value"></param> /// <param name="expireMinutes"></param> /// <returns></returns> public async Task<bool> Set<T>(string key, T value, int expireMinutes = 0) { try { var jsonOption = new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore }; var _str = JsonConvert.SerializeObject(value, jsonOption); if (string.IsNullOrWhiteSpace(_str)) { return false; } return await this.Set(key, _str, expireMinutes); } catch (Exception ex) { } return false; } #endregion #region List操作(注:可以當做隊列使用) /// <summary> /// list長度 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="key"></param> /// <returns></returns> public async Task<long> GetListLen<T>(string key) { try { var db = this.GetDb(); return await db.ListLengthAsync(key); } catch (Exception ex) { } return 0; } /// <summary> /// 獲取隊列出口數據並移除 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="key"></param> /// <returns></returns> public async Task<T> GetListAndPop<T>(string key) { var t = default(T); try { var db = this.GetDb(); var _str = await db.ListRightPopAsync(key); if (string.IsNullOrWhiteSpace(_str)) { return t; } t = JsonConvert.DeserializeObject<T>(_str); } catch (Exception ex) { } return t; } /// <summary> /// 集合對象添加到list左邊 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="key"></param> /// <param name="values"></param> /// <returns></returns> public async Task<long> SetLists<T>(string key, List<T> values) { var result = 0L; try { var jsonOption = new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore }; var db = this.GetDb(); foreach (var item in values) { var _str = JsonConvert.SerializeObject(item, jsonOption); result += await db.ListLeftPushAsync(key, _str); } return result; } catch (Exception ex) { } return result; } /// <summary> /// 單個對象添加到list左邊 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="key"></param> /// <param name="value"></param> /// <returns></returns> public async Task<long> SetList<T>(string key, T value) { var result = 0L; try { result = await this.SetLists(key, new List<T> { value }); } catch (Exception ex) { } return result; } /// <summary> /// 獲取List所有數據 /// </summary> public async Task<List<string>> GetAllList(string list) { var db = this.GetDb(); var redisList = await db.ListRangeAsync(list); List<string> listMembers = new List<string>(); foreach (var item in redisList) { listMembers.Add(JsonConvert.DeserializeObject<string>(item)); } return listMembers; } #endregion #region 額外擴展 /// <summary> /// 手動回收管理器對象 /// </summary> public void Dispose() { this.Dispose(_redis); } public void Dispose(ConnectionMultiplexer con) { if (con != null) { con.Close(); con.Dispose(); } } #endregion #endregion }
OK,核心代碼部分介紹到這里,具體來看怎么使用,推送當前類庫到自己的Nuget包,不知道怎么建Nuget服務器的可以看下我之前的那篇文章。
打開MI.Web項目,在Startup中注冊RabbitMQ的相關信息:
/// <summary> /// 消息總線RabbitMQ /// </summary> private void RegisterEventBus(IServiceCollection services) { #region 加載RabbitMQ賬戶 services.AddSingleton<IRabbitMQPersistentConnection>(sp => { var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>(); var factory = new ConnectionFactory() { HostName = Configuration["EventBusConnection"] }; if (!string.IsNullOrEmpty(Configuration["EventBusUserName"])) { factory.UserName = Configuration["EventBusUserName"]; } if (!string.IsNullOrEmpty(Configuration["EventBusPassword"])) { factory.Password = Configuration["EventBusPassword"]; } var retryCount = 5; if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) { retryCount = int.Parse(Configuration["EventBusRetryCount"]); } return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); }); #endregion var subscriptionClientName = Configuration["SubscriptionClientName"]; services.AddSingleton<IEventBus, EventBusRabbitMQ.EventBusRabbitMQ>(sp => { var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ.EventBusRabbitMQ>>(); var apiHelper = sp.GetRequiredService<IApiHelperService>(); var retryCount = 5; if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) { retryCount = int.Parse(Configuration["EventBusRetryCount"]); } return new EventBusRabbitMQ.EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, apiHelper, subscriptionClientName, retryCount); }); }
這里暫時還沒做出專門用於注冊RoutingKey的界面,所以暫時用在這里用方法注冊下,后面再修改,這里的RoutingKey用於用戶注冊使用:
//綁定RoutingKey與隊列 private void ConfigureEventBus(IApplicationBuilder app) { var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); eventBus.Subscribe(Configuration["SubscriptionClientName"], "UserRegister"); }
上面用的都是appsettings.json里的配置,貼下代碼,標藍的部分是需要用到的:
{ "Logging": { "IncludeScopes": false, "LogLevel": { "Default": "Warning" } }, "ConnectionStrings": { "ElasticSearchServerAddress": "", "Redis": "47.99.92.76:6379" }, "ServiceAddress": { "Service.Identity": "http://localhost:7000", "Service.Account": "http://localhost:7001", "Service.Ocelot": "http://localhost:7003", "Service.Picture": "http://localhost:7005" }, "MehtodName": { "Account.MiUser.SSOLogin": "/Account/MiUser/SSOLogin", //登錄 "Identity.Connect.Token": "/connect/token", //獲取token "Picture.QueryPicture.QueryStartProduct": "/Picture/QueryPicture/QueryStartProduct", //查詢明星產品 "Picture.QueryPicture.QuerySlideImg": "/Picture/QueryPicture/QuerySlideImg", //查詢輪播圖 "Picture.QueryPicture.QueryHadrWare": "/Picture/QueryPicture/QueryHadrWare" //查詢智能硬件表數據 }, "EventBusConnection": "******", //RabbitMQ地址 "EventBusUserName": "guest", "EventBusPassword": "guest", "EventBusRetryCount": 5, "SubscriptionClientName": "RabbitMQ_Bus_MI" }
OK,配置部分算是完成了,接下我們就要去發送MQ了,我們這里使用IEventBus對象調用發布方法,用於發送用戶的注冊信息,最終最調用新增用戶接口:
private readonly IEventBus _eventBus; public LoginController(IEventBus _eventBus) { this._eventBus = _eventBus; } public JsonResult RegisterUser(string UserName, string UserPwd) { try { if (!string.IsNullOrEmpty(UserName) && !string.IsNullOrEmpty(UserPwd)) { RegisterRequest request = new RegisterRequest { UserName = UserName, Password = UserPwd }; _eventBus.Publish("UserRegister", request); } } catch (Exception ex) { _logger.LogError(ex, "注冊失敗!"); } return Json(""); }
最終會新增當前傳入的用戶信息。
當然,這不是消息隊列的最終使用方式,后面會繼續修改,這里的問題在於發布和消費都耦合再了業務層,對於業務系統來說這是一種負擔,舉個例子,我們公司當前隊列消息多的能達到上百萬個,如果把消息的消費和業務系統放在一起可能會影響,所以使用的時候會把消費端單獨拿出來做成Windows服務,並添加自動重試和補償機制,畢竟RabbitMQ也不是沒有錯誤的,比如調用Api出現問題,遲遲無法返回ack確認,這個時候就會報出 wait ack timeout的錯誤。
OK,今天先到這里,我去煮包泡面吃。。。