前言
asp.net core版本選擇2.2,只是因為個人習慣了vs2017,代碼以及設計皆可移植到vs2019,用asp.net core 3.0以及以上運行起來
項目類似選擇web api,基礎設施選擇entity frame core + Masstransit + aspectCore
先贅述一下思路業務,中間通訊以及容錯/重試交給masstransit,部分流程的解耦交給aspectCore來完成,這部分包括,錯誤后通過masstransit發布給錯誤處理的模塊,最后落盤到ef core
整個過程,業務處理的層之間可以通過Masstransit的通訊,可采用rpc的模式,也可以同異步的發布訂閱通訊
整體設計是可單體可分布式的理念,可以根據項目變化,可以自行配置,拆分成完成單體到分布式的過程,完成業務分解,但是對於業務使用而言,都是一樣的,無感知。
剛開始的文章會有很多代碼段說明,屬於前置知識的鋪墊,可能會有些啰嗦,后續文章會忽略掉無關的大片代碼段。
業務演示,我們選擇傳統的銀行轉賬業務以及電商的支付到下單。
整體crud的設計圖
包括了業務層和基礎設施層的設計
當前文章的設計圖如下
整體采用Mq異步的發布訂閱,訂閱之間也通過發布訂閱通知,完成最終一致性
業務實體
public class BaseModel { [Key] public int Id { get; set; } public DateTime CreateTime { get; set; } }
public class UserInfo: BaseModel { public string NickName { get; set; } public decimal Money { get; set; } public DateTime LastOptions { get; set; } }
public class PayOrder: BaseModel { public int SourceId { get; set; } public int TargetId { get; set; } public decimal Money { get; set; } }
配置EF Core
public class TransactionContexts:DbContext { public DbSet<PayOrder> PayOrders { get; set; } public DbSet<UserInfo> UserInfos { get; set; } public TransactionContexts(DbContextOptions<TransactionContexts> options):base(options) { } }
配置數據源偷懶就用InMemory了
services.AddDbContext<TransactionContexts>(build=> { build.UseInMemoryDatabase("TransactionContexts"); });
實例編寫
建立一個交易的Command
internal class PayOrderCommand { public int SourceId { get; set; } public int TargetId { get; set; } public decimal Money { get; set; } }
一個交易的Event
internal class PayOrderEvent: PayOrderCommand { }
Command對外,Event對內,主要是用於項目區分外部流程和內部流程的區別,代碼本身是沒有硬編碼要求的
構建一個交易的服務,對外公開一個轉賬的接口
public interface ITransactionService { void TransferAccounts(int sourceId, int targetId, decimal money); }
這個接口完成Publish/Subscribe模式的交易
Publish端
internal class TransactionService: ITransactionService { private IBusControl busControl; public TransactionService(IBusControl busControl) { this.busControl = busControl; } public async void TransferAccounts(int sourceId, int targetId, decimal money) { await busControl.Publish( new PayOrderCommand { SourceId = sourceId, TargetId = targetId, Money = money } ); } }
很常見的發布一個命令
Subscribe端
internal class TransactionConsumer : IConsumer<PayOrderCommand>, IConsumer<PayOrderEvent> { private TransactionContexts transactionContexts; public TransactionConsumer(TransactionContexts transactionContexts) { this.transactionContexts = transactionContexts; } public async Task Consume(ConsumeContext<PayOrderCommand> context) { var value = context.Message; await Console.Out.WriteLineAsync($"PayOrderCommand Before:{DateTime.Now} SourceId:{value.SourceId} TargetId:{value.TargetId} Money:{value.Money}"); await transactionContexts.PayOrders.AddAsync(new PayOrder { Id = 1, SourceId = value.SourceId, TargetId = value.TargetId, Money = value.Money }); await transactionContexts.SaveChangesAsync(); await context.Publish(new PayOrderEvent { SourceId = value.SourceId, TargetId = value.TargetId, Money = value.Money }); await Console.Out.WriteLineAsync($"PayOrderCommand After:{DateTime.Now}"); } public async Task Consume(ConsumeContext<PayOrderEvent> context) { var value = context.Message; await Console.Out.WriteLineAsync($"PayOrderEvent Before:{DateTime.Now} SourceId:{value.SourceId} TargetId:{value.TargetId} Money:{value.Money}"); var source = transactionContexts.UserInfos.First(user => user.Id == value.SourceId); if (source.Money < value.Money) throw new Exception(); var target = transactionContexts.UserInfos.First(user => user.Id == value.TargetId); source.Money -= value.Money; target.Money += value.Money; transactionContexts.UserInfos.Update(source); transactionContexts.UserInfos.Update(target); await transactionContexts.SaveChangesAsync(); await Console.Out.WriteLineAsync($"PayOrderEvent After:{DateTime.Now}"); } }
配置依賴注入流程
MassTransit的通訊選擇MassTransit.RabbitMQ,這個庫,也支持很多MQ,個人圖方便就選的rabbitmq,后期要更換,修改一下依賴注入的配置即可
services.AddScoped<TransactionConsumer>(); services.AddMassTransit(c => {
c.AddConsumer<TransactionConsumer>();
c.AddBus(serviceProvider => { return Bus.Factory.CreateUsingRabbitMq(cfg => { var host = cfg.Host(new Uri("rabbitmq://localhost/"), hst => { hst.Username("guest"); hst.Password("guest"); }); cfg.ReceiveEndpoint("Transaction", config => { config.ConfigureConsumer<TransactionConsumer>(serviceProvider); }); }); }); });
這樣就完成了引入Masstransit做數據通訊部分,Masstransit支持Rpc模式,也支持Publish/Subscribe,后續的文章會混搭Rpc模式和發布訂閱的模式,主要根據業務場景的選擇做調整
編寫演示例子
在Configure里面配置初始化數據
using (var serviceScoped = app.ApplicationServices.CreateScope()) { var serviceProvider = serviceScoped.ServiceProvider; var context = serviceProvider.GetRequiredService<TransactionContexts>(); context.UserInfos.Add(new UserInfo { Id = 1, NickName = "Form", CreateTime = DateTime.Now, LastOptions = DateTime.Now, Money = 5000 }); context.UserInfos.Add(new UserInfo { Id = 2, NickName = "To", CreateTime = DateTime.Now, LastOptions = DateTime.Now, Money = 5000 }); context.SaveChanges(); } #endregion
給Configure方法增加一個注入的接口
public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime) { //。。。 var busControl = app.ApplicationServices.GetRequiredService<IBusControl>(); lifetime.ApplicationStarted.Register(busControl.Start); lifetime.ApplicationStopped.Register(busControl.Stop); }
依舊是省事兒的Configure方法里面寫的一個管道
app.Run(async (context) =>
{
var serviceProvider = context.RequestServices; var bus = serviceProvider.GetRequiredService<IBusControl>(); await bus.Publish(new PayOrderCommand { SourceId = 1, TargetId = 2, Money = 2000 }); await context.Response.WriteAsync("Hello World!"); });
后話
寫在默認的index管道,會觸發一個有意思的BUG,會兩次執行,但是主鍵是唯一的,這樣重復執行會拋出異常,提示主鍵已存在
這個例子是先鋪墊一下,很多有意思的實現還沒開展
1、某個業務需要執行過程和下游強一致性,要么一起完成,要么當場失敗
2、某個業務完成最終一致性,這個過程,如果失敗需要重試/限流/熔斷
3、某個業務完成最終一致性,失敗了則觸發補償
先鋪墊一下,后空余時間逐一編寫示例演示
現在的代碼一點都不優雅,還有很強的面向於具體實現的,后續會逐步高度抽象化,從相對麻煩的代碼,變成純crud拿來主義,業務層 和后續代碼的流程無感
打個小廣告
如果有技術交流可以加NCC的群 24791014、436035237,我在群里,有任何關於asp.net core/Masstransit的問題或者建議都可以與我交流,非常歡迎
示例代碼: