abp 通過IDistributedEventBus
接口集成自IEventBus實現分布式事件消息的發布訂閱。
IEventBus
在什么時機觸發PublishAsync
?
- 當前UnitOfWork完成時,觸發
IEventBus
的PublishAsync
- 在沒有事務環境下,同步調用
IEventBus
的PublishAsync
abp 默認實現基於RabbitMq消息隊列Volo.Abp.EventBus.RabbitMQ
實現分布式消息的發布與訂閱。
消息治理核心問題:
- 生產端如何保證投遞成功的消息不能丟失。
- Mq自身如何保證消息不丟失。
- 消費段如何保證消費端的消息不丟失。
基於abp 默認實現的DistributedEventBus不能滿足以下場景:
- Publisher 生產者無法保證消息一定能投遞到MQ。
- Consumer 消費端在消息消費時,出現異常時,沒有異常錯誤處理機制(確保消費失敗的消息能重新被消費)。
我們引入Masstransit,來提升abp對消息治理能力。
Masstransit提供以下開箱即用功能:
- Publish/Send/Request-Response等幾種消息投遞機制。
- 多種IOC容器支持。
- 異常機制。
- Saga事務管理。
- 事務活動補償機制(Courier)
- 消息審計
- 消息管道處理機制
Abp 框架下事件消息集成
- 使用MassTransit重新實現
IDistributedEventBus
。 - 在消費端Consumer傳遞用戶身份信息。
- 使用Asp.Net Core Web Host 作消費端Consumer宿主。
集成MassTransit
在Module初始化時,注入MassTransit實例,並啟動。
/// <summary>
/// 配置DistributedEventBus
/// </summary>
/// <param name="context"></param>
/// <param name="configuration"></param>
/// <param name="hostingEnvironment"></param>
private void ConfigureDistributedEventBus(ServiceConfigurationContext context, IConfiguration configuration, IWebHostEnvironment hostingEnvironment)
{
var options = context.Services.GetConfiguration().GetSection("Rabbitmq").Get<MassTransitEventBusOptions>();
var mqConnectionString = "rabbitmq://" + options.ConnectionString;
context.Services.AddMassTransit(mtConfig =>
{
//inject consumers into IOC from assembly
mtConfig.AddConsumers(typeof(AuthCenterEventBusHostModule));
mtConfig.AddBus(provider =>
{
var bus = Bus.Factory.CreateUsingRabbitMq(mqConfig =>
{
var host = mqConfig.Host(new Uri(mqConnectionString), h =>
{
h.Username(options.UserName);
h.Password(options.Password);
});
// set special message serializer
mqConfig.UseBsonSerializer();
// integrated existed logger compontent
mqConfig.UseExtensionsLogging(provider.GetService<ILoggerFactory>());
mqConfig.ReceiveEndpoint(host, "authcenter-queue", q =>
{
//set rabbitmq prefetch count
q.PrefetchCount = 200;
//set message retry policy
q.UseMessageRetry(r => r.Interval(3, 100));
q.Consumer<SmsTokenValidationCreatedEventConsumer>(provider);
EndpointConvention.Map<SmsTokenValidationCreatedEvent>(q.InputAddress);
});
mqConfig.ReceiveEndpoint(host, "user-synchronization", q =>
{
//set rabbitmq prefetch count
q.PrefetchCount = 50;
//q.UseRateLimit(100, TimeSpan.FromSeconds(1));
//q.UseConcurrencyLimit(2);
//set message retry policy
q.UseMessageRetry(r => r.Interval(3, 100));
q.Consumer<UserSyncEventConsumer>(provider);
EndpointConvention.Map<UserSyncEvent>(q.InputAddress);
});
mqConfig.ConfigureEndpoints(provider);
mqConfig.UseAuditingFilter(provider, o =>
{
o.ReplaceAuditing = true;
});
});
// set authtication middleware for user identity
bus.ConnectAuthenticationObservers(provider);
return bus;
});
});
}
在MassTransit中,使用IBusControl
接口 StartAsync
或 StopAsync
來啟動或停止。
使用IPublishEndpoint
重新實現IDistributedEventBus
接口,實現與abp分布式事件總線集成。
public class MassTransitDistributedEventBus : IDistributedEventBus, ISingletonDependency
{
private readonly IPublishEndpoint _publishEndpoint;
//protected IHybridServiceScopeFactory ServiceScopeFactory { get; }
protected AbpDistributedEventBusOptions DistributedEventBusOptions { get; }
public MassTransitDistributedEventBus(
IOptions<AbpDistributedEventBusOptions> distributedEventBusOptions,
IPublishEndpoint publishEndpoint)
{
//ServiceScopeFactory = serviceScopeFactory;
_publishEndpoint = publishEndpoint;
DistributedEventBusOptions = distributedEventBusOptions.Value;
//Subscribe(distributedEventBusOptions.Value.Handlers);
}
/*
* Not Implementation
*/
public Task PublishAsync<TEvent>(TEvent eventData)
where TEvent : class
{
return _publishEndpoint.Publish(eventData);
}
public Task PublishAsync(Type eventType, object eventData)
{
return _publishEndpoint.Publish(eventData, eventType);
}
}
到此,我們實現了MassTransit與Abp集成。
事件消息傳遞User Claims
在實際業務實現過程中,我們會用消息隊列實現“削峰填谷”的效果。異步消息隊列中傳遞用戶身份信息如何實現呢?
我們先看看abp在WebApi中,如何確定當前用戶?
ICurrentUser
提供當前User Claims抽象。而ICurrentUser
依賴於ICurrentPrincipalAccessor
,在Asp.Net core中利用HttpContext User 來記錄當前用戶身份。
在MassTransit中,利用IPublishObserver
> IConsumeObserver
生產者/消費端的觀察者,來實現傳遞已認證的用戶Claims。
/// <summary>
/// 生產者傳遞當前用戶Principal
/// </summary>
public class AuthPublishObserver : IPublishObserver
{
private readonly ICurrentPrincipalAccessor _currentPrincipalAccessor;
private readonly IClaimsPrincipalFactory _claimsPrincipalFactory;
public AuthPublishObserver(
ICurrentPrincipalAccessor currentPrincipalAccessor,
IClaimsPrincipalFactory claimsPrincipalFactory)
{
_currentPrincipalAccessor = currentPrincipalAccessor;
_claimsPrincipalFactory = claimsPrincipalFactory;
}
public Task PrePublish<T>(PublishContext<T> context) where T : class
{
var claimsPrincipal = _claimsPrincipalFactory
.CreateClaimsPrincipal(
_currentPrincipalAccessor.Principal
);
if (claimsPrincipal != null)
{
context.Headers.SetAuthenticationHeaders(claimsPrincipal);
}
return TaskUtil.Completed;
}
public Task PostPublish<T>(PublishContext<T> context) where T : class => TaskUtil.Completed;
public Task PublishFault<T>(PublishContext<T> context, Exception exception) where T : class => TaskUtil.Completed;
}
/// <summary>
/// 消費端從MqMessage Heads 中獲取當前用戶Principal,並賦值給HttpContext
/// </summary>
public class AuthConsumeObserver : IConsumeObserver
{
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly IServiceScopeFactory _factory;
public AuthConsumeObserver(IHttpContextAccessor httpContextAccessor, IServiceScopeFactory factory)
{
_httpContextAccessor = httpContextAccessor;
_factory = factory;
}
public Task PreConsume<T>(ConsumeContext<T> context) where T : class
{
if (_httpContextAccessor.HttpContext == null)
{
_httpContextAccessor.HttpContext = new DefaultHttpContext
{
RequestServices = _factory.CreateScope().ServiceProvider
};
}
var abpClaimsPrincipal = context.Headers.TryGetAbpClaimsPrincipal();
if (abpClaimsPrincipal != null && abpClaimsPrincipal.IsAuthenticated)
{
var claimsPrincipal = abpClaimsPrincipal.ToClaimsPrincipal();
_httpContextAccessor.HttpContext.User = claimsPrincipal;
Thread.CurrentPrincipal = claimsPrincipal;
}
return TaskUtil.Completed;
}
public Task PostConsume<T>(ConsumeContext<T> context) where T : class
{
_httpContextAccessor.HttpContext = null;
return TaskUtil.Completed;
}
public Task ConsumeFault<T>(ConsumeContext<T> context, Exception exception) where T : class
{
_httpContextAccessor.HttpContext = null;
return TaskUtil.Completed;
}
}
使用Asp.Net Core Web Host 作消費端Consumer宿主
基於以下幾點原因,我們使用Asp.Net Core Web Host 作為消息端Consumer宿主
- 部署在Linux環境下,Asp.Net Core Web Host 通常使用守護進程來啟動服務實例,這樣可以保證服務不被中斷。
- 根據abp vnext DDD 項目分層,最大程度利用Application層應用方法,復用abp vnext 框架機制。