Rebus消息總線


這里主要講一下我基於Rebus寫的一個ABP框架的模塊


 
目錄結構

對於Rebus網上的資料很少,其實我對於服務總線也不是很理解 。。個人理解的就是像ABP中的EventBus那樣的,但是集成了一些消息隊列像MSMQ,RabbitMQ等。

廢話不多說,下面主要講下幾個主要的文件

 
RebusRabbitMqModule

這個呢就是ABP的模塊寫法,詳細的可以去ABP官網看下,這里主要的代碼是在這里

            var moduleConfig = IocManager.Resolve<IRebusRabbitMqModuleConfig>(); if (moduleConfig.Enabled) { var rebusConfig = Configure.With(new CastleWindsorContainerAdapter(IocManager.IocContainer)); if (moduleConfig.LoggingConfigurer != null) { //配置Rebus用哪種工具來記錄日志,我這里用的Log4net rebusConfig.Logging(moduleConfig.LoggingConfigurer); } rebusConfig.Serialization(moduleConfig.SerializerConfigurer); if (moduleConfig.OptionsConfigurer != null) { //自定義配置 rebusConfig.Options(moduleConfig.OptionsConfigurer); } rebusConfig.Options(c => { c.SetMaxParallelism(moduleConfig.MaxParallelism); c.SetNumberOfWorkers(moduleConfig.NumberOfWorkers); }); if (moduleConfig.MessageAuditingEnabled) { //消息審計隊列名稱 rebusConfig.Options(o => o.EnableMessageAuditing(moduleConfig.MessageAuditingQueueName)); } var mqMessageTypes = new List<Type>(); //通過反射取到所有繼承IHandleMessages的類進行消息訂閱 foreach (var assembly in moduleConfig.AssemblysIncludeRebusMqMessageHandlers) { IocManager.IocContainer.AutoRegisterHandlersFromAssembly(assembly); mqMessageTypes.AddRange(assembly.GetTypes() .Where(t => t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IHandleMessages<>))) .SelectMany(t => t.GetInterfaces()) .Distinct() .SelectMany(t => t.GetGenericArguments()) .Distinct()); } //這個就是配置使用RabbitMq進行消息通信的方法,具體的去看Rebus上的文檔 _bus = rebusConfig.Transport(c => c.UseRabbitMq(moduleConfig.ConnectString, moduleConfig.QueueName)).Start(); //Subscribe messages mqMessageTypes = mqMessageTypes.Distinct().ToList(); foreach (var mqMessageType in mqMessageTypes) { _bus.Subscribe(mqMessageType); } } 

模塊的使用

找到你需要引用模塊的地方,如下圖


 
添加依賴

然后進行對應的配置


 
image.png

里面的RabbitMqUrl是你本地RabbitMq的訪問地址,比如我的是amqp://guest:guest@127.0.0.1:5672/

接下來就是如何使用進行消息的發送和處理

像其它的EventBus一樣,需要先建立EventData

 public class Test { public string Name { get; set; } } 

然后是EventHander

public class TestHandler : EventDataConsumerHandlerBase<Test> { public override Task Handle(Test message) { //這里就是寫你需要對message進行怎樣的處理 return base.Handle(message); } } 

最后就是發布,我這里是在AppService里面進行消息的發送

 #region 構造函數 private readonly IRepository<BaseItem> _baseItemRepository; private readonly ICacheManager _cacheManager; private readonly IMqMessagePublisher _iMqMessagePublisher;//依賴注入 public BaseItemAppService(IRepository<BaseItem> baseItemRepository, ICacheManager cacheManager, IMqMessagePublisher iMqMessagePublisher) : base(baseItemRepository) { this._cacheManager = cacheManager; this._baseItemRepository = baseItemRepository; _iMqMessagePublisher = iMqMessagePublisher; } #endregion #region 增刪改查 protected override IQueryable<BaseItem> CreateFilteredQuery(BaseItemSearchDto input) { //消息的發布 _iMqMessagePublisher.Publish(new Test { Name = "123" }); return base.CreateFilteredQuery(input) .WhereIf(input.DisplayName.IsNotNullOrEmpty(), m => m.DisplayName.Contains(input.DisplayName)) .WhereIf(input.TypeId != null, m => m.TypeId == input.TypeId); } #endregion 

這樣就可以了

我講的不是很詳細(我表達能力不行),大家可以去直接看我的代碼,我就做了簡單的封裝不是很麻煩。大家可能會問我集成這個之后的業務場景是什么?我就把我用到的講下吧:

  • 訂單的處理:同一時間可能有很多個訂單,這時可以把他發布到隊列中一個一個推送進行處理,而且未標記完成的訂單,就算你程序報錯了,他也會自動重新推送好像默認是5次
  • 掃碼入庫的時候:倉庫管理員掃碼入庫的速度是很快的,但后台程序需要一個比對操作比較耗時,這時候也可以把掃的碼加入到隊列中慢慢處理。

Rebus我研究不多,有些隱藏的好東西我可能暫時沒加入,希望能多提點提點。。還有服務總線是啥?

最后附上幾個地址:



作者:邵佳楠
鏈接:https://www.jianshu.com/p/5961bc5e556d
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM