之前使用MQ的時候是通過封裝成dll發布Nuget包來使用,消息的發布和消費都耦合在使用的站點和服務里,這樣會造成兩個問題:
1.增加服務和站點的壓力,因為每次消息的消費就意味着接口的調用,這部分的壓力都加在了使用的站點和服務的機器上。
2.增加修改的復雜性,如果我們需要加兩條消費日志,都需要再發布一個版本重新通過dll引用。
所以我們需要做以下兩方面的工作:
1.MQ的接收拆分為Windows服務,通過zokeerper實現主從防止單點故障。
2.MQ的消費這里做成單獨的WebApi服務。
這樣做的好處有以下幾方面:
1.解耦。MQ的消費從使用的站點和服務中被拆分出來,減輕服務壓力。
2.增加程序的可維護和可調試性。
3.單獨部署提高吞吐量。
首先我們先來看下MQ的消費服務端,其實就是把之前調接口的方法單獨放到了WebApi中,這樣可以單獨部署,減輕服務器壓力:
/// <summary> /// MQ消費到指定的服務接口 /// </summary> [HttpPost] public async Task<ConsumerProcessEventResponse> ConsumerProcessEventAsync([FromBody]ConsumerProcessEventRequest request) { ConsumerProcessEventResponse response = new ConsumerProcessEventResponse(); try { _logger.LogInformation($"MQ准備執行ConsumerProcessEvent方法,RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}"); using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { //獲取綁定該routingKey的服務地址集合 var subscriptions = await StackRedis.Current.GetAllList(request.RoutingKey); if (!subscriptions.Any()) { //如果Redis中不存在 則從數據庫中查詢 加入Redis中 var queryRoutingKeyApiUrlResponse = _apiHelperService.PostAsync<QueryRoutingKeyApiUrlResponse>(ServiceAddress.QueryRoutingKeyApiUrlAsync, new QueryRoutingKeyApiUrlRequest { RoutingKey = request.RoutingKey }); if (queryRoutingKeyApiUrlResponse.Result != null && queryRoutingKeyApiUrlResponse.Result.ApiUrlList.Any()) { subscriptions = queryRoutingKeyApiUrlResponse.Result.ApiUrlList; Task.Run(() => { StackRedis.Current.SetLists(request.RoutingKey, queryRoutingKeyApiUrlResponse.Result.ApiUrlList); }); } } if(subscriptions!=null && subscriptions.Any()) { foreach (var apiUrl in subscriptions) { Task.Run(() => { _logger.LogInformation(request.MQBodyMessage); }); //這里需要做判斷 假如MQ要發送到多個服務接口 其中一個消費失敗 應該將其單獨記錄到數據庫 而不影響這個消息的確認
//先做個備注 以后添加這個處理
await _apiHelperService.PostAsync(apiUrl, request.MQBodyMessage); } _logger.LogInformation($"MQ執行ProcessEvent方法完成,RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}"); } } } catch(Exception ex) { response.Successful = false; response.Message = ex.Message; _logger.LogError(ex, $"MQ消費失敗 RoutingKey:{request.RoutingKey} Message:{request.MQBodyMessage}"); } return response; }
這個WebApi只有這一個方法,就是根據RoutingKey查找對應的MQ配置,然后根據配置的接口地址調用指定的接口,比較簡單哈,之前也寫過,就不細說了。
我們來看接收MQ消息的Windows服務端,MQ首次使用都需要重新綁定Routingkey、隊列和交換器,所以我在Monitor服務里寫了一個綁定的方法,在Windows服務端啟動的時候調用一次:
public class MQConsumerService { private readonly IApiHelperService _apiHelperService; private ILog _logger; public MQConsumerService(IApiHelperService apiHelperService,ILog logger) { _apiHelperService = apiHelperService; _logger = logger; } /// <summary> /// 發送MQ到MQ消費服務端 /// </summary> /// <param name="routingKey"></param> /// <param name="message"></param> public void ProcessEvent(string routingKey, string message) { try { _logger.Info($"MQ准備執行ProcessEvent方法,RoutingKey:{routingKey} Message:{message}"); _apiHelperService.PostAsync<ConsumerProcessEventResponse>(ServiceUrls.ConsumerProcessEvent,new ConsumerProcessEventRequest { RoutingKey=routingKey,MQBodyMessage=message}); } catch(Exception ex) { _logger.Error($"MQ發送消費服務端失敗 RoutingKey:{routingKey} Message:{message}",ex); } } /// <summary> /// MQ初始化 調用隊列交換器綁定接口 /// </summary> /// <returns></returns> public async Task MQSubscribeAsync() { try { var response= await _apiHelperService.PostAsync<MQSubscribeResponse>(ServiceUrls.MQSubscribe, new MQSubscribeRequest()); if(!response.Successful) { _logger.Error($"MQ綁定RoutingKey隊列失敗: {response.Message}"); } } catch(Exception ex) { _logger.Error($"MQ綁定RoutingKey隊列失敗",ex); } } }
這里為了簡單起見,交換器和隊列使用的都是同一個,路由方式是“direct”模式,之后會繼續修改的,先跑起來再說:
static void Main(string[] args) { //交換器(Exchange) const string BROKER_NAME = "mi_event_bus"; //隊列(Queue) var SubscriptionClientName = "RabbitMQ_Bus_MI"; //log4net日志加載 ILoggerRepository repository = LogManager.CreateRepository("MI.WinService.MQConsumer"); XmlConfigurator.Configure(repository, new FileInfo("log4net.config")); ILog log = LogManager.GetLogger(repository.Name, "MI.WinService.MQConsumer"); //依賴注入加載 IServiceCollection serviceCollection = new ServiceCollection(); //WebApi調用類 serviceCollection.AddTransient<IApiHelperService, ApiHelperService>(); var serviceProvider = serviceCollection.AddHttpClient().BuildServiceProvider(); serviceProvider.GetService<ILogger>(); var apiHelperService = serviceProvider.GetService<IApiHelperService>(); //MQ消費類(發送MQ消息調用接口、綁定隊列交換器) MQConsumerService consumerService = new MQConsumerService(apiHelperService,log); //MQ連接類 ConnectionFactory factory = new ConnectionFactory { UserName = "", Password = "", HostName = "" }; var connection = factory.CreateConnection(); var channel = connection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); channel.QueueDeclare(queue: SubscriptionClientName, durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (ch, ea) => { //發送到MQ消費服務端 var message = Encoding.UTF8.GetString(ea.Body); log.Info($"MQ准備消費消息 RoutingKey:{ea.RoutingKey} Message:{message}"); //發送到MQ消費服務端MQStationServer Task result= Task.Run(() => { consumerService.ProcessEvent(ea.RoutingKey, message); }); if(!result.IsFaulted) { //確認ack channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(SubscriptionClientName, false, consumer); Console.WriteLine("消費者已啟動!"); //綁定隊列RoutingKey Task taskResult= Task.Run(async() => { await consumerService.MQSubscribeAsync(); }); taskResult.Wait(); Console.WriteLine("隊列RoutingKey綁定完成!"); Console.ReadKey(); channel.Dispose(); connection.Close(); }
最后梳理下消費端消費MQ流程:
MQ發布后,Windows服務端會受到MQ消息,然后通過調用接口將消息發送到MQ消費服務端,通過RoutingKey從數據庫查找對應的MQ和接口配置,調用指定接口,當然,這里只是簡單的代碼列子,想用在生產中必須要做好完善的日志調用記錄、性能監控、健康檢查以及服務器層面的集群防止單點故障。