在ABP core中使用RabbitMq


距上一篇博客的更新一集很久了,主要是最近做的事情比較雜,中間也有一個難點,就是在ABP中加入APP掃碼登錄,本來想些的,但是覺得這個寫出來會不會讓我們的系統被破解-_-||,所以想了想,就沒有寫。

這篇文章主要記錄一下在ABP core中使用RabbitMq,abp的版本4.30

首先自己基礎太菜,為了爬了很久的坑,最后參照的是這篇文章https://www.cnblogs.com/personball/p/7762931.html,使用了Abplus.MqMessages.RebusRabbitMqConsumer插件。本來我老早就看這篇文章了,可是一翻開里面的Github,發現是兩年前的代碼,以為早就沒有人維護,所以就沒有用,然后各種兜兜轉轉,饒了很多圈,中間包括去學習事件總線,ABP的后台任務等等,可是繞了半天,依然沒有實現,最終又繞回來,然而讓我意外的是,這個插件居然是有人維護的,我最后弄不出了的時候,去issues里提問,作者都回復得很快。

 

為什么要寫這些無關緊要的呢?就是希望看見這篇文章的人,不要再去嘗試自己寫啦(來自一個繞了一個多星期都沒有寫出demo的人的忠告),畢竟已經有大神為我們封裝好了,何必自己重復造輪子呢,當然如果你時間比較多,那個另算。

 

好了 開始我們的代碼吧,直接去這里,將作者的源碼下下來:https://github.com/personball/abplus,大致瀏覽一下里面的代碼,然后重點是作者的demo,里面寫了如何使用發布端和訂閱端,重點看以下4個文件

 

 

好了,開始搬進你的邏輯吧,中間還出了點小問題,在這里貼出來一下:

如果你像我一樣,發布端和消費端在同一個項目中,那么你可以直接使用消費端模塊即可,無需使用發布端。在abplus對接rebus的兩個模塊中,發布模塊使用的是單向連接方式,消費端使用的是雙向連接方式,即消費端模塊同時支持發布和消費

意思已經很清楚了,只需要引入消費模塊就可以了

1、在應用層添加 Abplus.MqMessages.RebusRabbitMqConsumer程序集,

2、在module上添加RebusRabbitMqConsumerModule依賴

3、添加消費配置:

            #region RabbitMq消費端配置 
            Configuration.Modules.UseAbplusRebusRabbitMqConsumer()
                .UseLogging(c => c.Log4Net())
                .ConnectTo("amqp://admin:123456@127.0.0.1:5672/")//連接
                .UseQueue("PayQueue")//隊列名稱
                .Prefetch(100)//用於控制每次拉取的資源消耗(內存,帶寬),消費速度還要看消費端自己的消息處理速度
                .SetMaxParallelism(1)//最大並行數
                .SetNumberOfWorkers(10)//設置最大工作線程數
                                       //配置其他選項
                .UseOptions(x => x.SimpleRetryStrategy(maxDeliveryAttempts: 50, secondLevelRetriesEnabled: true, errorTrackingMaxAgeMinutes: 20))
                .RegisterHandlerInAssemblys(Assembly.GetExecutingAssembly());//注冊包含Hander的處理程序??//(Assembly.Load("MLCDZ.Application"));
            #endregion RabbitMq消費端配置  Assembly.GetAssembly(typeof(PayHandler))

 其他的就是你的發布端和消費端邏輯,說一個重點:

消費端的handler不能寫在AppService里面,需要單獨寫一個類來實現 

應該說得很清楚了,不懂的請留言,最主要的是看看作者的demo。

 

補充這后面遇到的一個坑,上次寫這篇博客的時候,看到消費端確認就興高采烈的跑來寫博客了,結果在測試的時候,發現消費端沒有應答,所以隊列里的消息始終沒有被消費掉,直到后來得到QQ群友的幫助才知道,原來消費端的類和方法不能有任何授權攔截,比如[AbpAuthorize]等

如果你的報錯信息如下,請檢查你的邏輯是否帶登錄驗證:

WARN 2019-12-20 11:45:22,511 [ker 1] us.Retry.ErrorTracking.InMemErrorTracker - Unhandled exception 2 while handling message with ID "4cb46c5b-c35f-466f-964a-ae6a5b50875e"
Abp.Authorization.AbpAuthorizationException: 當前用戶沒有登錄到系統!
at Abp.Authorization.AuthorizationHelper.AuthorizeAsync(IEnumerable1 authorizeAttributes) in D:\Github\aspnetboilerplate\src\Abp\Authorization\AuthorizationHelper.cs:line 42 at Abp.Authorization.AuthorizationHelper.CheckPermissions(MethodInfo methodInfo, Type type) in D:\Github\aspnetboilerplate\src\Abp\Authorization\AuthorizationHelper.cs:line 107 at Abp.Authorization.AuthorizationHelper.AuthorizeAsync(MethodInfo methodInfo, Type type) in D:\Github\aspnetboilerplate\src\Abp\Authorization\AuthorizationHelper.cs:line 56 at Nito.AsyncEx.Synchronous.TaskExtensions.WaitAndUnwrapException(Task task) at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state) --- End of stack trace from previous location where exception was thrown --- at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot) --- End of stack trace from previous location where exception was thrown --- at Nito.AsyncEx.Synchronous.TaskExtensions.WaitAndUnwrapException(Task task) at Nito.AsyncEx.AsyncContext.Run(Func1 action)
at Abp.Authorization.AuthorizationInterceptor.Intercept(IInvocation invocation) in D:\Github\aspnetboilerplate\src\Abp\Authorization\AuthorizationInterceptor.cs:line 20
at Castle.DynamicProxy.AbstractInvocation.Proceed()
at Abp.Domain.Uow.UnitOfWorkInterceptor.PerformAsyncUow(IInvocation invocation, UnitOfWorkOptions options) in D:\Github\aspnetboilerplate\src\Abp\Domain\Uow\UnitOfWorkInterceptor.cs:line 78
at Castle.DynamicProxy.AbstractInvocation.Proceed()
at Abp.Auditing.AuditingInterceptor.PerformAsyncAuditing(IInvocation invocation, AuditInfo auditInfo) in D:\Github\aspnetboilerplate\src\Abp\Auditing\AuditingInterceptor.cs:line 86
at Castle.DynamicProxy.AbstractInvocation.Proceed()
at Castle.DynamicProxy.AbstractInvocation.Proceed()
at Castle.Proxies.IHandleMessages1Proxy.Handle(TestMessage message) at Rebus.Pipeline.Receive.HandlerInvoker1.Invoke() in C:\projects-rebus\Rebus\Rebus\Pipeline\Receive\HandlerInvoker.cs:line 154
at Rebus.Pipeline.Receive.DispatchIncomingMessageStep.Process(IncomingStepContext context, Func1 next) in C:\projects-rebus\Rebus\Rebus\Pipeline\Receive\DispatchIncomingMessageStep.cs:line 67 at Rebus.Sagas.LoadSagaDataStep.Process(IncomingStepContext context, Func1 next) in C:\projects-rebus\Rebus\Rebus\Sagas\LoadSagaDataStep.cs:line 66
at Rebus.Pipeline.Receive.ActivateHandlersStep.Process(IncomingStepContext context, Func1 next) in C:\projects-rebus\Rebus\Rebus\Pipeline\Receive\ActivateHandlersStep.cs:line 47 at Rebus.Pipeline.Receive.HandleRoutingSlipsStep.Process(IncomingStepContext context, Func1 next) in C:\projects-rebus\Rebus\Rebus\Pipeline\Receive\HandleRoutingSlipsStep.cs:line 42
at Rebus.Retry.Simple.FailedMessageWrapperStep.Process(IncomingStepContext context, Func1 next) in C:\projects-rebus\Rebus\Rebus\Retry\Simple\FailedMessageWrapperStep.cs:line 42 at Rebus.Pipeline.Receive.DeserializeIncomingMessageStep.Process(IncomingStepContext context, Func1 next) in C:\projects-rebus\Rebus\Rebus\Pipeline\Receive\DeserializeIncomingMessageStep.cs:line 34
at Rebus.Pipeline.Receive.HandleDeferredMessagesStep.Process(IncomingStepContext context, Func1 next) in C:\projects-rebus\Rebus\Rebus\Pipeline\Receive\HandleDeferredMessagesStep.cs:line 121 at Rebus.Retry.FailFast.FailFastStep.Process(IncomingStepContext context, Func1 next) in C:\projects-rebus\Rebus\Rebus\Retry\FailFast\FailFastStep.cs:line 41
at Rebus.Retry.Simple.SimpleRetryStrategyStep.DispatchWithTrackerIdentifier(Func`1 next, String identifierToTrackMessageBy, ITransactionContext transactionContext, String messageId, String secondLevelMessageId) in C:\projects-rebus\Rebus\Rebus\Retry\Simple\SimpleRetryStrategyStep.cs:line 120


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM