基於Abp VNext框架設計 - 分布式消息


abp 通過IDistributedEventBus接口集成自IEventBus實現分布式事件消息的發布訂閱。

IEventBus在什么時機觸發PublishAsync?

  1. 當前UnitOfWork完成時,觸發IEventBusPublishAsync
  2. 在沒有事務環境下,同步調用IEventBusPublishAsync

abp 默認實現基於RabbitMq消息隊列Volo.Abp.EventBus.RabbitMQ實現分布式消息的發布與訂閱。

消息治理核心問題:

  1. 生產端如何保證投遞成功的消息不能丟失。
  2. Mq自身如何保證消息不丟失。
  3. 消費段如何保證消費端的消息不丟失。

基於abp 默認實現的DistributedEventBus不能滿足以下場景:

  1. Publisher 生產者無法保證消息一定能投遞到MQ。
  2. Consumer 消費端在消息消費時,出現異常時,沒有異常錯誤處理機制(確保消費失敗的消息能重新被消費)。

我們引入Masstransit,來提升abp對消息治理能力。

Masstransit提供以下開箱即用功能:

  1. Publish/Send/Request-Response等幾種消息投遞機制。
  2. 多種IOC容器支持。
  3. 異常機制。
  4. Saga事務管理。
  5. 事務活動補償機制(Courier)
  6. 消息審計
  7. 消息管道處理機制

Abp 框架下事件消息集成

  1. 使用MassTransit重新實現IDistributedEventBus
  2. 在消費端Consumer傳遞用戶身份信息。
  3. 使用Asp.Net Core Web Host 作消費端Consumer宿主。

集成MassTransit

在Module初始化時,注入MassTransit實例,並啟動。

/// <summary>
/// 配置DistributedEventBus
/// </summary>
/// <param name="context"></param>
/// <param name="configuration"></param>
/// <param name="hostingEnvironment"></param>
private void ConfigureDistributedEventBus(ServiceConfigurationContext context, IConfiguration configuration, IWebHostEnvironment hostingEnvironment)
{
    var options = context.Services.GetConfiguration().GetSection("Rabbitmq").Get<MassTransitEventBusOptions>();

    var mqConnectionString = "rabbitmq://" + options.ConnectionString;


    context.Services.AddMassTransit(mtConfig =>
    {
        //inject consumers into IOC from assembly
        mtConfig.AddConsumers(typeof(AuthCenterEventBusHostModule));


        mtConfig.AddBus(provider =>
        {
            var bus = Bus.Factory.CreateUsingRabbitMq(mqConfig =>
                {
                    var host = mqConfig.Host(new Uri(mqConnectionString), h =>
                    {
                        h.Username(options.UserName);
                        h.Password(options.Password);
                    });
                // set special message serializer
                    mqConfig.UseBsonSerializer();

                    // integrated existed logger compontent
                    mqConfig.UseExtensionsLogging(provider.GetService<ILoggerFactory>());

                    mqConfig.ReceiveEndpoint(host, "authcenter-queue", q =>
                    {
                        //set rabbitmq prefetch count
                        q.PrefetchCount = 200;

                        //set message retry policy
                        q.UseMessageRetry(r => r.Interval(3, 100));

                        q.Consumer<SmsTokenValidationCreatedEventConsumer>(provider);
                        EndpointConvention.Map<SmsTokenValidationCreatedEvent>(q.InputAddress);

                    });

                    mqConfig.ReceiveEndpoint(host, "user-synchronization", q =>
                    {
                        //set rabbitmq prefetch count
                        q.PrefetchCount = 50;
                        //q.UseRateLimit(100, TimeSpan.FromSeconds(1));
                        //q.UseConcurrencyLimit(2);

                        //set message retry policy
                        q.UseMessageRetry(r => r.Interval(3, 100));

                        q.Consumer<UserSyncEventConsumer>(provider);
                        EndpointConvention.Map<UserSyncEvent>(q.InputAddress);
                    });

                    mqConfig.ConfigureEndpoints(provider);

                    mqConfig.UseAuditingFilter(provider, o =>
                    {
                        o.ReplaceAuditing = true;
                    });
                });

            // set authtication middleware for user identity
            bus.ConnectAuthenticationObservers(provider);

            return bus;
        });
    });
}

在MassTransit中,使用IBusControl接口 StartAsyncStopAsync 來啟動或停止。

使用IPublishEndpoint重新實現IDistributedEventBus接口,實現與abp分布式事件總線集成。

 public class MassTransitDistributedEventBus : IDistributedEventBus, ISingletonDependency
    {
        private readonly IPublishEndpoint _publishEndpoint;


        //protected IHybridServiceScopeFactory ServiceScopeFactory { get; }
        protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; }

        public MassTransitDistributedEventBus(
            IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,
            IPublishEndpoint publishEndpoint)
        {
            //ServiceScopeFactory = serviceScopeFactory;
            _publishEndpoint = publishEndpoint;
            DistributedEventBusOptions = distributedEventBusOptions.Value;
            //Subscribe(distributedEventBusOptions.Value.Handlers);
        }

        /*
        *  Not Implementation
        */
      

        public Task PublishAsync<TEvent>(TEvent eventData)
            where TEvent : class
        {
            return _publishEndpoint.Publish(eventData);
        }

        public Task PublishAsync(Type eventType, object eventData)
        {
            return _publishEndpoint.Publish(eventData, eventType);
        }
    }


到此,我們實現了MassTransit與Abp集成。

事件消息傳遞User Claims

在實際業務實現過程中,我們會用消息隊列實現“削峰填谷”的效果。異步消息隊列中傳遞用戶身份信息如何實現呢?

我們先看看abp在WebApi中,如何確定當前用戶?

ICurrentUser 提供當前User Claims抽象。而ICurrentUser依賴於ICurrentPrincipalAccessor,在Asp.Net core中利用HttpContext User 來記錄當前用戶身份。

在MassTransit中,利用IPublishObserver > IConsumeObserver 生產者/消費端的觀察者,來實現傳遞已認證的用戶Claims。

    /// <summary>
    /// 生產者傳遞當前用戶Principal
    /// </summary>
 public class AuthPublishObserver : IPublishObserver
    {
        private readonly ICurrentPrincipalAccessor _currentPrincipalAccessor;
        private readonly IClaimsPrincipalFactory _claimsPrincipalFactory;

        public AuthPublishObserver(
            ICurrentPrincipalAccessor currentPrincipalAccessor,
            IClaimsPrincipalFactory claimsPrincipalFactory)
        {
            _currentPrincipalAccessor = currentPrincipalAccessor;
            _claimsPrincipalFactory = claimsPrincipalFactory;
        }

        public Task PrePublish<T>(PublishContext<T> context) where T : class
        {
            var claimsPrincipal = _claimsPrincipalFactory
                .CreateClaimsPrincipal(
                    _currentPrincipalAccessor.Principal
                    );

            if (claimsPrincipal != null)
            {
                context.Headers.SetAuthenticationHeaders(claimsPrincipal);
            }


            return TaskUtil.Completed;

        }
        public Task PostPublish<T>(PublishContext<T> context) where T : class => TaskUtil.Completed;
        public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class => TaskUtil.Completed;

    }


    /// <summary>
    /// 消費端從MqMessage Heads 中獲取當前用戶Principal,並賦值給HttpContext
    /// </summary>
 public class AuthConsumeObserver : IConsumeObserver
    {
        private readonly IHttpContextAccessor _httpContextAccessor;
        private readonly IServiceScopeFactory _factory;


        public AuthConsumeObserver(IHttpContextAccessor httpContextAccessor, IServiceScopeFactory factory)
        {
            _httpContextAccessor = httpContextAccessor;
            _factory = factory;
        }

        public Task PreConsume<T>(ConsumeContext<T> context) where T : class
        {
            if (_httpContextAccessor.HttpContext == null)
            {
                _httpContextAccessor.HttpContext = new DefaultHttpContext
                {
                    RequestServices = _factory.CreateScope().ServiceProvider
                };
            }

            var abpClaimsPrincipal = context.Headers.TryGetAbpClaimsPrincipal();

            if (abpClaimsPrincipal != null && abpClaimsPrincipal.IsAuthenticated)
            {
                var claimsPrincipal = abpClaimsPrincipal.ToClaimsPrincipal();

                _httpContextAccessor.HttpContext.User = claimsPrincipal;
                Thread.CurrentPrincipal = claimsPrincipal;
            }

            return TaskUtil.Completed;
        }

        public Task PostConsume<T>(ConsumeContext<T> context) where T : class
        {
            _httpContextAccessor.HttpContext = null;

            return TaskUtil.Completed;
        }

        public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
        {
            _httpContextAccessor.HttpContext = null;

            return TaskUtil.Completed;
        }
    }

使用Asp.Net Core Web Host 作消費端Consumer宿主

基於以下幾點原因,我們使用Asp.Net Core Web Host 作為消息端Consumer宿主

  1. 部署在Linux環境下,Asp.Net Core Web Host 通常使用守護進程來啟動服務實例,這樣可以保證服務不被中斷。
  2. 根據abp vnext DDD 項目分層,最大程度利用Application層應用方法,復用abp vnext 框架機制。

MassTransit 深入研究

  1. 延遲消息
  2. 限流熔斷降級
  3. 批量消費
  4. Saga

References

  1. abp vnext disctributed event bus
  2. MassTransit


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM