有花時間去研究masstransit的saga,英文水平不過關,始終無法實現上手他的代碼編排的業務,遺憾。
本文通過rabbit和sqlserver實現下單,更新庫存,更新產品,模擬數據最終一致性。
項目結構如下,reportService可有可無,這里就相當一個鏈條,只要兩節走通了后面可以接龍,本文有用到不省略。流程:orderservice=>eComm=>reportservice 。

下面先看看order的配置,通過控制器新增訂單同時發布訂單信息到order_exchange交換機,Key是"order.created,這樣就把訂單推送到了隊列,等到庫存服務獲取訂單去更新庫存。

// POST api/<OrderController>
[HttpPost]
public async Task Post([FromBody] OrderDetail orderDetail)
{
var id = await orderCreator.Create(orderDetail);
publisher.Publish(JsonConvert.SerializeObject(new OrderRequest {
OrderId = id,
ProductId = orderDetail.ProductId,
Quantity = orderDetail.Quantity
}), "order.created", null);
}
更新庫存的代碼,然后再發送消息告訴order服務,這里有哪個try包裹,如果這里有失敗會觸發catch,發送減庫存失敗的消息。order服務消費到這條消息就會執行相應的刪除訂單操作。代碼如下:
using Ecomm.DataAccess;
using Ecomm.Models;
using Microsoft.Extensions.Hosting;
using Newtonsoft.Json;
using Plain.RabbitMQ;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Ecomm
{
public class OrderCreatedListener : IHostedService
{
private readonly IPublisher publisher;
private readonly ISubscriber subscriber;
private readonly IInventoryUpdator inventoryUpdator;
public OrderCreatedListener(IPublisher publisher, ISubscriber subscriber, IInventoryUpdator inventoryUpdator)
{
this.publisher = publisher;
this.subscriber = subscriber;
this.inventoryUpdator = inventoryUpdator;
}
public Task StartAsync(CancellationToken cancellationToken)
{
subscriber.Subscribe(Subscribe);
return Task.CompletedTask;
}
private bool Subscribe(string message, IDictionary<string, object> header)
{
var response = JsonConvert.DeserializeObject<OrderRequest>(message);
try
{
inventoryUpdator.Update(response.ProductId, response.Quantity).GetAwaiter().GetResult();
publisher.Publish(JsonConvert.SerializeObject(
new InventoryResponse { OrderId = response.OrderId, IsSuccess = true }
), "inventory.response", null);
}
catch (Exception)
{
publisher.Publish(JsonConvert.SerializeObject(
new InventoryResponse { OrderId = response.OrderId, IsSuccess = false }
), "inventory.response", null);
}
return true;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
using Ecomm.Models;
using Microsoft.Extensions.Hosting;
using Newtonsoft.Json;
using Plain.RabbitMQ;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace OrderService
{
public class InventoryResponseListener : IHostedService
{
private readonly ISubscriber subscriber;
private readonly IOrderDeletor orderDeletor;
public InventoryResponseListener(ISubscriber subscriber, IOrderDeletor orderDeletor)
{
this.subscriber = subscriber;
this.orderDeletor = orderDeletor;
}
public Task StartAsync(CancellationToken cancellationToken)
{
subscriber.Subscribe(Subscribe);
return Task.CompletedTask;
}
private bool Subscribe(string message, IDictionary<string, object> header)
{
var response = JsonConvert.DeserializeObject<InventoryResponse>(message);
if (!response.IsSuccess)
{
orderDeletor.Delete(response.OrderId).GetAwaiter().GetResult();
}
return true;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
上面的代碼是整個服務的核心業務,也很簡單就是隊列相互通信相互確認操作是否順利,失敗就執行回歸操作,而這里我們都會寫好對應補償代碼:
using Dapper; using System; using System.Collections.Generic; using System.Data.SqlClient; using System.Linq; using System.Threading.Tasks; namespace OrderService { public class OrderDeletor : IOrderDeletor { private readonly string connectionString; public OrderDeletor(string connectionString) { this.connectionString = connectionString; } public async Task Delete(int orderId) { using var connection = new SqlConnection(connectionString); connection.Open(); using var transaction = connection.BeginTransaction(); try { await connection.ExecuteAsync("DELETE FROM OrderDetail WHERE OrderId = @orderId", new { orderId }, transaction: transaction); await connection.ExecuteAsync("DELETE FROM [Order] WHERE Id = @orderId", new { orderId }, transaction: transaction); transaction.Commit(); } catch { transaction.Rollback(); } } } }
庫存服務里有發布產品的接口,這里沒有做過多的處理,只是把產品新增放到隊列,供后面的ReportService服務獲取,該服務拿到后會執行產品數量扣除:
using Microsoft.Extensions.Hosting; using Newtonsoft.Json; using Plain.RabbitMQ; using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace ReportService { public class ReportDataCollector : IHostedService { private const int DEFAULT_QUANTITY = 100; private readonly ISubscriber subscriber; private readonly IMemoryReportStorage memoryReportStorage; public ReportDataCollector(ISubscriber subscriber, IMemoryReportStorage memoryReportStorage) { this.subscriber = subscriber; this.memoryReportStorage = memoryReportStorage; } public Task StartAsync(CancellationToken cancellationToken) { subscriber.Subscribe(Subscribe); return Task.CompletedTask; } private bool Subscribe(string message, IDictionary<string, object> header) //private bool ProcessMessage(string message, IDictionary<string, object> header) { if (message.Contains("Product")) { var product = JsonConvert.DeserializeObject<Product>(message); if (memoryReportStorage.Get().Any(r => r.ProductName == product.ProductName)) { return true; } else { memoryReportStorage.Add(new Report { ProductName = product.ProductName, Count = DEFAULT_QUANTITY }); } } else { var order = JsonConvert.DeserializeObject<Order>(message); if(memoryReportStorage.Get().Any(r => r.ProductName == order.Name)) { memoryReportStorage.Get().First(r => r.ProductName == order.Name).Count -= order.Quantity; } else { memoryReportStorage.Add(new Report { ProductName = order.Name, Count = DEFAULT_QUANTITY - order.Quantity }); } } return true; } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } } }
到這里整個流程大概如此。只要理清楚了訂單和庫存更新這里的業務,后面差不多一樣,可以無限遞歸。代碼文末有鏈接供下載。
這里有一個地方的代碼如下,新增庫存的時候同時發布消息。假如新增完訂單后面崩掉了,這里是個原子操作最佳。
[HttpPost]
public async Task Post([FromBody] OrderDetail orderDetail)
{
var id = await orderCreator.Create(orderDetail);
publisher.Publish(JsonConvert.SerializeObject(new OrderRequest {
OrderId = id,
ProductId = orderDetail.ProductId,
Quantity = orderDetail.Quantity
}), "order.created", null);
}
很遺憾masstransit的saga還沒有整明白,那就上cap,完成業務一致性。加了點cap代碼因為之前是dapper,所以加了dbcontext和cap相關代碼有點小亂。核心代碼如下:
using DotNetCore.CAP;
using MediatR;
using OrderService.Command;
using System.Threading;
using Ecomm.Models;
using System.Collections.Generic;
namespace OrderService.Handler
{
public class InsertOrderDetailHandler : IRequestHandler<InsertOrderDetailCommand, InsertOrderDetailModel>
{
private readonly OrderDbContext context;
private readonly ICapPublisher cap;
public InsertOrderDetailHandler(OrderDbContext context, ICapPublisher cap)
{
this.context = context;
this.cap = cap;
}
public async System.Threading.Tasks.Task<InsertOrderDetailModel> Handle(InsertOrderDetailCommand request, CancellationToken cancellationToken)
{
using(var trans =context.Database.BeginTransaction(cap))
{
var order = context.Orders.Add(new Order
{
UpdatedTime = System.DateTime.Today,
UserId = request.UserId,
UserName = request.UserName
});
var orderDetail = context.OrderDetails.Add(new OrderDetail
{
OrderId = order.Entity.Id,
ProductId = request.ProductId,
Quantity = request.Quantity,
ProductName = request.ProductName,
});
context.SaveChanges();
cap.Publish<OrderRequest>("order.created", new OrderRequest
{
OrderId = order.Entity.Id,
ProductId = orderDetail.Entity.ProductId,
Quantity = orderDetail.Entity.Quantity
}, new Dictionary<string,string>()) ;
trans.Commit();
return new InsertOrderDetailModel { OrderDetailid = orderDetail.Entity.Id, OrderId = order.Entity.Id, Success = true };
}
}
}
}
到這里差不多要結束了,這里的代碼都可以調試運行的。因為加了cap,order服務有兩套rabbitmq的配置,有冗余,而且有點坑。調試的時候注意,Plain.RabbitMQ支持的交換機不是持久化的,而cap是持久化的,所以有點不兼容。第一次運行可以先確保Plain.RabbitMQ正常,再刪掉交換機,cap跑起來了再建持久化交換機,這樣cap消息就會被rabbitmq接收,后面就會被庫存服務消費。因為我這里cap不會自動綁定隊列,Plain.RabbitMQ是可以的。所以需要新建交換機后再綁定隊列。而且這里隊列以Plain.RabbitMQ生成的名字來綁定。要不然又可能會調試踩坑無法出坑。 用cap不注意你連消息隊列都看不到,看到了隊列也看不到消費數據,這點不知道是我不會還是cap有什么難的配置。結束。。。
上例項目demo:
liuzhixin405/SimpleOrders_Next (github.com)
超簡單微服務demo
